Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions docs/content/pypaimon/data-evolution.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,33 @@ under the License.

PyPaimon for Data Evolution mode. See [Data Evolution]({{< ref "append-table/data-evolution" >}}).

## Prerequisites

To use partial updates / data evolution, enable both options when creating the table:

- **`row-tracking.enabled`**: `true`
- **`data-evolution.enabled`**: `true`

## Update Columns By Row ID

You can create `TableUpdate.update_by_arrow_with_row_id` to update columns to data evolution tables.

The input data should include the `_ROW_ID` column, update operation will automatically sort and match each `_ROW_ID` to
its corresponding `first_row_id`, then groups rows with the same `first_row_id` and writes them to a separate file.

### Requirements for `_ROW_ID` updates

- **All rows required**: the input table must contain **exactly the full table row count** (one row per existing row).
- **Row id coverage**: after sorting by `_ROW_ID`, it must be **0..N-1** (no duplicates, no gaps).
- **Update columns only**: include `_ROW_ID` plus the columns you want to update (partial schema is OK).

```python
import pyarrow as pa
from pypaimon import CatalogFactory, Schema

catalog = CatalogFactory.create({'warehouse': '/tmp/warehouse'})
catalog.create_database('default', False)

simple_pa_schema = pa.schema([
('f0', pa.int8()),
('f1', pa.int16()),
Expand Down Expand Up @@ -78,3 +97,102 @@ table_commit.close()
# 'f0': [5, 6],
# 'f1': [-1001, 1002]
```

## Compute a New Column (scan + rewrite with shards)

If you want to **compute a derived column** (or **update an existing column based on other columns**) without providing
`_ROW_ID`, you can use the shard scan + rewrite workflow:

- Read only the columns you need (projection)
- Compute the new values in the same row order
- Write only the updated columns back
- Commit per shard

This is useful for backfilling a newly added column, or recomputing a column from other columns.

### Example: compute `d = c + b - a`

```python
import pyarrow as pa
from pypaimon import CatalogFactory, Schema

catalog = CatalogFactory.create({'warehouse': '/tmp/warehouse'})
catalog.create_database('default', False)

table_schema = pa.schema([
('a', pa.int32()),
('b', pa.int32()),
('c', pa.int32()),
('d', pa.int32()),
])

schema = Schema.from_pyarrow_schema(
table_schema,
options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'},
)
catalog.create_table('default.t', schema, False)
table = catalog.get_table('default.t')

# write initial data (a, b, c only)
write_builder = table.new_batch_write_builder()
write = write_builder.new_write().with_write_type(['a', 'b', 'c'])
commit = write_builder.new_commit()
write.write_arrow(pa.Table.from_pydict({'a': [1, 2], 'b': [10, 20], 'c': [100, 200]}))
commit.commit(write.prepare_commit())
write.close()
commit.close()

# shard update: read (a, b, c), write only (d)
update = write_builder.new_update()
update.with_read_projection(['a', 'b', 'c'])
update.with_update_type(['d'])

shard_idx = 0
num_shards = 1
upd = update.new_shard_updator(shard_idx, num_shards)
reader = upd.arrow_reader()

for batch in iter(reader.read_next_batch, None):
a = batch.column('a').to_pylist()
b = batch.column('b').to_pylist()
c = batch.column('c').to_pylist()
d = [ci + bi - ai for ai, bi, ci in zip(a, b, c)]

upd.update_by_arrow_batch(
pa.RecordBatch.from_pydict({'d': d}, schema=pa.schema([('d', pa.int32())]))
)

commit_messages = upd.prepare_commit()
commit = write_builder.new_commit()
commit.commit(commit_messages)
commit.close()
```

### Example: update an existing column `c = b - a`

```python
update = write_builder.new_update()
update.with_read_projection(['a', 'b'])
update.with_update_type(['c'])

upd = update.new_shard_updator(0, 1)
reader = upd.arrow_reader()
for batch in iter(reader.read_next_batch, None):
a = batch.column('a').to_pylist()
b = batch.column('b').to_pylist()
c = [bi - ai for ai, bi in zip(a, b)]
upd.update_by_arrow_batch(
pa.RecordBatch.from_pydict({'c': c}, schema=pa.schema([('c', pa.int32())]))
)

commit_messages = upd.prepare_commit()
commit = write_builder.new_commit()
commit.commit(commit_messages)
commit.close()
```

### Notes

- **Row order matters**: the batches you write must have the **same number of rows** as the batches you read, in the
same order for that shard.
- **Parallelism**: run multiple shards by calling `new_shard_updator(shard_idx, num_shards)` for each shard.
5 changes: 3 additions & 2 deletions paimon-python/pypaimon/globalindex/range.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def merge_sorted_as_possible(ranges: List['Range']) -> List['Range']:
return result

