From 3c26d2ee5ec395bc9308406376bf4514c05589a0 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 29 Jan 2026 22:56:09 +0800 Subject: [PATCH 1/6] [python/hotfix] correct with_slice row count for splits with multiple data files --- .../scanner/data_evolution_split_generator.py | 100 ++++++++++-------- .../pypaimon/tests/data_evolution_test.py | 62 +++++++++++ 2 files changed, 115 insertions(+), 47 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py index 2a4628b3d4ef..7e9cc80935c2 100644 --- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py +++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py @@ -316,71 +316,77 @@ def _compute_split_file_idx_map( For blob files (which may be rolled), the range is calculated based on each file's first_row_id. """ shard_file_idx_map = {} - + # Find the first non-blob file to determine the row range for this split - data_file = None + current_pos = file_end_pos + data_file_infos = [] for file in split.files: - if not self._is_blob_file(file.file_name): - data_file = file - break - - if data_file is None: + if self._is_blob_file(file.file_name): + continue + file_begin_pos = current_pos + current_pos += file.row_count + data_file_range = self._compute_file_range( + plan_start_pos, plan_end_pos, file_begin_pos, file.row_count + ) + data_file_infos.append((file, data_file_range)) + + if not data_file_infos: # No data file, skip this split shard_file_idx_map[self.NEXT_POS_KEY] = file_end_pos return shard_file_idx_map - # Calculate the row range based on the data file position - file_begin_pos = file_end_pos - file_end_pos += data_file.row_count - data_file_first_row_id = data_file.first_row_id if data_file.first_row_id is not None else 0 + next_pos = current_pos - # Determine the row range for the data file in this split using shared helper - data_file_range = self._compute_file_range( - plan_start_pos, plan_end_pos, file_begin_pos, data_file.row_count - ) - # Apply ranges to each file in the split for file in split.files: if self._is_blob_file(file.file_name): # For blob files, calculate range based on their first_row_id - if data_file_range is None: - # Data file is completely within shard, blob files should also be + blob_first_row_id = file.first_row_id if file.first_row_id is not None else 0 + data_file = None + data_file_range = None + for df, fr in data_file_infos: + df_first = df.first_row_id if df.first_row_id is not None else 0 + if df_first <= blob_first_row_id < df_first + df.row_count: + data_file = df + data_file_range = fr + break + if data_file is None or data_file_range is None: continue - elif data_file_range == (-1, -1): + if data_file_range == (-1, -1): # Data file is completely outside shard, blob files should be skipped shard_file_idx_map[file.file_name] = (-1, -1) + continue + data_file_first_row_id = ( + data_file.first_row_id if data_file.first_row_id is not None else 0 + ) + # Blob's position relative to data file start + blob_rel_start = blob_first_row_id - data_file_first_row_id + blob_rel_end = blob_rel_start + file.row_count + # Shard range relative to data file start + shard_start, shard_end = data_file_range + # Intersect blob's range with shard range + intersect_start = max(blob_rel_start, shard_start) + intersect_end = min(blob_rel_end, shard_end) + if intersect_start >= intersect_end: + # Blob file is completely outside shard range + shard_file_idx_map[file.file_name] = (-1, -1) + elif intersect_start == blob_rel_start and intersect_end == blob_rel_end: + # Blob file is completely within shard range, no slicing needed + pass else: - # Calculate blob file's position relative to data file's first_row_id - blob_first_row_id = file.first_row_id if file.first_row_id is not None else 0 - # Blob's position relative to data file start - blob_rel_start = blob_first_row_id - data_file_first_row_id - blob_rel_end = blob_rel_start + file.row_count - - # Shard range relative to data file start - shard_start = data_file_range[0] - shard_end = data_file_range[1] - - # Intersect blob's range with shard range - intersect_start = max(blob_rel_start, shard_start) - intersect_end = min(blob_rel_end, shard_end) - - if intersect_start >= intersect_end: - # Blob file is completely outside shard range - shard_file_idx_map[file.file_name] = (-1, -1) - elif intersect_start == blob_rel_start and intersect_end == blob_rel_end: - # Blob file is completely within shard range, no slicing needed - pass - else: - # Convert to file-local indices - local_start = intersect_start - blob_rel_start - local_end = intersect_end - blob_rel_start - shard_file_idx_map[file.file_name] = (local_start, local_end) + # Convert to file-local indices + local_start = intersect_start - blob_rel_start + local_end = intersect_end - blob_rel_start + shard_file_idx_map[file.file_name] = (local_start, local_end) else: # Data file - if data_file_range is not None: - shard_file_idx_map[file.file_name] = data_file_range + for df, fr in data_file_infos: + if df.file_name == file.file_name: + if fr is not None: + shard_file_idx_map[file.file_name] = fr + break - shard_file_idx_map[self.NEXT_POS_KEY] = file_end_pos + shard_file_idx_map[self.NEXT_POS_KEY] = next_pos return shard_file_idx_map def _wrap_to_indexed_splits(self, splits: List[Split]) -> List[Split]: diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py b/paimon-python/pypaimon/tests/data_evolution_test.py index 38034078e770..1ffb7dbcc4b4 100644 --- a/paimon-python/pypaimon/tests/data_evolution_test.py +++ b/paimon-python/pypaimon/tests/data_evolution_test.py @@ -85,6 +85,68 @@ def test_basic(self): ])) self.assertEqual(actual_data, expect_data) + def test_with_slice(self): + pa_schema = pa.schema([ + ("id", pa.int64()), + ("b", pa.int32()), + ("c", pa.int32()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + "row-tracking.enabled": "true", + "data-evolution.enabled": "true", + "source.split.target-size": "512m", + }, + ) + table_name = "default.test_with_slice_data_evolution" + self.catalog.create_table(table_name, schema, ignore_if_exists=True) + table = self.catalog.get_table(table_name) + + for batch in [ + {"id": [1, 2], "b": [10, 20], "c": [100, 200]}, + {"id": [1001, 2001], "b": [1011, 2011], "c": [1001, 2001]}, + {"id": [-1, -2], "b": [-10, -20], "c": [-100, -200]}, + ]: + wb = table.new_batch_write_builder() + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(pa.Table.from_pydict(batch, schema=pa_schema)) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + rb = table.new_read_builder() + full_splits = rb.new_scan().plan().splits() + full_result = rb.new_read().to_pandas(full_splits) + self.assertEqual( + len(full_result), + 6, + "Full scan should return 6 rows", + ) + self.assertEqual( + sorted(full_result["id"].tolist()), + [-2, -1, 1, 2, 1001, 2001], + "Full set ids mismatch", + ) + + # with_slice(1, 4) -> row indices [1, 2, 3] -> 3 rows with id in (2, 1001, 2001) + scan = rb.new_scan().with_slice(1, 4) + splits = scan.plan().splits() + result = rb.new_read().to_pandas(splits) + self.assertEqual( + len(result), + 3, + "with_slice(1, 4) should return 3 rows (indices 1,2,3). " + "Bug: DataEvolutionSplitGenerator returns 2 when split has multiple data files.", + ) + ids = result["id"].tolist() + self.assertEqual( + sorted(ids), + [2, 1001, 2001], + "with_slice(1, 4) should return id in (2, 1001, 2001). Got ids=%s" % ids, + ) + def test_multiple_appends(self): simple_pa_schema = pa.schema([ ('f0', pa.int32()), From 9d670ef7257c269925a251030d302d7e27b104c8 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 29 Jan 2026 23:22:29 +0800 Subject: [PATCH 2/6] fix code format --- .../pypaimon/read/scanner/data_evolution_split_generator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py index 7e9cc80935c2..8964dc2dd52c 100644 --- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py +++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py @@ -316,7 +316,7 @@ def _compute_split_file_idx_map( For blob files (which may be rolled), the range is calculated based on each file's first_row_id. """ shard_file_idx_map = {} - + # Find the first non-blob file to determine the row range for this split current_pos = file_end_pos data_file_infos = [] From 1d122cdc721b9de1c210a5c5bbd5ec07ca700a10 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 30 Jan 2026 01:07:42 +0800 Subject: [PATCH 3/6] fix redundant recomputation of data file's first_row_id --- .../read/scanner/data_evolution_split_generator.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py index 8964dc2dd52c..68279634baec 100644 --- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py +++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py @@ -344,21 +344,20 @@ def _compute_split_file_idx_map( blob_first_row_id = file.first_row_id if file.first_row_id is not None else 0 data_file = None data_file_range = None + data_file_first_row_id = None for df, fr in data_file_infos: df_first = df.first_row_id if df.first_row_id is not None else 0 if df_first <= blob_first_row_id < df_first + df.row_count: data_file = df data_file_range = fr + data_file_first_row_id = df_first break - if data_file is None or data_file_range is None: + if data_file is None or data_file_range is None or data_file_first_row_id is None: continue if data_file_range == (-1, -1): # Data file is completely outside shard, blob files should be skipped shard_file_idx_map[file.file_name] = (-1, -1) continue - data_file_first_row_id = ( - data_file.first_row_id if data_file.first_row_id is not None else 0 - ) # Blob's position relative to data file start blob_rel_start = blob_first_row_id - data_file_first_row_id blob_rel_end = blob_rel_start + file.row_count From 3d61beecce6b4788cb1284c008c9c81b64339976 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 30 Jan 2026 01:35:49 +0800 Subject: [PATCH 4/6] fix redundant loop --- .../scanner/data_evolution_split_generator.py | 79 ++++++++----------- 1 file changed, 33 insertions(+), 46 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py index 68279634baec..0d3e06c444bc 100644 --- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py +++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py @@ -317,7 +317,7 @@ def _compute_split_file_idx_map( """ shard_file_idx_map = {} - # Find the first non-blob file to determine the row range for this split + # First pass: data files only. Compute range and apply directly to avoid second-pass lookup. current_pos = file_end_pos data_file_infos = [] for file in split.files: @@ -329,6 +329,7 @@ def _compute_split_file_idx_map( plan_start_pos, plan_end_pos, file_begin_pos, file.row_count ) data_file_infos.append((file, data_file_range)) + shard_file_idx_map[file.file_name] = data_file_range if not data_file_infos: # No data file, skip this split @@ -337,53 +338,39 @@ def _compute_split_file_idx_map( next_pos = current_pos - # Apply ranges to each file in the split + # Second pass: only blob files (data files already in shard_file_idx_map from first pass) for file in split.files: - if self._is_blob_file(file.file_name): - # For blob files, calculate range based on their first_row_id - blob_first_row_id = file.first_row_id if file.first_row_id is not None else 0 - data_file = None - data_file_range = None - data_file_first_row_id = None - for df, fr in data_file_infos: - df_first = df.first_row_id if df.first_row_id is not None else 0 - if df_first <= blob_first_row_id < df_first + df.row_count: - data_file = df - data_file_range = fr - data_file_first_row_id = df_first - break - if data_file is None or data_file_range is None or data_file_first_row_id is None: - continue - if data_file_range == (-1, -1): - # Data file is completely outside shard, blob files should be skipped - shard_file_idx_map[file.file_name] = (-1, -1) - continue - # Blob's position relative to data file start - blob_rel_start = blob_first_row_id - data_file_first_row_id - blob_rel_end = blob_rel_start + file.row_count - # Shard range relative to data file start - shard_start, shard_end = data_file_range - # Intersect blob's range with shard range - intersect_start = max(blob_rel_start, shard_start) - intersect_end = min(blob_rel_end, shard_end) - if intersect_start >= intersect_end: - # Blob file is completely outside shard range - shard_file_idx_map[file.file_name] = (-1, -1) - elif intersect_start == blob_rel_start and intersect_end == blob_rel_end: - # Blob file is completely within shard range, no slicing needed - pass - else: - # Convert to file-local indices - local_start = intersect_start - blob_rel_start - local_end = intersect_end - blob_rel_start - shard_file_idx_map[file.file_name] = (local_start, local_end) + if not self._is_blob_file(file.file_name): + continue + blob_first_row_id = file.first_row_id if file.first_row_id is not None else 0 + data_file = None + data_file_range = None + data_file_first_row_id = None + for df, fr in data_file_infos: + df_first = df.first_row_id if df.first_row_id is not None else 0 + if df_first <= blob_first_row_id < df_first + df.row_count: + data_file = df + data_file_range = fr + data_file_first_row_id = df_first + break + if data_file is None or data_file_range is None or data_file_first_row_id is None: + continue + if data_file_range == (-1, -1): + shard_file_idx_map[file.file_name] = (-1, -1) + continue + blob_rel_start = blob_first_row_id - data_file_first_row_id + blob_rel_end = blob_rel_start + file.row_count + shard_start, shard_end = data_file_range + intersect_start = max(blob_rel_start, shard_start) + intersect_end = min(blob_rel_end, shard_end) + if intersect_start >= intersect_end: + shard_file_idx_map[file.file_name] = (-1, -1) + elif intersect_start == blob_rel_start and intersect_end == blob_rel_end: + pass else: - # Data file - for df, fr in data_file_infos: - if df.file_name == file.file_name: - if fr is not None: - shard_file_idx_map[file.file_name] = fr - break + local_start = intersect_start - blob_rel_start + local_end = intersect_end - blob_rel_start + shard_file_idx_map[file.file_name] = (local_start, local_end) shard_file_idx_map[self.NEXT_POS_KEY] = next_pos return shard_file_idx_map From b8022fff8a4837266716da147e9768476a97fdf7 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 30 Jan 2026 01:48:44 +0800 Subject: [PATCH 5/6] fix map typeError --- .../read/scanner/data_evolution_split_generator.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py index 0d3e06c444bc..7e3e5dae5627 100644 --- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py +++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py @@ -329,7 +329,8 @@ def _compute_split_file_idx_map( plan_start_pos, plan_end_pos, file_begin_pos, file.row_count ) data_file_infos.append((file, data_file_range)) - shard_file_idx_map[file.file_name] = data_file_range + if data_file_range is not None: + shard_file_idx_map[file.file_name] = data_file_range if not data_file_infos: # No data file, skip this split @@ -343,17 +344,15 @@ def _compute_split_file_idx_map( if not self._is_blob_file(file.file_name): continue blob_first_row_id = file.first_row_id if file.first_row_id is not None else 0 - data_file = None data_file_range = None data_file_first_row_id = None for df, fr in data_file_infos: df_first = df.first_row_id if df.first_row_id is not None else 0 if df_first <= blob_first_row_id < df_first + df.row_count: - data_file = df data_file_range = fr data_file_first_row_id = df_first break - if data_file is None or data_file_range is None or data_file_first_row_id is None: + if data_file_range is None or data_file_first_row_id is None: continue if data_file_range == (-1, -1): shard_file_idx_map[file.file_name] = (-1, -1) From d31860673691290ced636532452681c6dae0c112 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 30 Jan 2026 02:08:09 +0800 Subject: [PATCH 6/6] remove null check --- .../pypaimon/read/scanner/data_evolution_split_generator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py index 7e3e5dae5627..d053db5f56dc 100644 --- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py +++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py @@ -352,7 +352,7 @@ def _compute_split_file_idx_map( data_file_range = fr data_file_first_row_id = df_first break - if data_file_range is None or data_file_first_row_id is None: + if data_file_range is None: continue if data_file_range == (-1, -1): shard_file_idx_map[file.file_name] = (-1, -1)