diff --git a/docs/content/pypaimon/data-evolution.md b/docs/content/pypaimon/data-evolution.md index bd0bde03a427..2268090f7374 100644 --- a/docs/content/pypaimon/data-evolution.md +++ b/docs/content/pypaimon/data-evolution.md @@ -29,6 +29,13 @@ under the License. PyPaimon for Data Evolution mode. See [Data Evolution]({{< ref "append-table/data-evolution" >}}). +## Prerequisites + +To use partial updates / data evolution, enable both options when creating the table: + +- **`row-tracking.enabled`**: `true` +- **`data-evolution.enabled`**: `true` + ## Update Columns By Row ID You can create `TableUpdate.update_by_arrow_with_row_id` to update columns to data evolution tables. @@ -36,7 +43,19 @@ You can create `TableUpdate.update_by_arrow_with_row_id` to update columns to da The input data should include the `_ROW_ID` column, update operation will automatically sort and match each `_ROW_ID` to its corresponding `first_row_id`, then groups rows with the same `first_row_id` and writes them to a separate file. +### Requirements for `_ROW_ID` updates + +- **All rows required**: the input table must contain **exactly the full table row count** (one row per existing row). +- **Row id coverage**: after sorting by `_ROW_ID`, it must be **0..N-1** (no duplicates, no gaps). +- **Update columns only**: include `_ROW_ID` plus the columns you want to update (partial schema is OK). + ```python +import pyarrow as pa +from pypaimon import CatalogFactory, Schema + +catalog = CatalogFactory.create({'warehouse': '/tmp/warehouse'}) +catalog.create_database('default', False) + simple_pa_schema = pa.schema([ ('f0', pa.int8()), ('f1', pa.int16()), @@ -78,3 +97,102 @@ table_commit.close() # 'f0': [5, 6], # 'f1': [-1001, 1002] ``` + +## Compute a New Column (scan + rewrite with shards) + +If you want to **compute a derived column** (or **update an existing column based on other columns**) without providing +`_ROW_ID`, you can use the shard scan + rewrite workflow: + +- Read only the columns you need (projection) +- Compute the new values in the same row order +- Write only the updated columns back +- Commit per shard + +This is useful for backfilling a newly added column, or recomputing a column from other columns. + +### Example: compute `d = c + b - a` + +```python +import pyarrow as pa +from pypaimon import CatalogFactory, Schema + +catalog = CatalogFactory.create({'warehouse': '/tmp/warehouse'}) +catalog.create_database('default', False) + +table_schema = pa.schema([ + ('a', pa.int32()), + ('b', pa.int32()), + ('c', pa.int32()), + ('d', pa.int32()), +]) + +schema = Schema.from_pyarrow_schema( + table_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'}, +) +catalog.create_table('default.t', schema, False) +table = catalog.get_table('default.t') + +# write initial data (a, b, c only) +write_builder = table.new_batch_write_builder() +write = write_builder.new_write().with_write_type(['a', 'b', 'c']) +commit = write_builder.new_commit() +write.write_arrow(pa.Table.from_pydict({'a': [1, 2], 'b': [10, 20], 'c': [100, 200]})) +commit.commit(write.prepare_commit()) +write.close() +commit.close() + +# shard update: read (a, b, c), write only (d) +update = write_builder.new_update() +update.with_read_projection(['a', 'b', 'c']) +update.with_update_type(['d']) + +shard_idx = 0 +num_shards = 1 +upd = update.new_shard_updator(shard_idx, num_shards) +reader = upd.arrow_reader() + +for batch in iter(reader.read_next_batch, None): + a = batch.column('a').to_pylist() + b = batch.column('b').to_pylist() + c = batch.column('c').to_pylist() + d = [ci + bi - ai for ai, bi, ci in zip(a, b, c)] + + upd.update_by_arrow_batch( + pa.RecordBatch.from_pydict({'d': d}, schema=pa.schema([('d', pa.int32())])) + ) + +commit_messages = upd.prepare_commit() +commit = write_builder.new_commit() +commit.commit(commit_messages) +commit.close() +``` + +### Example: update an existing column `c = b - a` + +```python +update = write_builder.new_update() +update.with_read_projection(['a', 'b']) +update.with_update_type(['c']) + +upd = update.new_shard_updator(0, 1) +reader = upd.arrow_reader() +for batch in iter(reader.read_next_batch, None): + a = batch.column('a').to_pylist() + b = batch.column('b').to_pylist() + c = [bi - ai for ai, bi in zip(a, b)] + upd.update_by_arrow_batch( + pa.RecordBatch.from_pydict({'c': c}, schema=pa.schema([('c', pa.int32())])) + ) + +commit_messages = upd.prepare_commit() +commit = write_builder.new_commit() +commit.commit(commit_messages) +commit.close() +``` + +### Notes + +- **Row order matters**: the batches you write must have the **same number of rows** as the batches you read, in the + same order for that shard. +- **Parallelism**: run multiple shards by calling `new_shard_updator(shard_idx, num_shards)` for each shard. diff --git a/paimon-python/pypaimon/globalindex/range.py b/paimon-python/pypaimon/globalindex/range.py index f27a637ae71e..19b9b40e9439 100644 --- a/paimon-python/pypaimon/globalindex/range.py +++ b/paimon-python/pypaimon/globalindex/range.py @@ -154,7 +154,7 @@ def merge_sorted_as_possible(ranges: List['Range']) -> List['Range']: return result @staticmethod - def sort_and_merge_overlap(ranges: List['Range'], merge: bool = True) -> List['Range']: + def sort_and_merge_overlap(ranges: List['Range'], merge: bool = True, adjacent: bool = True) -> List['Range']: """ Sort ranges and optionally merge overlapping ones. """ @@ -166,10 +166,11 @@ def sort_and_merge_overlap(ranges: List['Range'], merge: bool = True) -> List['R if not merge: return sorted_ranges + adjacent_value = 1 if adjacent else 0 result = [sorted_ranges[0]] for r in sorted_ranges[1:]: last = result[-1] - if r.from_ <= last.to + 1: + if r.from_ <= last.to + adjacent_value: # Merge with last range result[-1] = Range(last.from_, max(last.to, r.to)) else: diff --git a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py index 2a4628b3d4ef..68504a45a2bd 100644 --- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py +++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py @@ -77,8 +77,9 @@ def sort_key(manifest_entry: ManifestEntry) -> tuple: self.end_pos_of_this_subtask ) elif self.idx_of_this_subtask is not None: - # shard data range: [plan_start_pos, plan_end_pos) - partitioned_files, plan_start_pos, plan_end_pos = self._filter_by_shard(partitioned_files) + partitioned_files = self._filter_by_shard( + partitioned_files, self.idx_of_this_subtask, self.number_of_para_subtasks + ) def weight_func(file_list: List[DataFileMeta]) -> int: return max(sum(f.file_size for f in file_list), self.open_file_cost) @@ -108,9 +109,8 @@ def weight_func(file_list: List[DataFileMeta]) -> int: flatten_packed_files, packed_files, sorted_entries_list ) - if self.start_pos_of_this_subtask is not None or self.idx_of_this_subtask is not None: + if self.start_pos_of_this_subtask is not None: splits = self._wrap_to_sliced_splits(splits, plan_start_pos, plan_end_pos) - # Wrap splits with IndexedSplit if row_ranges is provided if self.row_ranges: splits = self._wrap_to_indexed_splits(splits) @@ -242,22 +242,61 @@ def _filter_by_row_range( return filtered_partitioned_files, plan_start_pos, plan_end_pos - def _filter_by_shard(self, partitioned_files: defaultdict) -> tuple: - """ - Filter file entries by shard for data evolution tables. - """ - # Calculate total rows (excluding blob files) - total_row = sum( - entry.file.row_count - for file_entries in partitioned_files.values() - for entry in file_entries - if not self._is_blob_file(entry.file.file_name) - ) + def _filter_by_shard(self, partitioned_files: defaultdict, sub_task_id: int, total_tasks: int) -> defaultdict: + list_ranges = [] + for file_entries in partitioned_files.values(): + for entry in file_entries: + first_row_id = entry.file.first_row_id + if first_row_id is None: + raise ValueError("Found None first row id in files") + # Range is inclusive [from_, to], so use row_count - 1 + list_ranges.append(Range(first_row_id, first_row_id + entry.file.row_count - 1)) + + sorted_ranges = Range.sort_and_merge_overlap(list_ranges, True, False) + + start_range, end_range = self._divide_ranges(sorted_ranges, sub_task_id, total_tasks) + if start_range is None or end_range is None: + return defaultdict(list) + start_first_row_id = start_range.from_ + end_first_row_id = end_range.to - # Calculate shard range using shared helper - start_pos, end_pos = self._compute_shard_range(total_row) + filtered_partitioned_files = { + k: [x for x in v if x.file.first_row_id >= start_first_row_id and x.file.first_row_id <= end_first_row_id] + for k, v in partitioned_files.items() + } - return self._filter_by_row_range(partitioned_files, start_pos, end_pos) + filtered_partitioned_files = {k: v for k, v in filtered_partitioned_files.items() if v} + return defaultdict(list, filtered_partitioned_files) + + @staticmethod + def _divide_ranges( + sorted_ranges: List[Range], sub_task_id: int, total_tasks: int + ) -> Tuple[Optional[Range], Optional[Range]]: + if not sorted_ranges: + return None, None + + num_ranges = len(sorted_ranges) + + # If more tasks than ranges, some tasks get nothing + if sub_task_id >= num_ranges: + return None, None + + # Calculate balanced distribution of ranges across tasks + base_ranges_per_task = num_ranges // total_tasks + remainder = num_ranges % total_tasks + + # Each of the first 'remainder' tasks gets one extra range + if sub_task_id < remainder: + num_ranges_for_task = base_ranges_per_task + 1 + start_idx = sub_task_id * (base_ranges_per_task + 1) + else: + num_ranges_for_task = base_ranges_per_task + start_idx = ( + remainder * (base_ranges_per_task + 1) + + (sub_task_id - remainder) * base_ranges_per_task + ) + end_idx = start_idx + num_ranges_for_task - 1 + return sorted_ranges[start_idx], sorted_ranges[end_idx] def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]: """ diff --git a/paimon-python/pypaimon/tests/blob_table_test.py b/paimon-python/pypaimon/tests/blob_table_test.py index 88d2626bb6a7..7670ec9447ac 100755 --- a/paimon-python/pypaimon/tests/blob_table_test.py +++ b/paimon-python/pypaimon/tests/blob_table_test.py @@ -2204,12 +2204,12 @@ def test_blob_write_read_large_data_with_rolling_with_shard(self): result = table_read.to_arrow(table_scan.plan().splits()) # Verify the data - self.assertEqual(result.num_rows, 54, "Should have 54 rows") + self.assertEqual(result.num_rows, 80, "Should have 54 rows") self.assertEqual(result.num_columns, 4, "Should have 4 columns") # Verify blob data integrity blob_data = result.column('large_blob').to_pylist() - self.assertEqual(len(blob_data), 54, "Should have 54 blob records") + self.assertEqual(len(blob_data), 80, "Should have 54 blob records") # Verify each blob for i, blob in enumerate(blob_data): self.assertEqual(len(blob), len(large_blob_data), f"Blob {i + 1} should be {large_blob_size:,} bytes") @@ -2264,21 +2264,22 @@ def test_blob_rolling_with_shard(self): actual_size = len(large_blob_data) print(f"Created blob data: {actual_size:,} bytes ({actual_size / (1024 * 1024):.2f} MB)") - write_builder = table.new_batch_write_builder() - writer = write_builder.new_write() # Write 30 records - for record_id in range(30): - test_data = pa.Table.from_pydict({ - 'id': [record_id], # Unique ID for each row - 'metadata': [f'Large blob batch {record_id + 1}'], - 'large_blob': [struct.pack(' List[CommitMessage]: update_by_row_id = TableUpdateByRowId(self.table, self.commit_user) update_by_row_id.update_columns(table, self.update_cols) return update_by_row_id.commit_messages + + +class ShardTableUpdator: + + def __init__( + self, + table, + projection: Optional[List[str]], + write_cols: List[str], + commit_user, + shard_num: int, + total_shard_count: int, + ): + from pypaimon.table.file_store_table import FileStoreTable + self.table: FileStoreTable = table + self.projection = projection + self.write_cols = write_cols + self.commit_user = commit_user + self.total_shard_count = total_shard_count + self.shard_num = shard_num + + self.write_pos = 0 + self.writer: Optional[SingleWriter] = None + self.dict = defaultdict(list) + + scanner = self.table.new_read_builder().new_scan().with_shard(shard_num, total_shard_count) + self.splits = scanner.plan().splits() + + self.row_ranges: List[(Tuple, Range)] = [] + for split in self.splits: + if not isinstance(split, DataSplit): + raise ValueError(f"Split {split} is not DataSplit.") + files = split.files + ranges = self.compute_from_files(files) + for row_range in ranges: + self.row_ranges.append((tuple(split.partition.values), row_range)) + + @staticmethod + def compute_from_files(files: List[DataFileMeta]) -> List[Range]: + ranges = [] + for file in files: + ranges.append(Range(file.first_row_id, file.first_row_id + file.row_count - 1)) + + return Range.sort_and_merge_overlap(ranges, True, False) + + def arrow_reader(self) -> pyarrow.ipc.RecordBatchReader: + read_builder = self.table.new_read_builder() + read_builder.with_projection(self.projection) + return read_builder.new_read().to_arrow_batch_reader(self.splits) + + def prepare_commit(self) -> List[CommitMessage]: + commit_messages = [] + for (partition, files) in self.dict.items(): + commit_messages.append(CommitMessage(partition, 0, files)) + return commit_messages + + def update_by_arrow_batch(self, data: pa.RecordBatch): + self._init_writer() + + capacity = self.writer.capacity() + if capacity <= 0: + raise RuntimeError("Writer has no remaining capacity.") + + # Split the batch across writers. + first, rest = (data, None) if capacity >= data.num_rows else (data.slice(0, capacity), data.slice(capacity)) + + self.writer.write(first) + if self.writer.capacity() == 0: + self.dict[self.writer.partition()].append(self.writer.end()) + self.writer = None + + if rest is not None: + if self.writer is not None: + raise RuntimeError("Should not get here, rest and current writer exist in the same time.") + self.update_by_arrow_batch(rest) + + def _init_writer(self): + if self.writer is None: + if self.write_pos >= len(self.row_ranges): + raise RuntimeError( + "No more row ranges to write. " + "Ensure you write exactly the same number of rows as read from this shard." + ) + item = self.row_ranges[self.write_pos] + self.write_pos += 1 + partition = item[0] + row_range = item[1] + writer = AppendOnlyDataWriter(self.table, partition, 0, 0, self.table.options, self.write_cols) + writer.target_file_size = MemorySize.of_mebi_bytes(999999999).get_bytes() + self.writer = SingleWriter(writer, partition, row_range.from_, row_range.to - row_range.from_ + 1) + + +class SingleWriter: + + def __init__(self, writer: DataWriter, partition, first_row_id: int, row_count: int): + self.writer: DataWriter = writer + self._partition = partition + self.first_row_id = first_row_id + self.row_count = row_count + self.written_records_count = 0 + + def capacity(self) -> int: + return self.row_count - self.written_records_count + + def write(self, data: pa.RecordBatch): + if data.num_rows > self.capacity(): + raise Exception("Data num size exceeds capacity.") + self.written_records_count += data.num_rows + self.writer.write(data) + return + + def partition(self) -> Tuple: + return self._partition + + def end(self) -> DataFileMeta: + if self.capacity() != 0: + raise Exception("There still capacity left in the writer.") + files = self.writer.prepare_commit() + if len(files) != 1: + raise Exception("Should have one file.") + file = files[0] + if file.row_count != self.row_count: + raise Exception("File row count mismatch.") + file = file.assign_first_row_id(self.first_row_id) + return file