Skip to content

refactor join dynamic filters to be more granular#20142

Draft
adriangb wants to merge 1 commit intoapache:mainfrom
pydantic:remove-partition-sync
Draft

refactor join dynamic filters to be more granular#20142
adriangb wants to merge 1 commit intoapache:mainfrom
pydantic:remove-partition-sync

Conversation

@adriangb
Copy link
Contributor

@adriangb adriangb commented Feb 4, 2026

No description provided.

@github-actions github-actions bot added core Core DataFusion crate physical-plan Changes to the physical-plan crate labels Feb 4, 2026
@adriangb adriangb requested a review from Copilot February 4, 2026 01:36
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refactors hash join dynamic filter pushdown to use per-partition DynamicFilterPhysicalExpr instances (and a CASE routing expression in partitioned mode), enabling more granular updates and avoiding cross-partition synchronization.

Changes:

  • Replace single “global” dynamic filter update with per-partition filters, pushed to scans via a CASE hash_repartition % N ... expression.
  • Simplify build-side coordination by removing the barrier-based “wait for all partitions” mechanism and updating filters per partition as build data arrives.
  • Update optimizer/executor tests and snapshots to reflect the new pushed-down predicate shape.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

File Description
datafusion/physical-plan/src/joins/hash_join/stream.rs Removes the extra wait state/future and reports build data synchronously before probing.
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs Reworks SharedBuildAccumulator to update per-partition dynamic filters directly (and dedup in CollectLeft).
datafusion/physical-plan/src/joins/hash_join/exec.rs Introduces creation/storage of per-partition filters and pushes a CASE expression to the probe-side scan.
datafusion/core/tests/physical_optimizer/filter_pushdown.rs Updates snapshots/expectations for the new CASE ... DynamicFilter[...] predicate form.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 1163 to +1171
// Only enable dynamic filter pushdown if:
// - The session config enables dynamic filter pushdown
// - A dynamic filter exists
// - At least one consumer is holding a reference to it, this avoids expensive filter
// computation when disabled or when no consumer will use it.
// - A dynamic filter exists (it was pushed down successfully)
let enable_dynamic_filter_pushdown = context
.session_config()
.options()
.optimizer
.enable_join_dynamic_filter_pushdown
&& self
.dynamic_filter
.as_ref()
.map(|df| df.filter.is_used())
.unwrap_or(false);

&& self.dynamic_filter.is_some();
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enable_dynamic_filter_pushdown now checks only self.dynamic_filter.is_some(). However dynamic_filter can be present even when the probe side doesn't actually hold a reference to the filters (e.g. pushdown unsupported), in which case the build-side will still compute/update dynamic filters unnecessarily. Consider restoring the previous gating by checking whether any partition_filters are actually used (e.g. df.partition_filters.iter().any(|f| f.is_used())), or alternatively only setting dynamic_filter when the self-filter pushdown is reported as supported.

