feat: Add selectivity-tracking wrapper for dynamic filters#20160
feat: Add selectivity-tracking wrapper for dynamic filters#20160adriangb wants to merge 13 commits intoapache:mainfrom
Conversation
8ccef6c to
9a1ebd9
Compare
datafusion/common/src/config.rs
Outdated
| /// Enable selectivity-based disabling of dynamic filters from joins. | ||
| /// | ||
| /// When enabled, join dynamic filters that pass most rows (above the threshold) | ||
| /// will be automatically disabled to avoid evaluation overhead. This is useful | ||
| /// when the build side of a join covers most of the probe side values, making | ||
| /// the filter expensive to evaluate for little benefit. | ||
| /// | ||
| /// The selectivity tracking resets when the dynamic filter is updated (e.g., when | ||
| /// the hash table is built), allowing the filter to be re-evaluated with new data. | ||
| pub enable_dynamic_filter_selectivity_tracking: bool, default = false | ||
|
|
||
| /// Selectivity threshold for disabling join dynamic filters. | ||
| /// | ||
| /// If the filter passes this fraction or more of rows, it will be disabled. | ||
| /// Value should be between 0.0 and 1.0. | ||
| /// | ||
| /// For example, 0.95 means if 95% or more of rows pass the filter, it will be disabled. | ||
| /// Only used when `enable_dynamic_filter_selectivity_tracking` is true. | ||
| pub dynamic_filter_selectivity_threshold: f64, default = 0.95 | ||
|
|
||
| /// Minimum number of rows to process before making a selectivity decision | ||
| /// for join dynamic filters. | ||
| /// | ||
| /// The filter will remain in a tracking state until this many rows have been | ||
| /// processed. This ensures statistical stability before making the disable decision. | ||
| /// Only used when `enable_dynamic_filter_selectivity_tracking` is true. | ||
| pub dynamic_filter_min_rows_for_selectivity: usize, default = 10_000 |
There was a problem hiding this comment.
Not sure we need all of these, or at least not sure they should be prefixed with dynamic_filter
| pub struct SelectivityConfig { | ||
| /// Threshold above which the filter is disabled (e.g., 0.95 = 95% selectivity). | ||
| /// If the filter passes this fraction or more of rows, it will be disabled. | ||
| pub threshold: f64, |
There was a problem hiding this comment.
Could be in GB/s? Rows/s?
| /// If the filter passes this fraction or more of rows, it will be disabled. | ||
| pub threshold: f64, | ||
| /// Minimum rows to process before making a selectivity decision. | ||
| pub min_rows: usize, |
There was a problem hiding this comment.
Could be in GB? Number of batches? Time? (or all of the above?)
|
run benchmark tpchds tpch |
|
run benchmark tpchds tpch tpch10 |
|
run benchmarks |
|
show benchmark queue |
|
run benchmark tpcds |
|
run benchmark tpch |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
I expect benchmarks to look bad - there's overhead this wrapper introduces right now even in no-op mode. I'm going to push a fix and run benches again. |
|
run benchmark tpch |
|
🤖 |
|
run benchmark tpcds |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmark tpcds |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmark tpcds |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
Hmm I am seeing much larger (1.20x faster) changes locally. Will have to run again and see what’s going on. I do think it’s worth pondering what the units should be here. In particular I think we should incorporate filter evaluation time, something like (rows_filtered)/(compute_time) or (bytes_filtered)/(compute_time) (bytes/s, higher is better). |
Add `SelectivityAwareFilterExpr`, a wrapper `PhysicalExpr` that tracks filter selectivity at runtime and automatically disables filters that aren't pruning enough rows. This addresses the issue where dynamic filters from `HashJoinExec` can be expensive to evaluate for little benefit when the build side covers most of the probe side values. Key features: - Selectivity threshold: Filter disabled when rows_passed/rows_total >= threshold - Minimum rows: Statistics collected for min_rows before making a decision - Generation-aware reset: Resets when inner filter updates (e.g., hash table built) - Permanent disable: Once disabled, stays disabled for rest of query - Disabled behavior: Returns all-true array to bypass filter evaluation New configuration options in OptimizerOptions: - enable_dynamic_filter_selectivity_tracking (default: false) - dynamic_filter_selectivity_threshold (default: 0.95) - dynamic_filter_min_rows_for_selectivity (default: 10000) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Remove duplicate import of DynamicFilterPhysicalExpr and lit - Add new selectivity tracking config options to information_schema.slt Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Rename configuration options and the wrapper expression for clarity: - dynamic_filter_min_rows_for_selectivity -> adaptive_filter_min_rows_for_selectivity - dynamic_filter_selectivity_threshold -> adaptive_filter_selectivity_threshold - enable_dynamic_filter_selectivity_tracking -> enable_adaptive_filter_selectivity_tracking - SelectivityAwareFilterExpr -> AdaptiveSelectivityFilterExpr - selectivity_aware_filter.rs -> adaptive_selectivity_filter.rs Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Remove RwLock and optimize the hot path: - Replace RwLock<SelectivityState> with simple atomics - Fast path for ACTIVE state: single atomic load, no tracking - Fast path for DISABLED state: single atomic load, return all-true - Only call snapshot_generation() in TRACKING state - No counter updates in ACTIVE state This eliminates the performance overhead that was causing slowdowns even with threshold=1.0, since the overhead came from lock acquisition and generation checks on every evaluate() call. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
The snapshot() method was returning None, causing the wrapper to be preserved during snapshotting. Since PruningPredicate doesn't recognize the wrapper type, it fell back to lit(true) which disabled all row group and file pruning. Changed snapshot() to return the inner expression directly, stripping the wrapper during snapshotting so pruning predicates work correctly. Also set enable_adaptive_filter_selectivity_tracking default to false since this is an experimental feature. Benchmarks show no slowdowns after this fix. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Simplifies the implementation by removing generation-aware reset logic: - No longer tracks inner filter's generation - snapshot_generation() returns inner's generation, or 0 when disabled - Faster evaluate() path without generation checks The generation tracking was unnecessary for hash join filters and added overhead to the hot path. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
a43489c to
bc6cfac
Compare
Summary
Add
SelectivityAwareFilterExpr, a wrapperPhysicalExprthat tracks filter selectivity at runtime and automatically disables filters that aren't pruning enough rows. This addresses the issue where dynamic filters fromHashJoinExeccan be expensive to evaluate for little benefit when the build side covers most of the probe side values.Key Features
rows_passed / rows_total >= thresholdmin_rowsbefore making a decisionNew Configuration Options
Added to
OptimizerOptions:enable_dynamic_filter_selectivity_tracking(default:false)dynamic_filter_selectivity_threshold(default:0.95)dynamic_filter_min_rows_for_selectivity(default:10000)Files Changed
datafusion/physical-expr/src/expressions/selectivity_aware_filter.rsdatafusion/physical-expr/src/expressions/mod.rsdatafusion/common/src/config.rsOptimizerOptionsdatafusion/physical-plan/src/joins/hash_join/exec.rsdatafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.sltTest plan
SelectivityAwareFilterExpr(6 tests)🤖 Generated with Claude Code