From 6b948bd7a39f959566862f2c636f1e80a4e6404d Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 3 Feb 2026 20:33:45 -0500 Subject: [PATCH] refactor join dynamic filters to be more granular --- .../physical_optimizer/filter_pushdown.rs | 28 +- .../physical-plan/src/joins/hash_join/exec.rs | 202 +++++--- .../src/joins/hash_join/shared_bounds.rs | 440 +++++------------- .../src/joins/hash_join/stream.rs | 35 +- 4 files changed, 281 insertions(+), 424 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index 3a0015068567b..bad793a0abc9c 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -1213,7 +1213,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=CASE hash_repartition % 12 WHEN 0 THEN DynamicFilter [ empty ] WHEN 1 THEN DynamicFilter [ empty ] WHEN 2 THEN DynamicFilter [ empty ] WHEN 3 THEN DynamicFilter [ empty ] WHEN 4 THEN DynamicFilter [ empty ] WHEN 5 THEN DynamicFilter [ empty ] WHEN 6 THEN DynamicFilter [ empty ] WHEN 7 THEN DynamicFilter [ empty ] WHEN 8 THEN DynamicFilter [ empty ] WHEN 9 THEN DynamicFilter [ empty ] WHEN 10 THEN DynamicFilter [ empty ] WHEN 11 THEN DynamicFilter [ empty ] ELSE false END " ); @@ -1247,14 +1247,12 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 2 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) ELSE false END ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=CASE hash_repartition % 12 WHEN 0 THEN DynamicFilter [ false ] WHEN 1 THEN DynamicFilter [ false ] WHEN 2 THEN DynamicFilter [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) ] WHEN 3 THEN DynamicFilter [ false ] WHEN 4 THEN DynamicFilter [ a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) ] WHEN 5 THEN DynamicFilter [ false ] WHEN 6 THEN DynamicFilter [ false ] WHEN 7 THEN DynamicFilter [ false ] WHEN 8 THEN DynamicFilter [ false ] WHEN 9 THEN DynamicFilter [ false ] WHEN 10 THEN DynamicFilter [ false ] WHEN 11 THEN DynamicFilter [ false ] ELSE false END " ); - // When hash collisions force all data into a single partition, we optimize away the CASE expression. - // This avoids calling create_hashes() for every row on the probe side, since hash % 1 == 0 always, - // meaning the WHEN 0 branch would always match. This optimization is also important for primary key - // joins or any scenario where all build-side data naturally lands in one partition. + // When hash collisions force all data into a single partition, there's still a CASE expression + // because partition count is determined at plan time. Per-partition DynamicFilters show their data. #[cfg(feature = "force_hash_collisions")] insta::assert_snapshot!( format!("{}", format_plan_for_test(&plan)), @@ -1265,7 +1263,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=CASE hash_repartition % 12 WHEN 0 THEN DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ] ELSE false END " ); @@ -3029,7 +3027,7 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() { - RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true - RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=CASE hash_repartition % 4 WHEN 0 THEN DynamicFilter [ empty ] WHEN 1 THEN DynamicFilter [ empty ] WHEN 2 THEN DynamicFilter [ empty ] WHEN 3 THEN DynamicFilter [ empty ] ELSE false END " ); @@ -3054,7 +3052,7 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() { - RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true - RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ false ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=CASE hash_repartition % 4 WHEN 0 THEN DynamicFilter [ false ] WHEN 1 THEN DynamicFilter [ false ] WHEN 2 THEN DynamicFilter [ false ] WHEN 3 THEN DynamicFilter [ false ] ELSE false END " ); } @@ -3705,17 +3703,17 @@ async fn test_hashjoin_dynamic_filter_pushdown_is_used() { .downcast_ref::() .expect("Plan should be HashJoinExec"); - // Verify that a dynamic filter was created - let dynamic_filter = hash_join - .dynamic_filter_for_test() - .expect("Dynamic filter should be created"); + // Verify that partition filters were created + let partition_filters = hash_join + .partition_filters_for_test() + .expect("Partition filters should be created"); // Verify that is_used() returns the expected value based on probe side support. // When probe_supports_pushdown=false: no consumer holds a reference (is_used=false) // When probe_supports_pushdown=true: probe side holds a reference (is_used=true) + let any_used = partition_filters.iter().any(|f| f.is_used()); assert_eq!( - dynamic_filter.is_used(), - expected_is_used, + any_used, expected_is_used, "is_used() should return {expected_is_used} when probe side support is {probe_supports_pushdown}" ); } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index c249dfb10aacf..53581106121ba 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -70,17 +70,20 @@ use arrow_schema::DataType; use datafusion_common::config::ConfigOptions; use datafusion_common::utils::memory::estimate_memory_size; use datafusion_common::{ - JoinSide, JoinType, NullEquality, Result, assert_or_internal_err, internal_err, - plan_err, project_schema, + JoinSide, JoinType, NullEquality, Result, ScalarValue, assert_or_internal_err, + internal_err, plan_err, project_schema, }; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_expr::Accumulator; +use datafusion_expr::Operator; use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator}; use datafusion_physical_expr::equivalence::{ ProjectionMapping, join_equivalence_properties, }; -use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit}; +use datafusion_physical_expr::expressions::{ + BinaryExpr, CaseExpr, DynamicFilterPhysicalExpr, lit, +}; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; use ahash::RandomState; @@ -89,7 +92,7 @@ use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays; use futures::TryStreamExt; use parking_lot::Mutex; -use super::partitioned_hash_eval::SeededRandomState; +use super::partitioned_hash_eval::{HashExpr, SeededRandomState}; /// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions. pub(crate) const HASH_JOIN_SEED: SeededRandomState = @@ -479,12 +482,18 @@ pub struct HashJoinExec { /// Set when dynamic filter pushdown is detected in handle_child_pushdown_result. /// HashJoinExec also needs to keep a shared bounds accumulator for coordinating updates. dynamic_filter: Option, + /// Temporary storage for per-partition DynamicFilters during filter pushdown. + /// Set in gather_filters_for_pushdown, consumed in handle_child_pushdown_result. + /// Uses Mutex because gather_filters_for_pushdown takes &self and may be called + /// multiple times on the same node (e.g., when optimizations are re-run). + pending_partition_filters: Mutex>>>, } #[derive(Clone)] struct HashJoinExecDynamicFilter { - /// Dynamic filter that we'll update with the results of the build side once that is done. - filter: Arc, + /// Per-partition DynamicFilters that will be updated during execution. + /// CollectLeft: single filter. Partitioned: N filters. + partition_filters: Vec>, /// Build accumulator to collect build-side information (hash maps and/or bounds) from each partition. /// It is lazily initialized during execution to make sure we use the actual execution time partition counts. build_accumulator: OnceLock>, @@ -598,15 +607,83 @@ impl HashJoinExec { null_aware, cache, dynamic_filter: None, + pending_partition_filters: Mutex::new(None), }) } - fn create_dynamic_filter(on: &JoinOn) -> Arc { - // Extract the right-side keys (probe side keys) from the `on` clauses - // Dynamic filter will be created from build side values (left side) and applied to probe side (right side) + /// Creates per-partition DynamicFilters and the expression to push down to the scan. + /// + /// Returns `(expression_to_push_down, partition_filters)`: + /// - CollectLeft or single partition: returns a single DynamicFilter as both + /// - Partitioned N>1: creates N DynamicFilters, builds a CASE expression, + /// returns (case_expr, vec_of_filters) + fn create_partition_filters( + on: &JoinOn, + mode: PartitionMode, + num_build_partitions: usize, + repartition_random_state: SeededRandomState, + ) -> (Arc, Vec>) { let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect(); - // Initialize with a placeholder expression (true) that will be updated when the hash table is built - Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true))) + + if mode == PartitionMode::CollectLeft || num_build_partitions == 1 { + // Single filter: used directly as the pushed-down expression + let filter = Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true))); + let push_expr = Arc::clone(&filter) as Arc; + (push_expr, vec![filter]) + } else { + // Create N per-partition DynamicFilterPhysicalExpr instances, + // each starting with lit(true) as a pass-through. + let partition_filters: Vec> = (0 + ..num_build_partitions) + .map(|_| { + Arc::new(DynamicFilterPhysicalExpr::new( + right_keys.clone(), + lit(true), + )) + }) + .collect(); + + // Build the static CASE expression: + // CASE (hash_repartition(join_keys) % N) + // WHEN 0 THEN partition_filters[0] + // WHEN 1 THEN partition_filters[1] + // ... + // ELSE false + // END + let routing_hash_expr = Arc::new(HashExpr::new( + right_keys, + repartition_random_state, + "hash_repartition".to_string(), + )) as Arc; + + let modulo_expr = Arc::new(BinaryExpr::new( + routing_hash_expr, + Operator::Modulo, + lit(ScalarValue::UInt64(Some(num_build_partitions as u64))), + )) as Arc; + + let when_then_branches: Vec<(Arc, Arc)> = + partition_filters + .iter() + .enumerate() + .map(|(i, pf)| { + let when_expr = lit(ScalarValue::UInt64(Some(i as u64))); + let then_expr = Arc::clone(pf) as Arc; + (when_expr, then_expr) + }) + .collect(); + + let case_expr = Arc::new( + CaseExpr::try_new( + Some(modulo_expr), + when_then_branches, + Some(lit(false)), + ) + .expect("Failed to create CASE expression for per-partition filters"), + ) as Arc; + + (case_expr, partition_filters) + } } /// left (build) side which gets hashed @@ -650,13 +727,15 @@ impl HashJoinExec { self.null_equality } - /// Get the dynamic filter expression for testing purposes. + /// Get the partition filters for testing purposes. /// Returns `None` if no dynamic filter has been set. /// /// This method is intended for testing only and should not be used in production code. #[doc(hidden)] - pub fn dynamic_filter_for_test(&self) -> Option<&Arc> { - self.dynamic_filter.as_ref().map(|df| &df.filter) + pub fn partition_filters_for_test( + &self, + ) -> Option<&Vec>> { + self.dynamic_filter.as_ref().map(|df| &df.partition_filters) } /// Calculate order preservation flags for this hash join. @@ -1024,6 +1103,10 @@ impl ExecutionPlan for HashJoinExec { )?, // Keep the dynamic filter, bounds accumulator will be reset dynamic_filter: self.dynamic_filter.clone(), + // Propagate pending partition filters so handle_child_pushdown_result can find them + pending_partition_filters: Mutex::new( + self.pending_partition_filters.lock().clone(), + ), })) } @@ -1047,6 +1130,7 @@ impl ExecutionPlan for HashJoinExec { cache: self.cache.clone(), // Reset dynamic filter and bounds accumulator to initial state dynamic_filter: None, + pending_partition_filters: Mutex::new(None), })) } @@ -1078,20 +1162,13 @@ impl ExecutionPlan for HashJoinExec { // Only enable dynamic filter pushdown if: // - The session config enables dynamic filter pushdown - // - A dynamic filter exists - // - At least one consumer is holding a reference to it, this avoids expensive filter - // computation when disabled or when no consumer will use it. + // - A dynamic filter exists (it was pushed down successfully) let enable_dynamic_filter_pushdown = context .session_config() .options() .optimizer .enable_join_dynamic_filter_pushdown - && self - .dynamic_filter - .as_ref() - .map(|df| df.filter.is_used()) - .unwrap_or(false); - + && self.dynamic_filter.is_some(); let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); let array_map_created_count = MetricBuilder::new(&self.metrics) @@ -1150,25 +1227,20 @@ impl ExecutionPlan for HashJoinExec { let batch_size = context.session_config().batch_size(); // Initialize build_accumulator lazily with runtime partition counts (only if enabled) - // Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing - let repartition_random_state = REPARTITION_RANDOM_STATE; let build_accumulator = enable_dynamic_filter_pushdown .then(|| { self.dynamic_filter.as_ref().map(|df| { - let filter = Arc::clone(&df.filter); let on_right = self .on .iter() .map(|(_, right_expr)| Arc::clone(right_expr)) .collect::>(); Some(Arc::clone(df.build_accumulator.get_or_init(|| { - Arc::new(SharedBuildAccumulator::new_from_partition_mode( + Arc::new(SharedBuildAccumulator::new( self.mode, - self.left.as_ref(), - self.right.as_ref(), - filter, + df.partition_filters.clone(), on_right, - repartition_random_state, + self.right.schema(), )) }))) }) @@ -1317,9 +1389,20 @@ impl ExecutionPlan for HashJoinExec { if matches!(phase, FilterPushdownPhase::Post) && config.optimizer.enable_join_dynamic_filter_pushdown { - // Add actual dynamic filter to right side (probe side) - let dynamic_filter = Self::create_dynamic_filter(&self.on); - right_child = right_child.with_self_filter(dynamic_filter); + let num_build_partitions = match self.mode { + PartitionMode::Partitioned => { + self.left.output_partitioning().partition_count() + } + _ => 1, + }; + let (push_expr, partition_filters) = Self::create_partition_filters( + &self.on, + self.mode, + num_build_partitions, + REPARTITION_RANDOM_STATE, + ); + *self.pending_partition_filters.lock() = Some(partition_filters); + right_child = right_child.with_self_filter(push_expr); } Ok(FilterDescription::new() @@ -1349,15 +1432,11 @@ impl ExecutionPlan for HashJoinExec { let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone()); assert_eq!(child_pushdown_result.self_filters.len(), 2); // Should always be 2, we have 2 children let right_child_self_filters = &child_pushdown_result.self_filters[1]; // We only push down filters to the right child - // We expect 0 or 1 self filters - if let Some(filter) = right_child_self_filters.first() { - // Note that we don't check PushdDownPredicate::discrimnant because even if nothing said - // "yes, I can fully evaluate this filter" things might still use it for statistics -> it's worth updating - let predicate = Arc::clone(&filter.predicate); - if let Ok(dynamic_filter) = - Arc::downcast::(predicate) - { - // We successfully pushed down our self filter - we need to make a new node with the dynamic filter + + // Check if our self-filter was pushed (pending_partition_filters was set in gather_filters_for_pushdown) + if let Some(partition_filters) = self.pending_partition_filters.lock().take() { + if right_child_self_filters.first().is_some() { + // Self-filter was pushed — create a new node with the partition filters let new_node = Arc::new(HashJoinExec { left: Arc::clone(&self.left), right: Arc::clone(&self.right), @@ -1375,9 +1454,10 @@ impl ExecutionPlan for HashJoinExec { null_aware: self.null_aware, cache: self.cache.clone(), dynamic_filter: Some(HashJoinExecDynamicFilter { - filter: dynamic_filter, + partition_filters, build_accumulator: OnceLock::new(), }), + pending_partition_filters: Mutex::new(None), }); result = result.with_updated_node(new_node as Arc); } @@ -5074,9 +5154,14 @@ mod tests { Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, )]; - // Create a dynamic filter manually - let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); - let dynamic_filter_clone = Arc::clone(&dynamic_filter); + // Create partition filters manually using create_partition_filters + let (_push_expr, partition_filters) = HashJoinExec::create_partition_filters( + &on, + PartitionMode::CollectLeft, + 1, + REPARTITION_RANDOM_STATE, + ); + let filter_to_wait = Arc::clone(&partition_filters[0]); // Create HashJoinExec with the dynamic filter let mut join = HashJoinExec::try_new( @@ -5091,7 +5176,7 @@ mod tests { false, )?; join.dynamic_filter = Some(HashJoinExecDynamicFilter { - filter: dynamic_filter, + partition_filters, build_accumulator: OnceLock::new(), }); @@ -5099,9 +5184,9 @@ mod tests { let stream = join.execute(0, task_ctx)?; let _batches = common::collect(stream).await?; - // After the join completes, the dynamic filter should be marked as complete + // After the join completes, the partition filter should be marked as complete // wait_complete() should return immediately - dynamic_filter_clone.wait_complete().await; + filter_to_wait.wait_complete().await; Ok(()) } @@ -5123,9 +5208,14 @@ mod tests { Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, )]; - // Create a dynamic filter manually - let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); - let dynamic_filter_clone = Arc::clone(&dynamic_filter); + // Create partition filters manually using create_partition_filters + let (_push_expr, partition_filters) = HashJoinExec::create_partition_filters( + &on, + PartitionMode::CollectLeft, + 1, + REPARTITION_RANDOM_STATE, + ); + let filter_to_wait = Arc::clone(&partition_filters[0]); // Create HashJoinExec with the dynamic filter let mut join = HashJoinExec::try_new( @@ -5140,7 +5230,7 @@ mod tests { false, )?; join.dynamic_filter = Some(HashJoinExecDynamicFilter { - filter: dynamic_filter, + partition_filters, build_accumulator: OnceLock::new(), }); @@ -5148,9 +5238,9 @@ mod tests { let stream = join.execute(0, task_ctx)?; let _batches = common::collect(stream).await?; - // Even with empty build side, the dynamic filter should be marked as complete + // Even with empty build side, the partition filter should be marked as complete // wait_complete() should return immediately - dynamic_filter_clone.wait_complete().await; + filter_to_wait.wait_complete().await; Ok(()) } diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index f32dc7fa80268..2fb4a69d25f8d 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -21,14 +21,12 @@ use std::fmt; use std::sync::Arc; -use crate::ExecutionPlan; -use crate::ExecutionPlanProperties; use crate::joins::Map; use crate::joins::PartitionMode; use crate::joins::hash_join::exec::HASH_JOIN_SEED; use crate::joins::hash_join::inlist_builder::build_struct_fields; use crate::joins::hash_join::partitioned_hash_eval::{ - HashExpr, HashTableLookupExpr, SeededRandomState, + HashTableLookupExpr, SeededRandomState, }; use arrow::array::ArrayRef; use arrow::datatypes::{DataType, Field, Schema}; @@ -37,12 +35,11 @@ use datafusion_common::{Result, ScalarValue}; use datafusion_expr::Operator; use datafusion_functions::core::r#struct as struct_func; use datafusion_physical_expr::expressions::{ - BinaryExpr, CaseExpr, DynamicFilterPhysicalExpr, InListExpr, lit, + BinaryExpr, DynamicFilterPhysicalExpr, InListExpr, lit, }; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef, ScalarFunctionExpr}; use parking_lot::Mutex; -use tokio::sync::Barrier; /// Represents the minimum and maximum values for a specific column. /// Used in dynamic filter pushdown to establish value boundaries. @@ -50,7 +47,7 @@ use tokio::sync::Barrier; pub(crate) struct ColumnBounds { /// The minimum value observed for this column pub(crate) min: ScalarValue, - /// The maximum value observed for this column + /// The maximum value observed for this column pub(crate) max: ScalarValue, } @@ -183,49 +180,61 @@ fn create_bounds_predicate( } } -/// Coordinates build-side information collection across multiple partitions +/// Combines membership and bounds expressions into a single filter expression. +/// +/// Returns `None` if neither membership nor bounds are available (e.g., empty build side). +fn combine_membership_and_bounds( + membership_expr: Option>, + bounds_expr: Option>, +) -> Option> { + match (membership_expr, bounds_expr) { + (Some(membership), Some(bounds)) => { + // Both available: combine with AND + Some(Arc::new(BinaryExpr::new(bounds, Operator::And, membership)) + as Arc) + } + (Some(membership), None) => Some(membership), + (None, Some(bounds)) => Some(bounds), + (None, None) => None, + } +} + +/// Coordinates build-side information collection across multiple partitions. /// /// This structure collects information from the build side (hash tables and/or bounds) and -/// ensures that dynamic filters are built with complete information from all relevant -/// partitions before being applied to probe-side scans. Incomplete filters would -/// incorrectly eliminate valid join results. +/// ensures that dynamic filters are updated correctly as partitions complete. /// /// ## Synchronization Strategy /// -/// 1. Each partition computes information from its build-side data (hash maps and/or bounds) -/// 2. Information is stored in the shared state -/// 3. A barrier tracks how many partitions have reported -/// 4. When the last partition reports, information is merged and the filter is updated exactly once +/// **Partitioned mode:** Each partition independently updates its own DynamicFilter. +/// No shared mutable state per partition — zero contention. /// -/// ## Hash Map vs Bounds +/// **CollectLeft mode:** Uses a mutex for deduplication: +/// 1. The first partition to report updates `partition_filters[0]` and marks it complete +/// 2. Subsequent partitions skip the update (dedup) /// -/// - **Hash Maps (Partitioned mode)**: Collects Arc references to hash tables from each partition. -/// Creates a `PartitionedHashLookupPhysicalExpr` that routes rows to the correct partition's hash table. -/// - **Bounds (CollectLeft mode)**: Collects min/max bounds and creates range predicates. +/// ## Architecture /// -/// ## Partition Counting -/// -/// The `total_partitions` count represents how many times `collect_build_side` will be called: -/// - **CollectLeft**: Number of output partitions (each accesses shared build data) -/// - **Partitioned**: Number of input partitions (each builds independently) +/// The per-partition DynamicFilters are pushed directly to the scan (via a CASE +/// expression or a single DynamicFilter). There is no top-level DynamicFilter +/// wrapper — the expression tree is directly traversable, which allows the +/// `snapshot_generation()` function to detect per-partition updates for file pruning. /// /// ## Thread Safety /// -/// All fields use a single mutex to ensure correct coordination between concurrent -/// partition executions. +/// **Partitioned mode:** Each partition updates its own DynamicFilter — no shared state. +/// +/// **CollectLeft mode:** Mutex dedup ensures only the first report updates the filter. pub(crate) struct SharedBuildAccumulator { - /// Build-side data protected by a single mutex to avoid ordering concerns + /// Build-side data protected by a single mutex (used only for CollectLeft dedup) inner: Mutex, - barrier: Barrier, - /// Dynamic filter for pushdown to probe side - dynamic_filter: Arc, /// Right side join expressions needed for creating filter expressions on_right: Vec, - /// Random state for partitioning (RepartitionExec's hash function with 0,0,0,0 seeds) - /// Used for PartitionedHashLookupPhysicalExpr - repartition_random_state: SeededRandomState, /// Schema of the probe (right) side for evaluating filter expressions probe_schema: Arc, + /// Per-partition DynamicFilters that are pushed down to the scan. + /// CollectLeft: single filter. Partitioned: N filters. + partition_filters: Vec>, } /// Strategy for filter pushdown (decided at collection time) @@ -252,83 +261,31 @@ pub(crate) enum PartitionBuildData { }, } -/// Per-partition accumulated data (Partitioned mode) -#[derive(Clone)] -struct PartitionData { - bounds: PartitionBounds, - pushdown: PushdownStrategy, -} - -/// Build-side data organized by partition mode +/// Build-side data organized by partition mode (used for CollectLeft dedup) enum AccumulatedBuildData { - Partitioned { - partitions: Vec>, - }, + Partitioned, CollectLeft { - data: Option, + /// Whether the first report has already been processed + reported: bool, }, } impl SharedBuildAccumulator { - /// Creates a new SharedBuildAccumulator configured for the given partition mode - /// - /// This method calculates how many times `collect_build_side` will be called based on the - /// partition mode's execution pattern. This count is critical for determining when we have - /// complete information from all partitions to build the dynamic filter. + /// Creates a new SharedBuildAccumulator. /// - /// ## Partition Mode Execution Patterns - /// - /// - **CollectLeft**: Build side is collected ONCE from partition 0 and shared via `OnceFut` - /// across all output partitions. Each output partition calls `collect_build_side` to access the shared build data. - /// Although this results in multiple invocations, the `report_partition_bounds` function contains deduplication logic to handle them safely. - /// Expected calls = number of output partitions. - /// - /// - /// - **Partitioned**: Each partition independently builds its own hash table by calling - /// `collect_build_side` once. Expected calls = number of build partitions. - /// - /// - **Auto**: Placeholder mode resolved during optimization. Uses 1 as safe default since - /// the actual mode will be determined and a new accumulator created before execution. - /// - /// ## Why This Matters - /// - /// We cannot build a partial filter from some partitions - it would incorrectly eliminate - /// valid join results. We must wait until we have complete information from ALL - /// relevant partitions before updating the dynamic filter. - pub(crate) fn new_from_partition_mode( - partition_mode: PartitionMode, - left_child: &dyn ExecutionPlan, - right_child: &dyn ExecutionPlan, - dynamic_filter: Arc, + /// The `partition_filters` are the per-partition DynamicFilter instances that + /// have already been pushed down to the scan (either directly or via a CASE expression). + /// This constructor simply stores them for later updates during execution. + pub(crate) fn new( + mode: PartitionMode, + partition_filters: Vec>, on_right: Vec, - repartition_random_state: SeededRandomState, + probe_schema: Arc, ) -> Self { - // Troubleshooting: If partition counts are incorrect, verify this logic matches - // the actual execution pattern in collect_build_side() - let expected_calls = match partition_mode { - // Each output partition accesses shared build data + let mode_data = match mode { + PartitionMode::Partitioned => AccumulatedBuildData::Partitioned, PartitionMode::CollectLeft => { - right_child.output_partitioning().partition_count() - } - // Each partition builds its own data - PartitionMode::Partitioned => { - left_child.output_partitioning().partition_count() - } - // Default value, will be resolved during optimization (does not exist once `execute()` is called; will be replaced by one of the other two) - PartitionMode::Auto => unreachable!( - "PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!" - ), - }; - - let mode_data = match partition_mode { - PartitionMode::Partitioned => AccumulatedBuildData::Partitioned { - partitions: vec![ - None; - left_child.output_partitioning().partition_count() - ], - }, - PartitionMode::CollectLeft => { - AccumulatedBuildData::CollectLeft { data: None } + AccumulatedBuildData::CollectLeft { reported: false } } PartitionMode::Auto => unreachable!( "PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!" @@ -337,250 +294,93 @@ impl SharedBuildAccumulator { Self { inner: Mutex::new(mode_data), - barrier: Barrier::new(expected_calls), - dynamic_filter, on_right, - repartition_random_state, - probe_schema: right_child.schema(), + probe_schema, + partition_filters, } } - /// Report build-side data from a partition + /// Report build-side data from a partition (synchronous). /// - /// This unified method handles both CollectLeft and Partitioned modes. When all partitions - /// have reported (barrier wait), the leader builds the appropriate filter expression: - /// - CollectLeft: Simple conjunction of bounds and membership check - /// - Partitioned: CASE expression routing to per-partition filters + /// **Partitioned mode:** Each partition independently builds its filter expression + /// and updates its own per-partition DynamicFilter. No mutex contention. /// - /// # Arguments - /// * `data` - Build data including hash map, pushdown strategy, and bounds + /// **CollectLeft mode:** The first partition to report updates `partition_filters[0]`. + /// Subsequent reports are no-ops (dedup via mutex). /// - /// # Returns - /// * `Result<()>` - Ok if successful, Err if filter update failed or mode mismatch - pub(crate) async fn report_build_data(&self, data: PartitionBuildData) -> Result<()> { - // Store data in the accumulator - { - let mut guard = self.inner.lock(); - - match (data, &mut *guard) { - // Partitioned mode - ( - PartitionBuildData::Partitioned { - partition_id, - pushdown, - bounds, - }, - AccumulatedBuildData::Partitioned { partitions }, - ) => { - partitions[partition_id] = Some(PartitionData { pushdown, bounds }); - } - // CollectLeft mode (store once, deduplicate across partitions) - ( - PartitionBuildData::CollectLeft { pushdown, bounds }, - AccumulatedBuildData::CollectLeft { data }, - ) => { - // Deduplicate - all partitions report the same data in CollectLeft - if data.is_none() { - *data = Some(PartitionData { pushdown, bounds }); + /// Each filter is marked complete independently — no cross-partition coordination needed. + pub(crate) fn report_build_data(&self, data: PartitionBuildData) -> Result<()> { + match data { + PartitionBuildData::Partitioned { + partition_id, + pushdown, + bounds, + } => { + match &pushdown { + PushdownStrategy::Empty => { + // Empty partition: update per-partition filter to lit(false) + self.partition_filters[partition_id].update(lit(false))?; } - } - // Mismatched modes - should never happen - _ => { - return datafusion_common::internal_err!( - "Build data mode mismatch in report_build_data" - ); - } - } - } - - // Wait for all partitions to report - if self.barrier.wait().await.is_leader() { - // All partitions have reported, so we can create and update the filter - let inner = self.inner.lock(); - - match &*inner { - // CollectLeft: Simple conjunction of bounds and membership check - AccumulatedBuildData::CollectLeft { data } => { - if let Some(partition_data) = data { - // Create membership predicate (InList for small build sides, hash lookup otherwise) + _ => { + // Build the bounds AND membership expression for this partition let membership_expr = create_membership_predicate( &self.on_right, - partition_data.pushdown.clone(), + pushdown, &HASH_JOIN_SEED, self.probe_schema.as_ref(), )?; + let bounds_expr = + create_bounds_predicate(&self.on_right, &bounds); - // Create bounds check expression (if bounds available) - let bounds_expr = create_bounds_predicate( - &self.on_right, - &partition_data.bounds, - ); + if let Some(filter_expr) = + combine_membership_and_bounds(membership_expr, bounds_expr) + { + self.partition_filters[partition_id].update(filter_expr)?; + } + } + } - // Combine membership and bounds expressions for multi-layer optimization: - // - Bounds (min/max): Enable statistics-based pruning (Parquet row group/file skipping) - // - Membership (InList/hash lookup): Enables: - // * Precise filtering (exact value matching) - // * Bloom filter utilization (if present in Parquet files) - // * Better pruning for data types where min/max isn't effective (e.g., UUIDs) - // Together, they provide complementary benefits and maximize data skipping. - // Only update the filter if we have something to push down - if let Some(filter_expr) = match (membership_expr, bounds_expr) { - (Some(membership), Some(bounds)) => { - // Both available: combine with AND - Some(Arc::new(BinaryExpr::new( - bounds, - Operator::And, - membership, - )) - as Arc) - } - (Some(membership), None) => { - // Membership available but no bounds - // This is reachable when we have data but bounds aren't available - // (e.g., unsupported data types or no columns with bounds) - Some(membership) - } - (None, Some(bounds)) => { - // Bounds available but no membership. - // This should be unreachable in practice: we can always push down a reference - // to the hash table. - // But it seems safer to handle it defensively. - Some(bounds) - } - (None, None) => { - // No filter available (e.g., empty build side) - // Don't update the filter, but continue to mark complete - None + // Mark this partition's filter as complete + self.partition_filters[partition_id].mark_complete(); + } + PartitionBuildData::CollectLeft { pushdown, bounds } => { + // Use mutex for deduplication: only the first report updates the filter + { + let mut guard = self.inner.lock(); + match &mut *guard { + AccumulatedBuildData::CollectLeft { reported } => { + if *reported { + // Already reported by another partition, skip + return Ok(()); } - } { - self.dynamic_filter.update(filter_expr)?; + *reported = true; + } + AccumulatedBuildData::Partitioned => { + return datafusion_common::internal_err!( + "Build data mode mismatch: expected CollectLeft, got Partitioned" + ); } } } - // Partitioned: CASE expression routing to per-partition filters - AccumulatedBuildData::Partitioned { partitions } => { - // Collect all partition data (should all be Some at this point) - let partition_data: Vec<_> = - partitions.iter().filter_map(|p| p.as_ref()).collect(); - - if !partition_data.is_empty() { - // Build a CASE expression that combines range checks AND membership checks - // CASE (hash_repartition(join_keys) % num_partitions) - // WHEN 0 THEN (col >= min_0 AND col <= max_0 AND ...) AND membership_check_0 - // WHEN 1 THEN (col >= min_1 AND col <= max_1 AND ...) AND membership_check_1 - // ... - // ELSE false - // END - - let num_partitions = partition_data.len(); - - // Create base expression: hash_repartition(join_keys) % num_partitions - let routing_hash_expr = Arc::new(HashExpr::new( - self.on_right.clone(), - self.repartition_random_state.clone(), - "hash_repartition".to_string(), - )) - as Arc; - - let modulo_expr = Arc::new(BinaryExpr::new( - routing_hash_expr, - Operator::Modulo, - lit(ScalarValue::UInt64(Some(num_partitions as u64))), - )) - as Arc; - - // Create WHEN branches for each partition - let when_then_branches: Vec<( - Arc, - Arc, - )> = partitions - .iter() - .enumerate() - .filter_map(|(partition_id, partition_opt)| { - partition_opt.as_ref().and_then(|partition| { - // Skip empty partitions - they would always return false anyway - match &partition.pushdown { - PushdownStrategy::Empty => None, - _ => Some((partition_id, partition)), - } - }) - }) - .map(|(partition_id, partition)| -> Result<_> { - // WHEN partition_id - let when_expr = - lit(ScalarValue::UInt64(Some(partition_id as u64))); - - // THEN: Combine bounds check AND membership predicate + // Lock released; build the filter expression outside the lock - // 1. Create membership predicate (InList for small build sides, hash lookup otherwise) - let membership_expr = create_membership_predicate( - &self.on_right, - partition.pushdown.clone(), - &HASH_JOIN_SEED, - self.probe_schema.as_ref(), - )?; - - // 2. Create bounds check expression for this partition (if bounds available) - let bounds_expr = create_bounds_predicate( - &self.on_right, - &partition.bounds, - ); - - // 3. Combine membership and bounds expressions - let then_expr = match (membership_expr, bounds_expr) { - (Some(membership), Some(bounds)) => { - // Both available: combine with AND - Arc::new(BinaryExpr::new( - bounds, - Operator::And, - membership, - )) - as Arc - } - (Some(membership), None) => { - // Membership available but no bounds (e.g., unsupported data types) - membership - } - (None, Some(bounds)) => { - // Bounds available but no membership. - // This should be unreachable in practice: we can always push down a reference - // to the hash table. - // But it seems safer to handle it defensively. - bounds - } - (None, None) => { - // No filter for this partition - should not happen due to filter_map above - // but handle defensively by returning a "true" literal - lit(true) - } - }; - - Ok((when_expr, then_expr)) - }) - .collect::>>()?; - - // Optimize for single partition: skip CASE expression entirely - let filter_expr = if when_then_branches.is_empty() { - // All partitions are empty: no rows can match - lit(false) - } else if when_then_branches.len() == 1 { - // Single partition: just use the condition directly - // since hash % 1 == 0 always, the WHEN 0 branch will always match - Arc::clone(&when_then_branches[0].1) - } else { - // Multiple partitions: create CASE expression - Arc::new(CaseExpr::try_new( - Some(modulo_expr), - when_then_branches, - Some(lit(false)), // ELSE false - )?) as Arc - }; + let membership_expr = create_membership_predicate( + &self.on_right, + pushdown, + &HASH_JOIN_SEED, + self.probe_schema.as_ref(), + )?; + let bounds_expr = create_bounds_predicate(&self.on_right, &bounds); - self.dynamic_filter.update(filter_expr)?; - } + if let Some(filter_expr) = + combine_membership_and_bounds(membership_expr, bounds_expr) + { + self.partition_filters[0].update(filter_expr)?; } + + // Mark the single filter as complete + self.partition_filters[0].mark_complete(); } - self.dynamic_filter.mark_complete(); } Ok(()) diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 54e620f99de7a..06a10c3604788 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -125,8 +125,6 @@ impl BuildSide { pub(super) enum HashJoinStreamState { /// Initial state for HashJoinStream indicating that build-side data not collected yet WaitBuildSide, - /// Waiting for bounds to be reported by all partitions - WaitPartitionBoundsReport, /// Indicates that build-side has been collected, and stream is ready for fetching probe-side FetchProbeBatch, /// Indicates that non-empty batch has been fetched from probe-side, and is ready to be processed @@ -216,9 +214,6 @@ pub(super) struct HashJoinStream { right_side_ordered: bool, /// Shared build accumulator for coordinating dynamic filter updates (collects hash maps and/or bounds, optional) build_accumulator: Option>, - /// Optional future to signal when build information has been reported by all partitions - /// and the dynamic filter has been updated - build_waiter: Option>, /// Partitioning mode to use mode: PartitionMode, /// Output buffer for coalescing small batches into larger ones. @@ -403,7 +398,6 @@ impl HashJoinStream { build_indices_buffer: Vec::with_capacity(batch_size), right_side_ordered, build_accumulator, - build_waiter: None, mode, output_buffer, null_aware, @@ -429,9 +423,6 @@ impl HashJoinStream { HashJoinStreamState::WaitBuildSide => { handle_state!(ready!(self.collect_build_side(cx))) } - HashJoinStreamState::WaitPartitionBoundsReport => { - handle_state!(ready!(self.wait_for_partition_bounds_report(cx))) - } HashJoinStreamState::FetchProbeBatch => { handle_state!(ready!(self.fetch_probe_batch(cx))) } @@ -452,26 +443,6 @@ impl HashJoinStream { } } - /// Optional step to wait until build-side information (hash maps or bounds) has been reported by all partitions. - /// This state is only entered if a build accumulator is present. - /// - /// ## Why wait? - /// - /// The dynamic filter is only built once all partitions have reported their information (hash maps or bounds). - /// If we do not wait here, the probe-side scan may start before the filter is ready. - /// This can lead to the probe-side scan missing the opportunity to apply the filter - /// and skip reading unnecessary data. - fn wait_for_partition_bounds_report( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> Poll>>> { - if let Some(ref mut fut) = self.build_waiter { - ready!(fut.get_shared(cx))?; - } - self.state = HashJoinStreamState::FetchProbeBatch; - Poll::Ready(Ok(StatefulStreamResult::Continue)) - } - /// Collects build-side data by polling `OnceFut` future from initialized build-side /// /// Updates build-side to `Ready`, and state to `FetchProbeSide` @@ -534,10 +505,8 @@ impl HashJoinStream { ), }; - self.build_waiter = Some(OnceFut::new(async move { - build_accumulator.report_build_data(build_data).await - })); - self.state = HashJoinStreamState::WaitPartitionBoundsReport; + build_accumulator.report_build_data(build_data)?; + self.state = HashJoinStreamState::FetchProbeBatch; } else { self.state = HashJoinStreamState::FetchProbeBatch; }