diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index 31a21274ad62..b3ed8d9653fe 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -1808,6 +1808,67 @@ fn test_filter_pushdown_through_union() { ); } +#[test] +fn test_filter_pushdown_through_union_mixed_support() { + // Test case where one child supports filter pushdown and one doesn't + let scan1 = TestScanBuilder::new(schema()).with_support(true).build(); + let scan2 = TestScanBuilder::new(schema()).with_support(false).build(); + + let union = UnionExec::try_new(vec![scan1, scan2]).unwrap(); + + let predicate = col_lit_predicate("a", "foo", &schema()); + let plan = Arc::new(FilterExec::try_new(predicate, union).unwrap()); + + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown::new(), true), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - UnionExec + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + output: + Ok: + - UnionExec + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + " + ); +} + +#[test] +fn test_filter_pushdown_through_union_does_not_support() { + // Test case where one child supports filter pushdown and one doesn't + let scan1 = TestScanBuilder::new(schema()).with_support(false).build(); + let scan2 = TestScanBuilder::new(schema()).with_support(false).build(); + + let union = UnionExec::try_new(vec![scan1, scan2]).unwrap(); + + let predicate = col_lit_predicate("a", "foo", &schema()); + let plan = Arc::new(FilterExec::try_new(predicate, union).unwrap()); + + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown::new(), true), + @" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - UnionExec + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + output: + Ok: + - UnionExec + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + " + ); +} + /// Schema: /// a: String /// b: String diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index b6f943886e30..4ebb8910faea 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -36,7 +36,11 @@ use crate::execution_plan::{ InvariantLevel, boundedness_from_children, check_default_invariants, emission_type_from_children, }; -use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase}; +use crate::filter::FilterExec; +use crate::filter_pushdown::{ + ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, PushedDown, +}; use crate::metrics::BaselineMetrics; use crate::projection::{ProjectionExec, make_with_child}; use crate::stream::ObservedStream; @@ -49,7 +53,9 @@ use datafusion_common::{ Result, assert_or_internal_err, exec_err, internal_datafusion_err, }; use datafusion_execution::TaskContext; -use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr, calculate_union}; +use datafusion_physical_expr::{ + EquivalenceProperties, PhysicalExpr, calculate_union, conjunction, +}; use futures::Stream; use itertools::Itertools; @@ -370,6 +376,83 @@ impl ExecutionPlan for UnionExec { ) -> Result { FilterDescription::from_children(parent_filters, &self.children()) } + + fn handle_child_pushdown_result( + &self, + phase: FilterPushdownPhase, + child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result>> { + // Pre phase: handle heterogeneous pushdown by wrapping individual + // children with FilterExec and reporting all filters as handled. + // Post phase: use default behavior to let the filter creator decide how to handle + // filters that weren't fully pushed down. + if !matches!(phase, FilterPushdownPhase::Pre) { + return Ok(FilterPushdownPropagation::if_all(child_pushdown_result)); + } + + // UnionExec needs specialized filter pushdown handling when children have + // heterogeneous pushdown support. Without this, when some children support + // pushdown and others don't, the default behavior would leave FilterExec + // above UnionExec, re-applying filters to outputs of all children—including + // those that already applied the filters via pushdown. This specialized + // implementation adds FilterExec only to children that don't support + // pushdown, avoiding redundant filtering and improving performance. + // + // Example: Given Child1 (no pushdown support) and Child2 (has pushdown support) + // Default behavior: This implementation: + // FilterExec UnionExec + // UnionExec FilterExec + // Child1 Child1 + // Child2(filter) Child2(filter) + + // Collect unsupported filters for each child + let mut unsupported_filters_per_child = vec![Vec::new(); self.inputs.len()]; + for parent_filter_result in child_pushdown_result.parent_filters.iter() { + for (child_idx, &child_result) in + parent_filter_result.child_results.iter().enumerate() + { + if matches!(child_result, PushedDown::No) { + unsupported_filters_per_child[child_idx] + .push(Arc::clone(&parent_filter_result.filter)); + } + } + } + + // Wrap children that have unsupported filters with FilterExec + let mut new_children = self.inputs.clone(); + for (child_idx, unsupported_filters) in + unsupported_filters_per_child.iter().enumerate() + { + if !unsupported_filters.is_empty() { + let combined_filter = conjunction(unsupported_filters.clone()); + new_children[child_idx] = Arc::new(FilterExec::try_new( + combined_filter, + Arc::clone(&self.inputs[child_idx]), + )?); + } + } + + // Check if any children were modified + let children_modified = new_children + .iter() + .zip(self.inputs.iter()) + .any(|(new, old)| !Arc::ptr_eq(new, old)); + + let all_filters_pushed = + vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()]; + let propagation = if children_modified { + let updated_node = UnionExec::try_new(new_children)?; + FilterPushdownPropagation::with_parent_pushdown_result(all_filters_pushed) + .with_updated_node(updated_node) + } else { + FilterPushdownPropagation::with_parent_pushdown_result(all_filters_pushed) + }; + + // Report all parent filters as supported since we've ensured they're applied + // on all children (either pushed down or via FilterExec) + Ok(propagation) + } } /// Combines multiple input streams by interleaving them. diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index aa94e2e2f2c0..e2473ee328e5 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -674,3 +674,66 @@ logical_plan physical_plan 01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/array_data/data.parquet]]}, projection=[id, tags], file_type=parquet, predicate=id@0 > 1 AND array_has(tags@1, rust), pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 1, required_guarantees=[] + +### +# Test filter pushdown through UNION with mixed support +# This tests the case where one child supports filter pushdown (parquet) and one doesn't (memory table) +### + +# enable filter pushdown +statement ok +set datafusion.execution.parquet.pushdown_filters = true; + +statement ok +set datafusion.optimizer.max_passes = 0; + +# Create memory table with matching schema (a: VARCHAR, b: BIGINT) +statement ok +CREATE TABLE t_union_mem(a VARCHAR, b BIGINT) AS VALUES ('qux', 4), ('quux', 5); + +# Create parquet table with matching schema +statement ok +CREATE EXTERNAL TABLE t_union_parquet(a VARCHAR, b BIGINT) STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet'; + +# Query results combining memory table and Parquet with filter +query I rowsort +SELECT b FROM ( + SELECT a, b FROM t_union_mem + UNION ALL + SELECT a, b FROM t_union_parquet +) WHERE b > 2; +---- +3 +4 +5 +50 + +# Explain the union query - filter should be pushed to parquet but not memory table +query TT +EXPLAIN SELECT b FROM ( + SELECT a, b FROM t_union_mem + UNION ALL + SELECT a, b FROM t_union_parquet +) WHERE b > 2; +---- +logical_plan +01)Projection: b +02)--Filter: b > Int64(2) +03)----Union +04)------Projection: t_union_mem.a, t_union_mem.b +05)--------TableScan: t_union_mem +06)------Projection: t_union_parquet.a, t_union_parquet.b +07)--------TableScan: t_union_parquet +physical_plan +01)UnionExec +02)--FilterExec: b@0 > 2 +03)----DataSourceExec: partitions=1, partition_sizes=[1] +04)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet]]}, projection=[b], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[] + +# Clean up union test tables +statement ok +DROP TABLE t_union_mem; + +statement ok +DROP TABLE t_union_parquet;