-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Description
Search before asking
- I searched in the issues and found nothing similar.
Motivation
Currently, paimon-python only supports batch reads - users must manually poll for new snapshots and manage read progress themselves. This makes it difficult to build streaming data pipelines in Python that react to changes as they occur.
I'm proposing adding an asyncio-based streaming consumer, along with consumer-registration support. Wherever possible possible I've referred back to the Java implementation to keep them as similar as possible.
I have a PoC branch for this locally, but I want to get some buy-in on the idea first before I start polishing it and push it publicly :)
Solution
Architecture
flowchart TD
SRB[StreamReadBuilder<br/>with_poll_interval_ms<br/>with_consumer_id<br/>with_shard<br/>with_include_row_kind]
SRB --> ASTS[AsyncStreamingTableScan<br/>stream / stream_sync]
SRB --> TR[TableRead<br/>reused from batch]
SRB --> CM[ConsumerManager<br/>checkpoint / restore]
ASTS --> FUS[FollowUpScanner<br/>per-snapshot]
ASTS --> IDS[IncrementalDiffScanner<br/>catch-up mode]
Key Classes
| Component | Purpose |
|---|---|
StreamReadBuilder |
API for configuring streaming reads |
AsyncStreamingTableScan |
Core streaming engine with async iterator |
FollowUpScanner |
Interface for determining which snapshots to scan |
DeltaFollowUpScanner |
Scans APPEND commits (changelog-producer=none) |
ChangelogFollowUpScanner |
Scans changelog manifests (changelog-producer=input/full-compaction/lookup) |
IncrementalDiffScanner |
Efficient catch-up when many snapshots behind |
ConsumerManager |
Persists read progress for recovery |
API Design
Basic Usage
table = catalog.get_table('database.table')
# Create streaming read builder
stream_builder = table.new_stream_read_builder()
stream_builder.with_poll_interval_ms(1000)
# Create scan and reader
scan = stream_builder.new_streaming_scan()
table_read = stream_builder.new_read()
# Async streaming (recommended)
async for plan in scan.stream():
arrow_table = table_read.to_arrow(plan.splits())
process(arrow_table)
# Or synchronous
for plan in scan.stream_sync():
arrow_table = table_read.to_arrow(plan.splits())
process(arrow_table)Consumer Registration (Progress Tracking)
Compatible with the Java implementation.
stream_builder.with_consumer_id("my-etl-job")
async for plan in scan.stream():
process(table_read.to_arrow(plan.splits()))
# Persist progress to {table_path}/consumer/consumer-my-etl-job
scan.notify_checkpoint_complete(scan.next_snapshot_id)Parallel Consumption
Expose an API to allow building consumers with subsets of the buckets.
I'm not proposing we add any consumer group-membership synchronization in this, just the primitives that would allow folks to build that on top using an external strongly-consistent store like etcd/zookeeper.
# Consumer 0 of 4 reads buckets 0, 4, 8, ...
stream_builder.with_shard(0, 4)
# Or explicit bucket list
stream_builder.with_buckets([0, 1, 2])
# Or custom filter
stream_builder.with_bucket_filter(lambda b: b % 2 == 0)Implementation Details
Streaming Scan Flow
flowchart LR
IS[Initial Scan<br/>full state] --> POLL[Poll for New<br/>Snapshots]
POLL --> PLAN[Create Plan from<br/>Delta/Changelog]
POLL --> SHOULD{should_scan?}
SHOULD -->|APPEND| YES[Yes - scan]
SHOULD -->|COMPACT| NO[No - skip]
SHOULD -->|OVERWRITE| NO
Changelog Producer Modes
| Mode | Scanner | Data Source |
|---|---|---|
none (default) |
DeltaFollowUpScanner |
delta_manifest_list |
input |
ChangelogFollowUpScanner |
changelog_manifest_list |
full-compaction |
ChangelogFollowUpScanner |
changelog_manifest_list |
lookup |
ChangelogFollowUpScanner |
changelog_manifest_list |
Performance Optimizations
These could potentially be added afterwards, but we've already seen these caches benefit us a lot on the Flink side when using S3:
- Snapshot Caching: LRU cache for snapshot metadata
- Manifest Caching: LRU cache for manifest file contents
- Batch Lookahead: Fetch multiple snapshots in parallel to skip COMPACT commits efficiently
- Prefetching: Background thread prefetches next snapshot while current is processed
- Diff-based Catch-up: When >10ish snapshots behind, use
IncrementalDiffScannerto compute file diff instead of reading N delta_manifest_lists
Dependencies
cachetoolsfor LRU caching (already used by paimon-python)- No additional dependencies required
Anything else?
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!