Skip to content

Conversation

@beryllw
Copy link
Contributor

@beryllw beryllw commented Jan 26, 2026

Purpose

Linked issue: close #2482

Brief change log

Tests

API and Format

Documentation

@beryllw beryllw marked this pull request as draft January 26, 2026 12:54
@beryllw beryllw force-pushed the insertIfNotExists-poc branch 2 times, most recently from 7aa9f61 to 9f5414c Compare January 29, 2026 08:06
@beryllw beryllw marked this pull request as ready for review January 29, 2026 08:07
@beryllw beryllw changed the title [kv] Implement Atomic Lookup-or-Put for KV Tables [kv] Implement insertIfNotExists on tablet server Jan 29, 2026
@beryllw beryllw force-pushed the insertIfNotExists-poc branch from 5b7aff2 to b2e175a Compare January 29, 2026 12:47
* @param keyFields the key fields to decode
*/
static KeyDecoder of(RowType rowType, List<String> keyFields) {
return CompactedKeyDecoder.createKeyDecoder(rowType, keyFields);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add KeyDecoders for Paimon and Iceberg later.

Copy link
Contributor

@platinumhamburg platinumhamburg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, Just a few minor comments. Overall, this is the implementation with the minimal changes for the current update. Of course, it's not yet performance-optimal—it introduces a notable performance inefficiency: during the PutKV process, the original key is encoded and then immediately decoded again. This could be addressed in a future optimization; I suggest adding a TODO comment for it.

Copy link
Contributor

@platinumhamburg platinumhamburg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a critical bug that needs to be fixed

@platinumhamburg
Copy link
Contributor

I’ve thought about it carefully, and I think KeyRecordBatch is too specific—especially since it’s meant for a temporary solution we all agree isn’t final. We could just remove it entirely and use a simpler approach instead. But if we’re short on time, it’s not a big deal—it doesn’t add much code—so we can keep it for now and clean it up later.

@beryllw beryllw force-pushed the insertIfNotExists-poc branch 2 times, most recently from a2d7e77 to 37efb9a Compare January 30, 2026 14:28
@beryllw beryllw force-pushed the insertIfNotExists-poc branch from bee2bad to 20a2af3 Compare January 30, 2026 14:48
Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@beryllw I left some comments. Especially, I added a concurrent test that reveals the correctness issue and should be fixed.

}

@Test
void testLookupWithInsertIfNotExistsAutoIncrement() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add an additional test for concurrent lookups with insert-if-not-exists and auto-increment enabled?

The test should:

  • Spawn multiple threads (e.g., 3) that concurrently perform lookupWithInsert operations on the same set of keys (e.g., 100, 200, 300).
  • Verify that all threads receive consistent lookup results (i.e., the same inserted values for each key).
  • Confirm that exactly 3 changelog entries are written to the log tablet—one per unique key.

This will validate that concurrent put-and-relookup operations behave correctly under contention, ensuring idempotency and consistency when auto-increment and conditional inserts are enabled.

Comment on lines +49 to +52
if (lakeFormat == DataLakeFormat.PAIMON) {
// TODO: Implement key decoding support for Paimon lake format
throw new UnsupportedOperationException(
"Paimon lake format does not support key decoding");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much effort would it take to support this? I believe it’s essential—since most clusters will enable lakehouse integration, this will become a common and critical path.

If this requires significant work, please create a blocker issue to track it. And please add tests for this case as well.

timeoutMs,
requiredAcks,
produceEntryData,
null,
Copy link
Member

@wuchong wuchong Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrect. We should use partial updates to modify only the primary key fields. Otherwise, non-primary-key columns—including auto-increment fields—will be overwritten with null. This issue manifests when multiple threads concurrently call lookupAndInsert on the same keys.

To reproduce this, I’ve added a test: testConcurrentLookupWithInsertIfNotExistsAutoIncrement.

Additionally, we should enforce a validation in PutRequest: when performing a put operation, auto-increment fields must be excluded from the target columns. This ensures that auto-incremented values are never accidentally overwritten during updates. That means INSERT INTO t (id, auto_inc) VALUES ... or INSERT INTO t VALUES ... should fail with explicit error message that the auto increment fields can't be updated.

}
}

if (insertIfNotExists) {
Copy link
Contributor

@xx789633 xx789633 Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please also add a comment here to indicate that there might be a data race and describe how you solve it? The same key might be inserted by another thread after the lookup check passes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement Atomic Lookup-or-Put for KV Tables

4 participants