-
Notifications
You must be signed in to change notification settings - Fork 496
[spark] Add startup mode for batch read #2532
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
@YannByron @wuchong please help take a look, thank you! |
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @Yohahaha! It looks like no new tests have been added, should we include some tests for the new configuration option?
Also, as a best practice, we recommend first creating a dedicated issue to describe the feature and proposed APIs before submitting a pull request. The PR can then be linked to that issue. This helps us better track progress and maintain visibility across all subtasks of the umbrella initiative.
| ConfigBuilder | ||
| .key("scan.startup.mode") | ||
| .stringType() | ||
| .defaultValue(StartUpMode.LATEST.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use the default FULL mode to stay aligned with the Flink connector?
Using LATEST by default may result in empty results if the user doesn’t explicitly specify a startup mode for the query.
| val scanRecords = logScanner.poll(POLL_TIMEOUT) | ||
| if ((scanRecords == null || scanRecords.isEmpty) && currentOffset < flussPartition.stopOffset) { | ||
| throw new IllegalStateException(s"No more data from fluss server," + | ||
| s" but current offset $currentOffset not reach the stop offset ${flussPartition.stopOffset}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logScanner.poll() may return empty results when the Fluss server is undergoing recovery, restart, or rebalance. Given that the current POLL_TIMEOUT is set to a very short duration (100ms), this scenario is highly likely to occur.
Currently, the source immediately throws an exception if logScanner.poll() returns no records, which makes it unstable during Fluss server failover events.
A straightforward fix is to increase the POLL_TIMEOUT to 60 seconds. This means the source will wait up to 60 seconds for data during transient server unavailability. If the Fluss server still hasn’t recovered within that window, we can then throw an exception to alert users.
| if (currentRecords.hasNext) { | ||
| val scanRecord = currentRecords.next() | ||
| currentRow = convertToSparkRow(scanRecord) | ||
| currentOffset += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't simply +1 and should use currentOffset = scanRecord.logOffset() + 1 instead. Because, there are some record batch increases log offsets without any records in it. Simply +1 will lead to missing some data.
| logScanner.subscribeFromBeginning(bucketId) | ||
| logScanner.subscribe(bucketId, flussPartition.startOffset) | ||
| } | ||
| pollMoreRecords() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When both the start and end offsets are set to LATEST, the stop offset may be less than or equal to the start offset. In such cases, attempting to poll records will cause pollMoreRecords() to throw an exception.
To avoid this, we should explicitly validate that the start offset is strictly less than the stop offset before initiating polling.
| tablePath: TablePath, | ||
| tableInfo: TableInfo, | ||
| readSchema: StructType, | ||
| startOffsetsInitializer: OffsetsInitializer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The startOffsetsInitializer is not used in FlussUpsertBatch.
|
|
||
| override def toBatch: Batch = { | ||
| new FlussUpsertBatch(tablePath, tableInfo, readSchema, options, flussConfig) | ||
| val startOffsetsInitializer = FlussScan.startOffsetsInitializer(options) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The startOffsetsInitializer is not used FlussUpsertBatch. For FlussUpsertBatch (primary key tables), we currently only support FULL startup mode that reads kv snapshot first, and then swith to the corresponding log offsets. So we should check the startup mode is FULL, otherwise, throw unsupported exception.
Purpose
Add a new option
start.up.modeto read different offset from fluss.This PR only changes batch read related class.
Linked issue: close #2549
Brief change log
Tests
API and Format
Documentation