Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1808,6 +1808,67 @@ fn test_filter_pushdown_through_union() {
);
}

#[test]
fn test_filter_pushdown_through_union_mixed_support() {
// Test case where one child supports filter pushdown and one doesn't
let scan1 = TestScanBuilder::new(schema()).with_support(true).build();
let scan2 = TestScanBuilder::new(schema()).with_support(false).build();

let union = UnionExec::try_new(vec![scan1, scan2]).unwrap();

let predicate = col_lit_predicate("a", "foo", &schema());
let plan = Arc::new(FilterExec::try_new(predicate, union).unwrap());

insta::assert_snapshot!(
OptimizationTest::new(plan, FilterPushdown::new(), true),
@r"
OptimizationTest:
input:
- FilterExec: a@0 = foo
- UnionExec
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
output:
Ok:
- UnionExec
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
- FilterExec: a@0 = foo
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
"
);
}

#[test]
fn test_filter_pushdown_through_union_does_not_support() {
// Test case where one child supports filter pushdown and one doesn't
let scan1 = TestScanBuilder::new(schema()).with_support(false).build();
let scan2 = TestScanBuilder::new(schema()).with_support(false).build();

let union = UnionExec::try_new(vec![scan1, scan2]).unwrap();

let predicate = col_lit_predicate("a", "foo", &schema());
let plan = Arc::new(FilterExec::try_new(predicate, union).unwrap());

insta::assert_snapshot!(
OptimizationTest::new(plan, FilterPushdown::new(), true),
@"
OptimizationTest:
input:
- FilterExec: a@0 = foo
- UnionExec
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
output:
Ok:
- UnionExec
- FilterExec: a@0 = foo
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
- FilterExec: a@0 = foo
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
"
);
}

/// Schema:
/// a: String
/// b: String
Expand Down
87 changes: 85 additions & 2 deletions datafusion/physical-plan/src/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ use crate::execution_plan::{
InvariantLevel, boundedness_from_children, check_default_invariants,
emission_type_from_children,
};
use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase};
use crate::filter::FilterExec;
use crate::filter_pushdown::{
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation, PushedDown,
};
use crate::metrics::BaselineMetrics;
use crate::projection::{ProjectionExec, make_with_child};
use crate::stream::ObservedStream;
Expand All @@ -49,7 +53,9 @@ use datafusion_common::{
Result, assert_or_internal_err, exec_err, internal_datafusion_err,
};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr, calculate_union};
use datafusion_physical_expr::{
EquivalenceProperties, PhysicalExpr, calculate_union, conjunction,
};

use futures::Stream;
use itertools::Itertools;
Expand Down Expand Up @@ -370,6 +376,83 @@ impl ExecutionPlan for UnionExec {
) -> Result<FilterDescription> {
FilterDescription::from_children(parent_filters, &self.children())
}

fn handle_child_pushdown_result(
&self,
phase: FilterPushdownPhase,
child_pushdown_result: ChildPushdownResult,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
// Pre phase: handle heterogeneous pushdown by wrapping individual
// children with FilterExec and reporting all filters as handled.
// Post phase: use default behavior to let the filter creator decide how to handle
// filters that weren't fully pushed down.
if !matches!(phase, FilterPushdownPhase::Pre) {
return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
}

// UnionExec needs specialized filter pushdown handling when children have
// heterogeneous pushdown support. Without this, when some children support
// pushdown and others don't, the default behavior would leave FilterExec
// above UnionExec, re-applying filters to outputs of all children—including
// those that already applied the filters via pushdown. This specialized
// implementation adds FilterExec only to children that don't support
// pushdown, avoiding redundant filtering and improving performance.
//
// Example: Given Child1 (no pushdown support) and Child2 (has pushdown support)
// Default behavior: This implementation:
// FilterExec UnionExec
// UnionExec FilterExec
// Child1 Child1
// Child2(filter) Child2(filter)

// Collect unsupported filters for each child
let mut unsupported_filters_per_child = vec![Vec::new(); self.inputs.len()];
for parent_filter_result in child_pushdown_result.parent_filters.iter() {
for (child_idx, &child_result) in
parent_filter_result.child_results.iter().enumerate()
{
if matches!(child_result, PushedDown::No) {
unsupported_filters_per_child[child_idx]
.push(Arc::clone(&parent_filter_result.filter));
}
}
}

// Wrap children that have unsupported filters with FilterExec
let mut new_children = self.inputs.clone();
for (child_idx, unsupported_filters) in
unsupported_filters_per_child.iter().enumerate()
{
if !unsupported_filters.is_empty() {
let combined_filter = conjunction(unsupported_filters.clone());
new_children[child_idx] = Arc::new(FilterExec::try_new(
combined_filter,
Arc::clone(&self.inputs[child_idx]),
)?);
}
}

// Check if any children were modified
let children_modified = new_children
.iter()
.zip(self.inputs.iter())
.any(|(new, old)| !Arc::ptr_eq(new, old));

let all_filters_pushed =
vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()];
let propagation = if children_modified {
let updated_node = UnionExec::try_new(new_children)?;
FilterPushdownPropagation::with_parent_pushdown_result(all_filters_pushed)
.with_updated_node(updated_node)
} else {
FilterPushdownPropagation::with_parent_pushdown_result(all_filters_pushed)
};

// Report all parent filters as supported since we've ensured they're applied
// on all children (either pushed down or via FilterExec)
Ok(propagation)
}
}

/// Combines multiple input streams by interleaving them.
Expand Down
63 changes: 63 additions & 0 deletions datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
Original file line number Diff line number Diff line change
Expand Up @@ -674,3 +674,66 @@ logical_plan
physical_plan
01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/array_data/data.parquet]]}, projection=[id, tags], file_type=parquet, predicate=id@0 > 1 AND array_has(tags@1, rust), pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 1, required_guarantees=[]

###
# Test filter pushdown through UNION with mixed support
# This tests the case where one child supports filter pushdown (parquet) and one doesn't (memory table)
###

# enable filter pushdown
statement ok
set datafusion.execution.parquet.pushdown_filters = true;

statement ok
set datafusion.optimizer.max_passes = 0;

# Create memory table with matching schema (a: VARCHAR, b: BIGINT)
statement ok
CREATE TABLE t_union_mem(a VARCHAR, b BIGINT) AS VALUES ('qux', 4), ('quux', 5);

# Create parquet table with matching schema
statement ok
CREATE EXTERNAL TABLE t_union_parquet(a VARCHAR, b BIGINT) STORED AS PARQUET
LOCATION 'test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet';

# Query results combining memory table and Parquet with filter
query I rowsort
SELECT b FROM (
SELECT a, b FROM t_union_mem
UNION ALL
SELECT a, b FROM t_union_parquet
) WHERE b > 2;
----
3
4
5
50

# Explain the union query - filter should be pushed to parquet but not memory table
query TT
EXPLAIN SELECT b FROM (
SELECT a, b FROM t_union_mem
UNION ALL
SELECT a, b FROM t_union_parquet
) WHERE b > 2;
----
logical_plan
01)Projection: b
02)--Filter: b > Int64(2)
03)----Union
04)------Projection: t_union_mem.a, t_union_mem.b
05)--------TableScan: t_union_mem
06)------Projection: t_union_parquet.a, t_union_parquet.b
07)--------TableScan: t_union_parquet
physical_plan
01)UnionExec
02)--FilterExec: b@0 > 2
03)----DataSourceExec: partitions=1, partition_sizes=[1]
04)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet]]}, projection=[b], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[]

# Clean up union test tables
statement ok
DROP TABLE t_union_mem;

statement ok
DROP TABLE t_union_parquet;