Skip to content

Conversation

@fresh-borzoni
Copy link

@fresh-borzoni fresh-borzoni commented Dec 28, 2025

Purpose

Linked issue: close #2231

Add comprehensive test coverage for Delta Join feature in Flink 2.2 and improve documentation.

Brief change log

  • Added tests for normal PK tables with table.delete.behavior=IGNORE (not just first_row merge engine)
  • Added tests for joining on bucket key (not just full primary key)
  • Added tests verifying LEFT/RIGHT/FULL OUTER joins fail with appropriate validation error
  • Added test verifying cascade joins are not supported
  • Updated documentation for Flink 2.2 Delta Join with new supported features and limitations

Tests

  • testDeltaJoinWithPrimaryKeyTableNoDeletes - normal PK table with delete.behavior=IGNORE
  • testDeltaJoinOnBucketKey - join on bucket key only
  • testDeltaJoinFailsWhenFilterOnNonUpsertKeys - filter on non-upsert-key columns fails
  • testDeltaJoinOnBucketKey - join on bucket key only (not full PK)
  • testDeltaJoinFailsWhenSourceHasDelete - source with DELETE records fails
  • testDeltaJoinFailsWhenJoinKeyNotContainIndex - join key not containing index fail
  • testDeltaJoinFailsWithLeftJoin - LEFT JOIN wouldn't be converted to DeltaJoin
  • testDeltaJoinFailsWithRightJoin - RIGHT JOIN wouldn't be converted to DeltaJoin
  • testDeltaJoinFailsWithFullOuterJoin - FULL OUTER JOIN wouldn't be converted to DeltaJoin
  • testDeltaJoinFailsWithCascadeJoin - cascade join wouldn't be converted to DeltaJoin

API and Format

No

Documentation

Yes - updated docs/engine-flink/delta-joins.md in Flink 2.2 part.

@fresh-borzoni
Copy link
Author

@xuyangzhong @wuchong
While studying the Flink 2.2 changes related to Delta Join, I noticed this issue. I've added some tests and improved the documentation, feel free to use the changes if you find them useful.

@fresh-borzoni fresh-borzoni force-pushed the fluss-delta-joins-tests-docs branch from ac23b6b to 6035422 Compare December 31, 2025 13:31
@xuyangzhong
Copy link
Contributor

Hi, @fresh-borzoni I'm a bit busy these days, I'll try my best to take a look after next Wednesday

Copy link
Contributor

@xuyangzhong xuyangzhong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for driving this! I have added some comments. Furthermore, some methods for table creation could be reused.

@fresh-borzoni
Copy link
Author

@xuyangzhong Thank you for the review!
Addressed comments. PTAL 🙏

@fresh-borzoni fresh-borzoni force-pushed the fluss-delta-joins-tests-docs branch from 4048bcf to 790959f Compare January 21, 2026 14:42
Copy link
Contributor

@xuyangzhong xuyangzhong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating! There are a few minor comments.

import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** IT case for Delta Join optimization in Flink 2.2. */
public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inheriting from FlinkTableSourceITCase will result in the repeated execution of all existing cases within FlinkTableSourceITCase.

What about extending FlinkTestBase and

private static final String CATALOG_NAME = "testcatalog";

    private StreamTableEnvironment tEnv;

    @BeforeEach
    @Override
    public void beforeEach() throws Exception {
        super.beforeEach();

        bootstrapServers = conn.getConfiguration().get(ConfigOptions.BOOTSTRAP_SERVERS).get(0);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        tEnv = StreamTableEnvironment.create(env);

        tEnv.executeSql(
                String.format(
                        "create catalog %s with ('type' = 'fluss', '%s' = '%s')",
                        CATALOG_NAME, BOOTSTRAP_SERVERS.key(), bootstrapServers));
        tEnv.executeSql("use catalog " + CATALOG_NAME);
        tEnv.executeSql(String.format("create database if not exists `%s`", DEFAULT_DB));
        tEnv.useDatabase(DEFAULT_DB);

        // start two jobs for this test: one for DML involving the delta join, and the other for DQL
        // to query the results of the sink table
        tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
        // Set FORCE strategy for delta join
        tEnv.getConfig()
                .set(
                        OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
                        OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
    }


    @AfterEach
    void after() {
        tEnv.useDatabase(BUILTIN_DATABASE);
        tEnv.executeSql(String.format("drop database `%s` cascade", DEFAULT_DB));
    }

String primaryKey,
String bucketKey,
String streamType,
String extraOptions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using Map<String, String> instead of String here?

private void createSource(
            String tableName,
            String columns,
            String primaryKey,
            String bucketKey,
            String streamType,
            @Nullable Map<String, String> extraOptions) {
        Map<String, String> withOptions = new HashMap<>();
        if (extraOptions != null) {
            withOptions.putAll(extraOptions);
        }
        withOptions.put("connector", "fluss");
        withOptions.put("bucket.key", bucketKey);

        ...

        tEnv.executeSql(
                String.format(
                        "create table %s ( %s, primary key (%s) NOT ENFORCED ) with ( %s )",
                        tableName,
                        columns,
                        primaryKey,
                        withOptions.entrySet().stream()
                                .map(e -> String.format("'%s' = '%s'", e.getKey(), e.getValue()))
                                .collect(Collectors.joining(", "))));
    }

withOptions.append("'connector' = 'fluss', ");
withOptions.append(String.format("'bucket.key' = '%s'", bucketKey));

if ("insert_only".equals(streamType)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using string.equals here feels a bit awkward. What about including it in extraOptions when calling from outside?

@xuyangzhong
Copy link
Contributor

Hi, @fresh-borzoni I have update this comment on the new pr #2546 based on yours due to DDL being nearly. Please feel free to cherry-pick the refactoring commit, or comment on that pr and join as co-authors.

@fresh-borzoni
Copy link
Author

Hi, @fresh-borzoni I have update this comment on the new pr #2546 based on yours due to DDL being nearly. Please feel free to cherry-pick the refactoring commit, or comment on that pr and join as co-authors.

@xuyangzhong Sure, I'll take a look today and also address @wuchong review if you don't mind.

@fresh-borzoni
Copy link
Author

@wuchong
Since #2546 is now the active PR, I’m closing this one to keep discussion and review in a single place.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add Flink 2.2 Delta Join tests and documentation

2 participants