Skip to content

Conversation

@Yohahaha
Copy link
Contributor

@Yohahaha Yohahaha commented Jan 30, 2026

Purpose

Add a new option start.up.mode to read different offset from fluss.
This PR only changes batch read related class.

Linked issue: close #2549

Brief change log

Tests

API and Format

Documentation

@Yohahaha
Copy link
Contributor Author

Yohahaha commented Feb 2, 2026

@YannByron @wuchong please help take a look, thank you!

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @Yohahaha! It looks like no new tests have been added, should we include some tests for the new configuration option?

Also, as a best practice, we recommend first creating a dedicated issue to describe the feature and proposed APIs before submitting a pull request. The PR can then be linked to that issue. This helps us better track progress and maintain visibility across all subtasks of the umbrella initiative.

ConfigBuilder
.key("scan.startup.mode")
.stringType()
.defaultValue(StartUpMode.LATEST.toString)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use the default FULL mode to stay aligned with the Flink connector?
Using LATEST by default may result in empty results if the user doesn’t explicitly specify a startup mode for the query.

Comment on lines 47 to +50
val scanRecords = logScanner.poll(POLL_TIMEOUT)
if ((scanRecords == null || scanRecords.isEmpty) && currentOffset < flussPartition.stopOffset) {
throw new IllegalStateException(s"No more data from fluss server," +
s" but current offset $currentOffset not reach the stop offset ${flussPartition.stopOffset}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logScanner.poll() may return empty results when the Fluss server is undergoing recovery, restart, or rebalance. Given that the current POLL_TIMEOUT is set to a very short duration (100ms), this scenario is highly likely to occur.

Currently, the source immediately throws an exception if logScanner.poll() returns no records, which makes it unstable during Fluss server failover events.

A straightforward fix is to increase the POLL_TIMEOUT to 60 seconds. This means the source will wait up to 60 seconds for data during transient server unavailability. If the Fluss server still hasn’t recovered within that window, we can then throw an exception to alert users.

if (currentRecords.hasNext) {
val scanRecord = currentRecords.next()
currentRow = convertToSparkRow(scanRecord)
currentOffset += 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't simply +1 and should use currentOffset = scanRecord.logOffset() + 1 instead. Because, there are some record batch increases log offsets without any records in it. Simply +1 will lead to missing some data.

logScanner.subscribeFromBeginning(bucketId)
logScanner.subscribe(bucketId, flussPartition.startOffset)
}
pollMoreRecords()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When both the start and end offsets are set to LATEST, the stop offset may be less than or equal to the start offset. In such cases, attempting to poll records will cause pollMoreRecords() to throw an exception.

To avoid this, we should explicitly validate that the start offset is strictly less than the stop offset before initiating polling.

tablePath: TablePath,
tableInfo: TableInfo,
readSchema: StructType,
startOffsetsInitializer: OffsetsInitializer,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The startOffsetsInitializer is not used in FlussUpsertBatch.


override def toBatch: Batch = {
new FlussUpsertBatch(tablePath, tableInfo, readSchema, options, flussConfig)
val startOffsetsInitializer = FlussScan.startOffsetsInitializer(options)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The startOffsetsInitializer is not used FlussUpsertBatch. For FlussUpsertBatch (primary key tables), we currently only support FULL startup mode that reads kv snapshot first, and then swith to the corresponding log offsets. So we should check the startup mode is FULL, otherwise, throw unsupported exception.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[spark] Add startup mode config

2 participants