Star-schema is one of the most widely used data mart patterns.
The star schema consists of fact tables (usually partitioned) and dimension tables,
which are used to filter rows from fact tables.
Consider the following query which captures a common pattern of a fact table
store_sales partitioned by the column
ss_sold_date_sk joined with a filtered dimension table
SELECT COUNT(*) FROM store_sales JOIN date_dim ON store_sales.ss_sold_date_sk = date_dim.d_date_sk WHERE d_following_holiday='Y' AND d_year = 2000;
Without dynamic filtering, Presto will push predicates for the dimension table to the table scan on
it will scan all the data in the fact table since there are no filters on
store_sales in the query.
The join operator will end up throwing away most of the probe-side rows as the join criteria is highly selective.
The current implementation of dynamic filtering improves
on this, however it is limited only to broadcast joins on tables stored in ORC or Parquet format.
Additionally, it does not take advantage of the layout of partitioned Hive tables.
With dynamic partition pruning, which extends the current implementation of dynamic filtering, every worker node collects
values eligible for the join from
date_dim.d_date_sk column and passes it to the coordinator.
Coordinator can then skip processing of the partitions of
store_sales which don’t meet the join criteria.
This greatly reduces the amount of data scanned from
store_sales table by worker nodes.
This optimization is applicable to any storage format and to both broadcast and partitioned join.
Design considerations #
This optimization requires dynamic filters collected by worker nodes to be communicated to the coordinator over the network. We needed to ensure that this additional communication overhead does not overload the coordinator. This was achieved by packing dynamic filters into Presto’s existing framework for sending status updates from worker to coordinator.
was added on the coordinator node to perform dynamic filter collection asynchronously.
Queries registered with this service can request dynamic filters while scheduling splits without blocking any operations.
This service is also responsible for ensuring that all the build-side tasks of a join stage have completed execution before
constructing dynamic filters to be used in the scheduling of probe-side table scans by the coordinator.
For identifying opportunities for dynamic filtering in the logical plan, we rely on the implementation added in
#91. Dynamic filters are modeled as
FunctionCall expressions which
evaluate to a boolean value. They are created in the
PredicatePushDown optimizer rule from the equi-join clauses of inner join
nodes and pushed down in the plan along with other predicates. Dynamic filters are added to the plan after the cost-based
optimization rules. This ensures that dynamic filters do not interfere with cost estimation and join reordering.
PredicatePushDown rule can end up pushing dynamic filters to unsupported places in the plan via inferencing.
This was solved by adding the
optimizer rule which is responsible for ensuring that:
- Dynamic filters are present only directly above a
TableScannode and only if the subtree is on the probe side of some downstream
- Dynamic filters are removed from
JoinNodeif there is no consumer for it on its probe side subtree.
We also run
at the end of the planning phase to ensure that the above conditions have been satisfied by the optimized plan.
We reuse the existing
LocalExecutionPlanner to collect build-side values from each inner join on each worker node. In addition to passing the collected
within the same worker node for use in broadcast join probe-side scans, we also pass them to
TaskContext to populate task
status updates for the coordinator.
ContinuousTaskStatusFetcher on the coordinator node pulls task status updates from all worker nodes up to every
task.status-refresh-max-wait seconds (default is 1 second) or less (if task status changes).
on the coordinator regularly polls for dynamic filters from task status updates through
SqlQueryExecution and provides
an interface to supply dynamic filters when they are ready. The
ConnectorSplitManager#getSplits API has been updated to
optionally utilize dynamic filters supplied by the
In the Hive connector,
BackgroundHiveSplitLoader can apply dynamic filtering by either completely skipping the listing
of files within a partition, or by avoiding the creation of splits within a loaded partition if the dynamic filters
become available in
InternalHiveSplitFactory#createInternalHiveSplit due to lazy enumeration of splits.
We ran TPC-DS queries on 5 worker nodes cluster of r4.8xlarge machines using data stored in ORC format. TPC-DS tables were partitioned as:
The following queries ran faster by more than 20% with dynamic partition pruning (measuring the elapsed time in seconds, CPU time in minutes and Data read in MB).
|Query||Baseline elapsed||Dynamic partition pruning elapsed||Baseline CPU||Dynamic partition pruning CPU||Baseline data read||Dynamic partition pruning data read|
- 18 TPC-DS queries improved runtime by over 50% while decreasing CPU usage by an average of 64%. Data read was decreased by 66%.
- 7 TPC-DS queries improved between 30% to 50% while decreasing CPU usage by an average of 47%. Data read was decreased by 54%.
- 29 TPC-DS queries improved by 10% to 30% while decreasing CPU by an average of 20%. Data read was decreased by 27%.
Note that the baseline here includes the improvements from the existing node local dynamic filtering implementation.
In order for dynamic filtering to work, the smaller dimension table needs to be chosen as a join’s build side. Cost-based optimizer can automatically do this using table statistics from the metastore. Therefore, we generated table statistics prior to running this benchmark and rely on the CBO to correctly choose the smaller table on the build side of join.
It is quite common for large fact tables to be partitioned by dimensions like time.
Queries joining such tables with filtered dimension tables benefit significantly from dynamic partition pruning.
This optimization is applicable to partitioned Hive tables stored in any data format.
It also works with both broadcast and partitioned joins. Other connectors can easily take advantage of dynamic filters
by implementing the new
ConnectorSplitManager#getSplits API which supplies dynamic filters to the connector.
Future work #
- Support for using min-max range in DynamicFilterSourceOperator when the build-side contains too many values.
- Passing dynamic filters back to the worker nodes from coordinator to allow ORC and Parquet readers to use dynamic filters with partitioned joins.
- Allow connectors to block probe-side scan until dynamic filters are ready.
- Support dynamic filtering with inequality operators
- Support for semi-joins
- Take advantage of dynamic filters in connectors other than Hive.