@staticmethod
def sort_and_merge_overlap(ranges: List['Range'], merge: bool = True) -> List['Range']:
def sort_and_merge_overlap(ranges: List['Range'], merge: bool = True, adjacent: bool = True) -> List['Range']:
"""
Sort ranges and optionally merge overlapping ones.
"""
Expand All @@ -166,10 +166,11 @@ def sort_and_merge_overlap(ranges: List['Range'], merge: bool = True) -> List['R
if not merge:
return sorted_ranges

adjacent_value = 1 if adjacent else 0
result = [sorted_ranges[0]]
for r in sorted_ranges[1:]:
last = result[-1]
if r.from_ <= last.to + 1:
if r.from_ <= last.to + adjacent_value:
# Merge with last range
result[-1] = Range(last.from_, max(last.to, r.to))
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ def sort_key(manifest_entry: ManifestEntry) -> tuple:
self.end_pos_of_this_subtask
)
elif self.idx_of_this_subtask is not None:
# shard data range: [plan_start_pos, plan_end_pos)
partitioned_files, plan_start_pos, plan_end_pos = self._filter_by_shard(partitioned_files)
partitioned_files = self._filter_by_shard(
partitioned_files, self.idx_of_this_subtask, self.number_of_para_subtasks
)

def weight_func(file_list: List[DataFileMeta]) -> int:
return max(sum(f.file_size for f in file_list), self.open_file_cost)
Expand Down Expand Up @@ -108,9 +109,8 @@ def weight_func(file_list: List[DataFileMeta]) -> int:
flatten_packed_files, packed_files, sorted_entries_list
)

if self.start_pos_of_this_subtask is not None or self.idx_of_this_subtask is not None:
if self.start_pos_of_this_subtask is not None:
splits = self._wrap_to_sliced_splits(splits, plan_start_pos, plan_end_pos)

