Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion datafusion/execution/src/cache/cache_unit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ mod tests {
use datafusion_common::Statistics;
use datafusion_common::stats::Precision;
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::physical_expr::{
ExprExecutionContext, PhysicalExpr,
};
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use object_store::ObjectMeta;
use object_store::path::Path;
Expand Down Expand Up @@ -224,6 +226,13 @@ mod tests {
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "MockExpr")
}

fn execute(
self: Arc<Self>,
_context: &ExprExecutionContext,
) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}
}

fn ordering() -> LexOrdering {
Expand Down
9 changes: 8 additions & 1 deletion datafusion/ffi/src/physical_expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::sort_properties::ExprProperties;
use datafusion_expr::statistics::Distribution;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use datafusion_physical_expr_common::physical_expr::{ExprExecutionContext, fmt_sql};

use crate::arrow_wrappers::{WrappedArray, WrappedSchema};
use crate::expr::columnar_value::FFI_ColumnarValue;
Expand Down Expand Up @@ -705,6 +705,13 @@ impl PhysicalExpr for ForeignPhysicalExpr {
fn is_volatile_node(&self) -> bool {
unsafe { (self.expr.is_volatile_node)(&self.expr) }
}

fn execute(
self: Arc<Self>,
_context: &ExprExecutionContext,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}
}

impl Eq for ForeignPhysicalExpr {}
Expand Down
33 changes: 32 additions & 1 deletion datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,21 @@ use itertools::izip;
/// Shared [`PhysicalExpr`].
pub type PhysicalExprRef = Arc<dyn PhysicalExpr>;

/// Describes execution context for the particular expression.
pub struct ExprExecutionContext {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This context is used to provide additional values for expression execute(...) method. We could make it extendable to allow users to put their own values into it.

/// External parameters.
pub external_params: Arc<[ScalarValue]>,
}

impl ExprExecutionContext {
/// Make a new [`ExprExecutionContext`].
pub fn new(external_param: impl Into<Arc<[ScalarValue]>>) -> Self {
Self {
external_params: external_param.into(),
}
}
}

/// [`PhysicalExpr`]s represent expressions such as `A + 1` or `CAST(c1 AS int)`.
///
/// `PhysicalExpr` knows its type, nullability and can be evaluated directly on
Expand Down Expand Up @@ -430,6 +445,15 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
fn is_volatile_node(&self) -> bool {
false
}

/// Make this expression executable. The most expressions are executable and do not
/// require an additional work so this method could return `self`. However, there are
/// expressions that should be transformed prior to execution, e.g. placeholder that
/// should be resolved into scalar.
fn execute(
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way we support custom expression that should run sum logic before execution (not only dynamic filters or placeholders, but an arbitrary one).

self: Arc<Self>,
context: &ExprExecutionContext,
) -> Result<Arc<dyn PhysicalExpr>>;
}

#[deprecated(
Expand Down Expand Up @@ -662,7 +686,7 @@ pub fn is_volatile(expr: &Arc<dyn PhysicalExpr>) -> bool {

#[cfg(test)]
mod test {
use crate::physical_expr::PhysicalExpr;
use crate::physical_expr::{ExprExecutionContext, PhysicalExpr};
use arrow::array::{Array, BooleanArray, Int64Array, RecordBatch};
use arrow::datatypes::{DataType, Schema};
use datafusion_expr_common::columnar_value::ColumnarValue;
Expand Down Expand Up @@ -707,6 +731,13 @@ mod test {
fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("TestExpr")
}

fn execute(
self: Arc<Self>,
_context: &ExprExecutionContext,
) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}
}

impl Display for TestExpr {
Expand Down
11 changes: 10 additions & 1 deletion datafusion/physical-expr/src/async_scalar_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use datafusion_common::{internal_err, not_impl_err};
use datafusion_expr::ScalarFunctionArgs;
use datafusion_expr::async_udf::AsyncScalarUDF;
use datafusion_expr_common::columnar_value::ColumnarValue;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::physical_expr::{
ExprExecutionContext, PhysicalExpr,
};
use std::any::Any;
use std::fmt::Display;
use std::hash::{Hash, Hasher};
Expand Down Expand Up @@ -247,4 +249,11 @@ impl PhysicalExpr for AsyncFuncExpr {
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.func)
}

