diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index a18861aa3a69..f4a24ba9d98c 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1115,6 +1115,34 @@ config_namespace! { /// See: pub hash_join_inlist_pushdown_max_distinct_values: usize, default = 150 + /// Minimum number of rows to process before making a selectivity decision + /// for adaptive filtering of join dynamic filters. + /// + /// The filter will remain in a tracking state until this many rows have been + /// processed. This ensures statistical stability before making the disable decision. + /// Only used when `enable_adaptive_filter_selectivity_tracking` is true. + pub adaptive_filter_min_rows_for_selectivity: usize, default = 100_000 + + /// Selectivity threshold for adaptive disabling of join dynamic filters. + /// + /// If the filter passes this fraction or more of rows, it will be disabled. + /// Value should be between 0.0 and 1.0. + /// + /// For example, 0.95 means if 95% or more of rows pass the filter, it will be disabled. + /// Only used when `enable_adaptive_filter_selectivity_tracking` is true. + pub adaptive_filter_selectivity_threshold: f64, default = 0.50 + + /// Enable selectivity-based disabling of dynamic filters from joins. + /// + /// When enabled, join dynamic filters that pass most rows (above the threshold) + /// will be automatically disabled to avoid evaluation overhead. This is useful + /// when the build side of a join covers most of the probe side values, making + /// the filter expensive to evaluate for little benefit. + /// + /// The selectivity tracking resets when the dynamic filter is updated (e.g., when + /// the hash table is built), allowing the filter to be re-evaluated with new data. + pub enable_adaptive_filter_selectivity_tracking: bool, default = true + /// The default filter selectivity used by Filter Statistics /// when an exact selectivity cannot be determined. Valid values are /// between 0 (no selectivity) and 100 (all rows are selected). diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index 31a21274ad62..2af5773fc341 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -263,7 +263,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() { - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false] - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ] AND DynamicFilter [ empty ] " ); @@ -287,7 +287,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() { - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb] - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab AND d@0 IN (SET) ([aa, ab]) ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ d@0 >= aa AND d@0 <= ab AND d@0 IN (SET) ([aa, ab]) ] ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ] " ); } @@ -1003,7 +1003,7 @@ async fn test_hashjoin_dynamic_filter_pushdown() { Ok: - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ] ", ); @@ -1037,7 +1037,7 @@ async fn test_hashjoin_dynamic_filter_pushdown() { @r" - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ] ] " ); } @@ -1213,7 +1213,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ] " ); @@ -1247,7 +1247,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 2 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) ELSE false END ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ CASE hash_repartition % 12 WHEN 2 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) ELSE false END ] ] " ); @@ -1265,7 +1265,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ] ] " ); @@ -1405,7 +1405,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() { - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ] " ); @@ -1437,7 +1437,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() { - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ] ] " ); @@ -1579,8 +1579,8 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() { - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)] - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ] ", ); @@ -1610,8 +1610,8 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() { - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)] - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab AND b@0 IN (SET) ([aa, ab]) ] - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb AND d@0 IN (SET) ([ca, cb]) ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ b@0 >= aa AND b@0 <= ab AND b@0 IN (SET) ([aa, ab]) ] ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ d@0 >= ca AND d@0 <= cb AND d@0 IN (SET) ([ca, cb]) ] ] " ); } @@ -3029,7 +3029,7 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() { - RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true - RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ] " ); @@ -3054,7 +3054,7 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() { - RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true - RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ false ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ false ] ] " ); } @@ -3156,7 +3156,7 @@ async fn test_hashjoin_dynamic_filter_with_nulls() { @r" - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ] " ); @@ -3179,7 +3179,7 @@ async fn test_hashjoin_dynamic_filter_with_nulls() { @r" - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= 1 AND b@1 <= 2 AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:1}, {c0:,c1:2}, {c0:ab,c1:}]) ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= 1 AND b@1 <= 2 AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:1}, {c0:,c1:2}, {c0:ab,c1:}]) ] ] " ); diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 7107b0a9004d..ab21be790383 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -595,7 +595,6 @@ pub fn snapshot_physical_expr( /// Take a snapshot of the given `PhysicalExpr` if it is dynamic. /// -/// Take a snapshot of this `PhysicalExpr` if it is dynamic. /// This is used to capture the current state of `PhysicalExpr`s that may contain /// dynamic references to other operators in order to serialize it over the wire /// or treat it via downcast matching. diff --git a/datafusion/physical-expr/src/expressions/adaptive_selectivity_filter.rs b/datafusion/physical-expr/src/expressions/adaptive_selectivity_filter.rs new file mode 100644 index 000000000000..83e8ecb9eb71 --- /dev/null +++ b/datafusion/physical-expr/src/expressions/adaptive_selectivity_filter.rs @@ -0,0 +1,481 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A wrapper [`PhysicalExpr`] that tracks filter selectivity at runtime and +//! automatically disables filters that aren't pruning enough rows. + +use std::any::Any; +use std::fmt::Display; +use std::hash::Hash; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use arrow::array::BooleanArray; +use arrow::datatypes::{DataType, Schema}; +use arrow::record_batch::RecordBatch; +use datafusion_common::{Result, ScalarValue}; +use datafusion_expr::ColumnarValue; +use datafusion_physical_expr_common::physical_expr::DynHash; + +use crate::PhysicalExpr; +use crate::expressions::lit; + +/// Configuration for selectivity-based filter disabling. +#[derive(Debug, Clone)] +pub struct SelectivityConfig { + /// Threshold above which the filter is disabled (e.g., 0.95 = 95% selectivity). + /// If the filter passes this fraction or more of rows, it will be disabled. + pub threshold: f64, + /// Minimum rows to process before making a selectivity decision. + pub min_rows: usize, +} + +impl Default for SelectivityConfig { + fn default() -> Self { + Self { + threshold: 0.95, + min_rows: 10_000, + } + } +} + +// State values for the atomic state machine +const STATE_TRACKING: u8 = 0; +const STATE_ACTIVE: u8 = 1; +const STATE_DISABLED: u8 = 2; + +/// A wrapper [`PhysicalExpr`] that tracks selectivity and can disable filters +/// that pass too many rows. +/// +/// This wrapper is designed to be used with dynamic filters from joins. +/// It monitors how many rows pass through the filter, and if the filter +/// is found to be ineffective (passes most rows), it automatically disables +/// itself to avoid evaluation overhead. +#[derive(Debug)] +pub struct AdaptiveSelectivityFilterExpr { + /// The inner filter expression (typically DynamicFilterPhysicalExpr). + inner: Arc, + /// Simple atomic state: 0 = Tracking, 1 = Active, 2 = Disabled + /// This allows the hot path to be a single atomic load with no locks. + state: AtomicUsize, + /// Rows that passed the filter (only used in Tracking state). + rows_passed: AtomicUsize, + /// Total rows processed (only used in Tracking state). + rows_total: AtomicUsize, + /// Configuration for selectivity tracking. + config: SelectivityConfig, +} + +impl AdaptiveSelectivityFilterExpr { + /// Create a new `AdaptiveSelectivityFilterExpr` wrapping the given inner expression. + pub fn new(inner: Arc, config: SelectivityConfig) -> Self { + Self { + inner, + state: AtomicUsize::new(STATE_TRACKING as usize), + rows_passed: AtomicUsize::new(0), + rows_total: AtomicUsize::new(0), + config, + } + } + + /// Get the current selectivity information for observability. + /// + /// Returns `(rows_passed, rows_total, is_disabled)`. + #[cfg(test)] + fn selectivity_info(&self) -> (usize, usize, bool) { + let state = self.state.load(Ordering::Relaxed) as u8; + match state { + STATE_TRACKING => { + let passed = self.rows_passed.load(Ordering::Relaxed); + let total = self.rows_total.load(Ordering::Relaxed); + (passed, total, false) + } + STATE_ACTIVE => (0, 0, false), + STATE_DISABLED => (0, 0, true), + _ => (0, 0, false), + } + } + + /// Check if the filter is disabled. + #[cfg(test)] + fn is_disabled(&self) -> bool { + self.state.load(Ordering::Relaxed) as u8 == STATE_DISABLED + } + + /// Get the inner expression. + pub fn inner(&self) -> &Arc { + &self.inner + } + + /// Update tracking statistics after evaluating a batch. + /// Only called when in TRACKING state. + fn update_tracking(&self, result: &ColumnarValue) { + let (true_count, total_count) = match result { + ColumnarValue::Array(array) => { + let Some(bool_array) = array.as_any().downcast_ref::() + else { + // TODO: should this handle / propagate errors instead? + // Can this be a dictionary array or other wrapper type? + return; + }; + (bool_array.true_count(), array.len()) + } + ColumnarValue::Scalar(scalar) => { + if let ScalarValue::Boolean(Some(v)) = scalar { + if *v { (1, 1) } else { (0, 1) } + } else { + // Similarly, should this error? + return; + } + } + }; + + // Update counters + let passed = + self.rows_passed.fetch_add(true_count, Ordering::Relaxed) + true_count; + let new_total = + self.rows_total.fetch_add(total_count, Ordering::Relaxed) + total_count; + + // Check if we've seen enough rows to make a decision + if new_total >= self.config.min_rows { + let selectivity = passed as f64 / new_total as f64; + + // Use compare_exchange to ensure only one thread makes the transition + let new_state = if selectivity >= self.config.threshold { + STATE_DISABLED + } else { + STATE_ACTIVE + }; + + // Try to transition from TRACKING to the new state + // If this fails, another thread already did the transition, which is fine + let _ = self.state.compare_exchange( + STATE_TRACKING as usize, + new_state as usize, + Ordering::Relaxed, + Ordering::Relaxed, + ); + } + } +} + +impl Display for AdaptiveSelectivityFilterExpr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "AdaptiveSelectivity [ {} ]", self.inner) + } +} + +impl Hash for AdaptiveSelectivityFilterExpr { + fn hash(&self, state: &mut H) { + // Hash based on the inner expression + self.inner.dyn_hash(state); + } +} + +impl PartialEq for AdaptiveSelectivityFilterExpr { + fn eq(&self, other: &Self) -> bool { + self.inner.eq(&other.inner) + } +} + +impl Eq for AdaptiveSelectivityFilterExpr {} + +impl PhysicalExpr for AdaptiveSelectivityFilterExpr { + fn as_any(&self) -> &dyn Any { + self + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.inner] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + if children.len() != 1 { + return Err(datafusion_common::DataFusionError::Internal( + "AdaptiveSelectivityFilterExpr expects exactly one child".to_string(), + )); + } + Ok(Arc::new(Self::new( + Arc::clone(&children[0]), + self.config.clone(), + ))) + } + + fn data_type(&self, input_schema: &Schema) -> Result { + self.inner.data_type(input_schema) + } + + fn nullable(&self, input_schema: &Schema) -> Result { + self.inner.nullable(input_schema) + } + + fn evaluate(&self, batch: &RecordBatch) -> Result { + // Fast path: check state first + let state = self.state.load(Ordering::Relaxed) as u8; + if state == STATE_DISABLED { + // Fast path: filter is disabled, return all-true + return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(true)))); + } + + // Evaluate inner expression + let result = self.inner.evaluate(batch)?; + + // Update tracking if still in tracking state + if state == STATE_TRACKING { + self.update_tracking(&result); + } + + Ok(result) + } + + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.inner.fmt_sql(f) + } + + fn snapshot(&self) -> Result>> { + match self.state.load(Ordering::Relaxed) as u8 { + STATE_DISABLED => { + // If disabled, we can return a literal true expression instead + Ok(Some(lit(true) as Arc)) + } + _ => { + // Return the inner expression directly to strip the wrapper during snapshotting. + // This is important for PruningPredicate which needs to pattern-match on the + // underlying expression types (BinaryExpr, InListExpr, etc.) to build pruning + // predicates. If we return None, the wrapper would be preserved and + // PruningPredicate wouldn't recognize it, falling back to lit(true) which + // disables pruning entirely. + // Note: at this point in tree transformation, the inner has already been + // snapshotted via with_new_children, so self.inner is the snapshotted expression. + Ok(Some(Arc::clone(&self.inner))) + } + } + } + + fn snapshot_generation(&self) -> u64 { + let state = self.state.load(Ordering::Relaxed) as u8; + if state == STATE_DISABLED { + // When disabled, return 0 to indicate static behavior + 0 + } else { + // Pass through to inner expression's generation + self.inner.snapshot_generation() + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::expressions::{BinaryExpr, col, lit}; + use arrow::array::{ArrayRef, Int32Array}; + use arrow::datatypes::Field; + use datafusion_expr::Operator; + + fn create_batch(values: Vec) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(values)) as ArrayRef]) + .unwrap() + } + + fn create_filter_expr(threshold: i32) -> Arc { + // Create a filter: a < threshold + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + Arc::new(BinaryExpr::new( + col("a", &schema).unwrap(), + Operator::Lt, + lit(threshold), + )) + } + + #[test] + fn test_high_selectivity_filter_gets_disabled() { + // Create a filter that passes 95%+ of rows: a < 100 (all values pass) + let filter = create_filter_expr(100); + let config = SelectivityConfig { + threshold: 0.95, + min_rows: 100, + }; + let wrapper = AdaptiveSelectivityFilterExpr::new(filter, config); + + // Create batches where all rows pass the filter + let batch = create_batch((0..100).collect()); + + // Evaluate - should process and track + let result = wrapper.evaluate(&batch).unwrap(); + let ColumnarValue::Array(arr) = result else { + panic!("Expected array result"); + }; + assert_eq!(arr.len(), 100); + + // After enough rows, the filter should be disabled + assert!( + wrapper.is_disabled(), + "Filter should be disabled after high selectivity" + ); + } + + #[test] + fn test_low_selectivity_filter_stays_active() { + // Create a filter that passes ~50% of rows: a < 50 + let filter = create_filter_expr(50); + let config = SelectivityConfig { + threshold: 0.95, + min_rows: 100, + }; + let wrapper = AdaptiveSelectivityFilterExpr::new(filter, config); + + // Create batch where ~50% pass + let batch = create_batch((0..100).collect()); + + // Evaluate + let _result = wrapper.evaluate(&batch).unwrap(); + + // Filter should stay active (not disabled) + assert!( + !wrapper.is_disabled(), + "Low selectivity filter should stay active" + ); + } + + #[test] + fn test_disabled_filter_returns_all_true() { + // Create a filter that will be disabled + let filter = create_filter_expr(100); // All pass + let config = SelectivityConfig { + threshold: 0.95, + min_rows: 10, + }; + let wrapper = AdaptiveSelectivityFilterExpr::new(filter, config); + + // First batch - get it disabled + let batch = create_batch((0..100).collect()); + let _ = wrapper.evaluate(&batch).unwrap(); + + assert!(wrapper.is_disabled(), "Filter should be disabled"); + + // Now create a batch where the original filter would return some false + // But since we're disabled, we should get scalar true (efficient bypass) + let batch2 = create_batch(vec![200, 201, 202]); // These would fail a < 100 + let result = wrapper.evaluate(&batch2).unwrap(); + + // Should return scalar true when disabled + let ColumnarValue::Scalar(ScalarValue::Boolean(Some(true))) = result else { + panic!("Expected scalar true result when disabled, got: {result:?}"); + }; + } + + #[test] + fn test_min_rows_threshold_respected() { + let filter = create_filter_expr(100); // All pass + let config = SelectivityConfig { + threshold: 0.95, + min_rows: 1000, // High threshold + }; + let wrapper = AdaptiveSelectivityFilterExpr::new(filter, config); + + // Process less than min_rows + let batch = create_batch((0..100).collect()); + let _ = wrapper.evaluate(&batch).unwrap(); + + // Should still be tracking, not yet disabled + let (passed, total, disabled) = wrapper.selectivity_info(); + assert_eq!(passed, 100); + assert_eq!(total, 100); + assert!( + !disabled, + "Should still be tracking under min_rows threshold" + ); + } + + #[test] + fn test_display() { + let filter = create_filter_expr(50); + let config = SelectivityConfig::default(); + let wrapper = AdaptiveSelectivityFilterExpr::new(filter, config); + + let display = format!("{wrapper}"); + assert!( + display.contains("AdaptiveSelectivity"), + "Display should show wrapper name" + ); + } + + #[test] + fn test_with_new_children() { + let filter = create_filter_expr(50); + let config = SelectivityConfig { + threshold: 0.80, + min_rows: 5000, + }; + let wrapper = Arc::new(AdaptiveSelectivityFilterExpr::new(filter, config)); + + let new_filter = create_filter_expr(75); + let new_wrapper = wrapper.with_new_children(vec![new_filter]).unwrap(); + + // Should create a new wrapper with the new child + let new_wrapper = new_wrapper + .as_any() + .downcast_ref::() + .unwrap(); + assert!(!new_wrapper.is_disabled()); + } + + #[test] + fn test_active_state_no_tracking_overhead() { + // Test that once in Active state, there's minimal overhead + let filter = create_filter_expr(50); // ~50% pass rate + let config = SelectivityConfig { + threshold: 0.95, + min_rows: 100, + }; + let wrapper = AdaptiveSelectivityFilterExpr::new(filter, config); + + // Process enough rows to transition to Active + let batch = create_batch((0..100).collect()); + let _ = wrapper.evaluate(&batch).unwrap(); + + // Should be in Active state now + assert!(!wrapper.is_disabled()); + let state = wrapper.state.load(Ordering::Relaxed) as u8; + assert_eq!(state, STATE_ACTIVE, "Should be in Active state"); + + // Further evaluations should not update tracking counters + let initial_passed = wrapper.rows_passed.load(Ordering::Relaxed); + let initial_total = wrapper.rows_total.load(Ordering::Relaxed); + + // Evaluate more batches + for _ in 0..10 { + let _ = wrapper.evaluate(&batch).unwrap(); + } + + // Counters should NOT have changed (no tracking in Active state) + assert_eq!( + wrapper.rows_passed.load(Ordering::Relaxed), + initial_passed, + "Counters should not change in Active state" + ); + assert_eq!( + wrapper.rows_total.load(Ordering::Relaxed), + initial_total, + "Counters should not change in Active state" + ); + } +} diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index c9e02708d6c2..3ff668a56869 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -19,6 +19,7 @@ #[macro_use] mod binary; +mod adaptive_selectivity_filter; mod case; mod cast; mod cast_column; @@ -39,6 +40,7 @@ pub use crate::PhysicalSortExpr; /// Module with some convenient methods used in expression building pub use crate::aggregate::stats::StatsType; +pub use adaptive_selectivity_filter::{AdaptiveSelectivityFilterExpr, SelectivityConfig}; pub use binary::{BinaryExpr, binary, similar_to}; pub use case::{CaseExpr, case}; pub use cast::{CastExpr, cast}; diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index a330ad54cb33..5e5620a4bd62 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -80,7 +80,9 @@ use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumula use datafusion_physical_expr::equivalence::{ ProjectionMapping, join_equivalence_properties, }; -use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit}; +use datafusion_physical_expr::expressions::{ + AdaptiveSelectivityFilterExpr, DynamicFilterPhysicalExpr, SelectivityConfig, lit, +}; use datafusion_physical_expr::projection::{ProjectionRef, combine_projections}; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; @@ -1417,7 +1419,25 @@ impl ExecutionPlan for HashJoinExec { { // Add actual dynamic filter to right side (probe side) let dynamic_filter = Self::create_dynamic_filter(&self.on); - right_child = right_child.with_self_filter(dynamic_filter); + + // Optionally wrap with selectivity tracking + let filter_expr: Arc = if config + .optimizer + .enable_adaptive_filter_selectivity_tracking + { + let selectivity_config = SelectivityConfig { + threshold: config.optimizer.adaptive_filter_selectivity_threshold, + min_rows: config.optimizer.adaptive_filter_min_rows_for_selectivity, + }; + Arc::new(AdaptiveSelectivityFilterExpr::new( + dynamic_filter, + selectivity_config, + )) + } else { + dynamic_filter + }; + + right_child = right_child.with_self_filter(filter_expr); } Ok(FilterDescription::new() @@ -1451,10 +1471,31 @@ impl ExecutionPlan for HashJoinExec { if let Some(filter) = right_child_self_filters.first() { // Note that we don't check PushdDownPredicate::discrimnant because even if nothing said // "yes, I can fully evaluate this filter" things might still use it for statistics -> it's worth updating - let predicate = Arc::clone(&filter.predicate); - if let Ok(dynamic_filter) = - Arc::downcast::(predicate) - { + + // Try to extract the DynamicFilterPhysicalExpr, either directly or from a AdaptiveSelectivityFilterExpr wrapper + let maybe_dynamic_filter: Option> = { + let predicate = Arc::clone(&filter.predicate); + // First, try direct downcast to DynamicFilterPhysicalExpr + // Using .clone() instead of Arc::clone because it enables implicit coercion to Arc + #[expect(clippy::clone_on_ref_ptr)] + if let Ok(df) = + Arc::downcast::(predicate.clone()) + { + Some(df) + } else if let Some(wrapper) = predicate + .as_any() + .downcast_ref::( + ) { + // Try to get it from a AdaptiveSelectivityFilterExpr wrapper + #[expect(clippy::clone_on_ref_ptr)] + Arc::downcast::(wrapper.inner().clone()) + .ok() + } else { + None + } + }; + + if let Some(dynamic_filter) = maybe_dynamic_filter { // We successfully pushed down our self filter - we need to make a new node with the dynamic filter let new_node = Arc::new(HashJoinExec { left: Arc::clone(&self.left), diff --git a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt index b112d70f427f..3dadfd5ad7b0 100644 --- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt +++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt @@ -158,7 +158,7 @@ physical_plan 01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info] 02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[info@1, id@2, data@3] 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ] +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ] # Disable Join dynamic filter pushdown statement ok @@ -213,7 +213,7 @@ physical_plan 01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info] 02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[info@1, id@2, data@3] 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ] +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ] # Enable TopK, disable Join statement ok @@ -417,7 +417,7 @@ physical_plan 01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info] 02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[info@1, id@2, data@3] 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ] +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ] # Cleanup @@ -457,3 +457,104 @@ SET datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown = true; statement ok SET datafusion.optimizer.enable_dynamic_filter_pushdown = true; + +# Test 6: Adaptive selectivity-based dynamic filter disabling configuration +# These options control automatic disabling of dynamic filters that pass most rows + +statement ok +set datafusion.catalog.information_schema = true + +# Verify default values for adaptive selectivity tracking config options +query T +SELECT value FROM information_schema.df_settings +WHERE name = 'datafusion.optimizer.enable_adaptive_filter_selectivity_tracking'; +---- +true + +query T +SELECT value FROM information_schema.df_settings +WHERE name = 'datafusion.optimizer.adaptive_filter_selectivity_threshold'; +---- +0.5 + +query T +SELECT value FROM information_schema.df_settings +WHERE name = 'datafusion.optimizer.adaptive_filter_min_rows_for_selectivity'; +---- +100000 + +# Enable adaptive selectivity tracking +statement ok +SET datafusion.optimizer.enable_adaptive_filter_selectivity_tracking = true; + +# Verify it's enabled +query T +SELECT value FROM information_schema.df_settings +WHERE name = 'datafusion.optimizer.enable_adaptive_filter_selectivity_tracking'; +---- +true + +# Set custom threshold and min_rows +statement ok +SET datafusion.optimizer.adaptive_filter_selectivity_threshold = 0.80; + +statement ok +SET datafusion.optimizer.adaptive_filter_min_rows_for_selectivity = 5000; + +# Verify custom values +query T +SELECT value FROM information_schema.df_settings +WHERE name = 'datafusion.optimizer.adaptive_filter_selectivity_threshold'; +---- +0.8 + +query T +SELECT value FROM information_schema.df_settings +WHERE name = 'datafusion.optimizer.adaptive_filter_min_rows_for_selectivity'; +---- +5000 + +statement ok +set datafusion.catalog.information_schema = false + +# Test that join queries still work correctly with adaptive selectivity tracking enabled +statement ok +CREATE TABLE sel_left(id INT, data VARCHAR) AS VALUES +(1, 'left1'), +(2, 'left2'), +(3, 'left3'), +(4, 'left4'), +(5, 'left5'); + +statement ok +CREATE TABLE sel_right(id INT, info VARCHAR) AS VALUES +(1, 'right1'), +(3, 'right3'), +(5, 'right5'); + +# Verify join returns correct results with adaptive selectivity tracking enabled +query ITT rowsort +SELECT l.id, l.data, r.info +FROM sel_left l +INNER JOIN sel_right r ON l.id = r.id; +---- +1 left1 right1 +3 left3 right3 +5 left5 right5 + +# Cleanup selectivity test tables +statement ok +DROP TABLE sel_left; + +statement ok +DROP TABLE sel_right; + +# Reset adaptive selectivity tracking configs to defaults +statement ok +SET datafusion.optimizer.enable_adaptive_filter_selectivity_tracking = false; + +statement ok +SET datafusion.optimizer.adaptive_filter_selectivity_threshold = 0.95; + +statement ok +SET datafusion.optimizer.adaptive_filter_min_rows_for_selectivity = 10000; diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index e31cdbe0aad2..d9749a48bef2 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -291,8 +291,11 @@ datafusion.format.time_format %H:%M:%S%.f datafusion.format.timestamp_format %Y-%m-%dT%H:%M:%S%.f datafusion.format.timestamp_tz_format NULL datafusion.format.types_info false +datafusion.optimizer.adaptive_filter_min_rows_for_selectivity 100000 +datafusion.optimizer.adaptive_filter_selectivity_threshold 0.5 datafusion.optimizer.allow_symmetric_joins_without_pruning true datafusion.optimizer.default_filter_selectivity 20 +datafusion.optimizer.enable_adaptive_filter_selectivity_tracking true datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown true datafusion.optimizer.enable_distinct_aggregation_soft_limit true datafusion.optimizer.enable_dynamic_filter_pushdown true @@ -428,8 +431,11 @@ datafusion.format.time_format %H:%M:%S%.f Time format for time arrays datafusion.format.timestamp_format %Y-%m-%dT%H:%M:%S%.f Timestamp format for timestamp arrays datafusion.format.timestamp_tz_format NULL Timestamp format for timestamp with timezone arrays. When `None`, ISO 8601 format is used. datafusion.format.types_info false Show types in visual representation batches +datafusion.optimizer.adaptive_filter_min_rows_for_selectivity 100000 Minimum number of rows to process before making a selectivity decision for adaptive filtering of join dynamic filters. The filter will remain in a tracking state until this many rows have been processed. This ensures statistical stability before making the disable decision. Only used when `enable_adaptive_filter_selectivity_tracking` is true. +datafusion.optimizer.adaptive_filter_selectivity_threshold 0.5 Selectivity threshold for adaptive disabling of join dynamic filters. If the filter passes this fraction or more of rows, it will be disabled. Value should be between 0.0 and 1.0. For example, 0.95 means if 95% or more of rows pass the filter, it will be disabled. Only used when `enable_adaptive_filter_selectivity_tracking` is true. datafusion.optimizer.allow_symmetric_joins_without_pruning true Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. datafusion.optimizer.default_filter_selectivity 20 The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). +datafusion.optimizer.enable_adaptive_filter_selectivity_tracking true Enable selectivity-based disabling of dynamic filters from joins. When enabled, join dynamic filters that pass most rows (above the threshold) will be automatically disabled to avoid evaluation overhead. This is useful when the build side of a join covers most of the probe side values, making the filter expensive to evaluate for little benefit. The selectivity tracking resets when the dynamic filter is updated (e.g., when the hash table is built), allowing the filter to be re-evaluated with new data. datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Aggregate dynamic filters into the file scan phase. datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. diff --git a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt index 34c5fd97b51f..9358dcb99893 100644 --- a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt +++ b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt @@ -340,7 +340,7 @@ physical_plan 10)------------------FilterExec: service@2 = log 11)--------------------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1 12)----------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension/data.parquet]]}, projection=[d_dkey, env, service], file_type=parquet, predicate=service@2 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] -13)----------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] +13)----------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ] # Verify results without optimization query TTTIR rowsort @@ -392,7 +392,7 @@ physical_plan 07)------------FilterExec: service@2 = log 08)--------------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1 09)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension/data.parquet]]}, projection=[d_dkey, env, service], file_type=parquet, predicate=service@2 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] -10)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] +10)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ] query TTTIR rowsort SELECT f.f_dkey, MAX(d.env), MAX(d.service), count(*), sum(f.value) diff --git a/datafusion/sqllogictest/test_files/projection_pushdown.slt b/datafusion/sqllogictest/test_files/projection_pushdown.slt index dd8ca26e4cda..790139b8d98b 100644 --- a/datafusion/sqllogictest/test_files/projection_pushdown.slt +++ b/datafusion/sqllogictest/test_files/projection_pushdown.slt @@ -1432,7 +1432,7 @@ physical_plan 01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)] 02)--FilterExec: get_field(s@1, value) > 150, projection=[id@0] 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]}, projection=[id, s], file_type=parquet -04)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ] +04)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[id], file_type=parquet, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ] # Verify correctness - id matches and value > 150 query II @@ -1470,7 +1470,7 @@ physical_plan 02)--FilterExec: get_field(s@1, value) > 100, projection=[id@0] 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]}, projection=[id, s], file_type=parquet 04)--FilterExec: get_field(s@1, level) > 3, projection=[id@0] -05)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[id, s], file_type=parquet, predicate=DynamicFilter [ empty ] +05)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[id, s], file_type=parquet, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ] # Verify correctness - id matches, value > 100, and level > 3 # Matching ids where value > 100: 2(200), 3(150), 4(300), 5(250) @@ -1504,7 +1504,7 @@ physical_plan 01)ProjectionExec: expr=[id@0 as id, get_field(s@1, label) as simple_struct.s[label], get_field(s@2, role) as join_right.s[role]] 02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[id@0, s@1, s@3] 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]}, projection=[id, s], file_type=parquet -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[id, s], file_type=parquet, predicate=DynamicFilter [ empty ] +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[id, s], file_type=parquet, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ] # Verify correctness query ITT @@ -1536,7 +1536,7 @@ logical_plan physical_plan 01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]}, projection=[id], file_type=parquet -03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ] +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[id], file_type=parquet, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ] # Verify correctness query II diff --git a/datafusion/sqllogictest/test_files/push_down_filter.slt b/datafusion/sqllogictest/test_files/push_down_filter.slt index b1cb354e053e..318c7a96f42b 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter.slt @@ -302,7 +302,7 @@ physical_plan 01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/small_table.parquet]]}, projection=[k], file_type=parquet 03)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilter [ empty ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[] +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND AdaptiveSelectivity [ DynamicFilter [ empty ] ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[] statement ok drop table small_table; diff --git a/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt b/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt index e2c9fa423793..4fdcb8c8393d 100644 --- a/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt +++ b/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt @@ -383,7 +383,7 @@ physical_plan 14)--------------------------CoalescePartitionsExec 15)----------------------------FilterExec: service@1 = log, projection=[env@0, d_dkey@2] 16)------------------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=A/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=C/data.parquet]]}, projection=[env, service, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] -17)--------------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] +17)--------------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ] # Verify results without subset satisfaction query TPR rowsort @@ -479,7 +479,7 @@ physical_plan 11)--------------------CoalescePartitionsExec 12)----------------------FilterExec: service@1 = log, projection=[env@0, d_dkey@2] 13)------------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=A/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=C/data.parquet]]}, projection=[env, service, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] -14)--------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] +14)--------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ] # Verify results match with subset satisfaction query TPR rowsort diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index aaba453b3541..6bd0f1e64d0f 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -161,6 +161,9 @@ The following configuration settings are available: | datafusion.optimizer.hash_join_single_partition_threshold_rows | 131072 | The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition | | datafusion.optimizer.hash_join_inlist_pushdown_max_size | 131072 | Maximum size in bytes for the build side of a hash join to be pushed down as an InList expression for dynamic filtering. Build sides larger than this will use hash table lookups instead. Set to 0 to always use hash table lookups. InList pushdown can be more efficient for small build sides because it can result in better statistics pruning as well as use any bloom filters present on the scan side. InList expressions are also more transparent and easier to serialize over the network in distributed uses of DataFusion. On the other hand InList pushdown requires making a copy of the data and thus adds some overhead to the build side and uses more memory. This setting is per-partition, so we may end up using `hash_join_inlist_pushdown_max_size` \* `target_partitions` memory. The default is 128kB per partition. This should allow point lookup joins (e.g. joining on a unique primary key) to use InList pushdown in most cases but avoids excessive memory usage or overhead for larger joins. | | datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values | 150 | Maximum number of distinct values (rows) in the build side of a hash join to be pushed down as an InList expression for dynamic filtering. Build sides with more rows than this will use hash table lookups instead. Set to 0 to always use hash table lookups. This provides an additional limit beyond `hash_join_inlist_pushdown_max_size` to prevent very large IN lists that might not provide much benefit over hash table lookups. This uses the deduplicated row count once the build side has been evaluated. The default is 150 values per partition. This is inspired by Trino's `max-filter-keys-per-column` setting. See: | +| datafusion.optimizer.adaptive_filter_min_rows_for_selectivity | 100000 | Minimum number of rows to process before making a selectivity decision for adaptive filtering of join dynamic filters. The filter will remain in a tracking state until this many rows have been processed. This ensures statistical stability before making the disable decision. Only used when `enable_adaptive_filter_selectivity_tracking` is true. | +| datafusion.optimizer.adaptive_filter_selectivity_threshold | 0.5 | Selectivity threshold for adaptive disabling of join dynamic filters. If the filter passes this fraction or more of rows, it will be disabled. Value should be between 0.0 and 1.0. For example, 0.95 means if 95% or more of rows pass the filter, it will be disabled. Only used when `enable_adaptive_filter_selectivity_tracking` is true. | +| datafusion.optimizer.enable_adaptive_filter_selectivity_tracking | true | Enable selectivity-based disabling of dynamic filters from joins. When enabled, join dynamic filters that pass most rows (above the threshold) will be automatically disabled to avoid evaluation overhead. This is useful when the build side of a join covers most of the probe side values, making the filter expensive to evaluate for little benefit. The selectivity tracking resets when the dynamic filter is updated (e.g., when the hash table is built), allowing the filter to be re-evaluated with new data. | | datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). | | datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave | | datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. |