feat: add plan_splits function for distributed compute#5863
feat: add plan_splits function for distributed compute#5863hamersaw wants to merge 36 commits intolance-format:mainfrom
Conversation
- Add FilteredReadPlan struct using RowAddrTreeMap for row selection - Add get_or_create_plan API for lazy plan computation via OnceCell - Support providing pre-computed plan to FilteredReadExec::try_new - Centralize plan creation in get_or_create_plan_impl - Make RowAddrSelection public in lance-core
- Add FilteredReadInternalPlan (private) using BTreeMap<u32, Vec<Range<u64>>> for efficient local execution without bitmap conversion - Keep FilteredReadPlan (public) using RowAddrTreeMap for distributed execution - Local path: plan_scan() → internal plan → ScopedFragmentRead (zero conversions) - External API: get_or_create_plan() converts internal → external once - with_plan() converts external → internal for distributed workers - Add bitmap_to_ranges() utility in lance-core for efficient bitmap conversion - Use BTreeMap for rows to maintain deterministic fragment order 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
…c implementations in the future Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
rust/lance/src/dataset/scanner.rs
Outdated
There was a problem hiding this comment.
We should just let SplitPlanningOptions implement Default, instead of setting it inline.
There was a problem hiding this comment.
I think the semantics of this make Default not work exactly how I envisioned it. Basically, the user can set max_size_bytes and / or max_row_count (if both then the min is used). However, if the user does not set anything then we fallback to a default. If we use Default to set this, and the user only wants to filter on maximum number of rows they will need to unset the max_size_bytes.
There was a problem hiding this comment.
We could update this to some kind of build logic? splitting_options.with_max_rows(N).build() which allows us to check if neither are set and default there? I'm not convinced this is any clean / clear than the current logic because we still need a fallback in the plan_splits code if neither are set (default value or failure).
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
…an flow rather than creating a new execution path Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
This reverts commit 46ef0c5.
Adding a
plan_splitsfunction to the Scanner to facilitate a single solution for partitioning Lance dataset for efficient distributed compute. This function (1) filters the dataset (using index looking / delete vectors) producing a mapping of fragment IDs to valid row ranges and (2) bin packs these fragment rows ranges into "splits" that target a configurable partition size (in rows count or bytes).The most important areas to agree on:
FilteredReadExec. This is because all refine filters are provided within the pre-computed plan.FilteredReadExecwe maintain the plan whenwith_childrenis called. The problem is that if datafusion attempts an optimization is can clear the plan that we explicitly set. Since this flow injects theFilteredReadExecas a leaf node, it should be OK to copy this over.Example usage through python API: