refactor join dynamic filters to be more granular#20142
refactor join dynamic filters to be more granular#20142adriangb wants to merge 1 commit intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Refactors hash join dynamic filter pushdown to use per-partition DynamicFilterPhysicalExpr instances (and a CASE routing expression in partitioned mode), enabling more granular updates and avoiding cross-partition synchronization.
Changes:
- Replace single “global” dynamic filter update with per-partition filters, pushed to scans via a
CASE hash_repartition % N ...expression. - Simplify build-side coordination by removing the barrier-based “wait for all partitions” mechanism and updating filters per partition as build data arrives.
- Update optimizer/executor tests and snapshots to reflect the new pushed-down predicate shape.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
datafusion/physical-plan/src/joins/hash_join/stream.rs |
Removes the extra wait state/future and reports build data synchronously before probing. |
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs |
Reworks SharedBuildAccumulator to update per-partition dynamic filters directly (and dedup in CollectLeft). |
datafusion/physical-plan/src/joins/hash_join/exec.rs |
Introduces creation/storage of per-partition filters and pushes a CASE expression to the probe-side scan. |
datafusion/core/tests/physical_optimizer/filter_pushdown.rs |
Updates snapshots/expectations for the new CASE ... DynamicFilter[...] predicate form. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Only enable dynamic filter pushdown if: | ||
| // - The session config enables dynamic filter pushdown | ||
| // - A dynamic filter exists | ||
| // - At least one consumer is holding a reference to it, this avoids expensive filter | ||
| // computation when disabled or when no consumer will use it. | ||
| // - A dynamic filter exists (it was pushed down successfully) | ||
| let enable_dynamic_filter_pushdown = context | ||
| .session_config() | ||
| .options() | ||
| .optimizer | ||
| .enable_join_dynamic_filter_pushdown | ||
| && self | ||
| .dynamic_filter | ||
| .as_ref() | ||
| .map(|df| df.filter.is_used()) | ||
| .unwrap_or(false); | ||
|
|
||
| && self.dynamic_filter.is_some(); |
There was a problem hiding this comment.
enable_dynamic_filter_pushdown now checks only self.dynamic_filter.is_some(). However dynamic_filter can be present even when the probe side doesn't actually hold a reference to the filters (e.g. pushdown unsupported), in which case the build-side will still compute/update dynamic filters unnecessarily. Consider restoring the previous gating by checking whether any partition_filters are actually used (e.g. df.partition_filters.iter().any(|f| f.is_used())), or alternatively only setting dynamic_filter when the self-filter pushdown is reported as supported.
| // Check if our self-filter was pushed (pending_partition_filters was set in gather_filters_for_pushdown) | ||
| if let Some(partition_filters) = self.pending_partition_filters.lock().take() { | ||
| if right_child_self_filters.first().is_some() { | ||
| // Self-filter was pushed — create a new node with the partition filters | ||
| let new_node = Arc::new(HashJoinExec { |
There was a problem hiding this comment.
right_child_self_filters.first().is_some() doesn't indicate the self-filter was accepted by the right child; it will be Some whenever HashJoin added a self-filter, even if the child marked it unsupported. This can cause the join to treat dynamic filter pushdown as successful and keep partition_filters even when they are not referenced by the probe side. Use the PushedDownPredicate.discriminant (e.g. require PushedDown::Yes) and/or match the specific predicate you pushed before creating HashJoinExecDynamicFilter.
| let case_expr = Arc::new( | ||
| CaseExpr::try_new( | ||
| Some(modulo_expr), | ||
| when_then_branches, | ||
| Some(lit(false)), | ||
| ) | ||
| .expect("Failed to create CASE expression for per-partition filters"), | ||
| ) as Arc<dyn PhysicalExpr>; |
There was a problem hiding this comment.
create_partition_filters uses expect(...) on CaseExpr::try_new(...), which will panic in release builds if expression construction fails (e.g. type mismatch, unexpected edge case). Prefer making create_partition_filters fallible and propagating the error (Result<...>) or converting it into a DataFusionError via internal_err!/plan_err! so the query fails gracefully instead of aborting.
| let case_expr = Arc::new( | |
| CaseExpr::try_new( | |
| Some(modulo_expr), | |
| when_then_branches, | |
| Some(lit(false)), | |
| ) | |
| .expect("Failed to create CASE expression for per-partition filters"), | |
| ) as Arc<dyn PhysicalExpr>; | |
| let case_expr: Arc<dyn PhysicalExpr> = | |
| match CaseExpr::try_new( | |
| Some(modulo_expr), | |
| when_then_branches, | |
| Some(lit(false)), | |
| ) { | |
| Ok(expr) => Arc::new(expr), | |
| // In case of an unexpected failure constructing the CASE expression, | |
| // fall back to a literal `false` filter rather than panicking. | |
| Err(_) => lit(false), | |
| }; |
| /// Set in gather_filters_for_pushdown, consumed in handle_child_pushdown_result. | ||
| /// Uses Mutex because gather_filters_for_pushdown takes &self and may be called | ||
| /// multiple times on the same node (e.g., when optimizations are re-run). | ||
| pending_partition_filters: Mutex<Option<Vec<Arc<DynamicFilterPhysicalExpr>>>>, |
There was a problem hiding this comment.
The alternative to this is to use the values passed into handle_child_pushdown_result which do include the filters we are looking for but it involves downcast matching through the CASE expression and then cloning DynamicFilterPhysicalExpr
|
run benchmark tpch tpcds |
|
🤖 |
|
Note to self: worth trying moving the bound (min/max) checks outside of the hash, e.g. |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
@LiaCastaneda @Dandandan wonder what you folks think of this? |
|
🤖: Benchmark completed Details
|
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true | ||
| - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=CASE hash_repartition % 12 WHEN 0 THEN DynamicFilter [ empty ] WHEN 1 THEN DynamicFilter [ empty ] WHEN 2 THEN DynamicFilter [ empty ] WHEN 3 THEN DynamicFilter [ empty ] WHEN 4 THEN DynamicFilter [ empty ] WHEN 5 THEN DynamicFilter [ empty ] WHEN 6 THEN DynamicFilter [ empty ] WHEN 7 THEN DynamicFilter [ empty ] WHEN 8 THEN DynamicFilter [ empty ] WHEN 9 THEN DynamicFilter [ empty ] WHEN 10 THEN DynamicFilter [ empty ] WHEN 11 THEN DynamicFilter [ empty ] ELSE false END |
There was a problem hiding this comment.
I wonder if the routing can be better done by some custom PhysicalExpr rather than the long CASE WHEN. It at least allows for some easier optimization (e.g. doing direct lookup instead of expression evaluation), the downside is maybe that expression simplification won't work as automatic?
There was a problem hiding this comment.
OTOH I guess this expression can simplify those inner expression itself.
There was a problem hiding this comment.
One thing I want to be contentious of is to try to keep the expressions such that it can be used for stats pruning
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true | ||
| - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 2 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) ELSE false END ] | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=CASE hash_repartition % 12 WHEN 0 THEN DynamicFilter [ false ] WHEN 1 THEN DynamicFilter [ false ] WHEN 2 THEN DynamicFilter [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) ] WHEN 3 THEN DynamicFilter [ false ] WHEN 4 THEN DynamicFilter [ a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) ] WHEN 5 THEN DynamicFilter [ false ] WHEN 6 THEN DynamicFilter [ false ] WHEN 7 THEN DynamicFilter [ false ] WHEN 8 THEN DynamicFilter [ false ] WHEN 9 THEN DynamicFilter [ false ] WHEN 10 THEN DynamicFilter [ false ] WHEN 11 THEN DynamicFilter [ false ] ELSE false END |
There was a problem hiding this comment.
b@1 >= bb AND b@1 <= bb
It seems it doesn't simplify this yet to b@1 = bb (there are quite some examples of this)
There was a problem hiding this comment.
Also the the IN (SET) could be removed in this case.
| dynamic_filter_clone.wait_complete().await; | ||
| filter_to_wait.wait_complete().await; |
There was a problem hiding this comment.
I think this PR makes sense and believe datafusion-distributed could benefit from it a lot. My main concern is how custom leaf nodes (ExecutionPlan) that have Partitioned HashJoins to fetch the build in parallel but need to know when all partitions have reported their data would adapt to the new structure. With this PR, they would need to:
- Detect the CASE expression
- Extract all N DynamicFilters from the branches
- Wait for all of them to complete
I wonder if it would be worth keeping a way to know when all partitions have reported their filters. I was wondering if maybe keeping the whole CASE expression wrapped in a DynamicFilterPhysicalExpr and then having the nested DynamicFilterPhysicalExprs per branch would work, although I guess this is not the cleanest approach because how would we differentiate between the outermost DynamicFilterPhysicalExpr and a branch DynamicFilterPhysicalExpr? 🤔 Another option I could think of is: what if we have two types of DynamicFilterExpr? I imagine DynamicFilterPhysicalExpr for CollectLeft and partitioned branch dynamic filters, and then something like PartitionedDynamicFilter that has a Vec<Arc<DynamicFilterPhysicalExpr>> (this is just a quick thought)
There was a problem hiding this comment.
concern is how custom leaf nodes (ExecutionPlan) that have Partitioned HashJoins to fetch the build in parallel but need to know when all partitions have reported their data would adapt to the new structure
This is something you have internally right? Could you help explain how this works / what the requirement is? At some point I wonder if it wouldn't be better to have an optimizer rule that links the join node to the custom scan node if the custom scan node needs to behave in a specific way if it is a the leaf of a join (as opposed to e.g. the current parquet scan node that doesn't care at all)?
There was a problem hiding this comment.
Yeah, apart from Parquet, we have some custom ExecutionPlan leaf nodes that don't evaluate filters in the leaf node itself but instead push query execution (along with the filters) to a remote data source. The key requirement is that we need to wait for all build partitions to report their filters before querying the remote source because we execute this leaf node in a single partition (even if we have a Partitioned join), so we need the complete filter information from all hash join build partitions to generate a correct predicate.
I was raising it in case other custom scans had similar patterns, but if it's too specific to our use case, we can handle it internally.
No description provided.