-
Notifications
You must be signed in to change notification settings - Fork 498
[flink] Delta Join additional IT tests and docs improvement #2268
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[flink] Delta Join additional IT tests and docs improvement #2268
Conversation
|
@xuyangzhong @wuchong |
ac23b6b to
6035422
Compare
|
Hi, @fresh-borzoni I'm a bit busy these days, I'll try my best to take a look after next Wednesday |
xuyangzhong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for driving this! I have added some comments. Furthermore, some methods for table creation could be reused.
...nk/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java
Outdated
Show resolved
Hide resolved
...nk/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java
Outdated
Show resolved
Hide resolved
...nk/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java
Outdated
Show resolved
Hide resolved
...nk/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java
Outdated
Show resolved
Hide resolved
...nk/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java
Outdated
Show resolved
Hide resolved
|
@xuyangzhong Thank you for the review! |
4048bcf to
790959f
Compare
xuyangzhong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating! There are a few minor comments.
| import static org.assertj.core.api.Assertions.assertThatThrownBy; | ||
|
|
||
| /** IT case for Delta Join optimization in Flink 2.2. */ | ||
| public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inheriting from FlinkTableSourceITCase will result in the repeated execution of all existing cases within FlinkTableSourceITCase.
What about extending FlinkTestBase and
private static final String CATALOG_NAME = "testcatalog";
private StreamTableEnvironment tEnv;
@BeforeEach
@Override
public void beforeEach() throws Exception {
super.beforeEach();
bootstrapServers = conn.getConfiguration().get(ConfigOptions.BOOTSTRAP_SERVERS).get(0);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(
String.format(
"create catalog %s with ('type' = 'fluss', '%s' = '%s')",
CATALOG_NAME, BOOTSTRAP_SERVERS.key(), bootstrapServers));
tEnv.executeSql("use catalog " + CATALOG_NAME);
tEnv.executeSql(String.format("create database if not exists `%s`", DEFAULT_DB));
tEnv.useDatabase(DEFAULT_DB);
// start two jobs for this test: one for DML involving the delta join, and the other for DQL
// to query the results of the sink table
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
// Set FORCE strategy for delta join
tEnv.getConfig()
.set(
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
}
@AfterEach
void after() {
tEnv.useDatabase(BUILTIN_DATABASE);
tEnv.executeSql(String.format("drop database `%s` cascade", DEFAULT_DB));
}
| String primaryKey, | ||
| String bucketKey, | ||
| String streamType, | ||
| String extraOptions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about using Map<String, String> instead of String here?
private void createSource(
String tableName,
String columns,
String primaryKey,
String bucketKey,
String streamType,
@Nullable Map<String, String> extraOptions) {
Map<String, String> withOptions = new HashMap<>();
if (extraOptions != null) {
withOptions.putAll(extraOptions);
}
withOptions.put("connector", "fluss");
withOptions.put("bucket.key", bucketKey);
...
tEnv.executeSql(
String.format(
"create table %s ( %s, primary key (%s) NOT ENFORCED ) with ( %s )",
tableName,
columns,
primaryKey,
withOptions.entrySet().stream()
.map(e -> String.format("'%s' = '%s'", e.getKey(), e.getValue()))
.collect(Collectors.joining(", "))));
}
| withOptions.append("'connector' = 'fluss', "); | ||
| withOptions.append(String.format("'bucket.key' = '%s'", bucketKey)); | ||
|
|
||
| if ("insert_only".equals(streamType)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using string.equals here feels a bit awkward. What about including it in extraOptions when calling from outside?
|
Hi, @fresh-borzoni I have update this comment on the new pr #2546 based on yours due to DDL being nearly. Please feel free to cherry-pick the refactoring commit, or comment on that pr and join as co-authors. |
@xuyangzhong Sure, I'll take a look today and also address @wuchong review if you don't mind. |
Purpose
Linked issue: close #2231
Add comprehensive test coverage for Delta Join feature in Flink 2.2 and improve documentation.
Brief change log
table.delete.behavior=IGNORE(not justfirst_rowmerge engine)Tests
testDeltaJoinWithPrimaryKeyTableNoDeletes- normal PK table withdelete.behavior=IGNOREtestDeltaJoinOnBucketKey- join on bucket key onlytestDeltaJoinFailsWhenFilterOnNonUpsertKeys- filter on non-upsert-key columns failstestDeltaJoinOnBucketKey- join on bucket key only (not full PK)testDeltaJoinFailsWhenSourceHasDelete- source with DELETE records failstestDeltaJoinFailsWhenJoinKeyNotContainIndex- join key not containing index failtestDeltaJoinFailsWithLeftJoin- LEFT JOIN wouldn't be converted to DeltaJointestDeltaJoinFailsWithRightJoin- RIGHT JOIN wouldn't be converted to DeltaJointestDeltaJoinFailsWithFullOuterJoin- FULL OUTER JOIN wouldn't be converted to DeltaJointestDeltaJoinFailsWithCascadeJoin- cascade join wouldn't be converted to DeltaJoinAPI and Format
No
Documentation
Yes - updated
docs/engine-flink/delta-joins.mdin Flink 2.2 part.