diff --git a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py index 2a4628b3d4ef..d053db5f56dc 100644 --- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py +++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py @@ -317,70 +317,61 @@ def _compute_split_file_idx_map( """ shard_file_idx_map = {} - # Find the first non-blob file to determine the row range for this split - data_file = None + # First pass: data files only. Compute range and apply directly to avoid second-pass lookup. + current_pos = file_end_pos + data_file_infos = [] for file in split.files: - if not self._is_blob_file(file.file_name): - data_file = file - break - - if data_file is None: + if self._is_blob_file(file.file_name): + continue + file_begin_pos = current_pos + current_pos += file.row_count + data_file_range = self._compute_file_range( + plan_start_pos, plan_end_pos, file_begin_pos, file.row_count + ) + data_file_infos.append((file, data_file_range)) + if data_file_range is not None: + shard_file_idx_map[file.file_name] = data_file_range + + if not data_file_infos: # No data file, skip this split shard_file_idx_map[self.NEXT_POS_KEY] = file_end_pos return shard_file_idx_map - # Calculate the row range based on the data file position - file_begin_pos = file_end_pos - file_end_pos += data_file.row_count - data_file_first_row_id = data_file.first_row_id if data_file.first_row_id is not None else 0 + next_pos = current_pos - # Determine the row range for the data file in this split using shared helper - data_file_range = self._compute_file_range( - plan_start_pos, plan_end_pos, file_begin_pos, data_file.row_count - ) - - # Apply ranges to each file in the split + # Second pass: only blob files (data files already in shard_file_idx_map from first pass) for file in split.files: - if self._is_blob_file(file.file_name): - # For blob files, calculate range based on their first_row_id - if data_file_range is None: - # Data file is completely within shard, blob files should also be - continue - elif data_file_range == (-1, -1): - # Data file is completely outside shard, blob files should be skipped - shard_file_idx_map[file.file_name] = (-1, -1) - else: - # Calculate blob file's position relative to data file's first_row_id - blob_first_row_id = file.first_row_id if file.first_row_id is not None else 0 - # Blob's position relative to data file start - blob_rel_start = blob_first_row_id - data_file_first_row_id - blob_rel_end = blob_rel_start + file.row_count - - # Shard range relative to data file start - shard_start = data_file_range[0] - shard_end = data_file_range[1] - - # Intersect blob's range with shard range - intersect_start = max(blob_rel_start, shard_start) - intersect_end = min(blob_rel_end, shard_end) - - if intersect_start >= intersect_end: - # Blob file is completely outside shard range - shard_file_idx_map[file.file_name] = (-1, -1) - elif intersect_start == blob_rel_start and intersect_end == blob_rel_end: - # Blob file is completely within shard range, no slicing needed - pass - else: - # Convert to file-local indices - local_start = intersect_start - blob_rel_start - local_end = intersect_end - blob_rel_start - shard_file_idx_map[file.file_name] = (local_start, local_end) + if not self._is_blob_file(file.file_name): + continue + blob_first_row_id = file.first_row_id if file.first_row_id is not None else 0 + data_file_range = None + data_file_first_row_id = None + for df, fr in data_file_infos: + df_first = df.first_row_id if df.first_row_id is not None else 0 + if df_first <= blob_first_row_id < df_first + df.row_count: + data_file_range = fr + data_file_first_row_id = df_first + break + if data_file_range is None: + continue + if data_file_range == (-1, -1): + shard_file_idx_map[file.file_name] = (-1, -1) + continue + blob_rel_start = blob_first_row_id - data_file_first_row_id + blob_rel_end = blob_rel_start + file.row_count + shard_start, shard_end = data_file_range + intersect_start = max(blob_rel_start, shard_start) + intersect_end = min(blob_rel_end, shard_end) + if intersect_start >= intersect_end: + shard_file_idx_map[file.file_name] = (-1, -1) + elif intersect_start == blob_rel_start and intersect_end == blob_rel_end: + pass else: - # Data file - if data_file_range is not None: - shard_file_idx_map[file.file_name] = data_file_range + local_start = intersect_start - blob_rel_start + local_end = intersect_end - blob_rel_start + shard_file_idx_map[file.file_name] = (local_start, local_end) - shard_file_idx_map[self.NEXT_POS_KEY] = file_end_pos + shard_file_idx_map[self.NEXT_POS_KEY] = next_pos return shard_file_idx_map def _wrap_to_indexed_splits(self, splits: List[Split]) -> List[Split]: diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py b/paimon-python/pypaimon/tests/data_evolution_test.py index 38034078e770..1ffb7dbcc4b4 100644 --- a/paimon-python/pypaimon/tests/data_evolution_test.py +++ b/paimon-python/pypaimon/tests/data_evolution_test.py @@ -85,6 +85,68 @@ def test_basic(self): ])) self.assertEqual(actual_data, expect_data) + def test_with_slice(self): + pa_schema = pa.schema([ + ("id", pa.int64()), + ("b", pa.int32()), + ("c", pa.int32()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + "row-tracking.enabled": "true", + "data-evolution.enabled": "true", + "source.split.target-size": "512m", + }, + ) + table_name = "default.test_with_slice_data_evolution" + self.catalog.create_table(table_name, schema, ignore_if_exists=True) + table = self.catalog.get_table(table_name) + + for batch in [ + {"id": [1, 2], "b": [10, 20], "c": [100, 200]}, + {"id": [1001, 2001], "b": [1011, 2011], "c": [1001, 2001]}, + {"id": [-1, -2], "b": [-10, -20], "c": [-100, -200]}, + ]: + wb = table.new_batch_write_builder() + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(pa.Table.from_pydict(batch, schema=pa_schema)) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + rb = table.new_read_builder() + full_splits = rb.new_scan().plan().splits() + full_result = rb.new_read().to_pandas(full_splits) + self.assertEqual( + len(full_result), + 6, + "Full scan should return 6 rows", + ) + self.assertEqual( + sorted(full_result["id"].tolist()), + [-2, -1, 1, 2, 1001, 2001], + "Full set ids mismatch", + ) + + # with_slice(1, 4) -> row indices [1, 2, 3] -> 3 rows with id in (2, 1001, 2001) + scan = rb.new_scan().with_slice(1, 4) + splits = scan.plan().splits() + result = rb.new_read().to_pandas(splits) + self.assertEqual( + len(result), + 3, + "with_slice(1, 4) should return 3 rows (indices 1,2,3). " + "Bug: DataEvolutionSplitGenerator returns 2 when split has multiple data files.", + ) + ids = result["id"].tolist() + self.assertEqual( + sorted(ids), + [2, 1001, 2001], + "with_slice(1, 4) should return id in (2, 1001, 2001). Got ids=%s" % ids, + ) + def test_multiple_appends(self): simple_pa_schema = pa.schema([ ('f0', pa.int32()),