From 683a5516e28509ff6ee841fbfd1b7eedd8729a81 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Wed, 4 Feb 2026 15:02:14 +0800 Subject: [PATCH 1/5] feat: enable UnionExec filter pushdown with heterogeneous children --- .../physical_optimizer/filter_pushdown.rs | 61 ++++++++++++++++ datafusion/physical-plan/src/union.rs | 70 ++++++++++++++++++- 2 files changed, 129 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index 3a0015068567b..b8024319fc1f3 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -1808,6 +1808,67 @@ fn test_filter_pushdown_through_union() { ); } +#[test] +fn test_filter_pushdown_through_union_mixed_support() { + // Test case where one child supports filter pushdown and one doesn't + let scan1 = TestScanBuilder::new(schema()).with_support(true).build(); + let scan2 = TestScanBuilder::new(schema()).with_support(false).build(); + + let union = UnionExec::try_new(vec![scan1, scan2]).unwrap(); + + let predicate = col_lit_predicate("a", "foo", &schema()); + let plan = Arc::new(FilterExec::try_new(predicate, union).unwrap()); + + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown::new(), true), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - UnionExec + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + output: + Ok: + - UnionExec + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + " + ); +} + +#[test] +fn test_filter_pushdown_through_union_does_not_support() { + // Test case where one child supports filter pushdown and one doesn't + let scan1 = TestScanBuilder::new(schema()).with_support(false).build(); + let scan2 = TestScanBuilder::new(schema()).with_support(false).build(); + + let union = UnionExec::try_new(vec![scan1, scan2]).unwrap(); + + let predicate = col_lit_predicate("a", "foo", &schema()); + let plan = Arc::new(FilterExec::try_new(predicate, union).unwrap()); + + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown::new(), true), + @" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - UnionExec + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + output: + Ok: + - UnionExec + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + " + ); +} + /// Schema: /// a: String /// b: String diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index b6f943886e309..cffa17aabcf52 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -36,7 +36,11 @@ use crate::execution_plan::{ InvariantLevel, boundedness_from_children, check_default_invariants, emission_type_from_children, }; -use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase}; +use crate::filter::FilterExec; +use crate::filter_pushdown::{ + ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, PushedDown, +}; use crate::metrics::BaselineMetrics; use crate::projection::{ProjectionExec, make_with_child}; use crate::stream::ObservedStream; @@ -49,7 +53,9 @@ use datafusion_common::{ Result, assert_or_internal_err, exec_err, internal_datafusion_err, }; use datafusion_execution::TaskContext; -use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr, calculate_union}; +use datafusion_physical_expr::{ + EquivalenceProperties, PhysicalExpr, calculate_union, conjunction, +}; use futures::Stream; use itertools::Itertools; @@ -370,6 +376,66 @@ impl ExecutionPlan for UnionExec { ) -> Result { FilterDescription::from_children(parent_filters, &self.children()) } + + fn handle_child_pushdown_result( + &self, + phase: FilterPushdownPhase, + child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result>> { + // For non-Pre phase, use default behavior + if !matches!(phase, FilterPushdownPhase::Pre) { + return Ok(FilterPushdownPropagation::if_all(child_pushdown_result)); + } + + // Collect unsupported filters for each child + let mut unsupported_filters_per_child = vec![Vec::new(); self.inputs.len()]; + + for parent_filter_result in child_pushdown_result.parent_filters.iter() { + for (child_idx, &child_result) in + parent_filter_result.child_results.iter().enumerate() + { + if matches!(child_result, PushedDown::No) { + unsupported_filters_per_child[child_idx] + .push(Arc::clone(&parent_filter_result.filter)); + } + } + } + + // Wrap children that have unsupported filters with FilterExec + let mut new_children = self.inputs.clone(); + for (child_idx, unsupported_filters) in + unsupported_filters_per_child.iter().enumerate() + { + if !unsupported_filters.is_empty() { + let combined_filter = conjunction(unsupported_filters.clone()); + new_children[child_idx] = Arc::new(FilterExec::try_new( + combined_filter, + Arc::clone(&self.inputs[child_idx]), + )?); + } + } + + // Check if any children were modified + let children_modified = new_children + .iter() + .zip(self.inputs.iter()) + .any(|(new, old)| !Arc::ptr_eq(new, old)); + + let all_filters_pushed = + vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()]; + let propagation = if children_modified { + let updated_node = UnionExec::try_new(new_children)?; + FilterPushdownPropagation::with_parent_pushdown_result(all_filters_pushed) + .with_updated_node(updated_node) + } else { + FilterPushdownPropagation::with_parent_pushdown_result(all_filters_pushed) + }; + + // Report all parent filters as supported since we've ensured they're applied + // on all children (either pushed down or via FilterExec) + Ok(propagation) + } } /// Combines multiple input streams by interleaving them. From e32303f988b5ff526568bfc10e3d8b1440cd3063 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Wed, 4 Feb 2026 23:15:32 +0800 Subject: [PATCH 2/5] add slt test case --- .../test_files/parquet_filter_pushdown.slt | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index aa94e2e2f2c04..5e847428d3d80 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -674,3 +674,61 @@ logical_plan physical_plan 01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/array_data/data.parquet]]}, projection=[id, tags], file_type=parquet, predicate=id@0 > 1 AND array_has(tags@1, rust), pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 1, required_guarantees=[] + +### +# Test filter pushdown through UNION with mixed support +# This tests the case where one child supports filter pushdown (parquet) and one doesn't (memory table) +### + +# enable filter pushdown +statement ok +set datafusion.execution.parquet.pushdown_filters = true; + +# Create memory table with matching schema (a: VARCHAR, b: BIGINT) +statement ok +CREATE TABLE t_union_mem(a VARCHAR, b BIGINT) AS VALUES ('qux', 4), ('quux', 5); + +# Create parquet table with matching schema +statement ok +CREATE EXTERNAL TABLE t_union_parquet(a VARCHAR, b BIGINT) STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet'; + +# Query results combining memory table and Parquet with filter +query I rowsort +SELECT b FROM ( + SELECT a, b FROM t_union_mem + UNION ALL + SELECT a, b FROM t_union_parquet +) WHERE b > 2; +---- +3 +4 +5 +50 + +# Explain the union query - filter should be pushed to parquet but not memory table +query TT +EXPLAIN SELECT b FROM ( + SELECT a, b FROM t_union_mem + UNION ALL + SELECT a, b FROM t_union_parquet +) WHERE b > 2; +---- +logical_plan +01)Union +02)--Filter: t_union_mem.b > Int64(2) +03)----TableScan: t_union_mem projection=[b] +04)--Filter: t_union_parquet.b > Int64(2) +05)----TableScan: t_union_parquet projection=[b], partial_filters=[t_union_parquet.b > Int64(2)] +physical_plan +01)UnionExec +02)--FilterExec: b@0 > 2 +03)----DataSourceExec: partitions=1, partition_sizes=[1] +04)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet]]}, projection=[b], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[] + +# Clean up union test tables +statement ok +DROP TABLE t_union_mem; + +statement ok +DROP TABLE t_union_parquet; From 4694935eb6a45bbe827ff42764f60461a50ccfbb Mon Sep 17 00:00:00 2001 From: Huaijin Date: Wed, 4 Feb 2026 23:20:09 +0800 Subject: [PATCH 3/5] update --- .../test_files/parquet_filter_pushdown.slt | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 5e847428d3d80..e2473ee328e51 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -684,6 +684,9 @@ physical_plan statement ok set datafusion.execution.parquet.pushdown_filters = true; +statement ok +set datafusion.optimizer.max_passes = 0; + # Create memory table with matching schema (a: VARCHAR, b: BIGINT) statement ok CREATE TABLE t_union_mem(a VARCHAR, b BIGINT) AS VALUES ('qux', 4), ('quux', 5); @@ -715,11 +718,13 @@ EXPLAIN SELECT b FROM ( ) WHERE b > 2; ---- logical_plan -01)Union -02)--Filter: t_union_mem.b > Int64(2) -03)----TableScan: t_union_mem projection=[b] -04)--Filter: t_union_parquet.b > Int64(2) -05)----TableScan: t_union_parquet projection=[b], partial_filters=[t_union_parquet.b > Int64(2)] +01)Projection: b +02)--Filter: b > Int64(2) +03)----Union +04)------Projection: t_union_mem.a, t_union_mem.b +05)--------TableScan: t_union_mem +06)------Projection: t_union_parquet.a, t_union_parquet.b +07)--------TableScan: t_union_parquet physical_plan 01)UnionExec 02)--FilterExec: b@0 > 2 From ded7d9bf1beaafc7f1e1e419b10490bc48659d28 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Thu, 5 Feb 2026 10:02:08 +0800 Subject: [PATCH 4/5] add comment --- datafusion/physical-plan/src/union.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index cffa17aabcf52..25eb5fd9895d5 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -388,9 +388,23 @@ impl ExecutionPlan for UnionExec { return Ok(FilterPushdownPropagation::if_all(child_pushdown_result)); } + // UnionExec needs specialized filter pushdown handling when children have + // heterogeneous pushdown support. Without this, when some children support + // pushdown and others don't, the default behavior would leave FilterExec + // above UnionExec, re-applying filters to outputs of all children—including + // those that already applied the filters via pushdown. This specialized + // implementation adds FilterExec only to children that don't support + // pushdown, avoiding redundant filtering and improving performance. + // + // Example: Given Child1 (no pushdown support) and Child2 (has pushdown support) + // Default behavior: This implementation: + // FilterExec UnionExec + // UnionExec FilterExec + // Child1 Child1 + // Child2(filter) Child2(filter) + // Collect unsupported filters for each child let mut unsupported_filters_per_child = vec![Vec::new(); self.inputs.len()]; - for parent_filter_result in child_pushdown_result.parent_filters.iter() { for (child_idx, &child_result) in parent_filter_result.child_results.iter().enumerate() From 8afe4710a2a901f6d2f032dfeac129237594c68d Mon Sep 17 00:00:00 2001 From: Huaijin Date: Thu, 5 Feb 2026 13:48:44 +0800 Subject: [PATCH 5/5] update comment --- datafusion/physical-plan/src/union.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 25eb5fd9895d5..4ebb8910faea1 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -383,7 +383,10 @@ impl ExecutionPlan for UnionExec { child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result>> { - // For non-Pre phase, use default behavior + // Pre phase: handle heterogeneous pushdown by wrapping individual + // children with FilterExec and reporting all filters as handled. + // Post phase: use default behavior to let the filter creator decide how to handle + // filters that weren't fully pushed down. if !matches!(phase, FilterPushdownPhase::Pre) { return Ok(FilterPushdownPropagation::if_all(child_pushdown_result)); }