Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions datafusion/core/benches/sql_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ fn create_table_provider(column_prefix: &str, num_columns: usize) -> Arc<MemTabl
.unwrap()
}

/// Create a table provider with a struct column: `id` (Int32) and `props` (Struct { value: Int32, label: Utf8 })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

fn create_struct_table_provider() -> Arc<MemTable> {
let struct_fields = Fields::from(vec![
Field::new("value", DataType::Int32, true),
Field::new("label", DataType::Utf8, true),
]);
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, true),
Field::new("props", DataType::Struct(struct_fields), true),
]));
MemTable::try_new(schema, vec![vec![]])
.map(Arc::new)
.unwrap()
}

fn create_context() -> SessionContext {
let ctx = SessionContext::new();
ctx.register_table("t1", create_table_provider("a", 200))
Expand All @@ -88,6 +103,10 @@ fn create_context() -> SessionContext {
.unwrap();
ctx.register_table("t1000", create_table_provider("d", 1000))
.unwrap();
ctx.register_table("struct_t1", create_struct_table_provider())
.unwrap();
ctx.register_table("struct_t2", create_struct_table_provider())
.unwrap();
ctx
}

Expand Down Expand Up @@ -424,6 +443,25 @@ fn criterion_benchmark(c: &mut Criterion) {
});
});

let struct_agg_sort_query = "SELECT \
struct_t1.props['label'], \
SUM(struct_t1.props['value']), \
MAX(struct_t2.props['value']), \
COUNT(*) \
FROM struct_t1 \
JOIN struct_t2 ON struct_t1.id = struct_t2.id \
WHERE struct_t1.props['value'] > 50 \
GROUP BY struct_t1.props['label'] \
ORDER BY SUM(struct_t1.props['value']) DESC";

// -- Struct column benchmarks --
c.bench_function("logical_plan_struct_join_agg_sort", |b| {
b.iter(|| logical_plan(&ctx, &rt, struct_agg_sort_query))
});
c.bench_function("physical_plan_struct_join_agg_sort", |b| {
b.iter(|| physical_plan(&ctx, &rt, struct_agg_sort_query))
});

// -- Sorted Queries --
// 100, 200 && 300 is taking too long - https://github.com/apache/datafusion/issues/18366
// Logical Plan for datatype Int64 and UInt64 differs, UInt64 Logical Plan's Union are wrapped
Expand Down
256 changes: 255 additions & 1 deletion datafusion/sqllogictest/test_files/projection_pushdown.slt
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,24 @@ SELECT id, s['value'] FROM simple_struct ORDER BY id;
4 300
5 250

query TT
EXPLAIN SELECT s['label'] FROM simple_struct;
----
logical_plan
01)Projection: get_field(simple_struct.s, Utf8("label"))
02)--TableScan: simple_struct projection=[s]
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]}, projection=[get_field(s@1, label) as simple_struct.s[label]], file_type=parquet
Copy link
Contributor

@comphead comphead Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so projection=[get_field(s@1, label) means the we extract label field from s structure column as earlier as possible, on the scan level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep! In this case it "just works" because there's no filter, etc. in the way


# Verify correctness
query T
SELECT s['label'] FROM simple_struct ORDER BY s['label'];
----
alpha
beta
delta
epsilon
gamma

###
# Test 2.2: Multiple get_field expressions
###
Expand Down Expand Up @@ -1339,7 +1357,240 @@ SELECT id, s['value'] FROM simple_struct ORDER BY id, s['value'];
5 250

#####################
# Section 12: Cleanup
# Section 12: Join Tests - get_field Extraction from Join Nodes
#####################

# Create a second table for join tests
statement ok
COPY (
SELECT
column1 as id,
column2 as s
FROM VALUES
(1, {role: 'admin', level: 10}),
(2, {role: 'user', level: 5}),
(3, {role: 'guest', level: 1}),
(4, {role: 'admin', level: 8}),
(5, {role: 'user', level: 3})
) TO 'test_files/scratch/projection_pushdown/join_right.parquet'
STORED AS PARQUET;

statement ok
CREATE EXTERNAL TABLE join_right STORED AS PARQUET
LOCATION 'test_files/scratch/projection_pushdown/join_right.parquet';

###
# Test 12.1: Join with get_field in equijoin condition
# Tests extraction from join ON clause - get_field on each side routed appropriately
###

query TT
EXPLAIN SELECT simple_struct.id, join_right.id
FROM simple_struct
INNER JOIN join_right ON simple_struct.s['value'] = join_right.s['level'] * 10;
----
logical_plan
01)Projection: simple_struct.id, join_right.id
02)--Inner Join: get_field(simple_struct.s, Utf8("value")) = get_field(join_right.s, Utf8("level")) * Int64(10)
03)----TableScan: simple_struct projection=[id, s]
04)----TableScan: join_right projection=[id, s]
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(simple_struct.s[value]@2, join_right.s[level] * Int64(10)@2)], projection=[id@0, id@3]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]}, projection=[id, s, get_field(s@1, value) as simple_struct.s[value]], file_type=parquet
03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[id, s, get_field(s@1, level) * 10 as join_right.s[level] * Int64(10)], file_type=parquet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 its outside of this PR but

