Skip to content

[Feature] [python] Streaming Read Support in paimon-python #7152

@tub

Description

@tub

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

Currently, paimon-python only supports batch reads - users must manually poll for new snapshots and manage read progress themselves. This makes it difficult to build streaming data pipelines in Python that react to changes as they occur.

I'm proposing adding an asyncio-based streaming consumer, along with consumer-registration support. Wherever possible possible I've referred back to the Java implementation to keep them as similar as possible.

I have a PoC branch for this locally, but I want to get some buy-in on the idea first before I start polishing it and push it publicly :)

Solution

Architecture

flowchart TD
    SRB[StreamReadBuilder<br/>with_poll_interval_ms<br/>with_consumer_id<br/>with_shard<br/>with_include_row_kind]

    SRB --> ASTS[AsyncStreamingTableScan<br/>stream / stream_sync]
    SRB --> TR[TableRead<br/>reused from batch]
    SRB --> CM[ConsumerManager<br/>checkpoint / restore]

    ASTS --> FUS[FollowUpScanner<br/>per-snapshot]
    ASTS --> IDS[IncrementalDiffScanner<br/>catch-up mode]
Loading

Key Classes

Component Purpose
StreamReadBuilder API for configuring streaming reads
AsyncStreamingTableScan Core streaming engine with async iterator
FollowUpScanner Interface for determining which snapshots to scan
DeltaFollowUpScanner Scans APPEND commits (changelog-producer=none)
ChangelogFollowUpScanner Scans changelog manifests (changelog-producer=input/full-compaction/lookup)
IncrementalDiffScanner Efficient catch-up when many snapshots behind
ConsumerManager Persists read progress for recovery

API Design

Basic Usage

table = catalog.get_table('database.table')

# Create streaming read builder
stream_builder = table.new_stream_read_builder()
stream_builder.with_poll_interval_ms(1000)

# Create scan and reader
scan = stream_builder.new_streaming_scan()
table_read = stream_builder.new_read()

# Async streaming (recommended)
async for plan in scan.stream():
    arrow_table = table_read.to_arrow(plan.splits())
    process(arrow_table)

# Or synchronous
for plan in scan.stream_sync():
    arrow_table = table_read.to_arrow(plan.splits())
    process(arrow_table)

Consumer Registration (Progress Tracking)

Compatible with the Java implementation.

stream_builder.with_consumer_id("my-etl-job")

async for plan in scan.stream():
    process(table_read.to_arrow(plan.splits()))
    # Persist progress to {table_path}/consumer/consumer-my-etl-job
    scan.notify_checkpoint_complete(scan.next_snapshot_id)

Parallel Consumption

Expose an API to allow building consumers with subsets of the buckets.
I'm not proposing we add any consumer group-membership synchronization in this, just the primitives that would allow folks to build that on top using an external strongly-consistent store like etcd/zookeeper.

# Consumer 0 of 4 reads buckets 0, 4, 8, ...
stream_builder.with_shard(0, 4)

# Or explicit bucket list
stream_builder.with_buckets([0, 1, 2])

# Or custom filter
stream_builder.with_bucket_filter(lambda b: b % 2 == 0)

Implementation Details

Streaming Scan Flow

flowchart LR
    IS[Initial Scan<br/>full state] --> POLL[Poll for New<br/>Snapshots]
    POLL --> PLAN[Create Plan from<br/>Delta/Changelog]
    POLL --> SHOULD{should_scan?}
    SHOULD -->|APPEND| YES[Yes - scan]
    SHOULD -->|COMPACT| NO[No - skip]
    SHOULD -->|OVERWRITE| NO
Loading

Changelog Producer Modes

Mode Scanner Data Source
none (default) DeltaFollowUpScanner delta_manifest_list
input ChangelogFollowUpScanner changelog_manifest_list
full-compaction ChangelogFollowUpScanner changelog_manifest_list
lookup ChangelogFollowUpScanner changelog_manifest_list

Performance Optimizations

These could potentially be added afterwards, but we've already seen these caches benefit us a lot on the Flink side when using S3:

  1. Snapshot Caching: LRU cache for snapshot metadata
  2. Manifest Caching: LRU cache for manifest file contents
  3. Batch Lookahead: Fetch multiple snapshots in parallel to skip COMPACT commits efficiently
  4. Prefetching: Background thread prefetches next snapshot while current is processed
  5. Diff-based Catch-up: When >10ish snapshots behind, use IncrementalDiffScanner to compute file diff instead of reading N delta_manifest_lists

Dependencies

  • cachetools for LRU caching (already used by paimon-python)
  • No additional dependencies required

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions