Skip to content

feat: add plan_splits function for distributed compute#5863

Open
hamersaw wants to merge 36 commits intolance-format:mainfrom
hamersaw:feature/plan-splits
Open

feat: add plan_splits function for distributed compute#5863
hamersaw wants to merge 36 commits intolance-format:mainfrom
hamersaw:feature/plan-splits

Conversation

@hamersaw
Copy link
Contributor

@hamersaw hamersaw commented Jan 30, 2026

Adding a plan_splits function to the Scanner to facilitate a single solution for partitioning Lance dataset for efficient distributed compute. This function (1) filters the dataset (using index looking / delete vectors) producing a mapping of fragment IDs to valid row ranges and (2) bin packs these fragment rows ranges into "splits" that target a configurable partition size (in rows count or bytes).

The most important areas to agree on:

  • We explicitly disable refine filters when injecting the FilteredReadExec. This is because all refine filters are provided within the pre-computed plan.
  • Within the FilteredReadExec we maintain the plan when with_children is called. The problem is that if datafusion attempts an optimization is can clear the plan that we explicitly set. Since this flow injects the FilteredReadExec as a leaf node, it should be OK to copy this over.

Example usage through python API:

import lance
from lance import Split
import pyarrow as pa
import shutil

# insert initial table and create index
dataset = lance.write_dataset(
    pa.Table.from_pylist([{"id": 1, "name": "Alice", "age": 20, "weight": 130.5},
                          {"id": 2, "name": "Bob", "age": 30, "weight": 180.0},
                          {"id": 3, "name": "David", "age": 42, "weight": 200.2}]),
    "memory://lance.test")

dataset.insert(
    pa.Table.from_pylist([{"id": 4, "name": "Ricky", "age": 22, "weight": 150.0},
                          {"id": 5, "name": "Carl", "age": 29, "weight": 120.3}],
    ))

dataset.create_scalar_index(
    column="age",
    index_type="BTREE"
)

# insert more data (unindexed)
dataset.insert(
    pa.Table.from_pylist([{"id": 6, "name": "Carla", "age": 37, "weight": 150.0},
                          {"id": 7, "name": "Eve", "age": 29, "weight": 120.3}],
    ))


scanner = dataset.scanner(columns=["weight", "_rowid", "name"], filter="age >= 30 AND weight <= 200.0")

# evaluate splits
splits = scanner.plan_splits(max_row_count=2)
for split in splits:
    # serialize and deserialize split
    split_bytes = split.to_bytes()
    new_split = Split.from_bytes(split_bytes, dataset._ds)

    # read split data
    scanner = dataset.scanner(columns=new_split.output_columns).with_filtered_read_exec(new_split.filtered_read_exec)
    reader = scanner.to_reader()

    table = reader.read_all()
    print(table.to_pydict())

LuQQiu and others added 5 commits January 29, 2026 14:23
- Add FilteredReadPlan struct using RowAddrTreeMap for row selection
- Add get_or_create_plan API for lazy plan computation via OnceCell
- Support providing pre-computed plan to FilteredReadExec::try_new
- Centralize plan creation in get_or_create_plan_impl
- Make RowAddrSelection public in lance-core
- Add FilteredReadInternalPlan (private) using BTreeMap<u32, Vec<Range<u64>>>
  for efficient local execution without bitmap conversion
- Keep FilteredReadPlan (public) using RowAddrTreeMap for distributed execution
- Local path: plan_scan() → internal plan → ScopedFragmentRead (zero conversions)
- External API: get_or_create_plan() converts internal → external once
- with_plan() converts external → internal for distributed workers
- Add bitmap_to_ranges() utility in lance-core for efficient bitmap conversion
- Use BTreeMap for rows to maintain deterministic fragment order

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
@github-actions github-actions bot added the enhancement New feature or request label Jan 30, 2026
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
@codecov
Copy link

codecov bot commented Jan 30, 2026

Codecov Report

❌ Patch coverage is 93.34187% with 52 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
rust/lance/src/dataset/scanner.rs 92.49% 22 Missing and 16 partials ⚠️
rust/lance/src/dataset/split.rs 94.81% 10 Missing and 4 partials ⚠️

📢 Thoughts on this report? Let us know!

Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
…c implementations in the future

Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
@github-actions github-actions bot added the java label Feb 4, 2026
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
@hamersaw hamersaw marked this pull request as ready for review February 5, 2026 21:33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should just let SplitPlanningOptions implement Default, instead of setting it inline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the semantics of this make Default not work exactly how I envisioned it. Basically, the user can set max_size_bytes and / or max_row_count (if both then the min is used). However, if the user does not set anything then we fallback to a default. If we use Default to set this, and the user only wants to filter on maximum number of rows they will need to unset the max_size_bytes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could update this to some kind of build logic? splitting_options.with_max_rows(N).build() which allows us to check if neither are set and default there? I'm not convinced this is any clean / clear than the current logic because we still need a fallback in the plan_splits code if neither are set (default value or failure).

Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
…an flow rather than creating a new execution path

Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request java python

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants

Comments