diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index c249dfb10aacf..015cbcd780984 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -609,6 +609,26 @@ impl HashJoinExec { Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true))) } + fn allow_join_dynamic_filter_pushdown(&self, config: &ConfigOptions) -> bool { + if !config.optimizer.enable_join_dynamic_filter_pushdown { + return false; + } + + // `preserve_file_partitions` can report Hash partitioning for Hive-style + // file groups, but those partitions are not actually hash-distributed. + // Partitioned dynamic filters rely on hash routing, so disable them in + // this mode to avoid incorrect results. Follow-up work: enable dynamic + // filtering for preserve_file_partitioned scans (issue #20195). + // https://github.com/apache/datafusion/issues/20195 + if config.optimizer.preserve_file_partitions > 0 + && self.mode == PartitionMode::Partitioned + { + return false; + } + + true + } + /// left (build) side which gets hashed pub fn left(&self) -> &Arc { &self.left @@ -1081,11 +1101,8 @@ impl ExecutionPlan for HashJoinExec { // - A dynamic filter exists // - At least one consumer is holding a reference to it, this avoids expensive filter // computation when disabled or when no consumer will use it. - let enable_dynamic_filter_pushdown = context - .session_config() - .options() - .optimizer - .enable_join_dynamic_filter_pushdown + let enable_dynamic_filter_pushdown = self + .allow_join_dynamic_filter_pushdown(context.session_config().options()) && self .dynamic_filter .as_ref() @@ -1315,7 +1332,7 @@ impl ExecutionPlan for HashJoinExec { // Add dynamic filters in Post phase if enabled if matches!(phase, FilterPushdownPhase::Post) - && config.optimizer.enable_join_dynamic_filter_pushdown + && self.allow_join_dynamic_filter_pushdown(config) { // Add actual dynamic filter to right side (probe side) let dynamic_filter = Self::create_dynamic_filter(&self.on); diff --git a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt index 34c5fd97b51f3..297094fab16e7 100644 --- a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt +++ b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt @@ -101,6 +101,29 @@ STORED AS PARQUET; ---- 4 +# Create hive-partitioned dimension table (3 partitions matching fact_table) +# For testing Partitioned joins with matching partition counts +query I +COPY (SELECT 'dev' as env, 'log' as service) +TO 'test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=A/data.parquet' +STORED AS PARQUET; +---- +1 + +query I +COPY (SELECT 'prod' as env, 'log' as service) +TO 'test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=B/data.parquet' +STORED AS PARQUET; +---- +1 + +query I +COPY (SELECT 'prod' as env, 'log' as service) +TO 'test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=C/data.parquet' +STORED AS PARQUET; +---- +1 + # Create high-cardinality fact table (5 partitions > 3 target_partitions) # For testing partition merging with consistent hashing query I @@ -173,6 +196,13 @@ CREATE EXTERNAL TABLE dimension_table (d_dkey STRING, env STRING, service STRING STORED AS PARQUET LOCATION 'test_files/scratch/preserve_file_partitioning/dimension/'; +# Hive-partitioned dimension table (3 partitions matching fact_table for Partitioned join tests) +statement ok +CREATE EXTERNAL TABLE dimension_table_partitioned (env STRING, service STRING) +STORED AS PARQUET +PARTITIONED BY (d_dkey STRING) +LOCATION 'test_files/scratch/preserve_file_partitioning/dimension_partitioned/'; + # 'High'-cardinality fact table (5 partitions > 3 target_partitions) statement ok CREATE EXTERNAL TABLE high_cardinality_table (timestamp TIMESTAMP, value DOUBLE) @@ -579,6 +609,101 @@ C 1 300 D 1 400 E 1 500 +########## +# TEST 11: Partitioned Join with Matching Partition Counts - Without Optimization +# fact_table (3 partitions) joins dimension_table_partitioned (3 partitions) +# Shows RepartitionExec added when preserve_file_partitions is disabled +########## + +statement ok +set datafusion.optimizer.preserve_file_partitions = 0; + +# Force Partitioned join mode (not CollectLeft) +statement ok +set datafusion.optimizer.hash_join_single_partition_threshold = 0; + +statement ok +set datafusion.optimizer.hash_join_single_partition_threshold_rows = 0; + +query TT +EXPLAIN SELECT f.f_dkey, d.env, sum(f.value) +FROM fact_table f +INNER JOIN dimension_table_partitioned d ON f.f_dkey = d.d_dkey +GROUP BY f.f_dkey, d.env; +---- +logical_plan +01)Aggregate: groupBy=[[f.f_dkey, d.env]], aggr=[[sum(f.value)]] +02)--Projection: f.value, f.f_dkey, d.env +03)----Inner Join: f.f_dkey = d.d_dkey +04)------SubqueryAlias: f +05)--------TableScan: fact_table projection=[value, f_dkey] +06)------SubqueryAlias: d +07)--------TableScan: dimension_table_partitioned projection=[env, d_dkey] +physical_plan +01)AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey, env@1 as env], aggr=[sum(f.value)] +02)--RepartitionExec: partitioning=Hash([f_dkey@0, env@1], 3), input_partitions=3 +03)----AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey, env@2 as env], aggr=[sum(f.value)] +04)------ProjectionExec: expr=[value@1 as value, f_dkey@2 as f_dkey, env@0 as env] +05)--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@1, f_dkey@1)], projection=[env@0, value@2, f_dkey@3] +06)----------RepartitionExec: partitioning=Hash([d_dkey@1], 3), input_partitions=3 +07)------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=C/data.parquet]]}, projection=[env, d_dkey], file_type=parquet +08)----------RepartitionExec: partitioning=Hash([f_dkey@1], 3), input_partitions=3 +09)------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet, predicate=DynamicFilter [ empty ] + +query TTR rowsort +SELECT f.f_dkey, d.env, sum(f.value) +FROM fact_table f +INNER JOIN dimension_table_partitioned d ON f.f_dkey = d.d_dkey +GROUP BY f.f_dkey, d.env; +---- +A dev 772.4 +B prod 614.4 +C prod 2017.6 + +########## +# TEST 12: Partitioned Join with Matching Partition Counts - With Optimization +# Both tables have 3 partitions matching target_partitions=3 +# No RepartitionExec needed for join - partitions already satisfy the requirement +# Dynamic filter pushdown is disabled in this mode because preserve_file_partitions +# reports Hash partitioning for Hive-style file groups, which are not hash-routed. +########## + +statement ok +set datafusion.optimizer.preserve_file_partitions = 1; + +query TT +EXPLAIN SELECT f.f_dkey, d.env, sum(f.value) +FROM fact_table f +INNER JOIN dimension_table_partitioned d ON f.f_dkey = d.d_dkey +GROUP BY f.f_dkey, d.env; +---- +logical_plan +01)Aggregate: groupBy=[[f.f_dkey, d.env]], aggr=[[sum(f.value)]] +02)--Projection: f.value, f.f_dkey, d.env +03)----Inner Join: f.f_dkey = d.d_dkey +04)------SubqueryAlias: f +05)--------TableScan: fact_table projection=[value, f_dkey] +06)------SubqueryAlias: d +07)--------TableScan: dimension_table_partitioned projection=[env, d_dkey] +physical_plan +01)AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey, env@1 as env], aggr=[sum(f.value)] +02)--RepartitionExec: partitioning=Hash([f_dkey@0, env@1], 3), input_partitions=3 +03)----AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey, env@2 as env], aggr=[sum(f.value)] +04)------ProjectionExec: expr=[value@1 as value, f_dkey@2 as f_dkey, env@0 as env] +05)--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@1, f_dkey@1)], projection=[env@0, value@2, f_dkey@3] +06)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=C/data.parquet]]}, projection=[env, d_dkey], file_type=parquet +07)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet + +query TTR rowsort +SELECT f.f_dkey, d.env, sum(f.value) +FROM fact_table f +INNER JOIN dimension_table_partitioned d ON f.f_dkey = d.d_dkey +GROUP BY f.f_dkey, d.env; +---- +A dev 772.4 +B prod 614.4 +C prod 2017.6 + ########## # CLEANUP ########## @@ -592,5 +717,8 @@ DROP TABLE fact_table_ordered; statement ok DROP TABLE dimension_table; +statement ok +DROP TABLE dimension_table_partitioned; + statement ok DROP TABLE high_cardinality_table;