fn execute(
self: Arc<Self>,
_context: &ExprExecutionContext,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}
}
8 changes: 8 additions & 0 deletions datafusion/physical-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use datafusion_expr::statistics::{
use datafusion_expr::{ColumnarValue, Operator};
use datafusion_physical_expr_common::datum::{apply, apply_cmp};

use datafusion_physical_expr_common::physical_expr::ExprExecutionContext;
use kernels::{
bitwise_and_dyn, bitwise_and_dyn_scalar, bitwise_or_dyn, bitwise_or_dyn_scalar,
bitwise_shift_left_dyn, bitwise_shift_left_dyn_scalar, bitwise_shift_right_dyn,
Expand Down Expand Up @@ -609,6 +610,13 @@ impl PhysicalExpr for BinaryExpr {
write!(f, " {} ", self.op)?;
write_child(f, self.right.as_ref(), precedence)
}

fn execute(
self: Arc<Self>,
_context: &ExprExecutionContext,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}
}

/// Casts dictionary array to result type for binary numerical operators. Such operators
Expand Down
8 changes: 8 additions & 0 deletions datafusion/physical-expr/src/expressions/case.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use datafusion_common::{
internal_datafusion_err, internal_err,
};
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr_common::physical_expr::ExprExecutionContext;
use indexmap::{IndexMap, IndexSet};
use std::borrow::Cow;
use std::hash::Hash;
Expand Down Expand Up @@ -1342,6 +1343,13 @@ impl PhysicalExpr for CaseExpr {
}
write!(f, "END")
}

fn execute(
self: Arc<Self>,
_context: &ExprExecutionContext,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}
}

/// Attempts to const evaluate the given `predicate`.
Expand Down
8 changes: 8 additions & 0 deletions datafusion/physical-expr/src/expressions/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datafusion_common::{Result, not_impl_err};
use datafusion_expr_common::columnar_value::ColumnarValue;
use datafusion_expr_common::interval_arithmetic::Interval;
use datafusion_expr_common::sort_properties::ExprProperties;
use datafusion_physical_expr_common::physical_expr::ExprExecutionContext;

const DEFAULT_CAST_OPTIONS: CastOptions<'static> = CastOptions {
safe: false,
Expand Down Expand Up @@ -237,6 +238,13 @@ impl PhysicalExpr for CastExpr {

write!(f, ")")
}

fn execute(
self: Arc<Self>,
_context: &ExprExecutionContext,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}
}

/// Return a PhysicalExpression representing `expr` casted to
Expand Down
8 changes: 8 additions & 0 deletions datafusion/physical-expr/src/expressions/cast_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion_common::{
Result, ScalarValue, format::DEFAULT_CAST_OPTIONS, nested_struct::cast_column,
};
use datafusion_expr_common::columnar_value::ColumnarValue;
use datafusion_physical_expr_common::physical_expr::ExprExecutionContext;
use std::{
any::Any,
fmt::{self, Display},
Expand Down Expand Up @@ -180,6 +181,13 @@ impl PhysicalExpr for CastColumnExpr {
fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Display::fmt(self, f)
}

fn execute(
self: Arc<Self>,
_context: &ExprExecutionContext,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}
}

#[cfg(test)]
Expand Down
8 changes: 8 additions & 0 deletions datafusion/physical-expr/src/expressions/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use arrow::{
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{Result, internal_err, plan_err};
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr_common::physical_expr::ExprExecutionContext;

/// Represents the column at a given index in a RecordBatch
///
Expand Down Expand Up @@ -146,6 +147,13 @@ impl PhysicalExpr for Column {
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name)
}

fn execute(
self: Arc<Self>,
_context: &ExprExecutionContext,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}
}

impl Column {
Expand Down
9 changes: 8 additions & 1 deletion datafusion/physical-expr/src/expressions/dynamic_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion_common::{
tree_node::{Transformed, TransformedResult, TreeNode},
};
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr_common::physical_expr::DynHash;
use datafusion_physical_expr_common::physical_expr::{DynHash, ExprExecutionContext};

/// State of a dynamic filter, tracking both updates and completion.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -445,6 +445,13 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
// Return the current generation of the expression.
self.inner.read().generation
}

fn execute(
self: Arc<Self>,
_context: &ExprExecutionContext,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}
}