Copilot uses AI. Check for mistakes.
Comment on lines +1436 to 1440
// Check if our self-filter was pushed (pending_partition_filters was set in gather_filters_for_pushdown)
if let Some(partition_filters) = self.pending_partition_filters.lock().take() {
if right_child_self_filters.first().is_some() {
// Self-filter was pushed — create a new node with the partition filters
let new_node = Arc::new(HashJoinExec {
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right_child_self_filters.first().is_some() doesn't indicate the self-filter was accepted by the right child; it will be Some whenever HashJoin added a self-filter, even if the child marked it unsupported. This can cause the join to treat dynamic filter pushdown as successful and keep partition_filters even when they are not referenced by the probe side. Use the PushedDownPredicate.discriminant (e.g. require PushedDown::Yes) and/or match the specific predicate you pushed before creating HashJoinExecDynamicFilter.

Copilot uses AI. Check for mistakes.
Comment on lines +676 to +683
let case_expr = Arc::new(
CaseExpr::try_new(
Some(modulo_expr),
when_then_branches,
Some(lit(false)),
)
.expect("Failed to create CASE expression for per-partition filters"),
) as Arc<dyn PhysicalExpr>;
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_partition_filters uses expect(...) on CaseExpr::try_new(...), which will panic in release builds if expression construction fails (e.g. type mismatch, unexpected edge case). Prefer making create_partition_filters fallible and propagating the error (Result<...>) or converting it into a DataFusionError via internal_err!/plan_err! so the query fails gracefully instead of aborting.

Suggested change
let case_expr = Arc::new(
CaseExpr::try_new(
Some(modulo_expr),
when_then_branches,
Some(lit(false)),
)
.expect("Failed to create CASE expression for per-partition filters"),
) as Arc<dyn PhysicalExpr>;
let case_expr: Arc<dyn PhysicalExpr> =
match CaseExpr::try_new(
Some(modulo_expr),
when_then_branches,
Some(lit(false)),
) {
Ok(expr) => Arc::new(expr),
// In case of an unexpected failure constructing the CASE expression,
// fall back to a literal `false` filter rather than panicking.
Err(_) => lit(false),
};

Copilot uses AI. Check for mistakes.
/// Set in gather_filters_for_pushdown, consumed in handle_child_pushdown_result.
/// Uses Mutex because gather_filters_for_pushdown takes &self and may be called
/// multiple times on the same node (e.g., when optimizations are re-run).
pending_partition_filters: Mutex<Option<Vec<Arc<DynamicFilterPhysicalExpr>>>>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alternative to this is to use the values passed into handle_child_pushdown_result which do include the filters we are looking for but it involves downcast matching through the CASE expression and then cloning DynamicFilterPhysicalExpr

@adriangb
Copy link
Contributor Author

adriangb commented Feb 4, 2026

run benchmark tpch tpcds

@alamb-ghbot
Copy link

🤖 ./gh_compare_branch.sh gh_compare_branch.sh Running
Linux aal-dev 6.14.0-1018-gcp #19~24.04.1-Ubuntu SMP Wed Sep 24 23:23:09 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Comparing remove-partition-sync (6b948bd) to 81f7a87 diff using: tpch
Results will be posted here when complete

@adriangb
Copy link
Contributor Author

adriangb commented Feb 4, 2026

Note to self: worth trying moving the bound (min/max) checks outside of the hash, e.g. col > min AND col < max AND hash(col) % n = 3

@alamb-ghbot
Copy link

🤖: Benchmark completed

Details

Comparing HEAD and remove-partition-sync
--------------------
Benchmark tpch_sf1.json
--------------------
┏━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query     ┃      HEAD ┃ remove-partition-sync ┃    Change ┃
┡━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ QQuery 1  │ 178.08 ms │             178.92 ms │ no change │
│ QQuery 2  │  88.20 ms │              86.18 ms │ no change │
│ QQuery 3  │ 119.26 ms │             124.26 ms │ no change │
│ QQuery 4  │  78.36 ms │              78.74 ms │ no change │
│ QQuery 5  │ 172.62 ms │             174.68 ms │ no change │
│ QQuery 6  │  69.46 ms │              68.71 ms │ no change │
│ QQuery 7  │ 206.80 ms │             212.64 ms │ no change │
│ QQuery 8  │ 167.54 ms │             165.31 ms │ no change │
│ QQuery 9  │ 225.20 ms │             228.05 ms │ no change │
│ QQuery 10 │ 186.03 ms │             185.50 ms │ no change │
│ QQuery 11 │  62.03 ms │              60.59 ms │ no change │
│ QQuery 12 │ 118.19 ms │             118.45 ms │ no change │
│ QQuery 13 │ 218.12 ms │             222.84 ms │ no change │
│ QQuery 14 │  86.18 ms │              85.79 ms │ no change │
│ QQuery 15 │ 127.24 ms │             123.31 ms │ no change │
│ QQuery 16 │  60.54 ms │              59.65 ms │ no change │
│ QQuery 17 │ 255.75 ms │             263.53 ms │ no change │
│ QQuery 18 │ 306.91 ms │             309.43 ms │ no change │
│ QQuery 19 │ 131.24 ms │             134.50 ms │ no change │
│ QQuery 20 │ 127.85 ms │             128.78 ms │ no change │
│ QQuery 21 │ 259.38 ms │             260.41 ms │ no change │
│ QQuery 22 │  41.81 ms │              41.02 ms │ no change │
└───────────┴───────────┴───────────────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                    ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (HEAD)                    │ 3286.79ms │
│ Total Time (remove-partition-sync)   │ 3311.28ms │
│ Average Time (HEAD)                  │  149.40ms │
│ Average Time (remove-partition-sync) │  150.51ms │
│ Queries Faster                       │         0 │
│ Queries Slower                       │         0 │
│ Queries with No Change               │        22 │
│ Queries with Failure                 │         0 │
└──────────────────────────────────────┴───────────┘

@alamb-ghbot
Copy link

🤖 ./gh_compare_branch.sh gh_compare_branch.sh Running
Linux aal-dev 6.14.0-1018-gcp #19~24.04.1-Ubuntu SMP Wed Sep 24 23:23:09 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Comparing remove-partition-sync (6b948bd) to 81f7a87 diff using: tpcds
Results will be posted here when complete

@adriangb
Copy link
Contributor Author

adriangb commented Feb 4, 2026

@LiaCastaneda @Dandandan wonder what you folks think of this?

@alamb-ghbot
Copy link

🤖: Benchmark completed

Details

Comparing HEAD and remove-partition-sync
--------------------
Benchmark tpcds_sf1.json
--------------------
┏━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query     ┃        HEAD ┃ remove-partition-sync ┃        Change ┃
┡━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1  │    75.66 ms │              73.70 ms │     no change │
│ QQuery 2  │   218.39 ms │             211.45 ms │     no change │
│ QQuery 3  │   157.48 ms │             156.32 ms │     no change │
│ QQuery 4  │  1976.51 ms │            1926.46 ms │     no change │
│ QQuery 5  │   265.29 ms │             263.97 ms │     no change │
│ QQuery 6  │  1468.72 ms │            1465.84 ms │     no change │
│ QQuery 7  │   524.92 ms │             523.79 ms │     no change │
│ QQuery 8  │   170.01 ms │             171.71 ms │     no change │
│ QQuery 9  │   294.66 ms │             314.46 ms │  1.07x slower │
│ QQuery 10 │   174.26 ms │             174.27 ms │     no change │
│ QQuery 11 │  1330.37 ms │            1322.60 ms │     no change │
│ QQuery 12 │    70.17 ms │              68.56 ms │     no change │
│ QQuery 13 │   547.87 ms │             546.08 ms │     no change │
│ QQuery 14 │  1896.83 ms │            1861.62 ms │     no change │
│ QQuery 15 │    28.00 ms │              26.49 ms │ +1.06x faster │
│ QQuery 16 │    65.55 ms │              64.99 ms │     no change │
│ QQuery 17 │   366.68 ms │             359.47 ms │     no change │
│ QQuery 18 │   195.75 ms │             190.81 ms │     no change │
│ QQuery 19 │   219.40 ms │             215.71 ms │     no change │
│ QQuery 20 │    26.18 ms │              24.30 ms │ +1.08x faster │
│ QQuery 21 │    37.03 ms │              36.42 ms │     no change │
│ QQuery 22 │   701.56 ms │             741.81 ms │  1.06x slower │
│ QQuery 23 │  1793.82 ms │            1779.24 ms │     no change │
│ QQuery 24 │   692.37 ms │             681.06 ms │     no change │
│ QQuery 25 │   533.75 ms │             516.63 ms │     no change │
│ QQuery 26 │   129.52 ms │             126.94 ms │     no change │
│ QQuery 27 │   518.90 ms │             514.48 ms │     no change │
│ QQuery 28 │   299.26 ms │             299.93 ms │     no change │
│ QQuery 29 │   451.06 ms │             452.60 ms │     no change │
│ QQuery 30 │    77.06 ms │              75.65 ms │     no change │
│ QQuery 31 │   311.32 ms │             304.89 ms │     no change │
│ QQuery 32 │    81.64 ms │              81.45 ms │     no change │
│ QQuery 33 │   208.35 ms │             205.51 ms │     no change │
│ QQuery 34 │   161.07 ms │             160.04 ms │     no change │
│ QQuery 35 │   183.57 ms │             175.26 ms │     no change │
│ QQuery 36 │   288.23 ms │             287.71 ms │     no change │
│ QQuery 37 │   260.83 ms │             255.52 ms │     no change │
│ QQuery 38 │   161.54 ms │             153.56 ms │     no change │
│ QQuery 39 │   205.13 ms │             196.78 ms │     no change │
│ QQuery 40 │   180.23 ms │             184.17 ms │     no change │
│ QQuery 41 │    25.71 ms │              25.88 ms │     no change │
│ QQuery 42 │   145.72 ms │             143.16 ms │     no change │
│ QQuery 43 │   127.71 ms │             127.54 ms │     no change │
│ QQuery 44 │    29.17 ms │              28.84 ms │     no change │
│ QQuery 45 │    85.39 ms │              84.84 ms │     no change │
│ QQuery 46 │   323.66 ms │             323.39 ms │     no change │
│ QQuery 47 │  1030.34 ms │            1041.27 ms │     no change │
│ QQuery 48 │   406.14 ms │             408.25 ms │     no change │
│ QQuery 49 │   383.50 ms │             381.52 ms │     no change │
│ QQuery 50 │   330.10 ms │             326.48 ms │     no change │
│ QQuery 51 │   309.26 ms │             303.82 ms │     no change │
│ QQuery 52 │   146.79 ms │             145.49 ms │     no change │
│ QQuery 53 │   144.94 ms │             147.01 ms │     no change │
│ QQuery 54 │   204.94 ms │             202.63 ms │     no change │
│ QQuery 55 │   145.51 ms │             144.83 ms │     no change │
│ QQuery 56 │   206.54 ms │             205.21 ms │     no change │
│ QQuery 57 │   290.83 ms │             290.71 ms │     no change │
│ QQuery 58 │   496.29 ms │             490.44 ms │     no change │
│ QQuery 59 │   299.67 ms │             285.23 ms │     no change │
│ QQuery 60 │   213.55 ms │             210.25 ms │     no change │
│ QQuery 61 │   248.68 ms │             245.38 ms │     no change │
│ QQuery 62 │  1272.64 ms │            1301.13 ms │     no change │
│ QQuery 63 │   147.17 ms │             146.80 ms │     no change │
│ QQuery 64 │  1154.36 ms │            1174.16 ms │     no change │
│ QQuery 65 │   356.67 ms │             352.59 ms │     no change │
│ QQuery 66 │   396.88 ms │             394.95 ms │     no change │
│ QQuery 67 │   529.54 ms │             549.35 ms │     no change │
│ QQuery 68 │   375.14 ms │             374.55 ms │     no change │
│ QQuery 69 │   172.39 ms │             169.51 ms │     no change │
│ QQuery 70 │   495.28 ms │             495.15 ms │     no change │
│ QQuery 71 │   186.94 ms │             183.39 ms │     no change │
│ QQuery 72 │  2103.30 ms │            2114.78 ms │     no change │
│ QQuery 73 │   154.89 ms │             153.48 ms │     no change │
│ QQuery 74 │   823.54 ms │             818.41 ms │     no change │
│ QQuery 75 │   417.42 ms │             414.24 ms │     no change │
│ QQuery 76 │   186.48 ms │             184.51 ms │     no change │
│ QQuery 77 │   281.42 ms │             278.82 ms │     no change │
│ QQuery 78 │   683.21 ms │             680.35 ms │     no change │
│ QQuery 79 │   329.96 ms │             325.68 ms │     no change │
│ QQuery 80 │   522.42 ms │             531.32 ms │     no change │
│ QQuery 81 │    51.21 ms │              54.50 ms │  1.06x slower │
│ QQuery 82 │   278.61 ms │             281.34 ms │     no change │
│ QQuery 83 │    79.24 ms │              79.36 ms │     no change │
│ QQuery 84 │    71.85 ms │              69.03 ms │     no change │
│ QQuery 85 │   225.92 ms │             224.50 ms │     no change │
│ QQuery 86 │    59.54 ms │              58.38 ms │     no change │
│ QQuery 87 │   151.96 ms │             157.63 ms │     no change │
│ QQuery 88 │   259.24 ms │             256.79 ms │     no change │
│ QQuery 89 │   163.22 ms │             168.53 ms │     no change │
│ QQuery 90 │    45.15 ms │              45.03 ms │     no change │
│ QQuery 91 │    98.50 ms │              98.01 ms │     no change │
│ QQuery 92 │    82.04 ms │              80.61 ms │     no change │
│ QQuery 93 │   280.43 ms │             287.26 ms │     no change │
│ QQuery 94 │    90.75 ms │              88.58 ms │     no change │
│ QQuery 95 │   243.69 ms │             246.48 ms │     no change │
│ QQuery 96 │   112.03 ms │             115.40 ms │     no change │
│ QQuery 97 │   193.46 ms │             192.93 ms │     no change │
│ QQuery 98 │   221.58 ms │             219.11 ms │     no change │
│ QQuery 99 │ 14202.63 ms │           14213.60 ms │     no change │
└───────────┴─────────────┴───────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                    ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)                    │ 50444.06ms │
│ Total Time (remove-partition-sync)   │ 50334.64ms │
│ Average Time (HEAD)                  │   509.54ms │
│ Average Time (remove-partition-sync) │   508.43ms │
│ Queries Faster                       │          2 │
│ Queries Slower                       │          3 │
│ Queries with No Change               │         94 │
│ Queries with Failure                 │          0 │
└──────────────────────────────────────┴────────────┘

- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=CASE hash_repartition % 12 WHEN 0 THEN DynamicFilter [ empty ] WHEN 1 THEN DynamicFilter [ empty ] WHEN 2 THEN DynamicFilter [ empty ] WHEN 3 THEN DynamicFilter [ empty ] WHEN 4 THEN DynamicFilter [ empty ] WHEN 5 THEN DynamicFilter [ empty ] WHEN 6 THEN DynamicFilter [ empty ] WHEN 7 THEN DynamicFilter [ empty ] WHEN 8 THEN DynamicFilter [ empty ] WHEN 9 THEN DynamicFilter [ empty ] WHEN 10 THEN DynamicFilter [ empty ] WHEN 11 THEN DynamicFilter [ empty ] ELSE false END
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the routing can be better done by some custom PhysicalExpr rather than the long CASE WHEN. It at least allows for some easier optimization (e.g. doing direct lookup instead of expression evaluation), the downside is maybe that expression simplification won't work as automatic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OTOH I guess this expression can simplify those inner expression itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I want to be contentious of is to try to keep the expressions such that it can be used for stats pruning

- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 2 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) ELSE false END ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=CASE hash_repartition % 12 WHEN 0 THEN DynamicFilter [ false ] WHEN 1 THEN DynamicFilter [ false ] WHEN 2 THEN DynamicFilter [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) ] WHEN 3 THEN DynamicFilter [ false ] WHEN 4 THEN DynamicFilter [ a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) ] WHEN 5 THEN DynamicFilter [ false ] WHEN 6 THEN DynamicFilter [ false ] WHEN 7 THEN DynamicFilter [ false ] WHEN 8 THEN DynamicFilter [ false ] WHEN 9 THEN DynamicFilter [ false ] WHEN 10 THEN DynamicFilter [ false ] WHEN 11 THEN DynamicFilter [ false ] ELSE false END
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