get_field(s@1, level) * 10 as join_right.s[level] * Int64(10)

looks confusing, I would expect just

get_field(s@1, level) * 10

or we wanna preserve projection column generated names then

get_field(s@1, level) * 10 as [join_right.s[level] * Int64(10)]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually surprised this is getting pushed down into the scan here. I'm not sure what would cause that. It's not a bad thing but maybe we can evaluate if we should have the aliases there or not when we change this next time.


# Verify correctness - value = level * 10
# simple_struct: (1,100), (2,200), (3,150), (4,300), (5,250)
# join_right: (1,10), (2,5), (3,1), (4,8), (5,3)
# Matches: simple_struct.value=100 matches join_right.level*10=100 (level=10, id=1)
query II
SELECT simple_struct.id, join_right.id
FROM simple_struct
INNER JOIN join_right ON simple_struct.s['value'] = join_right.s['level'] * 10
ORDER BY simple_struct.id;
----
1 1

###
# Test 12.2: Join with get_field in non-equi filter
# Tests extraction from join filter expression - left side only
###

query TT
EXPLAIN SELECT simple_struct.id, join_right.id
FROM simple_struct
INNER JOIN join_right ON simple_struct.id = join_right.id
WHERE simple_struct.s['value'] > 150;
----
logical_plan
01)Inner Join: simple_struct.id = join_right.id
02)--Projection: simple_struct.id
03)----Filter: get_field(simple_struct.s, Utf8("value")) > Int64(150)
04)------TableScan: simple_struct projection=[id, s], partial_filters=[get_field(simple_struct.s, Utf8("value")) > Int64(150)]
05)--TableScan: join_right projection=[id]
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)]
02)--FilterExec: get_field(s@1, value) > 150, projection=[id@0]
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]}, projection=[id, s], file_type=parquet
04)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ]

# Verify correctness - id matches and value > 150
query II
SELECT simple_struct.id, join_right.id
FROM simple_struct
INNER JOIN join_right ON simple_struct.id = join_right.id
WHERE simple_struct.s['value'] > 150
ORDER BY simple_struct.id;
----
2 2
4 4
5 5

###
# Test 12.3: Join with get_field from both sides in filter
# Tests extraction routing to both left and right inputs
###

query TT
EXPLAIN SELECT simple_struct.id, join_right.id
FROM simple_struct
INNER JOIN join_right ON simple_struct.id = join_right.id
WHERE simple_struct.s['value'] > 100 AND join_right.s['level'] > 3;
----
logical_plan
01)Inner Join: simple_struct.id = join_right.id
02)--Projection: simple_struct.id
03)----Filter: get_field(simple_struct.s, Utf8("value")) > Int64(100)
04)------TableScan: simple_struct projection=[id, s], partial_filters=[get_field(simple_struct.s, Utf8("value")) > Int64(100)]
05)--Projection: join_right.id
06)----Filter: get_field(join_right.s, Utf8("level")) > Int64(3)
07)------TableScan: join_right projection=[id, s], partial_filters=[get_field(join_right.s, Utf8("level")) > Int64(3)]
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)]
02)--FilterExec: get_field(s@1, value) > 100, projection=[id@0]
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]}, projection=[id, s], file_type=parquet
04)--FilterExec: get_field(s@1, level) > 3, projection=[id@0]
05)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[id, s], file_type=parquet, predicate=DynamicFilter [ empty ]

