CTE Materialization Framework in Presto

    In this blog, we’ll dive deep into the Common Table Expression (CTE) Materialization Framework in Presto, a framework open-sourced by Uber and Meta and used in Presto. We will also showcase some actual production gains observed in Uber.

    The goal of CTE Materialization is to minimize query level redundant computations in queries, conserving system resources and improving query performance.

    What is Common Table Expression (CTE)

    As per the SQL standard, a Common Table Expression (CTE) refers to the result set generated by a query defined within a WITH clause which exists temporarily and for use only within the context of a larger query. In Presto, CTEs are streamed and executed multiple times depending on how often they are referenced, which contradicts the SQL standard, which specifies that CTEs should be executed only once.

    -- CTE to gather all necessary customer order data
    WITH customer_orders
    AS (
    	SELECT c.customer_id
    		,o.order_id
    		,o.total_amount
    		,oi.product_id
    	FROM customers c
    	JOIN orders o ON c.customer_id = o.customer_id
    	JOIN order_items oi ON o.order_id = oi.order_id
    	)
    -- Main query to calculate all metrics
    SELECT customer_id
    	,COUNT(DISTINCT order_id) AS total_orders
    	,SUM(total_amount) AS total_spent
    	,AVG(total_amount) AS avg_order_value
    	,COUNT(DISTINCT product_id) AS products_purchased
    FROM customer_orders
    GROUP BY customer_id;

    Presto supports advanced CTEs, allowing them to be nested within each other and they are treated as first-class objects for greater flexibility and reuse in queries.

    WITH cte1
    AS (
    	WITH cte2 AS (...)
    	…
    	)
    SELECT *
    FROM cte1;

    Understanding the Need for Reducing Redundant Computation

    As data grows and queries become more complex, users use CTEs to simplify and modularize SQL queries.

    However, without proper governance, CTEs can lead to redundant computation and increased resource consumption. Each CTE reference may trigger re-computation of the same subquery, causing unnecessary overheads. TPCDS Q64 for example has a CTE which when materialized consumes 2x less cpu and 2x less data read

    CTEs are user-defined units of recomputation, serving as a good starting point for detecting redundant computations. Over time, these can be further optimized by allowing the query optimizer to automatically handle and detect redundancies automatically.

    However, there is also a cost to writing to and reading from disk, so the optimization may not be beneficial for very simple CTEs or CTEs that are not used many times in a query.

    CTE Materialization Framework

    In the CTE Materialization Framework, intermediate results of eligible CTEs are stored as temporary tables on storage systems like HDFS or S3. These temporary tables exist only for the duration of the query execution and are automatically deleted once the query completes. This approach optimizes performance by reusing computed results without consuming long-term storage resources.

    Eligible CTE Detection

    CTEs can be independent, reference each other, or be referenced in the main query. This interconnectedness makes the choice of which CTEs to materialize particularly important, as it can significantly impact the efficiency of query execution.

    The image shown above shows the main queries and nested CTEs. Note that the graph will always be directed

    To identify which CTEs should be materialized within a query, we currently employ a heuristic approach. Selecting the optimal set of CTEs to materialize can result in an enormous plan space growing factorially with the number of CTEs O(N!), making exhaustive exploration computationally infeasible.

    To tackle this challenge, we adopt a greedy strategy. We start by selecting the earliest parent CTE that meets our eligibility criteria and proceed iteratively from there. This method simplifies the selection process while still capturing significant opportunities for optimization.

    Our current criteria for determining eligible CTEs are as follows:

    • Frequency of Use: The CTE is referenced four or more times within the query*.
    • Complexity of Operations: The CTE contains complex operations such as joins, aggregations, or window functions.

    * Because the default HDFS replication is 3.

    Future Enhancements

    We recognize that while our heuristic approach is effective, it may not capture all opportunities for optimization and can have false positives. To address this, we plan to extend our strategy by adopting a cost-based approach. By integrating with our History-Based Optimizer, we can make more informed and powerful decisions about which CTEs to materialize.

    This heuristic approach strikes a balance between optimization potential and computational feasibility, allowing us to improve query efficiency without incurring excessive planning overhead.

    Planning and Optimization

    1. Generating Intermediate Plan

    After the analysis phase, we generate an intermediate plan where each instance of a CTE is represented by a CTEReferenceNode. These nodes assist the optimizer in determining which CTEs to materialize and where to place their corresponding CTEConsumer nodes.

    2. Generating Logical Plan

    Once we’ve identified the CTEs to materialize, we create a logical plan. This plan includes CTEProducer, CTEConsumer, and a new logical plan node called Sequence. The Sequence node stores the CTE dependencies and plays a crucial role in helping the scheduler avoid deadlocks and incorrect results by ensuring the proper execution order.

    3. Optimizing the Logical Plan

    This logical plan then goes through all our various optimizers which try to optimize the plan. Simultaneously, we apply specific optimizations for CTEs, such as common filter pushdowns (as shown above) that push down shared filters into the CTE producers, streamlining data processing.

    Future Enhancements

    However, this approach does not always select the best possible plan because the optimizations are applied sequentially, one after another. In some cases, pushing individual filters or aggregates below the CTEs can be more efficient than materializing the CTE itself. Once a CTE is materialized, all common filters etc need to be applied afterward, which might not be optimal.

    To address these limitations, we plan to explore options for maintaining both the CTE materialized plan and the non-materialized plan simultaneously. By comparing the two after all optimizations are applied, we can determine which one offers better performance. This approach will help us identify more optimal execution strategies and improve query efficiency in future iterations.

    4. Generating Physical Plan

    Finally, the logical plan is transformed into a physical execution plan. In this physical plan:

    • CTE Producers are implemented as table writers that materialize the intermediate results into temporary storage.
    • CTE Consumers are represented as table scans that read from these materialized tables.

    The scheduler leverages the information contained within the Sequence node to properly orchestrate the execution of the query. In the final physical plan, the query is segmented into distinct sections

    Currently, all CTE producers are scheduled and executed concurrently before the main query runs. This means that the materialization of CTEs happens upfront, ensuring that all necessary intermediate results are ready for consumption when the main query begins execution.

    Future Enhancements

    We recognize that executing all CTE producers sequentially before the main query may not always be the most efficient strategy. In the future, we plan to enhance the scheduler to allow the main query to run simultaneously with the CTE producers.

    Currently, the CTE Materialization Framework is not supported in Presto on Spark. Also, since intermediate stages are stored as reusable Spark RDDs, there’s no need for additional table writers to materialize results into external storage.

    Temporary Tables

    Temporary tables created during CTE materialization are not registered in external metadata services like the Hive Metastore (HMS). Instead, the Presto coordinator manages their metadata locally in memory and clears these temporary tables after the query completes.

    The temporary tables are also bucketed on a random hashing to reduce small files and to bound on the number of files.

    We also store the temporary tables in Presto Pagefile format using snappy compression.This is the same format used by Presto workers to exchange data between stages during query execution. By utilizing the Presto PageFile format, we enhance performance by avoiding additional serialization and deserialization overhead during reads and writes. This optimization leads to more efficient data processing and faster query execution times.

    Results

    Over 600k queries have been successfully run in Uber production with CTE materialization. Heuristic CTE Materialization has been enabled in our production environment for heavy batch queries. For our interactive users, we’ve added query warnings through a session property to inform them when CTE materialization might help improve query performance.

    Performance

    We deployed Heuristic CTE Materialization for our top 15,000 CPU-intensive pipelines. Additionally, we enabled Heuristic CTE Materialization for queries that frequently encountered high memory usage or other failures. Queries that show performance regressions are automatically excluded from this optimization.

    Specifically, there was a 38% reduction in CPU usage and approximately 40% less data read for the affected queries based on their historical executions. Additionally, query latency improved by around 7%.

    Our benchmarks which sample 2000 queries show similar results. We have also observed that the total written data for the temporary tables was less than 1% of the read.

    We did not observe significant performance degradation in cpu or data read. We observed some performance degradation in query latency for cases where pushing down individual filters would have been more efficient. We also observed some cases where if there was a long chain of dependent CTEs the scheduling would be slow.

    We expect that upcoming scheduling improvements and a cost-based solution will help detect and address these scenarios

    Conclusion

    The CTE Materialization Framework in Presto significantly enhances query performance by reducing redundant computations and optimizing resource utilization.

    Our heuristic approach has proven effective in production for heavy batch queries, and we plan to further refine this by adopting a cost-based strategy integrated with a History-Based Optimizer.

    Future enhancements will also focus on improving the scheduler to allow more parallelism between CTE producers and the main query and presto on spark integration. These ongoing developments underscore our commitment to advancing query optimization techniques within the Presto ecosystem

    Please check our docs on how to deploy in your production and how to participate in development.