Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,26 @@ impl HashJoinExec {
Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true)))
}

fn allow_join_dynamic_filter_pushdown(&self, config: &ConfigOptions) -> bool {
if !config.optimizer.enable_join_dynamic_filter_pushdown {
return false;
}

// `preserve_file_partitions` can report Hash partitioning for Hive-style
// file groups, but those partitions are not actually hash-distributed.
// Partitioned dynamic filters rely on hash routing, so disable them in
// this mode to avoid incorrect results. Follow-up work: enable dynamic
// filtering for preserve_file_partitioned scans (issue #20195).
// https://github.com/apache/datafusion/issues/20195
if config.optimizer.preserve_file_partitions > 0
&& self.mode == PartitionMode::Partitioned
{
return false;
}

true
}

/// left (build) side which gets hashed
pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
&self.left
Expand Down Expand Up @@ -1081,11 +1101,8 @@ impl ExecutionPlan for HashJoinExec {
// - A dynamic filter exists
// - At least one consumer is holding a reference to it, this avoids expensive filter
// computation when disabled or when no consumer will use it.
let enable_dynamic_filter_pushdown = context
.session_config()
.options()
.optimizer
.enable_join_dynamic_filter_pushdown
let enable_dynamic_filter_pushdown = self
.allow_join_dynamic_filter_pushdown(context.session_config().options())
&& self
.dynamic_filter
.as_ref()
Expand Down Expand Up @@ -1315,7 +1332,7 @@ impl ExecutionPlan for HashJoinExec {

// Add dynamic filters in Post phase if enabled
if matches!(phase, FilterPushdownPhase::Post)
&& config.optimizer.enable_join_dynamic_filter_pushdown
&& self.allow_join_dynamic_filter_pushdown(config)
{
// Add actual dynamic filter to right side (probe side)
let dynamic_filter = Self::create_dynamic_filter(&self.on);
Expand Down
128 changes: 128 additions & 0 deletions datafusion/sqllogictest/test_files/preserve_file_partitioning.slt
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,29 @@ STORED AS PARQUET;
----
4

# Create hive-partitioned dimension table (3 partitions matching fact_table)
# For testing Partitioned joins with matching partition counts
query I
COPY (SELECT 'dev' as env, 'log' as service)
TO 'test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=A/data.parquet'
STORED AS PARQUET;
----
1

query I
COPY (SELECT 'prod' as env, 'log' as service)
TO 'test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=B/data.parquet'
STORED AS PARQUET;
----
1

query I
COPY (SELECT 'prod' as env, 'log' as service)
TO 'test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=C/data.parquet'
STORED AS PARQUET;
----
1

# Create high-cardinality fact table (5 partitions > 3 target_partitions)
# For testing partition merging with consistent hashing
query I
Expand Down Expand Up @@ -173,6 +196,13 @@ CREATE EXTERNAL TABLE dimension_table (d_dkey STRING, env STRING, service STRING
STORED AS PARQUET
LOCATION 'test_files/scratch/preserve_file_partitioning/dimension/';

# Hive-partitioned dimension table (3 partitions matching fact_table for Partitioned join tests)
statement ok
CREATE EXTERNAL TABLE dimension_table_partitioned (env STRING, service STRING)
STORED AS PARQUET
PARTITIONED BY (d_dkey STRING)
LOCATION 'test_files/scratch/preserve_file_partitioning/dimension_partitioned/';

# 'High'-cardinality fact table (5 partitions > 3 target_partitions)
statement ok
CREATE EXTERNAL TABLE high_cardinality_table (timestamp TIMESTAMP, value DOUBLE)
Expand Down Expand Up @@ -579,6 +609,101 @@ C 1 300
D 1 400
E 1 500

##########
# TEST 11: Partitioned Join with Matching Partition Counts - Without Optimization
# fact_table (3 partitions) joins dimension_table_partitioned (3 partitions)
# Shows RepartitionExec added when preserve_file_partitions is disabled
##########

statement ok
set datafusion.optimizer.preserve_file_partitions = 0;

# Force Partitioned join mode (not CollectLeft)
statement ok
set datafusion.optimizer.hash_join_single_partition_threshold = 0;

statement ok
set datafusion.optimizer.hash_join_single_partition_threshold_rows = 0;

query TT
EXPLAIN SELECT f.f_dkey, d.env, sum(f.value)
FROM fact_table f
INNER JOIN dimension_table_partitioned d ON f.f_dkey = d.d_dkey
GROUP BY f.f_dkey, d.env;
----
logical_plan
01)Aggregate: groupBy=[[f.f_dkey, d.env]], aggr=[[sum(f.value)]]
02)--Projection: f.value, f.f_dkey, d.env
03)----Inner Join: f.f_dkey = d.d_dkey
04)------SubqueryAlias: f
05)--------TableScan: fact_table projection=[value, f_dkey]
06)------SubqueryAlias: d
07)--------TableScan: dimension_table_partitioned projection=[env, d_dkey]
physical_plan
01)AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey, env@1 as env], aggr=[sum(f.value)]
02)--RepartitionExec: partitioning=Hash([f_dkey@0, env@1], 3), input_partitions=3
03)----AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey, env@2 as env], aggr=[sum(f.value)]
04)------ProjectionExec: expr=[value@1 as value, f_dkey@2 as f_dkey, env@0 as env]
05)--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@1, f_dkey@1)], projection=[env@0, value@2, f_dkey@3]
06)----------RepartitionExec: partitioning=Hash([d_dkey@1], 3), input_partitions=3
07)------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=C/data.parquet]]}, projection=[env, d_dkey], file_type=parquet
08)----------RepartitionExec: partitioning=Hash([f_dkey@1], 3), input_partitions=3
09)------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet, predicate=DynamicFilter [ empty ]

query TTR rowsort
SELECT f.f_dkey, d.env, sum(f.value)
FROM fact_table f
INNER JOIN dimension_table_partitioned d ON f.f_dkey = d.d_dkey
GROUP BY f.f_dkey, d.env;
----
A dev 772.4
B prod 614.4
C prod 2017.6

##########
# TEST 12: Partitioned Join with Matching Partition Counts - With Optimization
# Both tables have 3 partitions matching target_partitions=3
# No RepartitionExec needed for join - partitions already satisfy the requirement
# Dynamic filter pushdown is disabled in this mode because preserve_file_partitions
# reports Hash partitioning for Hive-style file groups, which are not hash-routed.
##########

statement ok
set datafusion.optimizer.preserve_file_partitions = 1;

query TT
EXPLAIN SELECT f.f_dkey, d.env, sum(f.value)
FROM fact_table f
INNER JOIN dimension_table_partitioned d ON f.f_dkey = d.d_dkey
GROUP BY f.f_dkey, d.env;
----
logical_plan
01)Aggregate: groupBy=[[f.f_dkey, d.env]], aggr=[[sum(f.value)]]
02)--Projection: f.value, f.f_dkey, d.env
03)----Inner Join: f.f_dkey = d.d_dkey
04)------SubqueryAlias: f
05)--------TableScan: fact_table projection=[value, f_dkey]
06)------SubqueryAlias: d
07)--------TableScan: dimension_table_partitioned projection=[env, d_dkey]
physical_plan
01)AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey, env@1 as env], aggr=[sum(f.value)]
02)--RepartitionExec: partitioning=Hash([f_dkey@0, env@1], 3), input_partitions=3
03)----AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey, env@2 as env], aggr=[sum(f.value)]
04)------ProjectionExec: expr=[value@1 as value, f_dkey@2 as f_dkey, env@0 as env]
05)--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@1, f_dkey@1)], projection=[env@0, value@2, f_dkey@3]
06)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=C/data.parquet]]}, projection=[env, d_dkey], file_type=parquet
07)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet

query TTR rowsort
SELECT f.f_dkey, d.env, sum(f.value)
FROM fact_table f
INNER JOIN dimension_table_partitioned d ON f.f_dkey = d.d_dkey
GROUP BY f.f_dkey, d.env;
----
A dev 772.4
B prod 614.4
C prod 2017.6

##########
# CLEANUP
##########
Expand All @@ -592,5 +717,8 @@ DROP TABLE fact_table_ordered;
statement ok
DROP TABLE dimension_table;

statement ok
DROP TABLE dimension_table_partitioned;

statement ok
DROP TABLE high_cardinality_table;