b@1 >= bb AND b@1 <= bb
It seems it doesn't simplify this yet to b@1 = bb (there are quite some examples of this)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the the IN (SET) could be removed in this case.

Comment on lines -5104 to +5189
dynamic_filter_clone.wait_complete().await;
filter_to_wait.wait_complete().await;
Copy link
Contributor

@LiaCastaneda LiaCastaneda Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR makes sense and believe datafusion-distributed could benefit from it a lot. My main concern is how custom leaf nodes (ExecutionPlan) that have Partitioned HashJoins to fetch the build in parallel but need to know when all partitions have reported their data would adapt to the new structure. With this PR, they would need to:

  1. Detect the CASE expression
  2. Extract all N DynamicFilters from the branches
  3. Wait for all of them to complete

I wonder if it would be worth keeping a way to know when all partitions have reported their filters. I was wondering if maybe keeping the whole CASE expression wrapped in a DynamicFilterPhysicalExpr and then having the nested DynamicFilterPhysicalExprs per branch would work, although I guess this is not the cleanest approach because how would we differentiate between the outermost DynamicFilterPhysicalExpr and a branch DynamicFilterPhysicalExpr? 🤔 Another option I could think of is: what if we have two types of DynamicFilterExpr? I imagine DynamicFilterPhysicalExpr for CollectLeft and partitioned branch dynamic filters, and then something like PartitionedDynamicFilter that has a Vec<Arc<DynamicFilterPhysicalExpr>> (this is just a quick thought)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

concern is how custom leaf nodes (ExecutionPlan) that have Partitioned HashJoins to fetch the build in parallel but need to know when all partitions have reported their data would adapt to the new structure

This is something you have internally right? Could you help explain how this works / what the requirement is? At some point I wonder if it wouldn't be better to have an optimizer rule that links the join node to the custom scan node if the custom scan node needs to behave in a specific way if it is a the leaf of a join (as opposed to e.g. the current parquet scan node that doesn't care at all)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, apart from Parquet, we have some custom ExecutionPlan leaf nodes that don't evaluate filters in the leaf node itself but instead push query execution (along with the filters) to a remote data source. The key requirement is that we need to wait for all build partitions to report their filters before querying the remote source because we execute this leaf node in a single partition (even if we have a Partitioned join), so we need the complete filter information from all hash join build partitions to generate a correct predicate.

I was raising it in case other custom scans had similar patterns, but if it's too specific to our use case, we can handle it internally.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants