diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs index d98d23821ec7f..9ac986b17565f 100644 --- a/datafusion/execution/src/cache/cache_unit.rs +++ b/datafusion/execution/src/cache/cache_unit.rs @@ -111,7 +111,9 @@ mod tests { use datafusion_common::Statistics; use datafusion_common::stats::Precision; use datafusion_expr::ColumnarValue; - use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + use datafusion_physical_expr_common::physical_expr::{ + ExprExecutionContext, PhysicalExpr, + }; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use object_store::ObjectMeta; use object_store::path::Path; @@ -224,6 +226,13 @@ mod tests { fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "MockExpr") } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> datafusion_common::Result> { + Ok(self) + } } fn ordering() -> LexOrdering { diff --git a/datafusion/ffi/src/physical_expr/mod.rs b/datafusion/ffi/src/physical_expr/mod.rs index d268dd613f987..30f7257672446 100644 --- a/datafusion/ffi/src/physical_expr/mod.rs +++ b/datafusion/ffi/src/physical_expr/mod.rs @@ -36,7 +36,7 @@ use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::ExprProperties; use datafusion_expr::statistics::Distribution; use datafusion_physical_expr::PhysicalExpr; -use datafusion_physical_expr_common::physical_expr::fmt_sql; +use datafusion_physical_expr_common::physical_expr::{ExprExecutionContext, fmt_sql}; use crate::arrow_wrappers::{WrappedArray, WrappedSchema}; use crate::expr::columnar_value::FFI_ColumnarValue; @@ -705,6 +705,13 @@ impl PhysicalExpr for ForeignPhysicalExpr { fn is_volatile_node(&self) -> bool { unsafe { (self.expr.is_volatile_node)(&self.expr) } } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> Result> { + Ok(self) + } } impl Eq for ForeignPhysicalExpr {} diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 2358a21940912..3f4be80093226 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -43,6 +43,21 @@ use itertools::izip; /// Shared [`PhysicalExpr`]. pub type PhysicalExprRef = Arc; +/// Describes execution context for the particular expression. +pub struct ExprExecutionContext { + /// External parameters. + pub external_params: Arc<[ScalarValue]>, +} + +impl ExprExecutionContext { + /// Make a new [`ExprExecutionContext`]. + pub fn new(external_param: impl Into>) -> Self { + Self { + external_params: external_param.into(), + } + } +} + /// [`PhysicalExpr`]s represent expressions such as `A + 1` or `CAST(c1 AS int)`. /// /// `PhysicalExpr` knows its type, nullability and can be evaluated directly on @@ -430,6 +445,15 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { fn is_volatile_node(&self) -> bool { false } + + /// Make this expression executable. The most expressions are executable and do not + /// require an additional work so this method could return `self`. However, there are + /// expressions that should be transformed prior to execution, e.g. placeholder that + /// should be resolved into scalar. + fn execute( + self: Arc, + context: &ExprExecutionContext, + ) -> Result>; } #[deprecated( @@ -662,7 +686,7 @@ pub fn is_volatile(expr: &Arc) -> bool { #[cfg(test)] mod test { - use crate::physical_expr::PhysicalExpr; + use crate::physical_expr::{ExprExecutionContext, PhysicalExpr}; use arrow::array::{Array, BooleanArray, Int64Array, RecordBatch}; use arrow::datatypes::{DataType, Schema}; use datafusion_expr_common::columnar_value::ColumnarValue; @@ -707,6 +731,13 @@ mod test { fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("TestExpr") } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> datafusion_common::Result> { + Ok(self) + } } impl Display for TestExpr { diff --git a/datafusion/physical-expr/src/async_scalar_function.rs b/datafusion/physical-expr/src/async_scalar_function.rs index 2f50a17afc39a..01535f08600be 100644 --- a/datafusion/physical-expr/src/async_scalar_function.rs +++ b/datafusion/physical-expr/src/async_scalar_function.rs @@ -25,7 +25,9 @@ use datafusion_common::{internal_err, not_impl_err}; use datafusion_expr::ScalarFunctionArgs; use datafusion_expr::async_udf::AsyncScalarUDF; use datafusion_expr_common::columnar_value::ColumnarValue; -use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use datafusion_physical_expr_common::physical_expr::{ + ExprExecutionContext, PhysicalExpr, +}; use std::any::Any; use std::fmt::Display; use std::hash::{Hash, Hasher}; @@ -247,4 +249,11 @@ impl PhysicalExpr for AsyncFuncExpr { fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.func) } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> Result> { + Ok(self) + } } diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 72eae396e68a6..c3bf333942305 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -42,6 +42,7 @@ use datafusion_expr::statistics::{ use datafusion_expr::{ColumnarValue, Operator}; use datafusion_physical_expr_common::datum::{apply, apply_cmp}; +use datafusion_physical_expr_common::physical_expr::ExprExecutionContext; use kernels::{ bitwise_and_dyn, bitwise_and_dyn_scalar, bitwise_or_dyn, bitwise_or_dyn_scalar, bitwise_shift_left_dyn, bitwise_shift_left_dyn_scalar, bitwise_shift_right_dyn, @@ -609,6 +610,13 @@ impl PhysicalExpr for BinaryExpr { write!(f, " {} ", self.op)?; write_child(f, self.right.as_ref(), precedence) } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> Result> { + Ok(self) + } } /// Casts dictionary array to result type for binary numerical operators. Such operators diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs index 758317d3d2798..802bf9e20db38 100644 --- a/datafusion/physical-expr/src/expressions/case.rs +++ b/datafusion/physical-expr/src/expressions/case.rs @@ -33,6 +33,7 @@ use datafusion_common::{ internal_datafusion_err, internal_err, }; use datafusion_expr::ColumnarValue; +use datafusion_physical_expr_common::physical_expr::ExprExecutionContext; use indexmap::{IndexMap, IndexSet}; use std::borrow::Cow; use std::hash::Hash; @@ -1342,6 +1343,13 @@ impl PhysicalExpr for CaseExpr { } write!(f, "END") } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> Result> { + Ok(self) + } } /// Attempts to const evaluate the given `predicate`. diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs index 6fced231f3e6f..292429f0ff7a3 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -31,6 +31,7 @@ use datafusion_common::{Result, not_impl_err}; use datafusion_expr_common::columnar_value::ColumnarValue; use datafusion_expr_common::interval_arithmetic::Interval; use datafusion_expr_common::sort_properties::ExprProperties; +use datafusion_physical_expr_common::physical_expr::ExprExecutionContext; const DEFAULT_CAST_OPTIONS: CastOptions<'static> = CastOptions { safe: false, @@ -237,6 +238,13 @@ impl PhysicalExpr for CastExpr { write!(f, ")") } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> Result> { + Ok(self) + } } /// Return a PhysicalExpression representing `expr` casted to diff --git a/datafusion/physical-expr/src/expressions/cast_column.rs b/datafusion/physical-expr/src/expressions/cast_column.rs index d80b6f4a588a4..d6b87801b3616 100644 --- a/datafusion/physical-expr/src/expressions/cast_column.rs +++ b/datafusion/physical-expr/src/expressions/cast_column.rs @@ -27,6 +27,7 @@ use datafusion_common::{ Result, ScalarValue, format::DEFAULT_CAST_OPTIONS, nested_struct::cast_column, }; use datafusion_expr_common::columnar_value::ColumnarValue; +use datafusion_physical_expr_common::physical_expr::ExprExecutionContext; use std::{ any::Any, fmt::{self, Display}, @@ -180,6 +181,13 @@ impl PhysicalExpr for CastColumnExpr { fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { Display::fmt(self, f) } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> Result> { + Ok(self) + } } #[cfg(test)] diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs index 8c7e8c319fff4..7795e66c2cdfa 100644 --- a/datafusion/physical-expr/src/expressions/column.rs +++ b/datafusion/physical-expr/src/expressions/column.rs @@ -30,6 +30,7 @@ use arrow::{ use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{Result, internal_err, plan_err}; use datafusion_expr::ColumnarValue; +use datafusion_physical_expr_common::physical_expr::ExprExecutionContext; /// Represents the column at a given index in a RecordBatch /// @@ -146,6 +147,13 @@ impl PhysicalExpr for Column { fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.name) } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> Result> { + Ok(self) + } } impl Column { diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 6c961e3bb0977..39bccce4fb2e1 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -26,7 +26,7 @@ use datafusion_common::{ tree_node::{Transformed, TransformedResult, TreeNode}, }; use datafusion_expr::ColumnarValue; -use datafusion_physical_expr_common::physical_expr::DynHash; +use datafusion_physical_expr_common::physical_expr::{DynHash, ExprExecutionContext}; /// State of a dynamic filter, tracking both updates and completion. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -445,6 +445,13 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { // Return the current generation of the expression. self.inner.read().generation } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> Result> { + Ok(self) + } } #[cfg(test)] diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index 5c2f1adcd0cf3..18e6521582b4e 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -40,6 +40,7 @@ use datafusion_expr::{ColumnarValue, expr_vec_fmt}; use ahash::RandomState; use datafusion_common::HashMap; +use datafusion_physical_expr_common::physical_expr::ExprExecutionContext; use hashbrown::hash_map::RawEntryMut; /// Trait for InList static filters @@ -862,6 +863,13 @@ impl PhysicalExpr for InListExpr { } write!(f, ")") } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> Result> { + Ok(self) + } } impl PartialEq for InListExpr { diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs index 62be8ebbc13e3..ab855f498d4d0 100644 --- a/datafusion/physical-expr/src/expressions/is_not_null.rs +++ b/datafusion/physical-expr/src/expressions/is_not_null.rs @@ -25,6 +25,7 @@ use arrow::{ use datafusion_common::Result; use datafusion_common::ScalarValue; use datafusion_expr::ColumnarValue; +use datafusion_physical_expr_common::physical_expr::ExprExecutionContext; use std::hash::Hash; use std::{any::Any, sync::Arc}; @@ -108,6 +109,13 @@ impl PhysicalExpr for IsNotNullExpr { self.arg.fmt_sql(f)?; write!(f, " IS NOT NULL") } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> Result> { + Ok(self) + } } /// Create an IS NOT NULL expression diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index 356fe2a866672..de3106fb87889 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -25,6 +25,7 @@ use arrow::{ use datafusion_common::Result; use datafusion_common::ScalarValue; use datafusion_expr::ColumnarValue; +use datafusion_physical_expr_common::physical_expr::ExprExecutionContext; use std::hash::Hash; use std::{any::Any, sync::Arc}; @@ -107,6 +108,13 @@ impl PhysicalExpr for IsNullExpr { self.arg.fmt_sql(f)?; write!(f, " IS NULL") } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> Result> { + Ok(self) + } } /// Create an IS NULL expression diff --git a/datafusion/physical-expr/src/expressions/like.rs b/datafusion/physical-expr/src/expressions/like.rs index fc49ca35f0498..70eb25f812bf9 100644 --- a/datafusion/physical-expr/src/expressions/like.rs +++ b/datafusion/physical-expr/src/expressions/like.rs @@ -21,6 +21,7 @@ use arrow::record_batch::RecordBatch; use datafusion_common::{Result, assert_or_internal_err}; use datafusion_expr::{ColumnarValue, Operator}; use datafusion_physical_expr_common::datum::apply_cmp; +use datafusion_physical_expr_common::physical_expr::ExprExecutionContext; use std::hash::Hash; use std::{any::Any, sync::Arc}; @@ -149,6 +150,13 @@ impl PhysicalExpr for LikeExpr { write!(f, " {} ", self.op_name())?; self.pattern.fmt_sql(f) } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> Result> { + Ok(self) + } } /// used for optimize Dictionary like diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index 1f3fefc60b7ad..b78b38bf1bc43 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -34,6 +34,7 @@ use datafusion_expr::Expr; use datafusion_expr_common::columnar_value::ColumnarValue; use datafusion_expr_common::interval_arithmetic::Interval; use datafusion_expr_common::sort_properties::{ExprProperties, SortProperties}; +use datafusion_physical_expr_common::physical_expr::ExprExecutionContext; /// Represents a literal value #[derive(Debug, PartialEq, Eq, Clone)] @@ -134,6 +135,13 @@ impl PhysicalExpr for Literal { fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { std::fmt::Display::fmt(self, f) } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> Result> { + Ok(self) + } } /// Create a literal expression diff --git a/datafusion/physical-expr/src/expressions/negative.rs b/datafusion/physical-expr/src/expressions/negative.rs index 0c9476bebaaf0..f20a6f82271ff 100644 --- a/datafusion/physical-expr/src/expressions/negative.rs +++ b/datafusion/physical-expr/src/expressions/negative.rs @@ -39,6 +39,7 @@ use datafusion_expr::{ ColumnarValue, type_coercion::{is_interval, is_null, is_signed_numeric, is_timestamp}, }; +use datafusion_physical_expr_common::physical_expr::ExprExecutionContext; /// Negative expression #[derive(Debug, Eq)] @@ -178,6 +179,13 @@ impl PhysicalExpr for NegativeExpr { self.arg.fmt_sql(f)?; write!(f, ")") } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> Result> { + Ok(self) + } } /// Creates a unary expression NEGATIVE diff --git a/datafusion/physical-expr/src/expressions/no_op.rs b/datafusion/physical-expr/src/expressions/no_op.rs index ff44a60a862d0..3d12c520cc3f0 100644 --- a/datafusion/physical-expr/src/expressions/no_op.rs +++ b/datafusion/physical-expr/src/expressions/no_op.rs @@ -28,6 +28,7 @@ use arrow::{ }; use datafusion_common::{Result, internal_err}; use datafusion_expr::ColumnarValue; +use datafusion_physical_expr_common::physical_expr::ExprExecutionContext; /// A place holder expression, can not be evaluated. /// @@ -80,4 +81,11 @@ impl PhysicalExpr for NoOp { fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { std::fmt::Display::fmt(self, f) } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> Result> { + Ok(self) + } } diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs index a29ab2ff40f5c..9a1ce958c4e73 100644 --- a/datafusion/physical-expr/src/expressions/not.rs +++ b/datafusion/physical-expr/src/expressions/not.rs @@ -30,6 +30,7 @@ use datafusion_common::{Result, ScalarValue, cast::as_boolean_array, internal_er use datafusion_expr::ColumnarValue; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::statistics::Distribution::{self, Bernoulli}; +use datafusion_physical_expr_common::physical_expr::ExprExecutionContext; /// Not expression #[derive(Debug, Eq)] @@ -184,6 +185,13 @@ impl PhysicalExpr for NotExpr { write!(f, "NOT ")?; self.arg.fmt_sql(f) } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> Result> { + Ok(self) + } } /// Creates a unary expression NOT diff --git a/datafusion/physical-expr/src/expressions/try_cast.rs b/datafusion/physical-expr/src/expressions/try_cast.rs index c63550f430be7..141d7f9bf515f 100644 --- a/datafusion/physical-expr/src/expressions/try_cast.rs +++ b/datafusion/physical-expr/src/expressions/try_cast.rs @@ -29,6 +29,7 @@ use compute::can_cast_types; use datafusion_common::format::DEFAULT_FORMAT_OPTIONS; use datafusion_common::{Result, not_impl_err}; use datafusion_expr::ColumnarValue; +use datafusion_physical_expr_common::physical_expr::ExprExecutionContext; /// TRY_CAST expression casts an expression to a specific data type and returns NULL on invalid cast #[derive(Debug, Eq)] @@ -125,6 +126,13 @@ impl PhysicalExpr for TryCastExpr { self.expr.fmt_sql(f)?; write!(f, " AS {:?})", self.cast_type) } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> Result> { + Ok(self) + } } /// Return a PhysicalExpression representing `expr` casted to diff --git a/datafusion/physical-expr/src/expressions/unknown_column.rs b/datafusion/physical-expr/src/expressions/unknown_column.rs index f06d880985f4a..474b8afb2a284 100644 --- a/datafusion/physical-expr/src/expressions/unknown_column.rs +++ b/datafusion/physical-expr/src/expressions/unknown_column.rs @@ -29,6 +29,7 @@ use arrow::{ }; use datafusion_common::{Result, internal_err}; use datafusion_expr::ColumnarValue; +use datafusion_physical_expr_common::physical_expr::ExprExecutionContext; #[derive(Debug, Clone, Eq)] pub struct UnKnownColumn { @@ -90,6 +91,13 @@ impl PhysicalExpr for UnKnownColumn { fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { std::fmt::Display::fmt(self, f) } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> Result> { + Ok(self) + } } impl Hash for UnKnownColumn { diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index e750bfd79d77d..ac9bc7d0e0e78 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -237,7 +237,9 @@ mod tests { use crate::physical_expr::{ physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal, }; - use datafusion_physical_expr_common::physical_expr::is_volatile; + use datafusion_physical_expr_common::physical_expr::{ + ExprExecutionContext, is_volatile, + }; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; @@ -430,6 +432,13 @@ mod tests { fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "mock_volatile({})", self.volatile) } + + fn execute( + self: Arc, + context: &ExprExecutionContext, + ) -> Result> { + Ok(self) + } } #[test] diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index 540fd620c92ce..cd9a96cbcc24a 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -717,6 +717,33 @@ impl Projector { } } + pub fn with_metrics_owned( + mut self, + metrics: &ExecutionPlanMetricsSet, + partition: usize, + ) -> Self { + self.expression_metrics = Some( + self.projection + .create_expression_metrics(metrics, partition), + ); + self + } + + pub fn with_exprs(&self, exprs: &[Arc]) -> Self { + let projection = self + .projection + .iter() + .zip(exprs.into_iter()) + .map(|(src, dst)| ProjectionExpr::new(Arc::clone(dst), src.alias.clone())) + .collect(); + + Self { + projection, + output_schema: Arc::clone(&self.output_schema), + expression_metrics: self.expression_metrics.clone(), + } + } + /// Project a record batch according to this projector's expressions. /// /// # Errors diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index aa090743ad441..2f50e63c17af6 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -48,6 +48,7 @@ use datafusion_expr::{ ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, Volatility, expr_vec_fmt, }; +use datafusion_physical_expr_common::physical_expr::ExprExecutionContext; /// Physical expression of a scalar function pub struct ScalarFunctionExpr { @@ -362,6 +363,13 @@ impl PhysicalExpr for ScalarFunctionExpr { fn is_volatile_node(&self) -> bool { self.fun.signature().volatility == Volatility::Volatile } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> Result> { + Ok(self) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/context.rs b/datafusion/physical-plan/src/context.rs new file mode 100644 index 0000000000000..2cc86e1854b37 --- /dev/null +++ b/datafusion/physical-plan/src/context.rs @@ -0,0 +1,132 @@ +use std::sync::Arc; + +use arrow::array::RecordBatch; +use datafusion_common::{Result, ScalarValue}; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr_common::{ + metrics::ExecutionPlanMetricsSet, physical_expr::ExprExecutionContext, +}; +use futures::TryStreamExt; +use parking_lot::Mutex; + +use crate::ExecutionPlan; + +/// Describes a plan execution context. +#[derive(Default)] +pub struct PlanExecutionContext { + /// User passed parameters. + external_params: Arc<[ScalarValue]>, + /// Associated task context. + task_context: Arc, +} + +impl PlanExecutionContext { + /// Make a new [`PlanExecutionContext`]. + pub fn new( + task_context: Arc, + external_params: impl Into>, + ) -> Self { + Self { + external_params: external_params.into(), + task_context, + } + } + + /// Build a context for the particular node. + pub fn build_node_context( + self: &Arc, + node: Arc, + ) -> Result { + let expr_context = ExprExecutionContext::new(Arc::clone(&self.external_params)); + let exprs = node + .exprs() + .iter() + .map(|expr| Arc::clone(expr).execute(&expr_context)) + .collect::>>()?; + let num_children = node.children().len(); + Ok(PlanNodeExecutionContext { + plan_context: Arc::clone(self), + node, + metrics: ExecutionPlanMetricsSet::new(), + children: Mutex::new((0..num_children).map(|_| None).collect()), + exprs, + }) + } + + /// Project associated task context. + pub fn task_context(&self) -> &Arc { + &self.task_context + } + + pub fn execute( + self: &Arc, + plan: &Arc, + ) -> Result { + let node_context = self.build_node_context(Arc::clone(plan)).map(Arc::new)?; + plan.execute_with(0, &node_context) + } + + pub async fn collect( + self: &Arc, + plan: &Arc, + ) -> Result> { + self.execute(plan)?.try_collect().await + } +} + +/// Describes a particular node execution context. +pub struct PlanNodeExecutionContext { + plan_context: Arc, + node: Arc, + metrics: ExecutionPlanMetricsSet, + /// Context for each plan child initialized lazy. + children: Mutex>]>>, + /// Executable form of expressions. + exprs: Vec>, +} + +impl PlanNodeExecutionContext { + /// Execute child `idx` of the current node. + pub fn execute_child( + &self, + idx: usize, + partition: usize, + ) -> Result { + let child_context = self.get_or_build_child(idx)?; + self.node.children()[idx].execute_with(partition, &child_context) + } + + /// Project a plan context. + pub fn plan_context(&self) -> &Arc { + &self.plan_context + } + + /// Project executable expressions. + pub fn exprs(&self) -> &[Arc] { + &self.exprs + } + + /// Project a plan node. + pub fn node(&self) -> &Arc { + &self.node + } + + /// Return metrics to fill. + pub fn metrics(&self) -> &ExecutionPlanMetricsSet { + &self.metrics + } + + fn get_or_build_child(&self, idx: usize) -> Result> { + let mut children = self.children.lock(); + if let Some(context) = children[idx].as_ref() { + return Ok(Arc::clone(context)); + } + let context = self + .plan_context + .build_node_context(Arc::clone(&self.node.children()[idx])) + .map(Arc::new)?; + children[idx] = Some(Arc::clone(&context)); + Ok(context) + } +} diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 52f4829127651..04a646de1ce4b 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::context::PlanNodeExecutionContext; pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, @@ -130,6 +131,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// trait, which is implemented for all `ExecutionPlan`s. fn properties(&self) -> &PlanProperties; + /// Return all expressions associated with plan. + fn exprs(&self) -> Vec> { + vec![] + } + /// Returns an error if this individual node does not conform to its invariants. /// These invariants are typically only checked in debug mode. /// @@ -457,6 +463,19 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { context: Arc, ) -> Result; + /// Execute this plan in [`PlanNodeExecutionContext`]. + /// + /// [`PlanNodeExecutionContext`] manages execution metrics and is responsible + /// to properly execute children plans. + /// + fn execute_with( + &self, + partition: usize, + context: &Arc, + ) -> Result { + self.execute(partition, Arc::clone(context.plan_context().task_context())) + } + /// Return a snapshot of the set of [`Metric`]s for this /// [`ExecutionPlan`]. If no `Metric`s are available, return None. /// diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index e3d432643cfba..bcef24ef766b7 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -29,7 +29,7 @@ use datafusion_common::Result; use datafusion_common::hash_utils::{create_hashes, with_hashes}; use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::{ - DynHash, PhysicalExpr, PhysicalExprRef, + DynHash, ExprExecutionContext, PhysicalExpr, PhysicalExprRef, }; use crate::joins::Map; @@ -203,6 +203,13 @@ impl PhysicalExpr for HashExpr { fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.description) } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> Result> { + Ok(self) + } } /// Physical expression that checks join keys in a [`Map`] (hash table or array map). @@ -349,6 +356,13 @@ impl PhysicalExpr for HashTableLookupExpr { fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.description) } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> Result> { + Ok(self) + } } fn evaluate_columns( diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 9352a143c11f8..3fffa533dab80 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -70,6 +70,7 @@ pub mod coalesce_batches; pub mod coalesce_partitions; pub mod column_rewriter; pub mod common; +pub mod context; pub mod coop; pub mod display; pub mod empty; diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 8d4c775f87348..f07918de16fed 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -216,6 +216,26 @@ impl ProjectionExec { } Ok(alias_map) } + + fn raw_execute( + metrics: &ExecutionPlanMetricsSet, + projector: Projector, + partition: usize, + input: SendableRecordBatchStream, + context: &TaskContext, + ) -> Result { + trace!( + "Start ProjectionExec::execute for partition {} of context session_id {} and task_id {:?}", + partition, + context.session_id(), + context.task_id() + ); + Ok(Box::pin(ProjectionStream::new( + projector, + input, + BaselineMetrics::new(&metrics, partition), + )?)) + } } impl DisplayAs for ProjectionExec { @@ -313,19 +333,30 @@ impl ExecutionPlan for ProjectionExec { partition: usize, context: Arc, ) -> Result { - trace!( - "Start ProjectionExec::execute for partition {} of context session_id {} and task_id {:?}", - partition, - context.session_id(), - context.task_id() - ); - + // Take expressions and metrics from self. let projector = self.projector.with_metrics(&self.metrics, partition); - Ok(Box::pin(ProjectionStream::new( + let input = self.input.execute(partition, Arc::clone(&context))?; + Self::raw_execute(&self.metrics, projector, partition, input, &context) + } + + fn execute_with( + &self, + partition: usize, + context: &Arc, + ) -> Result { + // Take expressions, metrics from context. + let projector = self + .projector + .with_exprs(context.exprs()) + .with_metrics(context.metrics(), partition); + let input = context.execute_child(0, partition)?; + Self::raw_execute( + context.metrics(), projector, - self.input.execute(partition, context)?, - BaselineMetrics::new(&self.metrics, partition), - )?)) + partition, + input, + context.plan_context().task_context().as_ref(), + ) } fn metrics(&self) -> Option { diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index b54b7030fc52a..a4d356025a149 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -39,6 +39,7 @@ use datafusion_expr::dml::InsertOp; use datafusion_functions_aggregate::approx_percentile_cont::approx_percentile_cont_udaf; use datafusion_functions_aggregate::array_agg::array_agg_udaf; use datafusion_functions_aggregate::min_max::max_udaf; +use datafusion_physical_expr_common::physical_expr::ExprExecutionContext; use prost::Message; use datafusion::arrow::array::ArrayRef; @@ -1024,6 +1025,13 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { std::fmt::Display::fmt(self, f) } + + fn execute( + self: Arc, + _context: &ExprExecutionContext, + ) -> Result> { + Ok(self) + } } #[derive(Debug)]