Skip to content

Conversation

@xuyangzhong
Copy link
Contributor

Purpose

This pr is based on #2268 and do some refactoring work.

Linked issue: close #2231

Brief change log

  • Added tests for normal PK tables with table.delete.behavior=IGNORE (not just first_row merge engine)
  • Added tests for joining on bucket key (not just full primary key)
  • Added tests verifying LEFT/RIGHT/FULL OUTER joins fail with appropriate validation error
  • Added test verifying cascade joins are not supported
  • Updated documentation for Flink 2.2 Delta Join with new supported features and limitations

Tests

  • testDeltaJoinWithPrimaryKeyTableNoDeletes - normal PK table with delete.behavior=IGNORE
  • testDeltaJoinOnBucketKey - join on bucket key only
  • testDeltaJoinFailsWhenFilterOnNonUpsertKeys - filter on non-upsert-key columns fails
  • testDeltaJoinOnBucketKey - join on bucket key only (not full PK)
  • testDeltaJoinFailsWhenSourceHasDelete - source with DELETE records fails
  • testDeltaJoinFailsWhenJoinKeyNotContainIndex - join key not containing index fail
  • testDeltaJoinFailsWithLeftJoin - LEFT JOIN wouldn't be converted to DeltaJoin
  • testDeltaJoinFailsWithRightJoin - RIGHT JOIN wouldn't be converted to DeltaJoin
  • testDeltaJoinFailsWithFullOuterJoin - FULL OUTER JOIN wouldn't be converted to DeltaJoin
  • testDeltaJoinFailsWithCascadeJoin - cascade join wouldn't be converted to DeltaJoin

API and Format

No

Documentation

Yes - updated docs/engine-flink/delta-joins.md in Flink 2.2 part.

@xuyangzhong xuyangzhong force-pushed the fluss-delta-joins-tests-docs branch from c92a487 to 7e3c93d Compare February 2, 2026 05:52
Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @xuyangzhong and @fresh-borzoni , this is a great and thorough test coverage.

Could you also add tests for partitioned tables?

}

@Test
void testDeltaJoin() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should configure all tests to use table.delete.behavior and join on the bucket key by default, as this aligns with the recommended approach documented for users implementing delta joins.

Then I suggest to rename this method to testDeltaJoinOnPrimaryKey and keep this test as is. And rename testDeltaJoinOnBucketKey to testDeltaJoin.

"c1, d1, c2, d2");

// Filter on e1 > e2, where e1 and e2 are NOT part of the upsert key
String sql =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add TODO comment above this

        // TODO we can add a UpsertFilterOperator that can convert the un-match-filter UPSERT record
        //  into DELETE record.

"c2",
ImmutableMap.of("table.delete.behavior", "IGNORE"));

// Sink PK (a1, a2) doesn't match upstream update key (c1, d1, c2, d2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add TODO on this

// TODO: this depends on Fluss supports MVCC/point-in-time lookup to support change upsert keys

"c2",
ImmutableMap.of("table.merge-engine", "first_row"));

String sinkTableName = "sink_table_nondeterministic";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment on this

        // TODO this should be supported in Flink in future for non-deterministic functions before sinking

@fresh-borzoni
Copy link

@xuyangzhong Ack. Since you’re pushing new commits, let's continue with this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add Flink 2.2 Delta Join tests and documentation

3 participants