Do you ❤️ Trino? Give us a 🌟 on GitHub

Trino Community Broadcast

11: Dynamic filtering and dynamic partition pruning

Feb 18, 2021

Video

Audio

 

Release 352

Release notes discussed: https://trino.io/docs/current/release/release-352.html

No new release to discuss yet except that 353 will be around the corner to fix a low-impact correctness issue that came out in 352 https://github.com/trinodb/trino/pull/6895.

Concept of the week: Dynamic filtering

So we’ve covered a lot on the Trino Community Broadcast to build our way up to tackling this pretty big subject in the space called dynamic filtering. If you haven’t seen episodes five through nine, you may want to go back and watch those for some context for this episode. Episode eight actually diverted to the Trino rebrand so we won’t discuss that one. For the recap;

In episode five, we spoke about Hive partitions. In order to save you time when you run a query, Hive stores data under directories named by the values of the data written underneath that directory. Take this directory structure for the orders table partitioned by the orderdate field:

orders
├── orderdate=1992-01-01
│   ├── orders_1992-01-01_1.orc
│   ├── orders_1992-01-01_2.orc
│   ├── orders_1992-01-01_3.orc
│   └── ...
├── orderdate=1992-01-02
│   └── ...
├── orderdate=1992-01-03
│   └── ...
└── ...

When querying for data under January 1st, 1992, according to the Hive model, query engines like Hive and Trino will only scan ORC files under the orders/orderdate=1992-01-01 directory. The idea is to avoid scanning unnecessary data by grouping rows based on a field commonly used in a query.

In episode six and seven, we discussed a bit about how a query gets represented internally to Trino once you submit your SQL query. First, the Parser converts SQL to an abstract syntax tree (AST) format. Then the planner generates a different tree structure called the intermediate representation (IR) that contains nodes representing the steps that need to be performed in order to answer the query. The leaves of the tree get executed first, and the parents of each node are dependent on the action of its child completing before it can start. Finally, the planner and cost-based-optimizer (CBO) runs various updates on the IR to optimize the query plan until it is ready to be executed. To sum it all up, the planner and CBO generate and optimize the plan by running optimization rules. Refer to chapter four in Trino: The Definitive Guide pg. 50 for more information.

In episode nine, we discussed how hash-joins work by first drawing a nested-loop analogy to how joins work. We then discussed how it is advantageous to read the inner loop into memory to avoid a lot of extra disk calls. Since it is ideal to read an entire table into memory, you likely want to make sure the table that is built in memory is the smaller size of the two tables. This smaller table called the build table. The table that gets streamed is called the probe table. We discussed a bit how hash-joins work which is a common mechanism to execute joins in a distributed and parallel fashion.

Another nomenclature akin to build table and probe tables are dimension and fact table, respectively. This nomenclature comes from the star schema from data warehousing. Typically, there are large tables called fact tables would live at the center of the schema. These tables typically have many foreign keys, and a bit of quantitative or measuarable columns of the event or instance. The foreign keys connect these big fact tables to smaller dimension tables that, when joined, provide human readable context to enrich the recordings in the fact table. The schema ends up looking like a star with the fact table at the center. In essence, you just need to remember when someone is describing a fact table they are saying it is a bigger table that is likely going to end up on the probe side of a join, where a dimension is more likely a candidate to fit into memory on the build side of a join.

So let’s get onto the dynamic filtering shall we? First, let’s cover a few concepts about dynamic filtering, then compare some variations of this concept.

Dynamic filtering takes advantage of joins with big fact tables to smaller dimension tables. What makes this filtering different from other types of filtering is that you are using the smaller build table that is loaded at query time to generate a list of values that exist in the join column between the build table and probe table. We know that only values that match these criteria are going to be returned from the probe side, so we can use this dynamically generated list as a pushdown predicate on the join column of the probe side. This means we are still scanning this data, but only sending the subset that answers the query. We can look at the blog written for the original local dynamic filtering implementation by Roman Zeyde for more insights on the original implementation for dynamic filtering before Raunaq’s changes.

Local dynamic filtering is definitely beneficial as it allows skipping unnecessary stripes or row-groups in the ORC or Parquet reader. However, it works only for broadcast joins, and its effectiveness depends upon the selectivity of the min and max indices maintained in ORC or Parquet files. What if we could prune entire partitions from the query execution based on dynamic filters? In the next iteration of dynamic filtering, called dynamic partition pruning, we do just that. We take advantage of the partitioned layout of Hive tables to avoid generating splits on partitions that won’t exist in the final query result. The coordinator can identify partitions for pruning based on the dynamic filters sent to it from the workers processing the build side of join. This only works if the query contains a join condition on a column that is partitioned.

With that basic understanding, let’s move on to the PR that implement dynamic partition pruning!

PR of the week: PR 1072 Implement dynamic partition pruning

In this week’s pull request https://github.com/trinodb/trino/pull/1072 we return with Raunaq Morarka and Karol Sobczak. This PR effectively brings in the second iteration of dynamic filtering, dynamic partition pruning, where instead of relying on local dynamic filtering we collect dynamic filters from the workers in the coordinator and prune out extra splits that aren’t needed with the partition layout of the probe side table. A query like this for example, seen in Raunaq’s blog about dynamic partition pruning shows that if we partition store_sales on ss_sold_date_sk we can take advantage of this information by sending it to the coordinator.

SELECT COUNT(*) FROM 
sales JOIN items ON sales.item_id = date_dim.items.id
WHERE items.price > 1000;

Below we show how the execution of this would look in a distributed manner if you partitioned the sales table on item_id. This is a visual reference for those listening in on the podcast:

1:
Query is sent to the coordinator to be parsed, analyzed, and planned.

2:
All workers get a subset of the items (build) table and each worker filters out items with price > 1000.

3:
All workers create dynamic filter for their item subset and send it to the coordinator.

4:
Coordinator uses dynamic filter list to prune out splits and partitions that do not overlap with the DF and submits splits to run on workers.

5:
Workers run splits over the sales (probe) table.

6:
Workers return final rows to be assembled into the final result on the coordinator.

PR Demo: PR 1072 Implement dynamic partition pruning

For this PR demo, we have set up one r5.4xlarge coordinator and four r5.4xlarge workers in a cluster. We have a sf100 size tpcds dataset. We will run some of the TPC-DS queries and perhaps a few others.

The first query we run through in the TPC-DS queries was query 54. With this query, we are using the hive catalog pointing to AWS S3 and AWS Glue as our metastore. We initially disable dynamic filtering then compare it to the times when dynamic filtering is enabled. With dynamic filtering we find the query to run at about 92 seconds, where with dynamic filtering it runs for 42 seconds. We see similar findings for the semijoin we execute below and discuss some implications of how the planner actually optimizes the semijoin into an inner join.

/* turn dynamic filtering on or off to compare */
SET SESSION enable_dynamic_filtering=false;

SELECT ss_sold_date_sk, COUNT(*) from store_sales WHERE ss_sold_date_sk IN (
  SELECT ws_sold_date_sk FROM (
    SELECT ws_sold_date_sk, COUNT(*) FROM web_sales GROUP BY 1 ORDER BY 2 LIMIT 100
  )
)
GROUP BY 1

Blogs

Upcoming events

Past Events

Latest training from David, Dain, and Martin(Now with timestamps!):

If you want to learn more about Trino, check out the definitive guide from OReilly. You can download the free PDF or buy the book online.

Music for the show is from the Megaman 6 Game Play album by Krzysztof Słowikowski.