Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,34 @@ config_namespace! {
/// See: <https://trino.io/docs/current/admin/dynamic-filtering.html#dynamic-filter-collection-thresholds>
pub hash_join_inlist_pushdown_max_distinct_values: usize, default = 150

/// Minimum number of rows to process before making a selectivity decision
/// for adaptive filtering of join dynamic filters.
///
/// The filter will remain in a tracking state until this many rows have been
/// processed. This ensures statistical stability before making the disable decision.
/// Only used when `enable_adaptive_filter_selectivity_tracking` is true.
pub adaptive_filter_min_rows_for_selectivity: usize, default = 100_000

/// Selectivity threshold for adaptive disabling of join dynamic filters.
///
/// If the filter passes this fraction or more of rows, it will be disabled.
/// Value should be between 0.0 and 1.0.
///
/// For example, 0.95 means if 95% or more of rows pass the filter, it will be disabled.
/// Only used when `enable_adaptive_filter_selectivity_tracking` is true.
pub adaptive_filter_selectivity_threshold: f64, default = 0.50

/// Enable selectivity-based disabling of dynamic filters from joins.
///
/// When enabled, join dynamic filters that pass most rows (above the threshold)
/// will be automatically disabled to avoid evaluation overhead. This is useful
/// when the build side of a join covers most of the probe side values, making
/// the filter expensive to evaluate for little benefit.
///
/// The selectivity tracking resets when the dynamic filter is updated (e.g., when
/// the hash table is built), allowing the filter to be re-evaluated with new data.
pub enable_adaptive_filter_selectivity_tracking: bool, default = true

/// The default filter selectivity used by Filter Statistics
/// when an exact selectivity cannot be determined. Valid values are
/// between 0 (no selectivity) and 100 (all rows are selected).
Expand Down
34 changes: 17 additions & 17 deletions datafusion/core/tests/physical_optimizer/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() {
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false]
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ] AND DynamicFilter [ empty ]
"
);

Expand All @@ -287,7 +287,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() {
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb]
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab AND d@0 IN (SET) ([aa, ab]) ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ d@0 >= aa AND d@0 <= ab AND d@0 IN (SET) ([aa, ab]) ] ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
"
);
}
Expand Down Expand Up @@ -1003,7 +1003,7 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
Ok:
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ]
",
);

Expand Down Expand Up @@ -1037,7 +1037,7 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
@r"
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ] ]
"
);
}
Expand Down Expand Up @@ -1213,7 +1213,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ]
"
);

Expand Down Expand Up @@ -1247,7 +1247,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 2 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) ELSE false END ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ CASE hash_repartition % 12 WHEN 2 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) ELSE false END ] ]
"
);

Expand All @@ -1265,7 +1265,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ] ]
"
);

Expand Down Expand Up @@ -1405,7 +1405,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() {
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ]
"
);

Expand Down Expand Up @@ -1437,7 +1437,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() {
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ] ]
"
);

Expand Down Expand Up @@ -1579,8 +1579,8 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ]
",
);

Expand Down Expand Up @@ -1610,8 +1610,8 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab AND b@0 IN (SET) ([aa, ab]) ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb AND d@0 IN (SET) ([ca, cb]) ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ b@0 >= aa AND b@0 <= ab AND b@0 IN (SET) ([aa, ab]) ] ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ d@0 >= ca AND d@0 <= cb AND d@0 IN (SET) ([ca, cb]) ] ]
"
);
}
Expand Down Expand Up @@ -3029,7 +3029,7 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() {
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ]
"
);

Expand All @@ -3054,7 +3054,7 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() {
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ false ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ false ] ]
"
);
}
Expand Down Expand Up @@ -3156,7 +3156,7 @@ async fn test_hashjoin_dynamic_filter_with_nulls() {
@r"
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ empty ] ]
"
);

Expand All @@ -3179,7 +3179,7 @@ async fn test_hashjoin_dynamic_filter_with_nulls() {
@r"
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= 1 AND b@1 <= 2 AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:1}, {c0:,c1:2}, {c0:ab,c1:}]) ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=AdaptiveSelectivity [ DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= 1 AND b@1 <= 2 AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:1}, {c0:,c1:2}, {c0:ab,c1:}]) ] ]
"
);

Expand Down
1 change: 0 additions & 1 deletion datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,6 @@ pub fn snapshot_physical_expr(

/// Take a snapshot of the given `PhysicalExpr` if it is dynamic.
///
/// Take a snapshot of this `PhysicalExpr` if it is dynamic.
/// This is used to capture the current state of `PhysicalExpr`s that may contain
/// dynamic references to other operators in order to serialize it over the wire
/// or treat it via downcast matching.
Expand Down
Loading