# Verify correctness - id matches, value > 100, and level > 3
# Matching ids where value > 100: 2(200), 3(150), 4(300), 5(250)
# Of those, level > 3: 2(5), 4(8), 5(3) -> only 2 and 4
query II
SELECT simple_struct.id, join_right.id
FROM simple_struct
INNER JOIN join_right ON simple_struct.id = join_right.id
WHERE simple_struct.s['value'] > 100 AND join_right.s['level'] > 3
ORDER BY simple_struct.id;
----
2 2
4 4

###
# Test 12.4: Join with get_field in SELECT projection
# Tests that get_field in output columns pushes down through the join
###

query TT
EXPLAIN SELECT simple_struct.id, simple_struct.s['label'], join_right.s['role']
FROM simple_struct
INNER JOIN join_right ON simple_struct.id = join_right.id;
----
logical_plan
01)Projection: simple_struct.id, get_field(simple_struct.s, Utf8("label")), get_field(join_right.s, Utf8("role"))
02)--Inner Join: simple_struct.id = join_right.id
03)----TableScan: simple_struct projection=[id, s]
04)----TableScan: join_right projection=[id, s]
physical_plan
01)ProjectionExec: expr=[id@0 as id, get_field(s@1, label) as simple_struct.s[label], get_field(s@2, role) as join_right.s[role]]
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[id@0, s@1, s@3]
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]}, projection=[id, s], file_type=parquet
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[id, s], file_type=parquet, predicate=DynamicFilter [ empty ]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we expect get_field pushdown here? or it is cheaper to bring the entire structure if many columns requested?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we want pushdown here, it just doesn't work with the current status quo


# Verify correctness
query ITT
SELECT simple_struct.id, simple_struct.s['label'], join_right.s['role']
FROM simple_struct
INNER JOIN join_right ON simple_struct.id = join_right.id
ORDER BY simple_struct.id;
----
1 alpha admin
2 beta user
3 gamma guest
4 delta admin
5 epsilon user

###
# Test 12.5: Join without get_field (baseline - no extraction needed)
# Verifies no unnecessary projections are added when there's nothing to extract
###

query TT
EXPLAIN SELECT simple_struct.id, join_right.id
FROM simple_struct
INNER JOIN join_right ON simple_struct.id = join_right.id;
----
logical_plan
01)Inner Join: simple_struct.id = join_right.id
02)--TableScan: simple_struct projection=[id]
03)--TableScan: join_right projection=[id]
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]}, projection=[id], file_type=parquet
03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ]

# Verify correctness
query II
SELECT simple_struct.id, join_right.id
FROM simple_struct
INNER JOIN join_right ON simple_struct.id = join_right.id
ORDER BY simple_struct.id;
----
1 1
2 2
3 3
4 4
5 5

###
# Test 12.6: Left Join with get_field extraction
# Tests extraction works correctly with outer joins
###

query TT
EXPLAIN SELECT simple_struct.id, simple_struct.s['value'], join_right.s['level']
FROM simple_struct
LEFT JOIN join_right ON simple_struct.id = join_right.id AND join_right.s['level'] > 5;
----
logical_plan
01)Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value")), get_field(join_right.s, Utf8("level"))
02)--Left Join: simple_struct.id = join_right.id
03)----TableScan: simple_struct projection=[id, s]
04)----Filter: get_field(join_right.s, Utf8("level")) > Int64(5)
05)------TableScan: join_right projection=[id, s], partial_filters=[get_field(join_right.s, Utf8("level")) > Int64(5)]
physical_plan
01)ProjectionExec: expr=[id@1 as id, get_field(s@2, value) as simple_struct.s[value], get_field(s@0, level) as join_right.s[level]]
02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(id@0, id@0)], projection=[s@1, id@2, s@3]
03)----FilterExec: get_field(s@1, level) > 5
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[id, s], file_type=parquet
05)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]}, projection=[id, s], file_type=parquet

# Verify correctness - left join with level > 5 condition
# Only join_right rows with level > 5 are matched: id=1 (level=10), id=4 (level=8)
query III
SELECT simple_struct.id, simple_struct.s['value'], join_right.s['level']
FROM simple_struct
LEFT JOIN join_right ON simple_struct.id = join_right.id AND join_right.s['level'] > 5
ORDER BY simple_struct.id;
----
1 100 10
2 200 NULL
3 150 NULL
4 300 8
5 250 NULL


#####################
# Section 13: Cleanup
#####################

statement ok
Expand All @@ -1353,3 +1604,6 @@ DROP TABLE nullable_struct;

statement ok
DROP TABLE multi_struct;

statement ok
DROP TABLE join_right;