Querying Nested Data with Lambda Functions
Denormalized data with nested values (e.g. array/map) have become omnipresent in this Big Data era, as a lot of data naturally conforms to a nested representation [1, 2]. As a result it is important to provide an efficient and convenient way to query nested data. SQL traditionally does not include support for this.
The pioneering work of Dremel proposed an extension to SQL based on recursive relational algebra to allow querying nested records , and is now available in BigQuery and the SQL:2016 standard. The following example shows how to transform array elements with this (adapted from BigQuery Docs):
SELECT elements, ARRAY(SELECT v * 2 FROM UNNEST(elements) AS v) AS multiplied_elements FROM ( VALUES (ARRAY[1, 2]), (ARRAY[1, 3, 9]), (ARRAY[1, 4, 16, 64]) ) AS t(elements) elements | multiplied_elements ----------------+--------------------- [1, 2] | [2, 4] [1, 3, 9] | [2, 6, 18] [1, 4, 16, 64] | [2, 8, 32, 128] (3 rows)
While nested relational algebra provides an elegant and unified approach to query nested data, we found it could be challenging for users to track the “unnest stack” in mind when writing the query. In our experience, users are more comfortable to apply a given function (e.g lambda) to each element in the collection. This motivates us to introduce lambda expressions into Presto to help query nested data, as illustrated below:
SELECT elements, transform(elements, v -> v * 2) as multiplied_elements FROM ( VALUES (ARRAY[1, 2]), (ARRAY[1, 3, 9]), (ARRAY[1, 4, 16, 64]) ) AS t(elements)
In Presto, a lambda expression consists of an argument list and lambda body, separated by
x -> x + 1 (x, y) -> x + y x -> regexp_like(x, 'a+') x -> x / x x -> IF(x > 0, x, -x) x -> COALESCE(x, 0) x -> CAST(x AS JSON) x -> x + TRY(1 / 0)
Note there is no type annotation in a lambda expression. The type of a lambda expression (e.g.
function(integer, integer)) thus has to be inferred from the context of function call. As a result, standalone lambda expressions are not allowed since their types cannot be determined.
Lambda Type Inference
The initial lambda support in Presto was added in Presto#6198 with basic compilation and execution. One of the major challenges this pull request addressed was type inference for lambda, as there is no type annotation in lambda expressions. Consider the following expression contains lambda:
transform(elements, v -> v * 2)
elements is with type
Presto allows function overloading, and the exact function match is resolved by looking up the function name and argument types. This is infeasible for higher-order functions as the type for
v -> v * 2 cannot be resolved without context.
As a result, Presto doesn’t allow function overloading for higher-order functions, thus it can still resolve the function
transform just by the name.
ExpresionAnalyzer will have the following information:
- The generic type for
(array(T), function(T, U)) -> array(U).
- The first argument type is
- The exact type for the second argument
v -> v * 2is unknown since it’s a lambda expression. However, its type can be uniquely determined once the input parameter type is bound. This is done by the TypeSignatureProvider class.
The type parameter
U needs to be determined to resolve the expression type. This is done by SignaturerBinder#bind method. SignaturerBinder#appendConstraintSolvers is called under the hood to iteratively solve this constraint satisfaction problem. A new TypeConstraintSolver called FunctionSolver was added for updating type constraints related to lambda expressions.
Lambda capture allows users to refer to other columns in the lambda function, for example:
SELECT elements, transform(elements, v -> v * factor) as multiplied_elements FROM ( VALUES (ARRAY[1, 2], 2), (ARRAY[1, 3, 9], 3), (ARRAY[1, 4, 16, 64], 4) ) AS t(elements, factor) elements | multiplied_elements ----------------+--------------------- [1, 2] | [2, 4] [1, 3, 9] | [3, 9, 27] [1, 4, 16, 64] | [4, 16, 64, 256] (3 rows)
Lambda capture supported is added in Presto#7210. It rewrites the captured lambda into non-capture lambda via partial function application. A special internal higher order function
BIND is introduced to partially apply captured arguments to the lambda.
Take the above example, the captured lambda call
transform(elements, v -> v * factor)
is rewritten to
transform( elements, BIND(factor, (captured_factor, e) -> e * captured_factor) )
The original unary lambda with capture
e -> e * factor is rewritten into a binary lambda without capture:
(captured_factor, e) -> e * captured_factor. The
BIND call takes
factor and this binary lambda as input, returns the partially applied function that multiplies the input by
captured_factor (Note the
captured_factor will be different for each row!). This partially applied function is a unary function and is provided as the second parameter to
In this section we are going to discuss how a lambda is executed during runtime. The original implementation used
MethodHandle objects to represent lambdas on the stack. Consider the same example:
transform( elements, BIND(factor, (captured_factor, v) -> v * captured_factor) )
Each invocation of
transform works in the following way:
- Push the Java object representing elements on the stack. When
array(integer), the corresponding Java stack type is
- Push the
MethodHandleobject representing captured lambda to the stack, i.e. binding
(captured_factor, v) -> v * captured_factor. To this end:
- Push the
v -> v * captured_factoronto the stack.
captured_factoron the stack.
MethodHandle#bindToto get a
BoundMethodHandlerepresenting captured lambda on the top of stack.
- Push the
Unfortunately, this implementation causes Java to generate a separate customized LambdaForm class for every
MethodHandle#bindTo call (i.e. per each row). Such excessive runtime class generation quickly fills the Metaspace and causes full GC, see Presto#7935 for reproduction and details. JDK developers have confirmed that each
BoundMethodHandle should be customized independently, and
MethodHandle#bindTo is not a good fit for implementing lambda capturing.
To fix this, we redesigned Presto lambda execution via Presto #8031. The key observations are:
- Lambda capture has to be performed per invocation, as different value will be captured for each row.
- However, we should use the same class representing captured lambda for every
BINDcall, otherwise we will generate too many classes.
We use the same approach as Java uses to handle lambda and capture [3, 4]:
- A lambda is represented as an object whose type is a functional interface (a.k.a. Single Abstract Method class)
invokedynamicinstruction is used to perform lambda capture:
- During the first
invokedynamiccall, the class representing the captured lambda (which is a functional interface) is created and a method to perform the capture is generated in it. This step is also called linkage and will only be done once.
invokedynamiccall performs capture and returns an instance of the desired functional interface.
- During the first
With this design,
BIND function will always be fused together with the lambda generation step to generate a captured lambda in a single step -- we cannot first generate an object representing the uncaptured lambda, and then perform a separate partial application step. Note this implementation also doesn't allow more general higher-order functions that return a function as result.
Lambda in Aggregation
While lambda was originally introduced to help query nested data with scalar functions, we also noted it can be used in aggregation functions to allow more flexible analytics. The initial support for lambda in aggregation was added in Presto#12084, with a reduce_agg function for demonstration purposes.
reduce_agg conceputally allows the creation of User-Defined Aggregation Function (UDAF) by making the input and combine functions lambdas. The following example shows how to use
reduce_agg to compute group-wise product (instead of sum):
SELECT id, reduce_agg(value, 1, (a, b) -> a * b, (a, b) -> a * b) prod FROM ( VALUES (1, 2), (1, 3), (1, 4), (2, 20), (2, 30), (2, 40) ) AS t(id, value) GROUP BY id; id | prod ----+------- 2 | 24000 1 | 24 (2 rows)
Unfortunately, due to JDK-8017163, aggregation state with
Block as a native container type is intentionally not supported yet. It can result in excessive JVM remembered set memory usage. This is because aggregation state requires updates in unpredictable order, resulting in a huge amount of cross-region references when each state is a separate object. This issue is also reported in Presto#9553. This makes this function not yet practically useful. Once JDK-8017163 is fixed in later versions of the JVM, we are looking forward to enabling it with more general types to allow more flexible analytics in aggregations!
 Dremel: Interactive Analysis of Web-Scale Datasets
 Everything You Always Wanted To Do in Table Scan
 Lambda: A peek under the hood