From 0fd0fa215b5323ca28f3763199cb7c611dbe7fd5 Mon Sep 17 00:00:00 2001 From: Denis Semenov Date: Thu, 5 Feb 2026 15:00:38 +0300 Subject: [PATCH] feat: support placeholders in execution plans Introduces `PlaceholderExpr`, allowing placeholder parameters to be preserved in the physical plan. Previously, placeholders had to be resolved to literals before physical planning. --- .../physical-expr/src/expressions/mod.rs | 2 + .../src/expressions/placeholder.rs | 126 +++++ datafusion/physical-expr/src/planner.rs | 46 +- .../src/simplifier/const_evaluator.rs | 7 +- datafusion/proto/proto/datafusion.proto | 7 + datafusion/proto/src/generated/pbjson.rs | 122 +++++ datafusion/proto/src/generated/prost.rs | 11 +- .../proto/src/physical_plan/from_proto.rs | 11 +- .../proto/src/physical_plan/to_proto.rs | 17 +- .../sqllogictest/test_files/placeholders.slt | 506 ++++++++++++++++++ 10 files changed, 835 insertions(+), 20 deletions(-) create mode 100644 datafusion/physical-expr/src/expressions/placeholder.rs create mode 100644 datafusion/sqllogictest/test_files/placeholders.slt diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index c9e02708d6c28..34314f1c80d5f 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -32,6 +32,7 @@ mod literal; mod negative; mod no_op; mod not; +mod placeholder; mod try_cast; mod unknown_column; @@ -54,5 +55,6 @@ pub use literal::{Literal, lit}; pub use negative::{NegativeExpr, negative}; pub use no_op::NoOp; pub use not::{NotExpr, not}; +pub use placeholder::{PlaceholderExpr, has_placeholders, placeholder}; pub use try_cast::{TryCastExpr, try_cast}; pub use unknown_column::UnKnownColumn; diff --git a/datafusion/physical-expr/src/expressions/placeholder.rs b/datafusion/physical-expr/src/expressions/placeholder.rs new file mode 100644 index 0000000000000..9149517a0a57a --- /dev/null +++ b/datafusion/physical-expr/src/expressions/placeholder.rs @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Placeholder expression. + +use std::{ + any::Any, + fmt::{self, Formatter}, + sync::Arc, +}; + +use arrow::{ + array::RecordBatch, + datatypes::{DataType, Field, FieldRef, Schema}, +}; +use datafusion_common::{ + DataFusionError, Result, exec_datafusion_err, tree_node::TreeNode, +}; +use datafusion_expr::ColumnarValue; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use std::hash::Hash; + +/// Physical expression representing a placeholder parameter (e.g., $1, $2, or named parameters) in +/// the physical plan. +/// +/// This expression serves as a placeholder that will be resolved to a literal value during +/// execution. It should not be evaluated directly. +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct PlaceholderExpr { + /// Placeholder id, e.g. $1 or $a. + pub id: String, + /// Derived from expression where placeholder is met. + pub field: Option, +} + +impl PlaceholderExpr { + /// Create a new placeholder expression. + pub fn new(id: String, data_type: DataType) -> Self { + let field = Arc::new(Field::new("", data_type, true)); + Self::new_with_field(id, field) + } + + /// Create a new placeholders expression from a field. + pub fn new_with_field(id: String, field: FieldRef) -> Self { + Self { + id, + field: Some(field), + } + } + + /// Create a new placeholder expression without a specified data type. + pub fn new_without_data_type(id: String) -> Self { + Self { id, field: None } + } + + fn execution_error(&self) -> DataFusionError { + exec_datafusion_err!( + "Placeholder '{}' was not provided a value for execution.", + self.id + ) + } +} + +/// Create a placeholder expression. +pub fn placeholder>(id: I, data_type: DataType) -> Arc { + Arc::new(PlaceholderExpr::new(id.into(), data_type)) +} + +/// Returns `true` if expression has placeholders. +pub fn has_placeholders(expr: &Arc) -> bool { + expr.exists(|e| Ok(e.as_any().is::())) + .expect("do not return errors") +} + +impl fmt::Display for PlaceholderExpr { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "{}", self.id) + } +} + +impl PhysicalExpr for PlaceholderExpr { + fn as_any(&self) -> &dyn Any { + self + } + + fn return_field(&self, _input_schema: &Schema) -> Result { + self.field + .as_ref() + .map(Arc::clone) + .ok_or_else(|| self.execution_error()) + } + + fn evaluate(&self, _batch: &RecordBatch) -> Result { + Err(self.execution_error()) + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + assert!(children.is_empty()); + Ok(self) + } + + fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(self, f) + } +} diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index 84a6aa4309872..cf0143e863ed2 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -20,10 +20,10 @@ use std::sync::Arc; use crate::ScalarFunctionExpr; use crate::{ PhysicalExpr, - expressions::{self, Column, Literal, binary, like, similar_to}, + expressions::{self, Column, Literal, PlaceholderExpr, binary, like, similar_to}, }; -use arrow::datatypes::Schema; +use arrow::datatypes::{DataType, Schema}; use datafusion_common::config::ConfigOptions; use datafusion_common::metadata::FieldMetadata; use datafusion_common::{ @@ -288,16 +288,28 @@ pub fn create_physical_expr( }; Ok(expressions::case(expr, when_then_expr, else_expr)?) } - Expr::Cast(Cast { expr, data_type }) => expressions::cast( - create_physical_expr(expr, input_dfschema, execution_props)?, - input_schema, - data_type.clone(), - ), - Expr::TryCast(TryCast { expr, data_type }) => expressions::try_cast( - create_physical_expr(expr, input_dfschema, execution_props)?, - input_schema, - data_type.clone(), - ), + Expr::Cast(Cast { expr, data_type }) => { + let mut expr = create_physical_expr(expr, input_dfschema, execution_props)?; + if let Some(placeholder) = expr.as_any().downcast_ref::() + && placeholder.field.is_none() + { + expr = expressions::placeholder(&placeholder.id, data_type.clone()); + } + + expressions::cast(expr, input_schema, data_type.clone()) + } + Expr::TryCast(TryCast { expr, data_type }) => { + let mut expr = create_physical_expr(expr, input_dfschema, execution_props)?; + if let Some(placeholder) = expr.as_any().downcast_ref::() + && placeholder.field.is_none() + { + // To maintain try_cast behavior, we initially resolve the placeholder with the + // Utf8 data type. + expr = expressions::placeholder(&placeholder.id, DataType::Utf8); + } + + expressions::try_cast(expr, input_schema, data_type.clone()) + } Expr::Not(expr) => { expressions::not(create_physical_expr(expr, input_dfschema, execution_props)?) } @@ -381,9 +393,13 @@ pub fn create_physical_expr( expressions::in_list(value_expr, list_exprs, negated, input_schema) } }, - Expr::Placeholder(Placeholder { id, .. }) => { - exec_err!("Placeholder '{id}' was not provided a value for execution.") - } + Expr::Placeholder(Placeholder { id, field }) => match field { + Some(field) => Ok(Arc::new(PlaceholderExpr::new_with_field( + id.clone(), + Arc::clone(field), + ))), + None => Ok(Arc::new(PlaceholderExpr::new_without_data_type(id.clone()))), + }, other => { not_impl_err!("Physical plan does not support logical expression {other:?}") } diff --git a/datafusion/physical-expr/src/simplifier/const_evaluator.rs b/datafusion/physical-expr/src/simplifier/const_evaluator.rs index 1e62e47ce2066..f216cd6d4c675 100644 --- a/datafusion/physical-expr/src/simplifier/const_evaluator.rs +++ b/datafusion/physical-expr/src/simplifier/const_evaluator.rs @@ -27,7 +27,7 @@ use datafusion_common::{Result, ScalarValue}; use datafusion_expr_common::columnar_value::ColumnarValue; use crate::PhysicalExpr; -use crate::expressions::{Column, Literal}; +use crate::expressions::{Column, Literal, PlaceholderExpr}; /// Simplify expressions that consist only of literals by evaluating them. /// @@ -81,7 +81,10 @@ fn can_evaluate_as_constant(expr: &Arc) -> bool { let mut can_evaluate = true; expr.apply(|e| { - if e.as_any().is::() || e.is_volatile_node() { + if e.as_any().is::() + || e.is_volatile_node() + || e.as_any().is::() + { can_evaluate = false; Ok(TreeNodeRecursion::Stop) } else { diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 67c6d5ae1671c..4ace20fafb392 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -896,6 +896,8 @@ message PhysicalExprNode { UnknownColumn unknown_column = 20; PhysicalHashExprNode hash_expr = 21; + + PhysicalPlaceholderNode placeholder_expr = 22; } } @@ -1023,6 +1025,11 @@ message PhysicalHashExprNode { string description = 6; } +message PhysicalPlaceholderNode { + string id = 1; + optional datafusion_common.Field field = 2; +} + message FilterExecNode { PhysicalPlanNode input = 1; PhysicalExprNode expr = 2; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index b77060394feba..ef2eb3a56d8d3 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -16212,6 +16212,9 @@ impl serde::Serialize for PhysicalExprNode { physical_expr_node::ExprType::HashExpr(v) => { struct_ser.serialize_field("hashExpr", v)?; } + physical_expr_node::ExprType::PlaceholderExpr(v) => { + struct_ser.serialize_field("placeholderExpr", v)?; + } } } struct_ser.end() @@ -16258,6 +16261,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "unknownColumn", "hash_expr", "hashExpr", + "placeholder_expr", + "placeholderExpr", ]; #[allow(clippy::enum_variant_names)] @@ -16282,6 +16287,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { Extension, UnknownColumn, HashExpr, + PlaceholderExpr, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -16323,6 +16329,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "extension" => Ok(GeneratedField::Extension), "unknownColumn" | "unknown_column" => Ok(GeneratedField::UnknownColumn), "hashExpr" | "hash_expr" => Ok(GeneratedField::HashExpr), + "placeholderExpr" | "placeholder_expr" => Ok(GeneratedField::PlaceholderExpr), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -16485,6 +16492,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { return Err(serde::de::Error::duplicate_field("hashExpr")); } expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::HashExpr) +; + } + GeneratedField::PlaceholderExpr => { + if expr_type__.is_some() { + return Err(serde::de::Error::duplicate_field("placeholderExpr")); + } + expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::PlaceholderExpr) ; } } @@ -17661,6 +17675,114 @@ impl<'de> serde::Deserialize<'de> for PhysicalNot { deserializer.deserialize_struct("datafusion.PhysicalNot", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for PhysicalPlaceholderNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.id.is_empty() { + len += 1; + } + if self.field.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalPlaceholderNode", len)?; + if !self.id.is_empty() { + struct_ser.serialize_field("id", &self.id)?; + } + if let Some(v) = self.field.as_ref() { + struct_ser.serialize_field("field", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PhysicalPlaceholderNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "id", + "field", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Id, + Field, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "id" => Ok(GeneratedField::Id), + "field" => Ok(GeneratedField::Field), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PhysicalPlaceholderNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PhysicalPlaceholderNode") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut id__ = None; + let mut field__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Id => { + if id__.is_some() { + return Err(serde::de::Error::duplicate_field("id")); + } + id__ = Some(map_.next_value()?); + } + GeneratedField::Field => { + if field__.is_some() { + return Err(serde::de::Error::duplicate_field("field")); + } + field__ = map_.next_value()?; + } + } + } + Ok(PhysicalPlaceholderNode { + id: id__.unwrap_or_default(), + field: field__, + }) + } + } + deserializer.deserialize_struct("datafusion.PhysicalPlaceholderNode", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PhysicalPlanNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index e95cddcc2c612..8c59379b001c2 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1292,7 +1292,7 @@ pub struct PhysicalExprNode { pub expr_id: ::core::option::Option, #[prost( oneof = "physical_expr_node::ExprType", - tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21" + tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21, 22" )] pub expr_type: ::core::option::Option, } @@ -1345,6 +1345,8 @@ pub mod physical_expr_node { UnknownColumn(super::UnknownColumn), #[prost(message, tag = "21")] HashExpr(super::PhysicalHashExprNode), + #[prost(message, tag = "22")] + PlaceholderExpr(super::PhysicalPlaceholderNode), } } #[derive(Clone, PartialEq, ::prost::Message)] @@ -1550,6 +1552,13 @@ pub struct PhysicalHashExprNode { pub description: ::prost::alloc::string::String, } #[derive(Clone, PartialEq, ::prost::Message)] +pub struct PhysicalPlaceholderNode { + #[prost(string, tag = "1")] + pub id: ::prost::alloc::string::String, + #[prost(message, optional, tag = "2")] + pub field: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] pub struct FilterExecNode { #[prost(message, optional, boxed, tag = "1")] pub input: ::core::option::Option<::prost::alloc::boxed::Box>, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index e424be162648b..2bec45b380fa7 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -42,7 +42,7 @@ use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs}; use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr}; use datafusion_physical_plan::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr, Literal, - NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, in_list, + NegativeExpr, NotExpr, PlaceholderExpr, TryCastExpr, UnKnownColumn, in_list, }; use datafusion_physical_plan::joins::{HashExpr, SeededRandomState}; use datafusion_physical_plan::windows::{create_window_expr, schema_add_window_field}; @@ -495,6 +495,15 @@ pub fn parse_physical_expr_with_converter( hash_expr.description.clone(), )) } + ExprType::PlaceholderExpr(placeholder_expr) => match placeholder_expr.field { + Some(ref field) => Arc::new(PlaceholderExpr::new_with_field( + placeholder_expr.id.clone(), + Arc::new(field.try_into()?), + )), + None => Arc::new(PlaceholderExpr::new_without_data_type( + placeholder_expr.id.clone(), + )), + }, ExprType::Extension(extension) => { let inputs: Vec> = extension .inputs diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index a38e59acdab26..33e7d47fabee0 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -37,7 +37,8 @@ use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, - LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, + LikeExpr, Literal, NegativeExpr, NotExpr, PlaceholderExpr, TryCastExpr, + UnKnownColumn, }; use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr}; use datafusion_physical_plan::udaf::AggregateFunctionExpr; @@ -507,6 +508,20 @@ pub fn serialize_physical_expr_with_converter( }, )), }) + } else if let Some(expr) = expr.downcast_ref::() { + Ok(protobuf::PhysicalExprNode { + expr_id: None, + expr_type: Some(protobuf::physical_expr_node::ExprType::PlaceholderExpr( + protobuf::PhysicalPlaceholderNode { + id: expr.id.clone(), + field: expr + .field + .as_ref() + .map(|f| f.as_ref().try_into()) + .transpose()?, + }, + )), + }) } else { let mut buf: Vec = vec![]; match codec.try_encode_expr(&value, &mut buf) { diff --git a/datafusion/sqllogictest/test_files/placeholders.slt b/datafusion/sqllogictest/test_files/placeholders.slt new file mode 100644 index 0000000000000..fe2b748dbb810 --- /dev/null +++ b/datafusion/sqllogictest/test_files/placeholders.slt @@ -0,0 +1,506 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +########## +## Test physical plans with placeholders. +########## + +statement ok +CREATE TABLE t1( + id INT, + name TEXT +) as VALUES + (1, 'Alex'), + (2, 'Bob'), + (3, 'Alice') + +# Filter with multiple placeholders +query TT +EXPLAIN SELECT id FROM t1 WHERE name = $1 OR id = $2 +---- +logical_plan +01)Projection: t1.id +02)--Filter: t1.name = $1 OR t1.id = $2 +03)----TableScan: t1 projection=[id, name] +physical_plan +01)FilterExec: name@1 = $1 OR id@0 = $2, projection=[id@0] +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +# Projection with placeholder +query TT +EXPLAIN SELECT id + $1 FROM t1 +---- +logical_plan +01)Projection: t1.id + $1 +02)--TableScan: t1 projection=[id] +physical_plan +01)ProjectionExec: expr=[id@0 + $1 as t1.id + $1] +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +# Projection and filter with placeholders +query TT +EXPLAIN SELECT id + $1 FROM t1 WHERE name = $2 +---- +logical_plan +01)Projection: t1.id + $1 +02)--Filter: t1.name = $2 +03)----TableScan: t1 projection=[id, name] +physical_plan +01)ProjectionExec: expr=[id@0 + $1 as t1.id + $1] +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----FilterExec: name@1 = $2 +04)------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +DROP TABLE t1 + +statement ok +CREATE EXTERNAL TABLE agg_order ( +c1 INT NOT NULL, +c2 INT NOT NULL, +c3 INT NOT NULL +) +STORED AS CSV +LOCATION '../core/tests/data/aggregate_agg_multi_order.csv' +OPTIONS ('format.has_header' 'true'); + +# Aggregate with placeholder in expression and ordering +query TT +EXPLAIN SELECT array_agg(c1 + $1 ORDER BY c2 DESC, c3) FROM agg_order; +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[array_agg(agg_order.c1 + $1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]] +02)--TableScan: agg_order projection=[c1, c2, c3] +physical_plan +01)AggregateExec: mode=Final, gby=[], aggr=[array_agg(agg_order.c1 + $1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]] +02)--CoalescePartitionsExec +03)----AggregateExec: mode=Partial, gby=[], aggr=[array_agg(agg_order.c1 + $1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]] +04)------SortExec: expr=[c2@1 DESC, c3@2 ASC NULLS LAST], preserve_partitioning=[true] +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2, c3], file_type=csv, has_header=true + +statement ok +DROP TABLE agg_order + +statement ok +CREATE EXTERNAL TABLE alltypes_plain STORED AS PARQUET LOCATION '../../parquet-testing/data/alltypes_plain.parquet'; + +statement ok +SET datafusion.execution.parquet.pushdown_filters = true; + +statement ok +SET datafusion.execution.parquet.pushdown_filters = true; + +# Filter with placeholder and parquet pushdown +query TT +EXPLAIN SELECT smallint_col FROM alltypes_plain WHERE int_col = $1; +---- +logical_plan +01)Projection: alltypes_plain.smallint_col +02)--Filter: alltypes_plain.int_col = $1 +03)----TableScan: alltypes_plain projection=[smallint_col, int_col], partial_filters=[alltypes_plain.int_col = $1] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[smallint_col], file_type=parquet, predicate=int_col@4 = $1, pruning_predicate=int_col_null_count@2 != row_count@3 AND int_col_min@0 <= $1 AND $1 <= int_col_max@1, required_guarantees=[] + +# Projection with placeholder on parquet table +query TT +EXPLAIN SELECT smallint_col + $1 FROM alltypes_plain; +---- +logical_plan +01)Projection: alltypes_plain.smallint_col + $1 +02)--TableScan: alltypes_plain projection=[smallint_col] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[smallint_col@3 + $1 as alltypes_plain.smallint_col + $1], file_type=parquet + +# Projection and filter with placeholders on parquet table +query TT +EXPLAIN select smallint_col + $1 FROM alltypes_plain WHERE int_col = $2 +---- +logical_plan +01)Projection: alltypes_plain.smallint_col + $1 +02)--Filter: alltypes_plain.int_col = $2 +03)----TableScan: alltypes_plain projection=[smallint_col, int_col], partial_filters=[alltypes_plain.int_col = $2] +physical_plan +01)ProjectionExec: expr=[smallint_col@0 + $1 as alltypes_plain.smallint_col + $1] +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[smallint_col, int_col], file_type=parquet, predicate=int_col@4 = $2, pruning_predicate=int_col_null_count@2 != row_count@3 AND int_col_min@0 <= $2 AND $2 <= int_col_max@1, required_guarantees=[] + +statement ok +DROP TABLE alltypes_plain; + +########## +## Joins with placeholders +########## + +statement ok +CREATE TABLE t1(id INT, name TEXT) AS VALUES (1, 'Alex'), (2, 'Bob'); + +statement ok +CREATE TABLE t2(id INT, age INT) AS VALUES (1, 25), (2, 30); + +# Join with placeholder in filter +query TT +EXPLAIN SELECT t1.name, t2.age FROM t1 JOIN t2 ON t1.id + $1 = t2.id; +---- +logical_plan +01)Projection: t1.name, t2.age +02)--Inner Join: t1.id + $1 = t2.id +03)----TableScan: t1 projection=[id, name] +04)----TableScan: t2 projection=[id, age] +physical_plan +01)ProjectionExec: expr=[name@1 as name, age@0 as age] +02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, t1.id + $1@2)], projection=[age@1, name@3] +03)----DataSourceExec: partitions=1, partition_sizes=[1] +04)----ProjectionExec: expr=[id@0 as id, name@1 as name, id@0 + $1 as t1.id + $1] +05)------DataSourceExec: partitions=1, partition_sizes=[1] + +# Join with placeholder in filter +query TT +EXPLAIN SELECT t1.name, t2.age FROM t1 JOIN t2 ON t1.id = t2.id WHERE t2.age > $1; +---- +logical_plan +01)Projection: t1.name, t2.age +02)--Inner Join: t1.id = t2.id +03)----TableScan: t1 projection=[id, name] +04)----Filter: t2.age > $1 +05)------TableScan: t2 projection=[id, age] +physical_plan +01)ProjectionExec: expr=[name@1 as name, age@0 as age] +02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[age@1, name@3] +03)----FilterExec: age@1 > $1 +04)------DataSourceExec: partitions=1, partition_sizes=[1] +05)----DataSourceExec: partitions=1, partition_sizes=[1] + +# Join with placeholder in projection +query TT +EXPLAIN SELECT t1.name, t2.age + $1 FROM t1 JOIN t2 ON t1.id = t2.id; +---- +logical_plan +01)Projection: t1.name, t2.age + $1 +02)--Inner Join: t1.id = t2.id +03)----TableScan: t1 projection=[id, name] +04)----TableScan: t2 projection=[id, age] +physical_plan +01)ProjectionExec: expr=[name@1 as name, age@3 + $1 as t2.age + $1] +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----ProjectionExec: expr=[id@2 as id, name@3 as name, id@0 as id, age@1 as age] +04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)] +05)--------DataSourceExec: partitions=1, partition_sizes=[1] +06)--------DataSourceExec: partitions=1, partition_sizes=[1] + +# Join with placeholder in ON statement +query TT +EXPLAIN SELECT t1.name, t2.age FROM t1 JOIN t2 ON t1.id + t2.id = $1; +---- +logical_plan +01)Projection: t1.name, t2.age +02)--Inner Join: Filter: t1.id + t2.id = $1 +03)----TableScan: t1 projection=[id, name] +04)----TableScan: t2 projection=[id, age] +physical_plan +01)ProjectionExec: expr=[name@1 as name, age@0 as age] +02)--NestedLoopJoinExec: join_type=Inner, filter=id@0 + id@1 = $1, projection=[age@1, name@3] +03)----DataSourceExec: partitions=1, partition_sizes=[1] +04)----DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +DROP TABLE t1; + +########## +## Window Functions and UNION with placeholders +########## + +statement ok +CREATE TABLE t1(id INT, name TEXT) AS VALUES (1, 'Alex'), (2, 'Bob'); + +# Window function with placeholder +query TT +EXPLAIN SELECT id, SUM(id) OVER (PARTITION BY name ORDER BY id) + $1 FROM t1; +---- +logical_plan +01)Projection: t1.id, sum(t1.id) PARTITION BY [t1.name] ORDER BY [t1.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + $1 +02)--WindowAggr: windowExpr=[[sum(CAST(t1.id AS Int64)) PARTITION BY [t1.name] ORDER BY [t1.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +03)----TableScan: t1 projection=[id, name] +physical_plan +01)ProjectionExec: expr=[id@0 as id, sum(t1.id) PARTITION BY [t1.name] ORDER BY [t1.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 + $1 as sum(t1.id) PARTITION BY [t1.name] ORDER BY [t1.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + $1] +02)--BoundedWindowAggExec: wdw=[sum(t1.id) PARTITION BY [t1.name] ORDER BY [t1.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(t1.id) PARTITION BY [t1.name] ORDER BY [t1.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +03)----SortExec: expr=[name@1 ASC NULLS LAST, id@0 ASC NULLS LAST], preserve_partitioning=[false] +04)------DataSourceExec: partitions=1, partition_sizes=[1] + +# Window function with placeholder +# Here we only resolve BoundedWindowAggExec. +query TT +EXPLAIN SELECT id, SUM(id + $1) OVER (PARTITION BY name ORDER BY id) FROM t1; +---- +logical_plan +01)Projection: t1.id, sum(t1.id + $1) PARTITION BY [t1.name] ORDER BY [t1.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW +02)--WindowAggr: windowExpr=[[sum(CAST(t1.id + $1 AS Int64)) PARTITION BY [t1.name] ORDER BY [t1.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +03)----TableScan: t1 projection=[id, name] +physical_plan +01)ProjectionExec: expr=[id@0 as id, sum(t1.id + $1) PARTITION BY [t1.name] ORDER BY [t1.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as sum(t1.id + $1) PARTITION BY [t1.name] ORDER BY [t1.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] +02)--BoundedWindowAggExec: wdw=[sum(t1.id + $1) PARTITION BY [t1.name] ORDER BY [t1.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(t1.id + $1) PARTITION BY [t1.name] ORDER BY [t1.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +03)----SortExec: expr=[name@1 ASC NULLS LAST, id@0 ASC NULLS LAST], preserve_partitioning=[false] +04)------DataSourceExec: partitions=1, partition_sizes=[1] + +# UNION with placeholder +query TT +EXPLAIN SELECT id FROM t1 WHERE id = $1 UNION ALL SELECT id FROM t1 WHERE id = $2; +---- +logical_plan +01)Union +02)--Filter: t1.id = $1 +03)----TableScan: t1 projection=[id] +04)--Filter: t1.id = $2 +05)----TableScan: t1 projection=[id] +physical_plan +01)UnionExec +02)--FilterExec: id@0 = $1 +03)----DataSourceExec: partitions=1, partition_sizes=[1] +04)--FilterExec: id@0 = $2 +05)----DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +DROP TABLE t1; + +########## +## Subqueries and CTEs with placeholders +########## + +statement ok +CREATE TABLE t1(id INT, name TEXT) AS VALUES (1, 'Alex'), (2, 'Bob'); + +# Scalar subquery with placeholder +query TT +EXPLAIN SELECT id, (SELECT name FROM t1 WHERE id = $1) FROM t1; +---- +logical_plan +01)Projection: t1.id, __scalar_sq_1.name AS name +02)--Left Join: +03)----TableScan: t1 projection=[id] +04)----SubqueryAlias: __scalar_sq_1 +05)------Projection: t1.name +06)--------Filter: t1.id = $1 +07)----------TableScan: t1 projection=[id, name] +physical_plan +01)ProjectionExec: expr=[id@1 as id, name@0 as name] +02)--NestedLoopJoinExec: join_type=Right +03)----FilterExec: id@0 = $1, projection=[name@1] +04)------DataSourceExec: partitions=1, partition_sizes=[1] +05)----DataSourceExec: partitions=1, partition_sizes=[1] + +# CTE with placeholder +query TT +EXPLAIN WITH cte AS (SELECT * FROM t1 WHERE id = $1) SELECT * FROM cte; +---- +logical_plan +01)SubqueryAlias: cte +02)--Filter: t1.id = $1 +03)----TableScan: t1 projection=[id, name] +physical_plan +01)FilterExec: id@0 = $1 +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +DROP TABLE t1; + +########## +## Group By and Order By with placeholders +########## + +statement ok +CREATE TABLE t1(id INT, name TEXT) AS VALUES (1, 'Alex'), (2, 'Bob'), (3, 'Alice'); + +# Group by with placeholder in SELECT +query TT +EXPLAIN SELECT id + $1, COUNT(*) FROM t1 GROUP BY id; +---- +logical_plan +01)Projection: t1.id + $1, count(Int64(1)) AS count(*) +02)--Aggregate: groupBy=[[t1.id]], aggr=[[count(Int64(1))]] +03)----TableScan: t1 projection=[id] +physical_plan +01)ProjectionExec: expr=[id@0 + $1 as t1.id + $1, count(Int64(1))@1 as count(*)] +02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[count(Int64(1))] +03)----RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=1 +04)------AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[count(Int64(1))] +05)--------DataSourceExec: partitions=1, partition_sizes=[1] + +# Group by with placeholder in HAVING +query TT +EXPLAIN SELECT id, COUNT(*) FROM t1 GROUP BY id HAVING COUNT(*) > $1; +---- +logical_plan +01)Projection: t1.id, count(Int64(1)) AS count(*) +02)--Filter: count(Int64(1)) > CAST($1 AS Int64) +03)----Aggregate: groupBy=[[t1.id]], aggr=[[count(Int64(1))]] +04)------TableScan: t1 projection=[id] +physical_plan +01)ProjectionExec: expr=[id@0 as id, count(Int64(1))@1 as count(*)] +02)--FilterExec: count(Int64(1))@1 > $1 +03)----AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[count(Int64(1))] +04)------RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=1 +05)--------AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[count(Int64(1))] +06)----------DataSourceExec: partitions=1, partition_sizes=[1] + +# Order by with placeholder +query TT +EXPLAIN SELECT id FROM t1 ORDER BY id + $1; +---- +logical_plan +01)Sort: t1.id + CAST($1 AS Int32) ASC NULLS LAST +02)--TableScan: t1 projection=[id] +physical_plan +01)SortExec: expr=[id@0 + $1 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +# Group by and Order by with placeholders +query TT +EXPLAIN SELECT name, SUM(id) FROM t1 GROUP BY name ORDER BY SUM(id) + $1; +---- +logical_plan +01)Sort: sum(t1.id) + CAST($1 AS Int64) ASC NULLS LAST +02)--Aggregate: groupBy=[[t1.name]], aggr=[[sum(CAST(t1.id AS Int64))]] +03)----TableScan: t1 projection=[id, name] +physical_plan +01)SortPreservingMergeExec: [sum(t1.id)@1 + $1 ASC NULLS LAST] +02)--SortExec: expr=[sum(t1.id)@1 + $1 ASC NULLS LAST], preserve_partitioning=[true] +03)----AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[sum(t1.id)] +04)------RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=1 +05)--------AggregateExec: mode=Partial, gby=[name@1 as name], aggr=[sum(t1.id)] +06)----------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +DROP TABLE t1; + +########## +## CAST and TRY_CAST with placeholders +########## + +# Implicit CAST with placeholder +query TT +EXPLAIN SELECT $1::INT; +---- +logical_plan +01)Projection: CAST($1 AS Int32) +02)--EmptyRelation: rows=1 +physical_plan +01)ProjectionExec: expr=[$1] +02)--PlaceholderRowExec + +# CAST with placeholder +query TT +EXPLAIN SELECT CAST($1 AS INT); +---- +logical_plan +01)Projection: CAST($1 AS Int32) +02)--EmptyRelation: rows=1 +physical_plan +01)ProjectionExec: expr=[$1] +02)--PlaceholderRowExec + +# TRY_CAST with placeholder +query TT +EXPLAIN SELECT TRY_CAST($1 AS INT); +---- +logical_plan +01)Projection: TRY_CAST($1 AS Int32) +02)--EmptyRelation: rows=1 +physical_plan +01)ProjectionExec: expr=[TRY_CAST($1 AS Int32) as $1] +02)--PlaceholderRowExec + +########## +## IN and BETWEEN with placeholders +########## + +statement ok +CREATE TABLE t1(id INT, name TEXT) AS VALUES (1, 'Alex'), (2, 'Bob'), (3, 'Alice'); + +# IN with placeholders +query TT +EXPLAIN SELECT id FROM t1 WHERE id IN ($1, $2, $3); +---- +logical_plan +01)Filter: t1.id = $1 OR t1.id = $2 OR t1.id = $3 +02)--TableScan: t1 projection=[id] +physical_plan +01)FilterExec: id@0 = $1 OR id@0 = $2 OR id@0 = $3 +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +# BETWEEN with placeholders +query TT +EXPLAIN SELECT id FROM t1 WHERE id BETWEEN $1 AND $2; +---- +logical_plan +01)Filter: t1.id >= $1 AND t1.id <= $2 +02)--TableScan: t1 projection=[id] +physical_plan +01)FilterExec: id@0 >= $1 AND id@0 <= $2 +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +########## +## String and Arithmetic operations with placeholders +########## + +# String concatenation with placeholders +query TT +EXPLAIN SELECT $1::TEXT || $2::TEXT; +---- +logical_plan +01)Projection: CAST($1 AS Utf8View) || CAST($2 AS Utf8View) +02)--EmptyRelation: rows=1 +physical_plan +01)ProjectionExec: expr=[$1 || $2] +02)--PlaceholderRowExec + +# Arithmetic with placeholders +query TT +EXPLAIN SELECT $1 + $2 * $3; +---- +logical_plan +01)Projection: $1 + CAST($2 AS Int64) * CAST($3 AS Int64) +02)--EmptyRelation: rows=1 +physical_plan +01)ProjectionExec: expr=[$1 + CAST($2 AS Int64) * CAST($3 AS Int64) as $1 + $2 * $3] +02)--PlaceholderRowExec + +########## +## LIKE and Regex with placeholders +########## + +# LIKE with placeholder +query TT +EXPLAIN SELECT * FROM t1 WHERE name LIKE $1; +---- +logical_plan +01)Filter: t1.name LIKE $1 +02)--TableScan: t1 projection=[id, name] +physical_plan +01)FilterExec: name@1 LIKE $1 +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +# Regex with placeholder +query TT +EXPLAIN SELECT * FROM t1 WHERE name ~ $1; +---- +logical_plan +01)Filter: t1.name ~ $1 +02)--TableScan: t1 projection=[id, name] +physical_plan +01)FilterExec: name@1 ~ $1 +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +DROP TABLE t1;