-
Notifications
You must be signed in to change notification settings - Fork 1.9k
refactor join dynamic filters to be more granular #20142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1213,7 +1213,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { | |
| - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true | ||
| - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=CASE hash_repartition % 12 WHEN 0 THEN DynamicFilter [ empty ] WHEN 1 THEN DynamicFilter [ empty ] WHEN 2 THEN DynamicFilter [ empty ] WHEN 3 THEN DynamicFilter [ empty ] WHEN 4 THEN DynamicFilter [ empty ] WHEN 5 THEN DynamicFilter [ empty ] WHEN 6 THEN DynamicFilter [ empty ] WHEN 7 THEN DynamicFilter [ empty ] WHEN 8 THEN DynamicFilter [ empty ] WHEN 9 THEN DynamicFilter [ empty ] WHEN 10 THEN DynamicFilter [ empty ] WHEN 11 THEN DynamicFilter [ empty ] ELSE false END | ||
| " | ||
| ); | ||
|
|
||
|
|
@@ -1247,14 +1247,12 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { | |
| - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true | ||
| - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 2 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) ELSE false END ] | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=CASE hash_repartition % 12 WHEN 0 THEN DynamicFilter [ false ] WHEN 1 THEN DynamicFilter [ false ] WHEN 2 THEN DynamicFilter [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) ] WHEN 3 THEN DynamicFilter [ false ] WHEN 4 THEN DynamicFilter [ a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) ] WHEN 5 THEN DynamicFilter [ false ] WHEN 6 THEN DynamicFilter [ false ] WHEN 7 THEN DynamicFilter [ false ] WHEN 8 THEN DynamicFilter [ false ] WHEN 9 THEN DynamicFilter [ false ] WHEN 10 THEN DynamicFilter [ false ] WHEN 11 THEN DynamicFilter [ false ] ELSE false END | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also the the |
||
| " | ||
| ); | ||
|
|
||
| // When hash collisions force all data into a single partition, we optimize away the CASE expression. | ||
| // This avoids calling create_hashes() for every row on the probe side, since hash % 1 == 0 always, | ||
| // meaning the WHEN 0 branch would always match. This optimization is also important for primary key | ||
| // joins or any scenario where all build-side data naturally lands in one partition. | ||
| // When hash collisions force all data into a single partition, there's still a CASE expression | ||
| // because partition count is determined at plan time. Per-partition DynamicFilters show their data. | ||
| #[cfg(feature = "force_hash_collisions")] | ||
| insta::assert_snapshot!( | ||
| format!("{}", format_plan_for_test(&plan)), | ||
|
|
@@ -1265,7 +1263,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { | |
| - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true | ||
| - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ] | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=CASE hash_repartition % 12 WHEN 0 THEN DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ] ELSE false END | ||
| " | ||
| ); | ||
|
|
||
|
|
@@ -3029,7 +3027,7 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() { | |
| - RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1 | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true | ||
| - RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1 | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=CASE hash_repartition % 4 WHEN 0 THEN DynamicFilter [ empty ] WHEN 1 THEN DynamicFilter [ empty ] WHEN 2 THEN DynamicFilter [ empty ] WHEN 3 THEN DynamicFilter [ empty ] ELSE false END | ||
| " | ||
| ); | ||
|
|
||
|
|
@@ -3054,7 +3052,7 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() { | |
| - RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1 | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true | ||
| - RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1 | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ false ] | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=CASE hash_repartition % 4 WHEN 0 THEN DynamicFilter [ false ] WHEN 1 THEN DynamicFilter [ false ] WHEN 2 THEN DynamicFilter [ false ] WHEN 3 THEN DynamicFilter [ false ] ELSE false END | ||
| " | ||
| ); | ||
| } | ||
|
|
@@ -3705,17 +3703,17 @@ async fn test_hashjoin_dynamic_filter_pushdown_is_used() { | |
| .downcast_ref::<HashJoinExec>() | ||
| .expect("Plan should be HashJoinExec"); | ||
|
|
||
| // Verify that a dynamic filter was created | ||
| let dynamic_filter = hash_join | ||
| .dynamic_filter_for_test() | ||
| .expect("Dynamic filter should be created"); | ||
| // Verify that partition filters were created | ||
| let partition_filters = hash_join | ||
| .partition_filters_for_test() | ||
| .expect("Partition filters should be created"); | ||
|
|
||
| // Verify that is_used() returns the expected value based on probe side support. | ||
| // When probe_supports_pushdown=false: no consumer holds a reference (is_used=false) | ||
| // When probe_supports_pushdown=true: probe side holds a reference (is_used=true) | ||
| let any_used = partition_filters.iter().any(|f| f.is_used()); | ||
| assert_eq!( | ||
| dynamic_filter.is_used(), | ||
| expected_is_used, | ||
| any_used, expected_is_used, | ||
| "is_used() should return {expected_is_used} when probe side support is {probe_supports_pushdown}" | ||
| ); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if the routing can be better done by some custom
PhysicalExprrather than the longCASE WHEN. It at least allows for some easier optimization (e.g. doing direct lookup instead of expression evaluation), the downside is maybe that expression simplification won't work as automatic?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OTOH I guess this expression can simplify those inner expression itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing I want to be contentious of is to try to keep the expressions such that it can be used for stats pruning