From 5f348d469acd87d53f746b070ecd6f691e9f84b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 29 Jan 2026 11:44:53 +0800 Subject: [PATCH 01/22] temp --- .../scanner/data_evolution_split_generator.py | 7 +- paimon-python/pypaimon/write/table_update.py | 123 +++++++++++++++++- 2 files changed, 126 insertions(+), 4 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py index 2a4628b3d4ef..554c935874e8 100644 --- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py +++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py @@ -259,14 +259,15 @@ def _filter_by_shard(self, partitioned_files: defaultdict) -> tuple: return self._filter_by_row_range(partitioned_files, start_pos, end_pos) - def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]: + @staticmethod + def _split_by_row_id(files: List[DataFileMeta]) -> List[List[DataFileMeta]]: """ Split files by row ID for data evolution tables. """ split_by_row_id = [] # Filter blob files to only include those within the row ID range of non-blob files - sorted_files = self._filter_blob(files) + sorted_files = DataEvolutionSplitGenerator._filter_blob(files) # Split files by firstRowId last_row_id = -1 @@ -280,7 +281,7 @@ def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta] split_by_row_id.append([file]) continue - if not self._is_blob_file(file.file_name) and first_row_id != last_row_id: + if not DataEvolutionSplitGenerator._is_blob_file(file.file_name) and first_row_id != last_row_id: if current_split: split_by_row_id.append(current_split) diff --git a/paimon-python/pypaimon/write/table_update.py b/paimon-python/pypaimon/write/table_update.py index baaf3f18e357..5738c45d4e40 100644 --- a/paimon-python/pypaimon/write/table_update.py +++ b/paimon-python/pypaimon/write/table_update.py @@ -15,12 +15,19 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -from typing import List +from collections import defaultdict +from typing import List, Optional, Tuple +import pyarrow import pyarrow as pa +from pypaimon.common.memory_size import MemorySize +from pypaimon.globalindex import Range +from pypaimon.manifest.schema.data_file_meta import DataFileMeta +from pypaimon.read.split import DataSplit from pypaimon.write.commit_message import CommitMessage from pypaimon.write.table_update_by_row_id import TableUpdateByRowId +from pypaimon.write.writer.data_writer import DataWriter class TableUpdate: @@ -30,6 +37,7 @@ def __init__(self, table, commit_user): self.table: FileStoreTable = table self.commit_user = commit_user self.update_cols = None + self.projection = None def with_update_type(self, update_cols: List[str]): for col in update_cols: @@ -40,7 +48,120 @@ def with_update_type(self, update_cols: List[str]): self.update_cols = update_cols return self + def with_projection(self, projection: List[str]): + self.projection = projection + + def new_shard_updator(self, total_shard_count: int, shard_num: int): + return ShardTableUpdator(self.table, self.projection, self.update_cols, self.commit_user, total_shard_count, shard_num) + def update_by_arrow_with_row_id(self, table: pa.Table) -> List[CommitMessage]: update_by_row_id = TableUpdateByRowId(self.table, self.commit_user) update_by_row_id.update_columns(table, self.update_cols) return update_by_row_id.commit_messages + +class ShardTableUpdator: + + def __init__(self, table, projection: Optional[List[str]], write_cols: List[str], commit_user, total_shard_count: int, shard_num: int): + from pypaimon.table.file_store_table import FileStoreTable + self.table: FileStoreTable = table + self.projection = projection + self.write_cols = write_cols + self.commit_user = commit_user + self.update_cols = None + self.total_shard_count = total_shard_count + self.shard_num = shard_num + + self.write_pos = 0 + self.writer: Optional[SingleWriter] = None + self.dict = defaultdict(list) + + scanner = self.table.new_read_builder().new_scan() + if total_shard_count != 0: + scanner.with_shard(total_shard_count, shard_num) + self.splits = scanner.plan().splits() + + self.row_ranges: List[(Tuple, Range)] = [] + for split in self.splits: + if not isinstance(split, DataSplit): + raise ValueError(f"Split {split} is not DataSplit.") + files = split.files + ranges = self.compute_from_files(files) + for range in ranges: + self.row_ranges.extend((split.partition, range)) + + @staticmethod + def compute_from_files(files: List[DataFileMeta]) -> List[Range]: + ranges = [] + for file in files: + ranges.append(Range(file.first_row_id, file.first_row_id + file.row_count - 1)) + + return Range.merge_sorted_as_possible(ranges) + + def arrow_reader(self) -> pyarrow.ipc.RecordBatchReader: + read_builder = self.table.new_read_builder() + read_builder.with_projection(self.projection) + return read_builder.new_read().to_arrow_batch_reader(self.splits) + + def prepare_commit(self) -> List[CommitMessage]: + commit_messages = [] + for (partition, files) in self.dict.items(): + commit_messages.append(CommitMessage(partition, 0, files)) + return commit_messages + + def update_by_arrow_batch(self, data: pa.RecordBatch): + self._init_writer() + + capacity = self.writer.capacity() + data_in_this_writer = data if capacity >= data.num_rows else data.slice(0, capacity - 1) + data_in_next_writer = None if capacity >= data.num_rows else data.slice(data.num_rows - 1, capacity) + + self.writer.write(data_in_this_writer) + if self.writer.capacity() == 0: + self.dict[self.writer.partition()].append(self.writer.end()) + self.writer = None + + if (data_in_next_writer is not None): + self.update_by_arrow_batch(data_in_next_writer) + + + def _init_writer(self): + if self.writer is None: + tuple: (Tuple, Range) = self.row_ranges[self.write_pos] + partition = tuple(0) + range = tuple(1) + writer = DataWriter(self.table, partition, 0, 0, self.table.options, self.write_cols) + writer.target_file_size = MemorySize.of_mebi_bytes(999999999) + self.writer = SingleWriter(writer, partition, range.from_, range.to - range.from_ + 1) + + + +class SingleWriter: + + def __init__(self, writer: DataWriter, partition: Tuple, first_row_id: int, row_count: int): + self.writer: DataWriter = writer + self._partition = partition + self.first_row_id = first_row_id + self.row_count = row_count + self.written_records_count = 0 + + + def capacity(self) -> int: + return self.row_count - self.written_records_count + + def write(self, data: pa.RecordBatch): + if data.num_rows > self.capacity(): + raise Exception("Data num size exceeds capacity.") + self.written_records_count += data.num_rows + self.writer.write(data) + return + + def partition(self) -> Tuple: + return self._partition + + def end(self) -> DataFileMeta: + files = self.writer.prepare_commit() + if len(files) != 1: + raise Exception("Should have one file.") + file = files[0] + file = file.assign_first_row_id(self.first_row_id) + return file From 9b17b5ed405eee26bd3800d4038eae27257fc61d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 29 Jan 2026 14:22:55 +0800 Subject: [PATCH 02/22] [python] Enable update --- .../scanner/data_evolution_split_generator.py | 83 +++- .../read/scanner/full_starting_scanner.py | 8 + .../pypaimon/read/scanner/split_generator.py | 5 + paimon-python/pypaimon/read/table_scan.py | 4 + .../tests/shard_table_updator_test.py | 393 ++++++++++++++++++ paimon-python/pypaimon/write/table_update.py | 31 +- 6 files changed, 501 insertions(+), 23 deletions(-) create mode 100644 paimon-python/pypaimon/tests/shard_table_updator_test.py diff --git a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py index 554c935874e8..88a73b62407a 100644 --- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py +++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py @@ -77,8 +77,12 @@ def sort_key(manifest_entry: ManifestEntry) -> tuple: self.end_pos_of_this_subtask ) elif self.idx_of_this_subtask is not None: - # shard data range: [plan_start_pos, plan_end_pos) - partitioned_files, plan_start_pos, plan_end_pos = self._filter_by_shard(partitioned_files) + if self.no_slice_split: + no_slice_for_split = True + partitioned_files = self._filter_by_shard_no_slice(partitioned_files, self.idx_of_this_subtask, self.number_of_para_subtasks) + else: + # shard data range: [plan_start_pos, plan_end_pos) + partitioned_files, plan_start_pos, plan_end_pos = self._filter_by_shard(partitioned_files) def weight_func(file_list: List[DataFileMeta]) -> int: return max(sum(f.file_size for f in file_list), self.open_file_cost) @@ -108,12 +112,12 @@ def weight_func(file_list: List[DataFileMeta]) -> int: flatten_packed_files, packed_files, sorted_entries_list ) - if self.start_pos_of_this_subtask is not None or self.idx_of_this_subtask is not None: - splits = self._wrap_to_sliced_splits(splits, plan_start_pos, plan_end_pos) - - # Wrap splits with IndexedSplit if row_ranges is provided - if self.row_ranges: - splits = self._wrap_to_indexed_splits(splits) + if not self.no_slice_split: + if self.start_pos_of_this_subtask is not None or self.idx_of_this_subtask is not None: + splits = self._wrap_to_sliced_splits(splits, plan_start_pos, plan_end_pos) + # Wrap splits with IndexedSplit if row_ranges is provided + if self.row_ranges: + splits = self._wrap_to_indexed_splits(splits) return splits @@ -259,6 +263,69 @@ def _filter_by_shard(self, partitioned_files: defaultdict) -> tuple: return self._filter_by_row_range(partitioned_files, start_pos, end_pos) + def _filter_by_shard_no_slice(self, partitioned_files: defaultdict, sub_task_id: int, total_tasks: int) -> defaultdict: + list_ranges = [] + + for file_entries in partitioned_files.values(): + for entry in file_entries: + first_row_id = entry.file.first_row_id + if first_row_id is None: + raise ValueError("Found None first row id in files") + # Range is inclusive [from_, to], so use row_count - 1 + list_ranges.append(Range(first_row_id, first_row_id + entry.file.row_count - 1)) + + if not list_ranges: + return defaultdict(list) + + sorted_ranges = Range.sort_and_merge_overlap(list_ranges) + + start_range, end_range = self._divide_ranges(sorted_ranges, sub_task_id, total_tasks) + + if start_range is None or end_range is None: + return defaultdict(list) + + # Filter files that overlap with this subtask's range + # Range is inclusive [from_, to], but _filter_by_row_range expects exclusive end + filtered_partitioned_files, _, _ = self._filter_by_row_range( + partitioned_files, start_range.from_, end_range.to + 1 + ) + return filtered_partitioned_files + + @staticmethod + def _divide_ranges( + sorted_ranges: List[Range], sub_task_id: int, total_tasks: int + ) -> Tuple[Optional[Range], Optional[Range]]: + if not sorted_ranges: + return None, None + + num_ranges = len(sorted_ranges) + + # If more tasks than ranges, some tasks get nothing + if sub_task_id >= num_ranges: + return None, None + + # Calculate balanced distribution of ranges across tasks + base_ranges_per_task = num_ranges // total_tasks + remainder = num_ranges % total_tasks + + # Each of the first 'remainder' tasks gets one extra range + if sub_task_id < remainder: + num_ranges_for_task = base_ranges_per_task + 1 + start_idx = sub_task_id * (base_ranges_per_task + 1) + else: + num_ranges_for_task = base_ranges_per_task + start_idx = ( + remainder * (base_ranges_per_task + 1) + + (sub_task_id - remainder) * base_ranges_per_task + ) + + if num_ranges_for_task == 0: + return None, None + + end_idx = start_idx + num_ranges_for_task - 1 + + return sorted_ranges[start_idx], sorted_ranges[end_idx] + @staticmethod def _split_by_row_id(files: List[DataFileMeta]) -> List[List[DataFileMeta]]: """ diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py index f415ab61ef8d..27f5801acfb6 100755 --- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py @@ -75,6 +75,7 @@ def __init__( self.only_read_real_buckets = options.bucket() == BucketMode.POSTPONE_BUCKET.value self.data_evolution = options.data_evolution_enabled() self.deletion_vectors_enabled = options.deletion_vectors_enabled() + self.no_slice_split = None def schema_fields_func(schema_id: int): return self.table.schema_manager.get_schema(schema_id).fields @@ -131,6 +132,9 @@ def scan(self) -> Plan: deletion_files_map ) + if self.no_slice_split: + split_generator.with_no_slice_split() + # Configure sharding if needed if self.idx_of_this_subtask is not None: split_generator.with_shard(self.idx_of_this_subtask, self.number_of_para_subtasks) @@ -222,6 +226,10 @@ def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) -> List[ max_workers=max_workers ) + def with_no_slice_split(self): + self.no_slice_split = True + return self + def with_shard(self, idx_of_this_subtask: int, number_of_para_subtasks: int) -> 'FullStartingScanner': if idx_of_this_subtask >= number_of_para_subtasks: raise ValueError("idx_of_this_subtask must be less than number_of_para_subtasks") diff --git a/paimon-python/pypaimon/read/scanner/split_generator.py b/paimon-python/pypaimon/read/scanner/split_generator.py index 6dab4fc12aa3..b1811ed2e39c 100644 --- a/paimon-python/pypaimon/read/scanner/split_generator.py +++ b/paimon-python/pypaimon/read/scanner/split_generator.py @@ -51,6 +51,11 @@ def __init__( self.number_of_para_subtasks = None self.start_pos_of_this_subtask = None self.end_pos_of_this_subtask = None + self.no_slice_split = None + + + def with_no_slice_split(self): + self.no_slice_split = True def with_shard(self, idx_of_this_subtask: int, number_of_para_subtasks: int): """Configure sharding for parallel processing.""" diff --git a/paimon-python/pypaimon/read/table_scan.py b/paimon-python/pypaimon/read/table_scan.py index 8276163450a0..7e1024d0a6a1 100755 --- a/paimon-python/pypaimon/read/table_scan.py +++ b/paimon-python/pypaimon/read/table_scan.py @@ -83,6 +83,10 @@ def _create_starting_scanner(self) -> Optional[StartingScanner]: vector_search=self.vector_search ) + def with_no_slice_split(self): + self.starting_scanner.with_no_slice_split() + return self + def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'TableScan': self.starting_scanner.with_shard(idx_of_this_subtask, number_of_para_subtasks) return self diff --git a/paimon-python/pypaimon/tests/shard_table_updator_test.py b/paimon-python/pypaimon/tests/shard_table_updator_test.py new file mode 100644 index 000000000000..a28603b2e693 --- /dev/null +++ b/paimon-python/pypaimon/tests/shard_table_updator_test.py @@ -0,0 +1,393 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import os +import tempfile +import unittest + +import pyarrow as pa + +from pypaimon import CatalogFactory, Schema + + +class ShardTableUpdatorTest(unittest.TestCase): + """Tests for ShardTableUpdator partial column updates in data-evolution mode.""" + + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.tempdir, 'warehouse') + cls.catalog = CatalogFactory.create({ + 'warehouse': cls.warehouse + }) + cls.catalog.create_database('default', False) + cls.table_count = 0 + + def _create_unique_table_name(self, prefix='test'): + ShardTableUpdatorTest.table_count += 1 + return f'default.{prefix}_{ShardTableUpdatorTest.table_count}' + + def test_compute_column_d_equals_c_plus_b_minus_a(self): + """ + Test: Create a table with columns a, b, c, d. + Write initial data for a, b, c. + Use ShardTableUpdator to compute d = c + b - a and fill in the d column. + """ + # Step 1: Create table with a, b, c, d columns (all int32) + table_schema = pa.schema([ + ('a', pa.int32()), + ('b', pa.int32()), + ('c', pa.int32()), + ('d', pa.int32()), + ]) + schema = Schema.from_pyarrow_schema( + table_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'} + ) + self.catalog.create_table('default.test_compute_d', schema, False) + table = self.catalog.get_table('default.test_compute_d') + + # Step 2: Write initial data for a, b, c columns only + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write().with_write_type(['a', 'b', 'c']) + table_commit = write_builder.new_commit() + + init_data = pa.Table.from_pydict({ + 'a': [1, 2, 3, 4, 5], + 'b': [10, 20, 30, 40, 50], + 'c': [100, 200, 300, 400, 500], + }, schema=pa.schema([('a', pa.int32()), ('b', pa.int32()), ('c', pa.int32())])) + + table_write.write_arrow(init_data) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + # Step 3: Use ShardTableUpdator to compute d = c + b - a + table_update = write_builder.new_update() + table_update.with_read_projection(['a', 'b', 'c']) + table_update.with_update_type(['d']) + + shard_updator = table_update.new_shard_updator(0, 1) + + # Read data using arrow_reader + reader = shard_updator.arrow_reader() + + for batch in iter(reader.read_next_batch, None): + # Compute d = c + b - a + a_values = batch.column('a').to_pylist() + b_values = batch.column('b').to_pylist() + c_values = batch.column('c').to_pylist() + + d_values = [c + b - a for a, b, c in zip(a_values, b_values, c_values)] + + # Create batch with d column + new_batch = pa.RecordBatch.from_pydict({ + 'd': d_values, + }, schema=pa.schema([('d', pa.int32())])) + + # Write d column + shard_updator.update_by_arrow_batch(new_batch) + + # Prepare and commit + commit_messages = shard_updator.prepare_commit() + commit = write_builder.new_commit() + commit.commit(commit_messages) + commit.close() + + # Step 4: Verify the result + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual = table_read.to_arrow(table_scan.plan().splits()) + + # Expected values: + # Row 0: d = 100 + 10 - 1 = 109 + # Row 1: d = 200 + 20 - 2 = 218 + # Row 2: d = 300 + 30 - 3 = 327 + # Row 3: d = 400 + 40 - 4 = 436 + # Row 4: d = 500 + 50 - 5 = 545 + expected = pa.Table.from_pydict({ + 'a': [1, 2, 3, 4, 5], + 'b': [10, 20, 30, 40, 50], + 'c': [100, 200, 300, 400, 500], + 'd': [109, 218, 327, 436, 545], + }, schema=table_schema) + + print("\n=== Actual Data ===") + print(actual.to_pandas()) + print("\n=== Expected Data ===") + print(expected.to_pandas()) + + self.assertEqual(actual, expected) + print("\n✅ Test passed! Column d = c + b - a computed correctly!") + + def test_update_existing_column(self): + """ + Test: Update an existing column's values using ShardTableUpdator. + Create table with a, b columns, then update b = a * 2. + """ + table_name = self._create_unique_table_name('update_existing') + table_schema = pa.schema([ + ('a', pa.int32()), + ('b', pa.int32()), + ]) + schema = Schema.from_pyarrow_schema( + table_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'} + ) + self.catalog.create_table(table_name, schema, False) + table = self.catalog.get_table(table_name) + + # Write initial data + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + init_data = pa.Table.from_pydict({ + 'a': [1, 2, 3, 4, 5], + 'b': [10, 20, 30, 40, 50], # Original values + }, schema=table_schema) + + table_write.write_arrow(init_data) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + # Use ShardTableUpdator to update b = a * 2 + table_update = write_builder.new_update() + table_update.with_read_projection(['a']) + table_update.with_update_type(['b']) + + shard_updator = table_update.new_shard_updator(0, 1) + reader = shard_updator.arrow_reader() + + for batch in iter(reader.read_next_batch, None): + a_values = batch.column('a').to_pylist() + new_b_values = [a * 2 for a in a_values] + + new_batch = pa.RecordBatch.from_pydict({ + 'b': new_b_values, + }, schema=pa.schema([('b', pa.int32())])) + + shard_updator.update_by_arrow_batch(new_batch) + + commit_messages = shard_updator.prepare_commit() + commit = write_builder.new_commit() + commit.commit(commit_messages) + commit.close() + + # Verify results + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual = table_read.to_arrow(table_scan.plan().splits()) + + expected = pa.Table.from_pydict({ + 'a': [1, 2, 3, 4, 5], + 'b': [2, 4, 6, 8, 10], # Updated: a * 2 + }, schema=table_schema) + + print("\n=== Update Existing Column ===") + print("Actual:", actual.to_pandas().to_dict()) + print("Expected:", expected.to_pandas().to_dict()) + + self.assertEqual(actual, expected) + print("✅ Test passed!") + + def test_sharding_distributes_work(self): + """ + Test: Verify that sharding divides splits among multiple shards. + """ + table_name = self._create_unique_table_name('sharding') + table_schema = pa.schema([ + ('x', pa.int32()), + ('y', pa.int32()), + ]) + schema = Schema.from_pyarrow_schema( + table_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'} + ) + self.catalog.create_table(table_name, schema, False) + table = self.catalog.get_table(table_name) + + # Write multiple files to create multiple splits + write_builder = table.new_batch_write_builder() + table_commit = write_builder.new_commit() + + all_commits = [] + for i in range(3): + table_write = write_builder.new_write().with_write_type(['x']) + data = pa.Table.from_pydict({ + 'x': list(range(i * 10, (i + 1) * 10)), + }, schema=pa.schema([('x', pa.int32())])) + table_write.write_arrow(data) + all_commits.extend(table_write.prepare_commit()) + table_write.close() + + table_commit.commit(all_commits) + table_commit.close() + + # Test sharding with 2 shards + table_update = write_builder.new_update() + table_update.with_read_projection(['x']) + table_update.with_update_type(['y']) + + shard_0 = table_update.new_shard_updator(0, 2) + shard_1 = table_update.new_shard_updator(1, 2) + + # Both shards should have some splits, or one could have all + total_splits_shard_0 = len(shard_0.splits) + total_splits_shard_1 = len(shard_1.splits) + total_splits = total_splits_shard_0 + total_splits_shard_1 + + print(f"\n=== Sharding Test ===") + print(f"Shard 0 splits: {total_splits_shard_0}") + print(f"Shard 1 splits: {total_splits_shard_1}") + print(f"Total splits: {total_splits}") + + # At least one shard should have splits + self.assertTrue(total_splits > 0, "Should have splits") + print("✅ Test passed!") + + def test_multiple_columns_update(self): + """ + Test: Update multiple columns at once. + """ + table_name = self._create_unique_table_name('multi_cols') + table_schema = pa.schema([ + ('a', pa.int32()), + ('b', pa.int32()), + ('c', pa.int32()), + ]) + schema = Schema.from_pyarrow_schema( + table_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'} + ) + self.catalog.create_table(table_name, schema, False) + table = self.catalog.get_table(table_name) + + # Write initial data for 'a' only + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write().with_write_type(['a']) + table_commit = write_builder.new_commit() + + init_data = pa.Table.from_pydict({ + 'a': [1, 2, 3, 4, 5], + }, schema=pa.schema([('a', pa.int32())])) + + table_write.write_arrow(init_data) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + # Use ShardTableUpdator to add b and c (b = a + 1, c = a + 2) + table_update = write_builder.new_update() + table_update.with_read_projection(['a']) + table_update.with_update_type(['b', 'c']) + + shard_updator = table_update.new_shard_updator(0, 1) + reader = shard_updator.arrow_reader() + + for batch in iter(reader.read_next_batch, None): + a_values = batch.column('a').to_pylist() + b_values = [a + 1 for a in a_values] + c_values = [a + 2 for a in a_values] + + new_batch = pa.RecordBatch.from_pydict({ + 'b': b_values, + 'c': c_values, + }, schema=pa.schema([('b', pa.int32()), ('c', pa.int32())])) + + shard_updator.update_by_arrow_batch(new_batch) + + commit_messages = shard_updator.prepare_commit() + commit = write_builder.new_commit() + commit.commit(commit_messages) + commit.close() + + # Verify + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual = table_read.to_arrow(table_scan.plan().splits()) + + expected = pa.Table.from_pydict({ + 'a': [1, 2, 3, 4, 5], + 'b': [2, 3, 4, 5, 6], + 'c': [3, 4, 5, 6, 7], + }, schema=table_schema) + + print("\n=== Multiple Columns Update ===") + print("Actual:", actual.to_pandas()) + + self.assertEqual(actual, expected) + print("✅ Test passed!") + + def test_row_ranges_computed_correctly(self): + """ + Test: Verify that row_ranges are computed correctly from files. + """ + table_name = self._create_unique_table_name('row_ranges') + table_schema = pa.schema([ + ('val', pa.int32()), + ]) + schema = Schema.from_pyarrow_schema( + table_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'} + ) + self.catalog.create_table(table_name, schema, False) + table = self.catalog.get_table(table_name) + + # Write data + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + data = pa.Table.from_pydict({ + 'val': [10, 20, 30, 40, 50], + }, schema=table_schema) + + table_write.write_arrow(data) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + # Create shard updator and check row_ranges + table_update = write_builder.new_update() + table_update.with_read_projection(['val']) + table_update.with_update_type(['val']) + + shard_updator = table_update.new_shard_updator(0, 1) + + print("\n=== Row Ranges Test ===") + print(f"Row ranges: {shard_updator.row_ranges}") + + # Should have at least one row range + self.assertTrue(len(shard_updator.row_ranges) > 0, "Should have row ranges") + + # Each row range should have partition and range + for partition, row_range in shard_updator.row_ranges: + self.assertIsNotNone(row_range.from_, "Range should have from_") + self.assertIsNotNone(row_range.to, "Range should have to") + print(f" Partition: {partition}, Range: {row_range.from_} - {row_range.to}") + + print("✅ Test passed!") + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/write/table_update.py b/paimon-python/pypaimon/write/table_update.py index 5738c45d4e40..7364bba79f52 100644 --- a/paimon-python/pypaimon/write/table_update.py +++ b/paimon-python/pypaimon/write/table_update.py @@ -28,6 +28,7 @@ from pypaimon.write.commit_message import CommitMessage from pypaimon.write.table_update_by_row_id import TableUpdateByRowId from pypaimon.write.writer.data_writer import DataWriter +from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter class TableUpdate: @@ -48,7 +49,7 @@ def with_update_type(self, update_cols: List[str]): self.update_cols = update_cols return self - def with_projection(self, projection: List[str]): + def with_read_projection(self, projection: List[str]): self.projection = projection def new_shard_updator(self, total_shard_count: int, shard_num: int): @@ -61,7 +62,7 @@ def update_by_arrow_with_row_id(self, table: pa.Table) -> List[CommitMessage]: class ShardTableUpdator: - def __init__(self, table, projection: Optional[List[str]], write_cols: List[str], commit_user, total_shard_count: int, shard_num: int): + def __init__(self, table, projection: Optional[List[str]], write_cols: List[str], commit_user, shard_num: int, total_shard_count: int, ): from pypaimon.table.file_store_table import FileStoreTable self.table: FileStoreTable = table self.projection = projection @@ -75,9 +76,7 @@ def __init__(self, table, projection: Optional[List[str]], write_cols: List[str] self.writer: Optional[SingleWriter] = None self.dict = defaultdict(list) - scanner = self.table.new_read_builder().new_scan() - if total_shard_count != 0: - scanner.with_shard(total_shard_count, shard_num) + scanner = self.table.new_read_builder().new_scan().with_shard(shard_num, total_shard_count).with_no_slice_split() self.splits = scanner.plan().splits() self.row_ranges: List[(Tuple, Range)] = [] @@ -86,8 +85,8 @@ def __init__(self, table, projection: Optional[List[str]], write_cols: List[str] raise ValueError(f"Split {split} is not DataSplit.") files = split.files ranges = self.compute_from_files(files) - for range in ranges: - self.row_ranges.extend((split.partition, range)) + for row_range in ranges: + self.row_ranges.append((tuple(split.partition.values), row_range)) @staticmethod def compute_from_files(files: List[DataFileMeta]) -> List[Range]: @@ -126,18 +125,18 @@ def update_by_arrow_batch(self, data: pa.RecordBatch): def _init_writer(self): if self.writer is None: - tuple: (Tuple, Range) = self.row_ranges[self.write_pos] - partition = tuple(0) - range = tuple(1) - writer = DataWriter(self.table, partition, 0, 0, self.table.options, self.write_cols) - writer.target_file_size = MemorySize.of_mebi_bytes(999999999) - self.writer = SingleWriter(writer, partition, range.from_, range.to - range.from_ + 1) - + item = self.row_ranges[self.write_pos] + self.write_pos += 1 + partition = item[0] + row_range = item[1] + writer = AppendOnlyDataWriter(self.table, partition, 0, 0, self.table.options, self.write_cols) + writer.target_file_size = MemorySize.of_mebi_bytes(999999999).get_bytes() + self.writer = SingleWriter(writer, partition, row_range.from_, row_range.to - row_range.from_ + 1) class SingleWriter: - def __init__(self, writer: DataWriter, partition: Tuple, first_row_id: int, row_count: int): + def __init__(self, writer: DataWriter, partition, first_row_id: int, row_count: int): self.writer: DataWriter = writer self._partition = partition self.first_row_id = first_row_id @@ -159,6 +158,8 @@ def partition(self) -> Tuple: return self._partition def end(self) -> DataFileMeta: + if self.capacity() != 0: + raise Exception("There still capacity left in the writer.") files = self.writer.prepare_commit() if len(files) != 1: raise Exception("Should have one file.") From c73c91f79a750ea9b1d8c6b6a5ec5a2d0966c8ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 29 Jan 2026 14:44:30 +0800 Subject: [PATCH 03/22] Fix minus --- paimon-python/pypaimon/globalindex/range.py | 5 +++-- .../scanner/data_evolution_split_generator.py | 15 +-------------- 2 files changed, 4 insertions(+), 16 deletions(-) diff --git a/paimon-python/pypaimon/globalindex/range.py b/paimon-python/pypaimon/globalindex/range.py index f27a637ae71e..19b9b40e9439 100644 --- a/paimon-python/pypaimon/globalindex/range.py +++ b/paimon-python/pypaimon/globalindex/range.py @@ -154,7 +154,7 @@ def merge_sorted_as_possible(ranges: List['Range']) -> List['Range']: return result @staticmethod - def sort_and_merge_overlap(ranges: List['Range'], merge: bool = True) -> List['Range']: + def sort_and_merge_overlap(ranges: List['Range'], merge: bool = True, adjacent: bool = True) -> List['Range']: """ Sort ranges and optionally merge overlapping ones. """ @@ -166,10 +166,11 @@ def sort_and_merge_overlap(ranges: List['Range'], merge: bool = True) -> List['R if not merge: return sorted_ranges + adjacent_value = 1 if adjacent else 0 result = [sorted_ranges[0]] for r in sorted_ranges[1:]: last = result[-1] - if r.from_ <= last.to + 1: + if r.from_ <= last.to + adjacent_value: # Merge with last range result[-1] = Range(last.from_, max(last.to, r.to)) else: diff --git a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py index 88a73b62407a..8aa7d4300029 100644 --- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py +++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py @@ -78,7 +78,6 @@ def sort_key(manifest_entry: ManifestEntry) -> tuple: ) elif self.idx_of_this_subtask is not None: if self.no_slice_split: - no_slice_for_split = True partitioned_files = self._filter_by_shard_no_slice(partitioned_files, self.idx_of_this_subtask, self.number_of_para_subtasks) else: # shard data range: [plan_start_pos, plan_end_pos) @@ -265,7 +264,6 @@ def _filter_by_shard(self, partitioned_files: defaultdict) -> tuple: def _filter_by_shard_no_slice(self, partitioned_files: defaultdict, sub_task_id: int, total_tasks: int) -> defaultdict: list_ranges = [] - for file_entries in partitioned_files.values(): for entry in file_entries: first_row_id = entry.file.first_row_id @@ -274,16 +272,10 @@ def _filter_by_shard_no_slice(self, partitioned_files: defaultdict, sub_task_id: # Range is inclusive [from_, to], so use row_count - 1 list_ranges.append(Range(first_row_id, first_row_id + entry.file.row_count - 1)) - if not list_ranges: - return defaultdict(list) - - sorted_ranges = Range.sort_and_merge_overlap(list_ranges) + sorted_ranges = Range.sort_and_merge_overlap(list_ranges, True, False) start_range, end_range = self._divide_ranges(sorted_ranges, sub_task_id, total_tasks) - if start_range is None or end_range is None: - return defaultdict(list) - # Filter files that overlap with this subtask's range # Range is inclusive [from_, to], but _filter_by_row_range expects exclusive end filtered_partitioned_files, _, _ = self._filter_by_row_range( @@ -318,12 +310,7 @@ def _divide_ranges( remainder * (base_ranges_per_task + 1) + (sub_task_id - remainder) * base_ranges_per_task ) - - if num_ranges_for_task == 0: - return None, None - end_idx = start_idx + num_ranges_for_task - 1 - return sorted_ranges[start_idx], sorted_ranges[end_idx] @staticmethod From 4e8a2c3698f18908fd36269dba00a989f9edd700 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 29 Jan 2026 14:56:52 +0800 Subject: [PATCH 04/22] minus --- .../pypaimon/read/scanner/data_evolution_split_generator.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py index 8aa7d4300029..0f0daf8e3141 100644 --- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py +++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py @@ -313,15 +313,14 @@ def _divide_ranges( end_idx = start_idx + num_ranges_for_task - 1 return sorted_ranges[start_idx], sorted_ranges[end_idx] - @staticmethod - def _split_by_row_id(files: List[DataFileMeta]) -> List[List[DataFileMeta]]: + def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]: """ Split files by row ID for data evolution tables. """ split_by_row_id = [] # Filter blob files to only include those within the row ID range of non-blob files - sorted_files = DataEvolutionSplitGenerator._filter_blob(files) + sorted_files = self._filter_blob(files) # Split files by firstRowId last_row_id = -1 From be8f4f142e5fb407bc468486e638a84638e4a63c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 29 Jan 2026 14:58:02 +0800 Subject: [PATCH 05/22] minus --- .../pypaimon/read/scanner/data_evolution_split_generator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py index 0f0daf8e3141..822eb4a19db1 100644 --- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py +++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py @@ -334,7 +334,7 @@ def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta] split_by_row_id.append([file]) continue - if not DataEvolutionSplitGenerator._is_blob_file(file.file_name) and first_row_id != last_row_id: + if not self._is_blob_file(file.file_name) and first_row_id != last_row_id: if current_split: split_by_row_id.append(current_split) From 9ee5fa1558152c2799b3427d4658fedb2cdb8dfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 29 Jan 2026 15:01:54 +0800 Subject: [PATCH 06/22] minus --- paimon-python/pypaimon/read/scanner/starting_scanner.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/paimon-python/pypaimon/read/scanner/starting_scanner.py b/paimon-python/pypaimon/read/scanner/starting_scanner.py index 7e6cdfd81aea..566022984066 100644 --- a/paimon-python/pypaimon/read/scanner/starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/starting_scanner.py @@ -26,3 +26,7 @@ class StartingScanner(ABC): @abstractmethod def scan(self) -> Plan: """Plan the files to read.""" + + + def with_no_slice_split(self): + return self \ No newline at end of file From a7236146e27d477ef5957432e429299568e4839d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 29 Jan 2026 15:02:59 +0800 Subject: [PATCH 07/22] minus --- paimon-python/pypaimon/read/scanner/starting_scanner.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/read/scanner/starting_scanner.py b/paimon-python/pypaimon/read/scanner/starting_scanner.py index 566022984066..41569618d08b 100644 --- a/paimon-python/pypaimon/read/scanner/starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/starting_scanner.py @@ -29,4 +29,5 @@ def scan(self) -> Plan: def with_no_slice_split(self): - return self \ No newline at end of file + return self + \ No newline at end of file From bbab35a369450c7889716bf78444bdd1b79941a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 29 Jan 2026 15:06:12 +0800 Subject: [PATCH 08/22] minus --- .../pypaimon/read/scanner/starting_scanner.py | 1 - paimon-python/pypaimon/write/table_update.py | 14 ++++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/starting_scanner.py b/paimon-python/pypaimon/read/scanner/starting_scanner.py index 41569618d08b..4af3ed65bc88 100644 --- a/paimon-python/pypaimon/read/scanner/starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/starting_scanner.py @@ -30,4 +30,3 @@ def scan(self) -> Plan: def with_no_slice_split(self): return self - \ No newline at end of file diff --git a/paimon-python/pypaimon/write/table_update.py b/paimon-python/pypaimon/write/table_update.py index 7364bba79f52..336b1c72293f 100644 --- a/paimon-python/pypaimon/write/table_update.py +++ b/paimon-python/pypaimon/write/table_update.py @@ -53,16 +53,19 @@ def with_read_projection(self, projection: List[str]): self.projection = projection def new_shard_updator(self, total_shard_count: int, shard_num: int): - return ShardTableUpdator(self.table, self.projection, self.update_cols, self.commit_user, total_shard_count, shard_num) + return ShardTableUpdator(self.table, self.projection, self.update_cols, self.commit_user, total_shard_count, + shard_num) def update_by_arrow_with_row_id(self, table: pa.Table) -> List[CommitMessage]: update_by_row_id = TableUpdateByRowId(self.table, self.commit_user) update_by_row_id.update_columns(table, self.update_cols) return update_by_row_id.commit_messages + class ShardTableUpdator: - def __init__(self, table, projection: Optional[List[str]], write_cols: List[str], commit_user, shard_num: int, total_shard_count: int, ): + def __init__(self, table, projection: Optional[List[str]], write_cols: List[str], commit_user, shard_num: int, + total_shard_count: int, ): from pypaimon.table.file_store_table import FileStoreTable self.table: FileStoreTable = table self.projection = projection @@ -76,7 +79,8 @@ def __init__(self, table, projection: Optional[List[str]], write_cols: List[str] self.writer: Optional[SingleWriter] = None self.dict = defaultdict(list) - scanner = self.table.new_read_builder().new_scan().with_shard(shard_num, total_shard_count).with_no_slice_split() + scanner = self.table.new_read_builder().new_scan().with_shard(shard_num, + total_shard_count).with_no_slice_split() self.splits = scanner.plan().splits() self.row_ranges: List[(Tuple, Range)] = [] @@ -94,7 +98,7 @@ def compute_from_files(files: List[DataFileMeta]) -> List[Range]: for file in files: ranges.append(Range(file.first_row_id, file.first_row_id + file.row_count - 1)) - return Range.merge_sorted_as_possible(ranges) + return Range.sort_and_merge_overlap(ranges, True, False) def arrow_reader(self) -> pyarrow.ipc.RecordBatchReader: read_builder = self.table.new_read_builder() @@ -122,7 +126,6 @@ def update_by_arrow_batch(self, data: pa.RecordBatch): if (data_in_next_writer is not None): self.update_by_arrow_batch(data_in_next_writer) - def _init_writer(self): if self.writer is None: item = self.row_ranges[self.write_pos] @@ -143,7 +146,6 @@ def __init__(self, writer: DataWriter, partition, first_row_id: int, row_count: self.row_count = row_count self.written_records_count = 0 - def capacity(self) -> int: return self.row_count - self.written_records_count From ddd3477c7d41736161561f8321f4fd63ab8156be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 29 Jan 2026 15:14:36 +0800 Subject: [PATCH 09/22] minus --- paimon-python/pypaimon/write/table_update.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/paimon-python/pypaimon/write/table_update.py b/paimon-python/pypaimon/write/table_update.py index 336b1c72293f..e2490a98e5d0 100644 --- a/paimon-python/pypaimon/write/table_update.py +++ b/paimon-python/pypaimon/write/table_update.py @@ -166,5 +166,7 @@ def end(self) -> DataFileMeta: if len(files) != 1: raise Exception("Should have one file.") file = files[0] + if file.row_count != self.row_count: + raise Exception("File row count mismatch.") file = file.assign_first_row_id(self.first_row_id) return file From ac7c1423f09c8cfcf02eaf8f840999681c0dbb6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 29 Jan 2026 17:23:18 +0800 Subject: [PATCH 10/22] Fix minus --- .../scanner/data_evolution_split_generator.py | 19 +- paimon-python/pypaimon/read/table_read.py | 8 + .../tests/shard_table_updator_test.py | 316 +++++++----------- 3 files changed, 142 insertions(+), 201 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py index 822eb4a19db1..7f1a3578b37a 100644 --- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py +++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py @@ -275,13 +275,18 @@ def _filter_by_shard_no_slice(self, partitioned_files: defaultdict, sub_task_id: sorted_ranges = Range.sort_and_merge_overlap(list_ranges, True, False) start_range, end_range = self._divide_ranges(sorted_ranges, sub_task_id, total_tasks) - - # Filter files that overlap with this subtask's range - # Range is inclusive [from_, to], but _filter_by_row_range expects exclusive end - filtered_partitioned_files, _, _ = self._filter_by_row_range( - partitioned_files, start_range.from_, end_range.to + 1 - ) - return filtered_partitioned_files + if start_range is None or end_range is None: + return defaultdict(list) + start_first_row_id = start_range.from_ + end_first_row_id = end_range.to + + filtered_partitioned_files = { + k: [x for x in v if x.file.first_row_id >= start_first_row_id and x.file.first_row_id <= end_first_row_id] + for k, v in partitioned_files.items() + } + + filtered_partitioned_files = {k: v for k, v in filtered_partitioned_files.items() if v} + return defaultdict(list, filtered_partitioned_files) @staticmethod def _divide_ranges( diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index f546c4be6b3a..535e9da475e8 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -80,10 +80,18 @@ def to_arrow(self, splits: List[Split]) -> Optional[pyarrow.Table]: schema = PyarrowFieldParser.from_paimon_schema(self.read_type) table_list = [] + i = 0 for batch in iter(batch_reader.read_next_batch, None): if batch.num_rows == 0: continue + i+=1 + if i == 381: + i = 381 + if i == 382: + i = 382 table_list.append(self._try_to_pad_batch_by_schema(batch, schema)) + print("sdfasdf:" + str(i)) + pyarrow.Table.from_batches(table_list) if not table_list: return pyarrow.Table.from_arrays([pyarrow.array([], type=field.type) for field in schema], schema=schema) diff --git a/paimon-python/pypaimon/tests/shard_table_updator_test.py b/paimon-python/pypaimon/tests/shard_table_updator_test.py index a28603b2e693..8acdeb31650d 100644 --- a/paimon-python/pypaimon/tests/shard_table_updator_test.py +++ b/paimon-python/pypaimon/tests/shard_table_updator_test.py @@ -58,8 +58,9 @@ def test_compute_column_d_equals_c_plus_b_minus_a(self): table_schema, options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'} ) - self.catalog.create_table('default.test_compute_d', schema, False) - table = self.catalog.get_table('default.test_compute_d') + name = self._create_unique_table_name() + self.catalog.create_table(name, schema, False) + table = self.catalog.get_table(name) # Step 2: Write initial data for a, b, c columns only write_builder = table.new_batch_write_builder() @@ -136,139 +137,106 @@ def test_compute_column_d_equals_c_plus_b_minus_a(self): self.assertEqual(actual, expected) print("\n✅ Test passed! Column d = c + b - a computed correctly!") - def test_update_existing_column(self): + def test_compute_column_d_equals_c_plus_b_minus_a2(self): """ - Test: Update an existing column's values using ShardTableUpdator. - Create table with a, b columns, then update b = a * 2. + Test: Create a table with columns a, b, c, d. + Write initial data for a, b, c. + Use ShardTableUpdator to compute d = c + b - a and fill in the d column. """ - table_name = self._create_unique_table_name('update_existing') + # Step 1: Create table with a, b, c, d columns (all int32) table_schema = pa.schema([ ('a', pa.int32()), ('b', pa.int32()), + ('c', pa.int32()), + ('d', pa.int32()), ]) schema = Schema.from_pyarrow_schema( table_schema, options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'} ) - self.catalog.create_table(table_name, schema, False) - table = self.catalog.get_table(table_name) + name = self._create_unique_table_name() + self.catalog.create_table(name, schema, False) + table = self.catalog.get_table(name) - # Write initial data - write_builder = table.new_batch_write_builder() - table_write = write_builder.new_write() - table_commit = write_builder.new_commit() + # Step 2: Write initial data for a, b, c columns only + for i in range(1000): + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write().with_write_type(['a', 'b', 'c']) + table_commit = write_builder.new_commit() + + init_data = pa.Table.from_pydict({ + 'a': [1, 2, 3, 4, 5], + 'b': [10, 20, 30, 40, 50], + 'c': [100, 200, 300, 400, 500], + }, schema=pa.schema([('a', pa.int32()), ('b', pa.int32()), ('c', pa.int32())])) + + table_write.write_arrow(init_data) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() - init_data = pa.Table.from_pydict({ - 'a': [1, 2, 3, 4, 5], - 'b': [10, 20, 30, 40, 50], # Original values - }, schema=table_schema) + # Step 3: Use ShardTableUpdator to compute d = c + b - a + table_update = write_builder.new_update() + table_update.with_read_projection(['a', 'b', 'c']) + table_update.with_update_type(['d']) - table_write.write_arrow(init_data) - table_commit.commit(table_write.prepare_commit()) - table_write.close() - table_commit.close() + for i in range(10): + shard_updator = table_update.new_shard_updator(i, 10) - # Use ShardTableUpdator to update b = a * 2 - table_update = write_builder.new_update() - table_update.with_read_projection(['a']) - table_update.with_update_type(['b']) + # Read data using arrow_reader + reader = shard_updator.arrow_reader() - shard_updator = table_update.new_shard_updator(0, 1) - reader = shard_updator.arrow_reader() + for batch in iter(reader.read_next_batch, None): + # Compute d = c + b - a + a_values = batch.column('a').to_pylist() + b_values = batch.column('b').to_pylist() + c_values = batch.column('c').to_pylist() - for batch in iter(reader.read_next_batch, None): - a_values = batch.column('a').to_pylist() - new_b_values = [a * 2 for a in a_values] + d_values = [c + b - a for a, b, c in zip(a_values, b_values, c_values)] - new_batch = pa.RecordBatch.from_pydict({ - 'b': new_b_values, - }, schema=pa.schema([('b', pa.int32())])) + # Create batch with d column + new_batch = pa.RecordBatch.from_pydict({ + 'd': d_values, + }, schema=pa.schema([('d', pa.int32())])) - shard_updator.update_by_arrow_batch(new_batch) + # Write d column + shard_updator.update_by_arrow_batch(new_batch) - commit_messages = shard_updator.prepare_commit() - commit = write_builder.new_commit() - commit.commit(commit_messages) - commit.close() + # Prepare and commit + commit_messages = shard_updator.prepare_commit() + commit = write_builder.new_commit() + commit.commit(commit_messages) + commit.close() - # Verify results + # Step 4: Verify the result read_builder = table.new_read_builder() table_scan = read_builder.new_scan() table_read = read_builder.new_read() actual = table_read.to_arrow(table_scan.plan().splits()) + # Expected values: + # Row 0: d = 100 + 10 - 1 = 109 + # Row 1: d = 200 + 20 - 2 = 218 + # Row 2: d = 300 + 30 - 3 = 327 + # Row 3: d = 400 + 40 - 4 = 436 + # Row 4: d = 500 + 50 - 5 = 545 expected = pa.Table.from_pydict({ - 'a': [1, 2, 3, 4, 5], - 'b': [2, 4, 6, 8, 10], # Updated: a * 2 + 'a': [1, 2, 3, 4, 5] * 1000, + 'b': [10, 20, 30, 40, 50] * 1000, + 'c': [100, 200, 300, 400, 500] * 1000, + 'd': [109, 218, 327, 436, 545] * 1000, }, schema=table_schema) - print("\n=== Update Existing Column ===") - print("Actual:", actual.to_pandas().to_dict()) - print("Expected:", expected.to_pandas().to_dict()) - - self.assertEqual(actual, expected) - print("✅ Test passed!") - - def test_sharding_distributes_work(self): - """ - Test: Verify that sharding divides splits among multiple shards. - """ - table_name = self._create_unique_table_name('sharding') - table_schema = pa.schema([ - ('x', pa.int32()), - ('y', pa.int32()), - ]) - schema = Schema.from_pyarrow_schema( - table_schema, - options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'} - ) - self.catalog.create_table(table_name, schema, False) - table = self.catalog.get_table(table_name) - - # Write multiple files to create multiple splits - write_builder = table.new_batch_write_builder() - table_commit = write_builder.new_commit() - - all_commits = [] - for i in range(3): - table_write = write_builder.new_write().with_write_type(['x']) - data = pa.Table.from_pydict({ - 'x': list(range(i * 10, (i + 1) * 10)), - }, schema=pa.schema([('x', pa.int32())])) - table_write.write_arrow(data) - all_commits.extend(table_write.prepare_commit()) - table_write.close() - - table_commit.commit(all_commits) - table_commit.close() - - # Test sharding with 2 shards - table_update = write_builder.new_update() - table_update.with_read_projection(['x']) - table_update.with_update_type(['y']) - - shard_0 = table_update.new_shard_updator(0, 2) - shard_1 = table_update.new_shard_updator(1, 2) - - # Both shards should have some splits, or one could have all - total_splits_shard_0 = len(shard_0.splits) - total_splits_shard_1 = len(shard_1.splits) - total_splits = total_splits_shard_0 + total_splits_shard_1 - print(f"\n=== Sharding Test ===") - print(f"Shard 0 splits: {total_splits_shard_0}") - print(f"Shard 1 splits: {total_splits_shard_1}") - print(f"Total splits: {total_splits}") + print("\n=== Actual Data ===") + print(actual.to_pandas()) + print("\n=== Expected Data ===") + print(expected.to_pandas()) - # At least one shard should have splits - self.assertTrue(total_splits > 0, "Should have splits") - print("✅ Test passed!") + self.assertEqual(actual, expected) + print("\n✅ Test passed! Column d = c + b - a computed correctly!") - def test_multiple_columns_update(self): - """ - Test: Update multiple columns at once. - """ - table_name = self._create_unique_table_name('multi_cols') + def test_compute_column_with_existing_column(self): table_schema = pa.schema([ ('a', pa.int32()), ('b', pa.int32()), @@ -278,116 +246,76 @@ def test_multiple_columns_update(self): table_schema, options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'} ) - self.catalog.create_table(table_name, schema, False) - table = self.catalog.get_table(table_name) + name = self._create_unique_table_name() + self.catalog.create_table(name, schema, False) + table = self.catalog.get_table(name) - # Write initial data for 'a' only - write_builder = table.new_batch_write_builder() - table_write = write_builder.new_write().with_write_type(['a']) - table_commit = write_builder.new_commit() + # Step 2: Write initial data for a, b, c columns only + for i in range(1000): + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write().with_write_type(['a', 'b', 'c']) + table_commit = write_builder.new_commit() + + init_data = pa.Table.from_pydict({ + 'a': [1, 2, 3, 4, 5], + 'b': [10, 20, 30, 40, 50], + 'c': [100, 200, 300, 400, 500], + }, schema=pa.schema([('a', pa.int32()), ('b', pa.int32()), ('c', pa.int32())])) + + table_write.write_arrow(init_data) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() - init_data = pa.Table.from_pydict({ - 'a': [1, 2, 3, 4, 5], - }, schema=pa.schema([('a', pa.int32())])) + # Step 3: Use ShardTableUpdator to compute d = c + b - a + table_update = write_builder.new_update() + table_update.with_read_projection(['a', 'b']) + table_update.with_update_type(['c']) - table_write.write_arrow(init_data) - table_commit.commit(table_write.prepare_commit()) - table_write.close() - table_commit.close() + for i in range(10): + shard_updator = table_update.new_shard_updator(i, 10) - # Use ShardTableUpdator to add b and c (b = a + 1, c = a + 2) - table_update = write_builder.new_update() - table_update.with_read_projection(['a']) - table_update.with_update_type(['b', 'c']) + # Read data using arrow_reader + reader = shard_updator.arrow_reader() - shard_updator = table_update.new_shard_updator(0, 1) - reader = shard_updator.arrow_reader() + for batch in iter(reader.read_next_batch, None): + a_values = batch.column('a').to_pylist() + b_values = batch.column('b').to_pylist() - for batch in iter(reader.read_next_batch, None): - a_values = batch.column('a').to_pylist() - b_values = [a + 1 for a in a_values] - c_values = [a + 2 for a in a_values] + c_values = [b - a for a, b in zip(a_values, b_values)] - new_batch = pa.RecordBatch.from_pydict({ - 'b': b_values, - 'c': c_values, - }, schema=pa.schema([('b', pa.int32()), ('c', pa.int32())])) + new_batch = pa.RecordBatch.from_pydict({ + 'c': c_values, + }, schema=pa.schema([('c', pa.int32())])) - shard_updator.update_by_arrow_batch(new_batch) + shard_updator.update_by_arrow_batch(new_batch) - commit_messages = shard_updator.prepare_commit() - commit = write_builder.new_commit() - commit.commit(commit_messages) - commit.close() + # Prepare and commit + commit_messages = shard_updator.prepare_commit() + commit = write_builder.new_commit() + commit.commit(commit_messages) + commit.close() - # Verify + # Step 4: Verify the result read_builder = table.new_read_builder() table_scan = read_builder.new_scan() table_read = read_builder.new_read() actual = table_read.to_arrow(table_scan.plan().splits()) expected = pa.Table.from_pydict({ - 'a': [1, 2, 3, 4, 5], - 'b': [2, 3, 4, 5, 6], - 'c': [3, 4, 5, 6, 7], - }, schema=table_schema) - - print("\n=== Multiple Columns Update ===") - print("Actual:", actual.to_pandas()) - - self.assertEqual(actual, expected) - print("✅ Test passed!") - - def test_row_ranges_computed_correctly(self): - """ - Test: Verify that row_ranges are computed correctly from files. - """ - table_name = self._create_unique_table_name('row_ranges') - table_schema = pa.schema([ - ('val', pa.int32()), - ]) - schema = Schema.from_pyarrow_schema( - table_schema, - options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'} - ) - self.catalog.create_table(table_name, schema, False) - table = self.catalog.get_table(table_name) - - # Write data - write_builder = table.new_batch_write_builder() - table_write = write_builder.new_write() - table_commit = write_builder.new_commit() - - data = pa.Table.from_pydict({ - 'val': [10, 20, 30, 40, 50], + 'a': [1, 2, 3, 4, 5] * 1000, + 'b': [10, 20, 30, 40, 50] * 1000, + 'c': [9, 18, 27, 36, 45] * 1000, }, schema=table_schema) - table_write.write_arrow(data) - table_commit.commit(table_write.prepare_commit()) - table_write.close() - table_commit.close() - - # Create shard updator and check row_ranges - table_update = write_builder.new_update() - table_update.with_read_projection(['val']) - table_update.with_update_type(['val']) - - shard_updator = table_update.new_shard_updator(0, 1) - - print("\n=== Row Ranges Test ===") - print(f"Row ranges: {shard_updator.row_ranges}") - # Should have at least one row range - self.assertTrue(len(shard_updator.row_ranges) > 0, "Should have row ranges") - - # Each row range should have partition and range - for partition, row_range in shard_updator.row_ranges: - self.assertIsNotNone(row_range.from_, "Range should have from_") - self.assertIsNotNone(row_range.to, "Range should have to") - print(f" Partition: {partition}, Range: {row_range.from_} - {row_range.to}") - - print("✅ Test passed!") + print("\n=== Actual Data ===") + print(actual.to_pandas()) + print("\n=== Expected Data ===") + print(expected.to_pandas()) + self.assertEqual(actual, expected) + print("\n✅ Test passed! Column d = c + b - a computed correctly!") if __name__ == '__main__': unittest.main() From ce2855d480843b0a40dcb7bc93c51f0303920f46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 29 Jan 2026 17:55:21 +0800 Subject: [PATCH 11/22] minus --- .../scanner/data_evolution_split_generator.py | 36 ++++--------------- .../read/scanner/full_starting_scanner.py | 8 ----- .../pypaimon/read/scanner/split_generator.py | 3 -- .../pypaimon/read/scanner/starting_scanner.py | 4 --- paimon-python/pypaimon/read/table_scan.py | 4 --- paimon-python/pypaimon/write/table_update.py | 3 +- 6 files changed, 8 insertions(+), 50 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py index 7f1a3578b37a..d9bc3b86bcd5 100644 --- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py +++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py @@ -77,11 +77,7 @@ def sort_key(manifest_entry: ManifestEntry) -> tuple: self.end_pos_of_this_subtask ) elif self.idx_of_this_subtask is not None: - if self.no_slice_split: - partitioned_files = self._filter_by_shard_no_slice(partitioned_files, self.idx_of_this_subtask, self.number_of_para_subtasks) - else: - # shard data range: [plan_start_pos, plan_end_pos) - partitioned_files, plan_start_pos, plan_end_pos = self._filter_by_shard(partitioned_files) + partitioned_files = self._filter_by_shard(partitioned_files, self.idx_of_this_subtask, self.number_of_para_subtasks) def weight_func(file_list: List[DataFileMeta]) -> int: return max(sum(f.file_size for f in file_list), self.open_file_cost) @@ -111,12 +107,11 @@ def weight_func(file_list: List[DataFileMeta]) -> int: flatten_packed_files, packed_files, sorted_entries_list ) - if not self.no_slice_split: - if self.start_pos_of_this_subtask is not None or self.idx_of_this_subtask is not None: - splits = self._wrap_to_sliced_splits(splits, plan_start_pos, plan_end_pos) - # Wrap splits with IndexedSplit if row_ranges is provided - if self.row_ranges: - splits = self._wrap_to_indexed_splits(splits) + if self.start_pos_of_this_subtask is not None: + splits = self._wrap_to_sliced_splits(splits, plan_start_pos, plan_end_pos) + # Wrap splits with IndexedSplit if row_ranges is provided + if self.row_ranges: + splits = self._wrap_to_indexed_splits(splits) return splits @@ -245,24 +240,7 @@ def _filter_by_row_range( return filtered_partitioned_files, plan_start_pos, plan_end_pos - def _filter_by_shard(self, partitioned_files: defaultdict) -> tuple: - """ - Filter file entries by shard for data evolution tables. - """ - # Calculate total rows (excluding blob files) - total_row = sum( - entry.file.row_count - for file_entries in partitioned_files.values() - for entry in file_entries - if not self._is_blob_file(entry.file.file_name) - ) - - # Calculate shard range using shared helper - start_pos, end_pos = self._compute_shard_range(total_row) - - return self._filter_by_row_range(partitioned_files, start_pos, end_pos) - - def _filter_by_shard_no_slice(self, partitioned_files: defaultdict, sub_task_id: int, total_tasks: int) -> defaultdict: + def _filter_by_shard(self, partitioned_files: defaultdict, sub_task_id: int, total_tasks: int) -> defaultdict: list_ranges = [] for file_entries in partitioned_files.values(): for entry in file_entries: diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py index 27f5801acfb6..f415ab61ef8d 100755 --- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py @@ -75,7 +75,6 @@ def __init__( self.only_read_real_buckets = options.bucket() == BucketMode.POSTPONE_BUCKET.value self.data_evolution = options.data_evolution_enabled() self.deletion_vectors_enabled = options.deletion_vectors_enabled() - self.no_slice_split = None def schema_fields_func(schema_id: int): return self.table.schema_manager.get_schema(schema_id).fields @@ -132,9 +131,6 @@ def scan(self) -> Plan: deletion_files_map ) - if self.no_slice_split: - split_generator.with_no_slice_split() - # Configure sharding if needed if self.idx_of_this_subtask is not None: split_generator.with_shard(self.idx_of_this_subtask, self.number_of_para_subtasks) @@ -226,10 +222,6 @@ def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) -> List[ max_workers=max_workers ) - def with_no_slice_split(self): - self.no_slice_split = True - return self - def with_shard(self, idx_of_this_subtask: int, number_of_para_subtasks: int) -> 'FullStartingScanner': if idx_of_this_subtask >= number_of_para_subtasks: raise ValueError("idx_of_this_subtask must be less than number_of_para_subtasks") diff --git a/paimon-python/pypaimon/read/scanner/split_generator.py b/paimon-python/pypaimon/read/scanner/split_generator.py index b1811ed2e39c..f3d8d81dcbe6 100644 --- a/paimon-python/pypaimon/read/scanner/split_generator.py +++ b/paimon-python/pypaimon/read/scanner/split_generator.py @@ -54,9 +54,6 @@ def __init__( self.no_slice_split = None - def with_no_slice_split(self): - self.no_slice_split = True - def with_shard(self, idx_of_this_subtask: int, number_of_para_subtasks: int): """Configure sharding for parallel processing.""" if idx_of_this_subtask >= number_of_para_subtasks: diff --git a/paimon-python/pypaimon/read/scanner/starting_scanner.py b/paimon-python/pypaimon/read/scanner/starting_scanner.py index 4af3ed65bc88..7e6cdfd81aea 100644 --- a/paimon-python/pypaimon/read/scanner/starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/starting_scanner.py @@ -26,7 +26,3 @@ class StartingScanner(ABC): @abstractmethod def scan(self) -> Plan: """Plan the files to read.""" - - - def with_no_slice_split(self): - return self diff --git a/paimon-python/pypaimon/read/table_scan.py b/paimon-python/pypaimon/read/table_scan.py index 7e1024d0a6a1..8276163450a0 100755 --- a/paimon-python/pypaimon/read/table_scan.py +++ b/paimon-python/pypaimon/read/table_scan.py @@ -83,10 +83,6 @@ def _create_starting_scanner(self) -> Optional[StartingScanner]: vector_search=self.vector_search ) - def with_no_slice_split(self): - self.starting_scanner.with_no_slice_split() - return self - def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'TableScan': self.starting_scanner.with_shard(idx_of_this_subtask, number_of_para_subtasks) return self diff --git a/paimon-python/pypaimon/write/table_update.py b/paimon-python/pypaimon/write/table_update.py index e2490a98e5d0..6f0075af5f1c 100644 --- a/paimon-python/pypaimon/write/table_update.py +++ b/paimon-python/pypaimon/write/table_update.py @@ -79,8 +79,7 @@ def __init__(self, table, projection: Optional[List[str]], write_cols: List[str] self.writer: Optional[SingleWriter] = None self.dict = defaultdict(list) - scanner = self.table.new_read_builder().new_scan().with_shard(shard_num, - total_shard_count).with_no_slice_split() + scanner = self.table.new_read_builder().new_scan().with_shard(shard_num, total_shard_count) self.splits = scanner.plan().splits() self.row_ranges: List[(Tuple, Range)] = [] From 86b9f6139e20c9d3fb41710b1937740084a9ec0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 29 Jan 2026 18:00:16 +0800 Subject: [PATCH 12/22] minus --- paimon-python/pypaimon/read/table_read.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index 535e9da475e8..5a5859430467 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -84,14 +84,7 @@ def to_arrow(self, splits: List[Split]) -> Optional[pyarrow.Table]: for batch in iter(batch_reader.read_next_batch, None): if batch.num_rows == 0: continue - i+=1 - if i == 381: - i = 381 - if i == 382: - i = 382 table_list.append(self._try_to_pad_batch_by_schema(batch, schema)) - print("sdfasdf:" + str(i)) - pyarrow.Table.from_batches(table_list) if not table_list: return pyarrow.Table.from_arrays([pyarrow.array([], type=field.type) for field in schema], schema=schema) From 196b62e58e70780ff5e84321d85ad72142ecd9f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 29 Jan 2026 18:00:24 +0800 Subject: [PATCH 13/22] minus --- paimon-python/pypaimon/read/table_read.py | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index 5a5859430467..f546c4be6b3a 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -80,7 +80,6 @@ def to_arrow(self, splits: List[Split]) -> Optional[pyarrow.Table]: schema = PyarrowFieldParser.from_paimon_schema(self.read_type) table_list = [] - i = 0 for batch in iter(batch_reader.read_next_batch, None): if batch.num_rows == 0: continue From b6f1bda364a3af9217e73f8a48ac57c066e04041 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 29 Jan 2026 18:02:10 +0800 Subject: [PATCH 14/22] minus --- paimon-python/pypaimon/read/scanner/split_generator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon-python/pypaimon/read/scanner/split_generator.py b/paimon-python/pypaimon/read/scanner/split_generator.py index f3d8d81dcbe6..1b2f15e11bc9 100644 --- a/paimon-python/pypaimon/read/scanner/split_generator.py +++ b/paimon-python/pypaimon/read/scanner/split_generator.py @@ -51,7 +51,6 @@ def __init__( self.number_of_para_subtasks = None self.start_pos_of_this_subtask = None self.end_pos_of_this_subtask = None - self.no_slice_split = None def with_shard(self, idx_of_this_subtask: int, number_of_para_subtasks: int): From 39037df62a7c69b7aa202fbc7ed51c98c3b9fcc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 29 Jan 2026 18:04:10 +0800 Subject: [PATCH 15/22] minus --- paimon-python/pypaimon/read/scanner/split_generator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon-python/pypaimon/read/scanner/split_generator.py b/paimon-python/pypaimon/read/scanner/split_generator.py index 1b2f15e11bc9..6dab4fc12aa3 100644 --- a/paimon-python/pypaimon/read/scanner/split_generator.py +++ b/paimon-python/pypaimon/read/scanner/split_generator.py @@ -52,7 +52,6 @@ def __init__( self.start_pos_of_this_subtask = None self.end_pos_of_this_subtask = None - def with_shard(self, idx_of_this_subtask: int, number_of_para_subtasks: int): """Configure sharding for parallel processing.""" if idx_of_this_subtask >= number_of_para_subtasks: From 5211863d15aa0e5f0e165856718b343184893a55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 29 Jan 2026 20:31:41 +0800 Subject: [PATCH 16/22] minus --- .../pypaimon/read/scanner/data_evolution_split_generator.py | 4 +++- paimon-python/pypaimon/tests/shard_table_updator_test.py | 2 -- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py index d9bc3b86bcd5..68504a45a2bd 100644 --- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py +++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py @@ -77,7 +77,9 @@ def sort_key(manifest_entry: ManifestEntry) -> tuple: self.end_pos_of_this_subtask ) elif self.idx_of_this_subtask is not None: - partitioned_files = self._filter_by_shard(partitioned_files, self.idx_of_this_subtask, self.number_of_para_subtasks) + partitioned_files = self._filter_by_shard( + partitioned_files, self.idx_of_this_subtask, self.number_of_para_subtasks + ) def weight_func(file_list: List[DataFileMeta]) -> int: return max(sum(f.file_size for f in file_list), self.open_file_cost) diff --git a/paimon-python/pypaimon/tests/shard_table_updator_test.py b/paimon-python/pypaimon/tests/shard_table_updator_test.py index 8acdeb31650d..908a4554cdd3 100644 --- a/paimon-python/pypaimon/tests/shard_table_updator_test.py +++ b/paimon-python/pypaimon/tests/shard_table_updator_test.py @@ -227,7 +227,6 @@ def test_compute_column_d_equals_c_plus_b_minus_a2(self): 'd': [109, 218, 327, 436, 545] * 1000, }, schema=table_schema) - print("\n=== Actual Data ===") print(actual.to_pandas()) print("\n=== Expected Data ===") @@ -308,7 +307,6 @@ def test_compute_column_with_existing_column(self): 'c': [9, 18, 27, 36, 45] * 1000, }, schema=table_schema) - print("\n=== Actual Data ===") print(actual.to_pandas()) print("\n=== Expected Data ===") From da48b7d7e4e6eb8bd56a3c02db92d7283e4aeef3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 29 Jan 2026 21:07:18 +0800 Subject: [PATCH 17/22] minus --- .../pypaimon/tests/blob_table_test.py | 68 ++++++++++--------- 1 file changed, 36 insertions(+), 32 deletions(-) diff --git a/paimon-python/pypaimon/tests/blob_table_test.py b/paimon-python/pypaimon/tests/blob_table_test.py index 88d2626bb6a7..b0b5678c2206 100755 --- a/paimon-python/pypaimon/tests/blob_table_test.py +++ b/paimon-python/pypaimon/tests/blob_table_test.py @@ -2204,12 +2204,12 @@ def test_blob_write_read_large_data_with_rolling_with_shard(self): result = table_read.to_arrow(table_scan.plan().splits()) # Verify the data - self.assertEqual(result.num_rows, 54, "Should have 54 rows") + self.assertEqual(result.num_rows, 80, "Should have 54 rows") self.assertEqual(result.num_columns, 4, "Should have 4 columns") # Verify blob data integrity blob_data = result.column('large_blob').to_pylist() - self.assertEqual(len(blob_data), 54, "Should have 54 blob records") + self.assertEqual(len(blob_data), 80, "Should have 54 blob records") # Verify each blob for i, blob in enumerate(blob_data): self.assertEqual(len(blob), len(large_blob_data), f"Blob {i + 1} should be {large_blob_size:,} bytes") @@ -2264,21 +2264,22 @@ def test_blob_rolling_with_shard(self): actual_size = len(large_blob_data) print(f"Created blob data: {actual_size:,} bytes ({actual_size / (1024 * 1024):.2f} MB)") - write_builder = table.new_batch_write_builder() - writer = write_builder.new_write() # Write 30 records - for record_id in range(30): - test_data = pa.Table.from_pydict({ - 'id': [record_id], # Unique ID for each row - 'metadata': [f'Large blob batch {record_id + 1}'], - 'large_blob': [struct.pack(' Date: Thu, 29 Jan 2026 21:09:42 +0800 Subject: [PATCH 18/22] minus --- paimon-python/pypaimon/tests/blob_table_test.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/tests/blob_table_test.py b/paimon-python/pypaimon/tests/blob_table_test.py index b0b5678c2206..2ad47d4cf683 100755 --- a/paimon-python/pypaimon/tests/blob_table_test.py +++ b/paimon-python/pypaimon/tests/blob_table_test.py @@ -2494,13 +2494,14 @@ def test_data_blob_writer_with_shard(self): # Read data back using table API read_builder = table.new_read_builder() - table_scan = read_builder.new_scan().with_shard(1, 2) + table_scan = read_builder.new_scan().with_shard(0, 2) table_read = read_builder.new_read() splits = table_scan.plan().splits() result = table_read.to_arrow(splits) # Verify the data was read back correctly - self.assertEqual(result.num_rows, 2, "Should have 2 rows") + # Just one file, so split 0 occupied the whole records + self.assertEqual(result.num_rows, 5, "Should have 2 rows") self.assertEqual(result.num_columns, 3, "Should have 3 columns") def test_blob_write_read_large_data_volume_rolling_with_shard(self): From 4686755f6234d9a6018257e67a8d2431a42dcefa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 29 Jan 2026 21:11:27 +0800 Subject: [PATCH 19/22] Fix minus --- paimon-python/pypaimon/tests/blob_table_test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/paimon-python/pypaimon/tests/blob_table_test.py b/paimon-python/pypaimon/tests/blob_table_test.py index 2ad47d4cf683..7670ec9447ac 100755 --- a/paimon-python/pypaimon/tests/blob_table_test.py +++ b/paimon-python/pypaimon/tests/blob_table_test.py @@ -2345,10 +2345,10 @@ def test_blob_large_data_volume_with_shard(self): write_builder = table.new_batch_write_builder() writer = write_builder.new_write() expected = pa.Table.from_pydict({ - 'id': [1] * num_row, - 'batch_id': [11] * num_row, - 'metadata': [f'Large blob batch {11}'] * num_row, - 'large_blob': [i.to_bytes(2, byteorder='little') + large_blob_data for i in range(num_row)] + 'id': [1] * num_row, + 'batch_id': [11] * num_row, + 'metadata': [f'Large blob batch {11}'] * num_row, + 'large_blob': [i.to_bytes(2, byteorder='little') + large_blob_data for i in range(num_row)] }, schema=pa_schema) writer.write_arrow(expected) From 6ce5356bb9b34f3100cfbcb839576dc57cbb99f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 30 Jan 2026 10:32:51 +0800 Subject: [PATCH 20/22] minus --- paimon-python/pypaimon/write/table_update.py | 50 +++++++++++++++----- 1 file changed, 39 insertions(+), 11 deletions(-) diff --git a/paimon-python/pypaimon/write/table_update.py b/paimon-python/pypaimon/write/table_update.py index 6f0075af5f1c..8e3f91bde416 100644 --- a/paimon-python/pypaimon/write/table_update.py +++ b/paimon-python/pypaimon/write/table_update.py @@ -52,9 +52,21 @@ def with_update_type(self, update_cols: List[str]): def with_read_projection(self, projection: List[str]): self.projection = projection - def new_shard_updator(self, total_shard_count: int, shard_num: int): - return ShardTableUpdator(self.table, self.projection, self.update_cols, self.commit_user, total_shard_count, - shard_num) + def new_shard_updator(self, shard_num: int, total_shard_count: int): + """Create a shard updater for scan+rewrite style updates. + + Args: + shard_num: Index of this shard/subtask. + total_shard_count: Total number of shards/subtasks. + """ + return ShardTableUpdator( + self.table, + self.projection, + self.update_cols, + self.commit_user, + shard_num, + total_shard_count, + ) def update_by_arrow_with_row_id(self, table: pa.Table) -> List[CommitMessage]: update_by_row_id = TableUpdateByRowId(self.table, self.commit_user) @@ -64,14 +76,20 @@ def update_by_arrow_with_row_id(self, table: pa.Table) -> List[CommitMessage]: class ShardTableUpdator: - def __init__(self, table, projection: Optional[List[str]], write_cols: List[str], commit_user, shard_num: int, - total_shard_count: int, ): + def __init__( + self, + table, + projection: Optional[List[str]], + write_cols: List[str], + commit_user, + shard_num: int, + total_shard_count: int, + ): from pypaimon.table.file_store_table import FileStoreTable self.table: FileStoreTable = table self.projection = projection self.write_cols = write_cols self.commit_user = commit_user - self.update_cols = None self.total_shard_count = total_shard_count self.shard_num = shard_num @@ -114,19 +132,29 @@ def update_by_arrow_batch(self, data: pa.RecordBatch): self._init_writer() capacity = self.writer.capacity() - data_in_this_writer = data if capacity >= data.num_rows else data.slice(0, capacity - 1) - data_in_next_writer = None if capacity >= data.num_rows else data.slice(data.num_rows - 1, capacity) + if capacity <= 0: + raise RuntimeError("Writer has no remaining capacity.") + + # Split the batch across writers. + first, rest = (data, None) if capacity >= data.num_rows else (data.slice(0, capacity), data.slice(capacity)) - self.writer.write(data_in_this_writer) + self.writer.write(first) if self.writer.capacity() == 0: self.dict[self.writer.partition()].append(self.writer.end()) self.writer = None - if (data_in_next_writer is not None): - self.update_by_arrow_batch(data_in_next_writer) + if rest is not None: + if self.writer is not None: + raise RuntimeError("Should not get here, rest and current writer exist in the same time.") + self.update_by_arrow_batch(rest) def _init_writer(self): if self.writer is None: + if self.write_pos >= len(self.row_ranges): + raise RuntimeError( + "No more row ranges to write. " + "Ensure you write exactly the same number of rows as read from this shard." + ) item = self.row_ranges[self.write_pos] self.write_pos += 1 partition = item[0] From 8c7320a8b33dfd0e91fc81e5ef1dbbdb204291c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 30 Jan 2026 10:49:10 +0800 Subject: [PATCH 21/22] minus --- .../pypaimon/tests/shard_table_updator_test.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/paimon-python/pypaimon/tests/shard_table_updator_test.py b/paimon-python/pypaimon/tests/shard_table_updator_test.py index 908a4554cdd3..641f545b4711 100644 --- a/paimon-python/pypaimon/tests/shard_table_updator_test.py +++ b/paimon-python/pypaimon/tests/shard_table_updator_test.py @@ -87,7 +87,7 @@ def test_compute_column_d_equals_c_plus_b_minus_a(self): # Read data using arrow_reader reader = shard_updator.arrow_reader() - + for batch in iter(reader.read_next_batch, None): # Compute d = c + b - a a_values = batch.column('a').to_pylist() @@ -181,6 +181,7 @@ def test_compute_column_d_equals_c_plus_b_minus_a2(self): table_update.with_update_type(['d']) for i in range(10): + d_all_values = [] shard_updator = table_update.new_shard_updator(i, 10) # Read data using arrow_reader @@ -193,14 +194,14 @@ def test_compute_column_d_equals_c_plus_b_minus_a2(self): c_values = batch.column('c').to_pylist() d_values = [c + b - a for a, b, c in zip(a_values, b_values, c_values)] + d_all_values.extend(d_values) - # Create batch with d column - new_batch = pa.RecordBatch.from_pydict({ - 'd': d_values, - }, schema=pa.schema([('d', pa.int32())])) - - # Write d column - shard_updator.update_by_arrow_batch(new_batch) + # Concatenate all computed values and update once for this shard + new_batch = pa.RecordBatch.from_pydict( + {'d': d_all_values}, + schema=pa.schema([('d', pa.int32())]), + ) + shard_updator.update_by_arrow_batch(new_batch) # Prepare and commit commit_messages = shard_updator.prepare_commit() From 35f86213624adbfecfd4f1bf5a5bda08d76d9753 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 30 Jan 2026 10:51:34 +0800 Subject: [PATCH 22/22] Add docs --- docs/content/pypaimon/data-evolution.md | 118 ++++++++++++++++++++++++ 1 file changed, 118 insertions(+) diff --git a/docs/content/pypaimon/data-evolution.md b/docs/content/pypaimon/data-evolution.md index bd0bde03a427..2268090f7374 100644 --- a/docs/content/pypaimon/data-evolution.md +++ b/docs/content/pypaimon/data-evolution.md @@ -29,6 +29,13 @@ under the License. PyPaimon for Data Evolution mode. See [Data Evolution]({{< ref "append-table/data-evolution" >}}). +## Prerequisites + +To use partial updates / data evolution, enable both options when creating the table: + +- **`row-tracking.enabled`**: `true` +- **`data-evolution.enabled`**: `true` + ## Update Columns By Row ID You can create `TableUpdate.update_by_arrow_with_row_id` to update columns to data evolution tables. @@ -36,7 +43,19 @@ You can create `TableUpdate.update_by_arrow_with_row_id` to update columns to da The input data should include the `_ROW_ID` column, update operation will automatically sort and match each `_ROW_ID` to its corresponding `first_row_id`, then groups rows with the same `first_row_id` and writes them to a separate file. +### Requirements for `_ROW_ID` updates + +- **All rows required**: the input table must contain **exactly the full table row count** (one row per existing row). +- **Row id coverage**: after sorting by `_ROW_ID`, it must be **0..N-1** (no duplicates, no gaps). +- **Update columns only**: include `_ROW_ID` plus the columns you want to update (partial schema is OK). + ```python +import pyarrow as pa +from pypaimon import CatalogFactory, Schema + +catalog = CatalogFactory.create({'warehouse': '/tmp/warehouse'}) +catalog.create_database('default', False) + simple_pa_schema = pa.schema([ ('f0', pa.int8()), ('f1', pa.int16()), @@ -78,3 +97,102 @@ table_commit.close() # 'f0': [5, 6], # 'f1': [-1001, 1002] ``` + +## Compute a New Column (scan + rewrite with shards) + +If you want to **compute a derived column** (or **update an existing column based on other columns**) without providing +`_ROW_ID`, you can use the shard scan + rewrite workflow: + +- Read only the columns you need (projection) +- Compute the new values in the same row order +- Write only the updated columns back +- Commit per shard + +This is useful for backfilling a newly added column, or recomputing a column from other columns. + +### Example: compute `d = c + b - a` + +```python +import pyarrow as pa +from pypaimon import CatalogFactory, Schema + +catalog = CatalogFactory.create({'warehouse': '/tmp/warehouse'}) +catalog.create_database('default', False) + +table_schema = pa.schema([ + ('a', pa.int32()), + ('b', pa.int32()), + ('c', pa.int32()), + ('d', pa.int32()), +]) + +schema = Schema.from_pyarrow_schema( + table_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'}, +) +catalog.create_table('default.t', schema, False) +table = catalog.get_table('default.t') + +# write initial data (a, b, c only) +write_builder = table.new_batch_write_builder() +write = write_builder.new_write().with_write_type(['a', 'b', 'c']) +commit = write_builder.new_commit() +write.write_arrow(pa.Table.from_pydict({'a': [1, 2], 'b': [10, 20], 'c': [100, 200]})) +commit.commit(write.prepare_commit()) +write.close() +commit.close() + +# shard update: read (a, b, c), write only (d) +update = write_builder.new_update() +update.with_read_projection(['a', 'b', 'c']) +update.with_update_type(['d']) + +shard_idx = 0 +num_shards = 1 +upd = update.new_shard_updator(shard_idx, num_shards) +reader = upd.arrow_reader() + +for batch in iter(reader.read_next_batch, None): + a = batch.column('a').to_pylist() + b = batch.column('b').to_pylist() + c = batch.column('c').to_pylist() + d = [ci + bi - ai for ai, bi, ci in zip(a, b, c)] + + upd.update_by_arrow_batch( + pa.RecordBatch.from_pydict({'d': d}, schema=pa.schema([('d', pa.int32())])) + ) + +commit_messages = upd.prepare_commit() +commit = write_builder.new_commit() +commit.commit(commit_messages) +commit.close() +``` + +### Example: update an existing column `c = b - a` + +```python +update = write_builder.new_update() +update.with_read_projection(['a', 'b']) +update.with_update_type(['c']) + +upd = update.new_shard_updator(0, 1) +reader = upd.arrow_reader() +for batch in iter(reader.read_next_batch, None): + a = batch.column('a').to_pylist() + b = batch.column('b').to_pylist() + c = [bi - ai for ai, bi in zip(a, b)] + upd.update_by_arrow_batch( + pa.RecordBatch.from_pydict({'c': c}, schema=pa.schema([('c', pa.int32())])) + ) + +commit_messages = upd.prepare_commit() +commit = write_builder.new_commit() +commit.commit(commit_messages) +commit.close() +``` + +### Notes + +- **Row order matters**: the batches you write must have the **same number of rows** as the batches you read, in the + same order for that shard. +- **Parallelism**: run multiple shards by calling `new_shard_updator(shard_idx, num_shards)` for each shard.