# Wrap splits with IndexedSplit if row_ranges is provided
if self.row_ranges:
splits = self._wrap_to_indexed_splits(splits)
Expand Down Expand Up @@ -242,22 +242,61 @@ def _filter_by_row_range(

return filtered_partitioned_files, plan_start_pos, plan_end_pos

def _filter_by_shard(self, partitioned_files: defaultdict) -> tuple:
"""
Filter file entries by shard for data evolution tables.
"""
# Calculate total rows (excluding blob files)
total_row = sum(
entry.file.row_count
for file_entries in partitioned_files.values()
for entry in file_entries
if not self._is_blob_file(entry.file.file_name)
)
def _filter_by_shard(self, partitioned_files: defaultdict, sub_task_id: int, total_tasks: int) -> defaultdict:
list_ranges = []
for file_entries in partitioned_files.values():
for entry in file_entries:
first_row_id = entry.file.first_row_id
if first_row_id is None:
raise ValueError("Found None first row id in files")
# Range is inclusive [from_, to], so use row_count - 1
list_ranges.append(Range(first_row_id, first_row_id + entry.file.row_count - 1))

sorted_ranges = Range.sort_and_merge_overlap(list_ranges, True, False)

start_range, end_range = self._divide_ranges(sorted_ranges, sub_task_id, total_tasks)
if start_range is None or end_range is None:
return defaultdict(list)
start_first_row_id = start_range.from_
end_first_row_id = end_range.to

# Calculate shard range using shared helper
start_pos, end_pos = self._compute_shard_range(total_row)
filtered_partitioned_files = {
k: [x for x in v if x.file.first_row_id >= start_first_row_id and x.file.first_row_id <= end_first_row_id]
for k, v in partitioned_files.items()
}

return self._filter_by_row_range(partitioned_files, start_pos, end_pos)
filtered_partitioned_files = {k: v for k, v in filtered_partitioned_files.items() if v}
return defaultdict(list, filtered_partitioned_files)

@staticmethod
def _divide_ranges(
sorted_ranges: List[Range], sub_task_id: int, total_tasks: int
) -> Tuple[Optional[Range], Optional[Range]]:
if not sorted_ranges:
return None, None

num_ranges = len(sorted_ranges)

# If more tasks than ranges, some tasks get nothing
if sub_task_id >= num_ranges:
return None, None

# Calculate balanced distribution of ranges across tasks
base_ranges_per_task = num_ranges // total_tasks
remainder = num_ranges % total_tasks

# Each of the first 'remainder' tasks gets one extra range
if sub_task_id < remainder:
num_ranges_for_task = base_ranges_per_task + 1
start_idx = sub_task_id * (base_ranges_per_task + 1)
else:
num_ranges_for_task = base_ranges_per_task
start_idx = (
remainder * (base_ranges_per_task + 1) +
(sub_task_id - remainder) * base_ranges_per_task
)
end_idx = start_idx + num_ranges_for_task - 1
return sorted_ranges[start_idx], sorted_ranges[end_idx]

def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]:
"""
Expand Down
73 changes: 39 additions & 34 deletions paimon-python/pypaimon/tests/blob_table_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2204,12 +2204,12 @@ def test_blob_write_read_large_data_with_rolling_with_shard(self):
result = table_read.to_arrow(table_scan.plan().splits())

# Verify the data
self.assertEqual(result.num_rows, 54, "Should have 54 rows")
self.assertEqual(result.num_rows, 80, "Should have 54 rows")
self.assertEqual(result.num_columns, 4, "Should have 4 columns")

# Verify blob data integrity
blob_data = result.column('large_blob').to_pylist()
self.assertEqual(len(blob_data), 54, "Should have 54 blob records")
self.assertEqual(len(blob_data), 80, "Should have 54 blob records")
# Verify each blob
for i, blob in enumerate(blob_data):
self.assertEqual(len(blob), len(large_blob_data), f"Blob {i + 1} should be {large_blob_size:,} bytes")
Expand Down Expand Up @@ -2264,21 +2264,22 @@ def test_blob_rolling_with_shard(self):
actual_size = len(large_blob_data)
print(f"Created blob data: {actual_size:,} bytes ({actual_size / (1024 * 1024):.2f} MB)")

write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()
# Write 30 records
for record_id in range(30):
test_data = pa.Table.from_pydict({
'id': [record_id], # Unique ID for each row
'metadata': [f'Large blob batch {record_id + 1}'],
'large_blob': [struct.pack('<I', record_id) + large_blob_data]
}, schema=pa_schema)
writer.write_arrow(test_data)
for i in range(3):
write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()
for record_id in range(10):
test_data = pa.Table.from_pydict({
'id': [record_id * i], # Unique ID for each row
'metadata': [f'Large blob batch {record_id + 1}'],
'large_blob': [struct.pack('<I', record_id) + large_blob_data]
}, schema=pa_schema)
writer.write_arrow(test_data)

commit_messages = writer.prepare_commit()
commit = write_builder.new_commit()
commit.commit(commit_messages)
writer.close()
commit_messages = writer.prepare_commit()
commit = write_builder.new_commit()
commit.commit(commit_messages)
writer.close()

# Read data back
read_builder = table.new_read_builder()
Expand All @@ -2304,7 +2305,7 @@ def test_blob_rolling_with_shard(self):
actual2 = table_read.to_arrow(splits2)
splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits()
actual3 = table_read.to_arrow(splits3)
actual = pa.concat_tables([actual1, actual2, actual3]).sort_by('id')
actual = pa.concat_tables([actual1, actual2, actual3])

# Verify the data
self.assertEqual(actual.num_rows, 30, "Should have 30 rows")
Expand Down Expand Up @@ -2337,22 +2338,25 @@ def test_blob_large_data_volume_with_shard(self):
repetitions = large_blob_size // pattern_size
large_blob_data = blob_pattern * repetitions

num_row = 20000
write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()
expected = pa.Table.from_pydict({
'id': [1] * num_row,
'batch_id': [11] * num_row,
'metadata': [f'Large blob batch {11}'] * num_row,
'large_blob': [i.to_bytes(2, byteorder='little') + large_blob_data for i in range(num_row)]
}, schema=pa_schema)
writer.write_arrow(expected)
for i in range(3):
num_row = 6666
if i == 0:
num_row += 1
write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()
expected = pa.Table.from_pydict({
'id': [1] * num_row,
'batch_id': [11] * num_row,
'metadata': [f'Large blob batch {11}'] * num_row,
'large_blob': [i.to_bytes(2, byteorder='little') + large_blob_data for i in range(num_row)]
}, schema=pa_schema)
writer.write_arrow(expected)

# Commit all data at once
commit_messages = writer.prepare_commit()
commit = write_builder.new_commit()
commit.commit(commit_messages)
writer.close()
# Commit all data at once
commit_messages = writer.prepare_commit()
commit = write_builder.new_commit()
commit.commit(commit_messages)
writer.close()

# Read data back
read_builder = table.new_read_builder()
Expand All @@ -2364,7 +2368,7 @@ def test_blob_large_data_volume_with_shard(self):
self.assertEqual(6666, result.num_rows)
self.assertEqual(4, result.num_columns)

self.assertEqual(expected.slice(13334, 6666), result)
self.assertEqual(expected, result)
splits = read_builder.new_scan().plan().splits()
expected = table_read.to_arrow(splits)
splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits()
Expand Down Expand Up @@ -2490,13 +2494,14 @@ def test_data_blob_writer_with_shard(self):

# Read data back using table API
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan().with_shard(1, 2)
table_scan = read_builder.new_scan().with_shard(0, 2)
table_read = read_builder.new_read()
splits = table_scan.plan().splits()
result = table_read.to_arrow(splits)

# Verify the data was read back correctly
self.assertEqual(result.num_rows, 2, "Should have 2 rows")
# Just one file, so split 0 occupied the whole records
self.assertEqual(result.num_rows, 5, "Should have 2 rows")
self.assertEqual(result.num_columns, 3, "Should have 3 columns")

def test_blob_write_read_large_data_volume_rolling_with_shard(self):
Expand Down
Loading