Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 46 additions & 55 deletions paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,70 +317,61 @@ def _compute_split_file_idx_map(
"""
shard_file_idx_map = {}

# Find the first non-blob file to determine the row range for this split
data_file = None
# First pass: data files only. Compute range and apply directly to avoid second-pass lookup.
current_pos = file_end_pos
data_file_infos = []
for file in split.files:
if not self._is_blob_file(file.file_name):
data_file = file
break

if data_file is None:
if self._is_blob_file(file.file_name):
continue
file_begin_pos = current_pos
current_pos += file.row_count
data_file_range = self._compute_file_range(
plan_start_pos, plan_end_pos, file_begin_pos, file.row_count
)
data_file_infos.append((file, data_file_range))
if data_file_range is not None:
shard_file_idx_map[file.file_name] = data_file_range

if not data_file_infos:
# No data file, skip this split
shard_file_idx_map[self.NEXT_POS_KEY] = file_end_pos
return shard_file_idx_map

# Calculate the row range based on the data file position
file_begin_pos = file_end_pos
file_end_pos += data_file.row_count
data_file_first_row_id = data_file.first_row_id if data_file.first_row_id is not None else 0
next_pos = current_pos

# Determine the row range for the data file in this split using shared helper
data_file_range = self._compute_file_range(
plan_start_pos, plan_end_pos, file_begin_pos, data_file.row_count
)

# Apply ranges to each file in the split
# Second pass: only blob files (data files already in shard_file_idx_map from first pass)
for file in split.files:
if self._is_blob_file(file.file_name):
# For blob files, calculate range based on their first_row_id
if data_file_range is None:
# Data file is completely within shard, blob files should also be
continue
elif data_file_range == (-1, -1):
# Data file is completely outside shard, blob files should be skipped
shard_file_idx_map[file.file_name] = (-1, -1)
else:
# Calculate blob file's position relative to data file's first_row_id
blob_first_row_id = file.first_row_id if file.first_row_id is not None else 0
# Blob's position relative to data file start
blob_rel_start = blob_first_row_id - data_file_first_row_id
blob_rel_end = blob_rel_start + file.row_count

# Shard range relative to data file start
shard_start = data_file_range[0]
shard_end = data_file_range[1]

# Intersect blob's range with shard range
intersect_start = max(blob_rel_start, shard_start)
intersect_end = min(blob_rel_end, shard_end)

if intersect_start >= intersect_end:
# Blob file is completely outside shard range
shard_file_idx_map[file.file_name] = (-1, -1)
elif intersect_start == blob_rel_start and intersect_end == blob_rel_end:
# Blob file is completely within shard range, no slicing needed
pass
else:
# Convert to file-local indices
local_start = intersect_start - blob_rel_start
local_end = intersect_end - blob_rel_start
shard_file_idx_map[file.file_name] = (local_start, local_end)
if not self._is_blob_file(file.file_name):
continue
blob_first_row_id = file.first_row_id if file.first_row_id is not None else 0
data_file_range = None
data_file_first_row_id = None
for df, fr in data_file_infos:
df_first = df.first_row_id if df.first_row_id is not None else 0
if df_first <= blob_first_row_id < df_first + df.row_count:
data_file_range = fr
data_file_first_row_id = df_first
break
if data_file_range is None:
continue
if data_file_range == (-1, -1):
shard_file_idx_map[file.file_name] = (-1, -1)
continue
blob_rel_start = blob_first_row_id - data_file_first_row_id
blob_rel_end = blob_rel_start + file.row_count
shard_start, shard_end = data_file_range
intersect_start = max(blob_rel_start, shard_start)
intersect_end = min(blob_rel_end, shard_end)
if intersect_start >= intersect_end:
shard_file_idx_map[file.file_name] = (-1, -1)
elif intersect_start == blob_rel_start and intersect_end == blob_rel_end:
pass
else:
# Data file
if data_file_range is not None:
shard_file_idx_map[file.file_name] = data_file_range
local_start = intersect_start - blob_rel_start
local_end = intersect_end - blob_rel_start
shard_file_idx_map[file.file_name] = (local_start, local_end)

shard_file_idx_map[self.NEXT_POS_KEY] = file_end_pos
shard_file_idx_map[self.NEXT_POS_KEY] = next_pos
return shard_file_idx_map

def _wrap_to_indexed_splits(self, splits: List[Split]) -> List[Split]:
Expand Down
62 changes: 62 additions & 0 deletions paimon-python/pypaimon/tests/data_evolution_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,68 @@ def test_basic(self):
]))
self.assertEqual(actual_data, expect_data)

def test_with_slice(self):
pa_schema = pa.schema([
("id", pa.int64()),
("b", pa.int32()),
("c", pa.int32()),
])
schema = Schema.from_pyarrow_schema(
pa_schema,
options={
"row-tracking.enabled": "true",
"data-evolution.enabled": "true",
"source.split.target-size": "512m",
},
)
table_name = "default.test_with_slice_data_evolution"
self.catalog.create_table(table_name, schema, ignore_if_exists=True)
table = self.catalog.get_table(table_name)

for batch in [
{"id": [1, 2], "b": [10, 20], "c": [100, 200]},
{"id": [1001, 2001], "b": [1011, 2011], "c": [1001, 2001]},
{"id": [-1, -2], "b": [-10, -20], "c": [-100, -200]},
]:
wb = table.new_batch_write_builder()
tw = wb.new_write()
tc = wb.new_commit()
tw.write_arrow(pa.Table.from_pydict(batch, schema=pa_schema))
tc.commit(tw.prepare_commit())
tw.close()
tc.close()

rb = table.new_read_builder()
full_splits = rb.new_scan().plan().splits()
full_result = rb.new_read().to_pandas(full_splits)
self.assertEqual(
len(full_result),
6,
"Full scan should return 6 rows",
)
self.assertEqual(
sorted(full_result["id"].tolist()),
[-2, -1, 1, 2, 1001, 2001],
"Full set ids mismatch",
)

# with_slice(1, 4) -> row indices [1, 2, 3] -> 3 rows with id in (2, 1001, 2001)
scan = rb.new_scan().with_slice(1, 4)
splits = scan.plan().splits()
result = rb.new_read().to_pandas(splits)
self.assertEqual(
len(result),
3,
"with_slice(1, 4) should return 3 rows (indices 1,2,3). "
"Bug: DataEvolutionSplitGenerator returns 2 when split has multiple data files.",
)
ids = result["id"].tolist()
self.assertEqual(
sorted(ids),
[2, 1001, 2001],
"with_slice(1, 4) should return id in (2, 1001, 2001). Got ids=%s" % ids,
)

def test_multiple_appends(self):
simple_pa_schema = pa.schema([
('f0', pa.int32()),
Expand Down