#[cfg(test)]
Expand Down
8 changes: 8 additions & 0 deletions datafusion/physical-expr/src/expressions/in_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use datafusion_expr::{ColumnarValue, expr_vec_fmt};

use ahash::RandomState;
use datafusion_common::HashMap;
use datafusion_physical_expr_common::physical_expr::ExprExecutionContext;
use hashbrown::hash_map::RawEntryMut;

/// Trait for InList static filters
Expand Down Expand Up @@ -862,6 +863,13 @@ impl PhysicalExpr for InListExpr {
}
write!(f, ")")
}

fn execute(
self: Arc<Self>,
_context: &ExprExecutionContext,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}
}

impl PartialEq for InListExpr {
Expand Down
8 changes: 8 additions & 0 deletions datafusion/physical-expr/src/expressions/is_not_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use arrow::{
use datafusion_common::Result;
use datafusion_common::ScalarValue;
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr_common::physical_expr::ExprExecutionContext;
use std::hash::Hash;
use std::{any::Any, sync::Arc};

Expand Down Expand Up @@ -108,6 +109,13 @@ impl PhysicalExpr for IsNotNullExpr {
self.arg.fmt_sql(f)?;
write!(f, " IS NOT NULL")
}

fn execute(
self: Arc<Self>,
_context: &ExprExecutionContext,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}
}

/// Create an IS NOT NULL expression
Expand Down
8 changes: 8 additions & 0 deletions datafusion/physical-expr/src/expressions/is_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use arrow::{
use datafusion_common::Result;
use datafusion_common::ScalarValue;
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr_common::physical_expr::ExprExecutionContext;
use std::hash::Hash;
use std::{any::Any, sync::Arc};

Expand Down Expand Up @@ -107,6 +108,13 @@ impl PhysicalExpr for IsNullExpr {
self.arg.fmt_sql(f)?;
write!(f, " IS NULL")
}

fn execute(
self: Arc<Self>,
_context: &ExprExecutionContext,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}
}

/// Create an IS NULL expression
Expand Down
8 changes: 8 additions & 0 deletions datafusion/physical-expr/src/expressions/like.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use arrow::record_batch::RecordBatch;
use datafusion_common::{Result, assert_or_internal_err};
use datafusion_expr::{ColumnarValue, Operator};
use datafusion_physical_expr_common::datum::apply_cmp;
use datafusion_physical_expr_common::physical_expr::ExprExecutionContext;
use std::hash::Hash;
use std::{any::Any, sync::Arc};

Expand Down Expand Up @@ -149,6 +150,13 @@ impl PhysicalExpr for LikeExpr {
write!(f, " {} ", self.op_name())?;
self.pattern.fmt_sql(f)
}

fn execute(
self: Arc<Self>,
_context: &ExprExecutionContext,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}
}

/// used for optimize Dictionary like
Expand Down
8 changes: 8 additions & 0 deletions datafusion/physical-expr/src/expressions/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use datafusion_expr::Expr;
use datafusion_expr_common::columnar_value::ColumnarValue;
use datafusion_expr_common::interval_arithmetic::Interval;
use datafusion_expr_common::sort_properties::{ExprProperties, SortProperties};
use datafusion_physical_expr_common::physical_expr::ExprExecutionContext;

/// Represents a literal value
#[derive(Debug, PartialEq, Eq, Clone)]
Expand Down Expand Up @@ -134,6 +135,13 @@ impl PhysicalExpr for Literal {
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(self, f)
}

fn execute(
self: Arc<Self>,
_context: &ExprExecutionContext,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}
}

/// Create a literal expression
Expand Down
8 changes: 8 additions & 0 deletions datafusion/physical-expr/src/expressions/negative.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use datafusion_expr::{
ColumnarValue,
type_coercion::{is_interval, is_null, is_signed_numeric, is_timestamp},
};
use datafusion_physical_expr_common::physical_expr::ExprExecutionContext;

/// Negative expression
#[derive(Debug, Eq)]
Expand Down Expand Up @@ -178,6 +179,13 @@ impl PhysicalExpr for NegativeExpr {
self.arg.fmt_sql(f)?;
write!(f, ")")
}

fn execute(
self: Arc<Self>,
_context: &ExprExecutionContext,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}
}

/// Creates a unary expression NEGATIVE
Expand Down
Loading
Loading