diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java b/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java index 4c8ecbd79a..9f18dd401f 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java @@ -65,6 +65,10 @@ public final class TableDescriptor implements Serializable { public static final String LOG_OFFSET_COLUMN = "_log_offset"; public static final String COMMIT_TIMESTAMP_COLUMN = "_commit_timestamp"; + // column names for $binlog virtual table nested row fields + public static final String BEFORE_COLUMN = "before"; + public static final String AFTER_COLUMN = "after"; + private final Schema schema; private final @Nullable String comment; private final List partitionKeys; diff --git a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118BinlogVirtualTableITCase.java b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118BinlogVirtualTableITCase.java new file mode 100644 index 0000000000..7dcd9e45a2 --- /dev/null +++ b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118BinlogVirtualTableITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +/** IT case for {@link BinlogVirtualTableITCase} in Flink 1.18. */ +public class Flink118BinlogVirtualTableITCase extends BinlogVirtualTableITCase {} diff --git a/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/source/Flink119BinlogVirtualTableITCase.java b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/source/Flink119BinlogVirtualTableITCase.java new file mode 100644 index 0000000000..074a300f4c --- /dev/null +++ b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/source/Flink119BinlogVirtualTableITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +/** IT case for {@link BinlogVirtualTableITCase} in Flink 1.19. */ +public class Flink119BinlogVirtualTableITCase extends BinlogVirtualTableITCase {} diff --git a/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/source/Flink120BinlogVirtualTableITCase.java b/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/source/Flink120BinlogVirtualTableITCase.java new file mode 100644 index 0000000000..354f8f1980 --- /dev/null +++ b/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/source/Flink120BinlogVirtualTableITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +/** IT case for {@link BinlogVirtualTableITCase} in Flink 1.20. */ +public class Flink120BinlogVirtualTableITCase extends BinlogVirtualTableITCase {} diff --git a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22BinlogVirtualTableITCase.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22BinlogVirtualTableITCase.java new file mode 100644 index 0000000000..d73dad6131 --- /dev/null +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22BinlogVirtualTableITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +/** IT case for {@link BinlogVirtualTableITCase} in Flink 2.2. */ +public class Flink22BinlogVirtualTableITCase extends BinlogVirtualTableITCase {} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java index c62292c69b..6fd5d147fc 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java @@ -245,7 +245,13 @@ public class FlinkConnectorOptions { .withDescription( "The serialized base64 bytes of refresh handler of materialized table."); - // ------------------------------------------------------------------------------------------ + /** Internal option to indicate whether the base table is partitioned for $binlog sources. */ + public static final ConfigOption INTERNAL_BINLOG_IS_PARTITIONED = + ConfigOptions.key("_internal.binlog.is-partitioned") + .booleanType() + .defaultValue(false) + .withDescription( + "Internal option: indicates whether the base table is partitioned for $binlog virtual tables. Not part of public API."); /** Startup mode for the fluss scanner, see {@link #SCAN_STARTUP_MODE}. */ public enum ScanStartupMode implements DescribedEnum { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java index c6c434889f..5d81c5e5e6 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java @@ -23,6 +23,7 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.InvalidTableException; +import org.apache.fluss.flink.FlinkConnectorOptions; import org.apache.fluss.flink.adapter.CatalogTableAdapter; import org.apache.fluss.flink.lake.LakeFlinkCatalog; import org.apache.fluss.flink.procedure.ProcedureManager; @@ -39,6 +40,7 @@ import org.apache.fluss.utils.IOUtils; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.catalog.AbstractCatalog; import org.apache.flink.table.catalog.CatalogBaseTable; @@ -73,6 +75,7 @@ import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.factories.Factory; import org.apache.flink.table.procedures.Procedure; +import org.apache.flink.table.types.AbstractDataType; import java.util.ArrayList; import java.util.Collections; @@ -315,11 +318,7 @@ public CatalogBaseTable getTable(ObjectPath objectPath) return getVirtualChangelogTable(objectPath); } else if (tableName.endsWith(BINLOG_TABLE_SUFFIX) && !tableName.contains(LAKE_TABLE_SPLITTER)) { - // TODO: Implement binlog virtual table in future - throw new UnsupportedOperationException( - String.format( - "$binlog virtual tables are not yet supported for table %s", - objectPath)); + return getVirtualBinlogTable(objectPath); } TablePath tablePath = toTablePath(objectPath); @@ -960,4 +959,109 @@ private Schema buildChangelogSchema(Schema originalSchema) { return builder.build(); } + + /** + * Creates a virtual $binlog table by modifying the base table's schema to include metadata + * columns and nested before/after ROW fields. + */ + private CatalogBaseTable getVirtualBinlogTable(ObjectPath objectPath) + throws TableNotExistException, CatalogException { + // Extract the base table name (remove $binlog suffix) + String virtualTableName = objectPath.getObjectName(); + String baseTableName = + virtualTableName.substring( + 0, virtualTableName.length() - BINLOG_TABLE_SUFFIX.length()); + + // Get the base table + ObjectPath baseObjectPath = new ObjectPath(objectPath.getDatabaseName(), baseTableName); + TablePath baseTablePath = toTablePath(baseObjectPath); + + try { + // Retrieve base table info + TableInfo tableInfo = admin.getTableInfo(baseTablePath).get(); + + // $binlog is only supported for primary key tables + if (!tableInfo.hasPrimaryKey()) { + throw new UnsupportedOperationException( + String.format( + "$binlog virtual tables are only supported for primary key tables. " + + "Table %s does not have a primary key.", + baseTablePath)); + } + + // Convert to Flink table + CatalogBaseTable catalogBaseTable = FlinkConversions.toFlinkTable(tableInfo); + + if (!(catalogBaseTable instanceof CatalogTable)) { + throw new UnsupportedOperationException( + "Virtual $binlog tables are only supported for regular tables"); + } + + CatalogTable baseTable = (CatalogTable) catalogBaseTable; + + // Build the binlog schema with nested before/after ROW columns + Schema originalSchema = baseTable.getUnresolvedSchema(); + Schema binlogSchema = buildBinlogSchema(originalSchema); + + // Copy options from base table + Map newOptions = new HashMap<>(baseTable.getOptions()); + newOptions.put(BOOTSTRAP_SERVERS.key(), bootstrapServers); + newOptions.putAll(securityConfigs); + + // Store whether the base table is partitioned for the table source to use. + // Since binlog schema has nested columns, we can't use Flink's partition key mechanism. + newOptions.put( + FlinkConnectorOptions.INTERNAL_BINLOG_IS_PARTITIONED.key(), + String.valueOf(!baseTable.getPartitionKeys().isEmpty())); + + // Create a new CatalogTable with the binlog schema + // Binlog virtual tables don't have partition keys at the top level + return CatalogTableAdapter.toCatalogTable( + binlogSchema, baseTable.getComment(), Collections.emptyList(), newOptions); + + } catch (Exception e) { + Throwable t = ExceptionUtils.stripExecutionException(e); + if (t instanceof UnsupportedOperationException) { + throw (UnsupportedOperationException) t; + } + if (isTableNotExist(t)) { + throw new TableNotExistException(getName(), baseObjectPath); + } else { + throw new CatalogException( + String.format( + "Failed to get virtual binlog table %s in %s", + objectPath, getName()), + t); + } + } + } + + private Schema buildBinlogSchema(Schema originalSchema) { + Schema.Builder builder = Schema.newBuilder(); + + // Add metadata columns + builder.column("_change_type", STRING().notNull()); + builder.column("_log_offset", BIGINT().notNull()); + builder.column("_commit_timestamp", TIMESTAMP_LTZ(3).notNull()); + + // Build nested ROW type from original columns for before/after fields + // Using UnresolvedField since physCol.getDataType() returns AbstractDataType (unresolved) + List rowFields = new ArrayList<>(); + for (Schema.UnresolvedColumn col : originalSchema.getColumns()) { + if (col instanceof Schema.UnresolvedPhysicalColumn) { + Schema.UnresolvedPhysicalColumn physCol = (Schema.UnresolvedPhysicalColumn) col; + rowFields.add(DataTypes.FIELD(physCol.getName(), physCol.getDataType())); + } + } + AbstractDataType nestedRowType = + DataTypes.ROW(rowFields.toArray(new DataTypes.UnresolvedField[0])); + + // Add before and after as nullable nested ROW columns + builder.column("before", nestedRowType); + builder.column("after", nestedRowType); + + // Note: We don't copy primary keys or watermarks for virtual tables + + return builder.build(); + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java index 7c99b6f4b8..a7a5608de6 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java @@ -24,6 +24,7 @@ import org.apache.fluss.flink.lake.LakeTableFactory; import org.apache.fluss.flink.sink.FlinkTableSink; import org.apache.fluss.flink.sink.shuffle.DistributionMode; +import org.apache.fluss.flink.source.BinlogFlinkTableSource; import org.apache.fluss.flink.source.ChangelogFlinkTableSource; import org.apache.fluss.flink.source.FlinkTableSource; import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils; @@ -93,6 +94,11 @@ public DynamicTableSource createDynamicTableSource(Context context) { return createChangelogTableSource(context, tableIdentifier, tableName); } + // Check if this is a $binlog suffix in table name + if (tableName.endsWith(FlinkCatalog.BINLOG_TABLE_SUFFIX)) { + return createBinlogTableSource(context, tableIdentifier, tableName); + } + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); final ReadableConfig tableOptions = helper.getOptions(); validateSourceOptions(tableOptions); @@ -327,4 +333,51 @@ private DynamicTableSource createChangelogTableSource( partitionDiscoveryIntervalMs, catalogTableOptions); } + + /** Creates a BinlogFlinkTableSource for $binlog virtual tables. */ + private DynamicTableSource createBinlogTableSource( + Context context, ObjectIdentifier tableIdentifier, String tableName) { + // Extract the base table name by removing the $binlog suffix + String baseTableName = + tableName.substring( + 0, tableName.length() - FlinkCatalog.BINLOG_TABLE_SUFFIX.length()); + + boolean isStreamingMode = + context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) + == RuntimeExecutionMode.STREAMING; + + // tableOutputType: [_change_type, _log_offset, _commit_timestamp, before ROW<...>, after + // ROW<...>] + RowType tableOutputType = (RowType) context.getPhysicalRowDataType().getLogicalType(); + + Map catalogTableOptions = context.getCatalogTable().getOptions(); + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + final ReadableConfig tableOptions = helper.getOptions(); + validateSourceOptions(tableOptions); + + ZoneId timeZone = + FlinkConnectorOptionsUtils.getLocalTimeZone( + context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE)); + final FlinkConnectorOptionsUtils.StartupOptions startupOptions = + FlinkConnectorOptionsUtils.getStartupOptions(tableOptions, timeZone); + + // Check if the table is partitioned from the internal option + boolean isPartitioned = + tableOptions.get(FlinkConnectorOptions.INTERNAL_BINLOG_IS_PARTITIONED); + + long partitionDiscoveryIntervalMs = + tableOptions + .get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL) + .toMillis(); + + return new BinlogFlinkTableSource( + TablePath.of(tableIdentifier.getDatabaseName(), baseTableName), + toFlussClientConfig(catalogTableOptions, context.getConfiguration()), + tableOutputType, + isPartitioned, + isStreamingMode, + startupOptions, + partitionDiscoveryIntervalMs, + catalogTableOptions); + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java new file mode 100644 index 0000000000..a5ec91bf3f --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +import org.apache.fluss.client.initializer.OffsetsInitializer; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.source.deserializer.BinlogDeserializationSchema; +import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils; +import org.apache.fluss.flink.utils.FlinkConversions; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.predicate.Predicate; +import org.apache.fluss.types.RowType; + +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.LogicalType; + +import javax.annotation.Nullable; + +import java.util.Map; + +/** A Flink table source for the $binlog virtual table. */ +public class BinlogFlinkTableSource implements ScanTableSource { + + private final TablePath tablePath; + private final Configuration flussConfig; + // The binlog output type (includes metadata + nested before/after ROW columns) + private final org.apache.flink.table.types.logical.RowType binlogOutputType; + // The data columns type extracted from the 'before' nested ROW + private final org.apache.flink.table.types.logical.RowType dataColumnsType; + private final boolean isPartitioned; + private final boolean streaming; + private final FlinkConnectorOptionsUtils.StartupOptions startupOptions; + private final long scanPartitionDiscoveryIntervalMs; + private final Map tableOptions; + + // Projection pushdown + @Nullable private int[] projectedFields; + private LogicalType producedDataType; + + @Nullable private Predicate partitionFilters; + + public BinlogFlinkTableSource( + TablePath tablePath, + Configuration flussConfig, + org.apache.flink.table.types.logical.RowType binlogOutputType, + boolean isPartitioned, + boolean streaming, + FlinkConnectorOptionsUtils.StartupOptions startupOptions, + long scanPartitionDiscoveryIntervalMs, + Map tableOptions) { + this.tablePath = tablePath; + this.flussConfig = flussConfig; + this.binlogOutputType = binlogOutputType; + this.isPartitioned = isPartitioned; + this.streaming = streaming; + this.startupOptions = startupOptions; + this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs; + this.tableOptions = tableOptions; + + // Extract data columns from the 'before' nested ROW type (index 3) + // The binlog schema is: [_change_type, _log_offset, _commit_timestamp, before, after] + this.dataColumnsType = + (org.apache.flink.table.types.logical.RowType) binlogOutputType.getTypeAt(3); + this.producedDataType = binlogOutputType; + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + // Create the Fluss row type for the data columns (the original table columns) + RowType flussRowType = FlinkConversions.toFlussRowType(dataColumnsType); + if (projectedFields != null) { + flussRowType = flussRowType.project(projectedFields); + } + + // Determine the offsets initializer based on startup mode + OffsetsInitializer offsetsInitializer; + switch (startupOptions.startupMode) { + case EARLIEST: + case FULL: + // For binlog, read all log records from the beginning + offsetsInitializer = OffsetsInitializer.earliest(); + break; + case LATEST: + offsetsInitializer = OffsetsInitializer.latest(); + break; + case TIMESTAMP: + offsetsInitializer = + OffsetsInitializer.timestamp(startupOptions.startupTimestampMs); + break; + default: + throw new IllegalArgumentException( + "Unsupported startup mode: " + startupOptions.startupMode); + } + + // Create the source with the binlog deserialization schema + FlinkSource source = + new FlinkSource<>( + flussConfig, + tablePath, + false, + isPartitioned, + flussRowType, + projectedFields, + offsetsInitializer, + scanPartitionDiscoveryIntervalMs, + new BinlogDeserializationSchema(), + streaming, + partitionFilters, + null); + + return SourceProvider.of(source); + } + + @Override + public DynamicTableSource copy() { + BinlogFlinkTableSource copy = + new BinlogFlinkTableSource( + tablePath, + flussConfig, + binlogOutputType, + isPartitioned, + streaming, + startupOptions, + scanPartitionDiscoveryIntervalMs, + tableOptions); + copy.producedDataType = producedDataType; + copy.projectedFields = projectedFields; + copy.partitionFilters = partitionFilters; + return copy; + } + + @Override + public String asSummaryString() { + return "FlussBinlogTableSource"; + } + + // TODO: Implement projection pushdown handling for nested before/after columns + // TODO: Implement filter pushdown +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java index 3883c43086..a034807d7c 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java @@ -27,7 +27,6 @@ import org.apache.fluss.predicate.Predicate; import org.apache.fluss.types.RowType; -import org.apache.flink.api.connector.source.Source; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; @@ -170,22 +169,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { partitionFilters, null); // Lake source not supported - if (!streaming) { - // Batch mode - changelog virtual tables read from log, not data lake - return new SourceProvider() { - @Override - public boolean isBounded() { - return true; - } - - @Override - public Source createSource() { - return source; - } - }; - } else { - return SourceProvider.of(source); - } + return SourceProvider.of(source); } @Override diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/BinlogDeserializationSchema.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/BinlogDeserializationSchema.java new file mode 100644 index 0000000000..1114febb9c --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/BinlogDeserializationSchema.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source.deserializer; + +import org.apache.fluss.flink.utils.BinlogRowConverter; +import org.apache.fluss.record.LogRecord; +import org.apache.fluss.types.RowType; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; + +import javax.annotation.Nullable; + +import static org.apache.fluss.flink.utils.FlinkConversions.toFlinkRowType; + +/** + * A deserialization schema that converts {@link LogRecord} objects to Flink's {@link RowData} + * format with nested before/after row structure for the $binlog virtual table. + * + *

This schema is stateful: it buffers UPDATE_BEFORE (-U) records and returns {@code null} for + * them. When the subsequent UPDATE_AFTER (+U) record arrives, it merges both into a single binlog + * row. The {@link org.apache.fluss.flink.source.emitter.FlinkRecordEmitter} handles null returns by + * skipping emission. + */ +public class BinlogDeserializationSchema implements FlussDeserializationSchema { + + /** + * Converter responsible for transforming Fluss row data into Flink's {@link RowData} format + * with nested before/after structure. Initialized during {@link #open(InitializationContext)}. + */ + private transient BinlogRowConverter converter; + + /** Creates a new BinlogDeserializationSchema. */ + public BinlogDeserializationSchema() {} + + /** Initializes the deserialization schema. */ + @Override + public void open(InitializationContext context) throws Exception { + if (converter == null) { + this.converter = new BinlogRowConverter(context.getRowSchema()); + } + } + + /** + * Deserializes a {@link LogRecord} into a Flink {@link RowData} object with nested before/after + * structure. + */ + @Override + @Nullable + public RowData deserialize(LogRecord record) throws Exception { + if (converter == null) { + throw new IllegalStateException( + "Converter not initialized. The open() method must be called before deserializing records."); + } + return converter.toBinlogRowData(record); + } + + /** + * Returns the TypeInformation for the produced {@link RowData} type including nested + * before/after ROW columns. + */ + @Override + public TypeInformation getProducedType(RowType rowSchema) { + // Build the output type with nested before/after ROW columns + org.apache.flink.table.types.logical.RowType outputType = + BinlogRowConverter.buildBinlogRowType(toFlinkRowType(rowSchema)); + return InternalTypeInfo.of(outputType); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java index 82a3ed87e6..ba4c9d0131 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java @@ -73,8 +73,20 @@ public void emitRecord( } processAndEmitRecord(scanRecord, sourceOutput); } else if (splitState.isLogSplitState()) { - splitState.asLogSplitState().setNextOffset(recordAndPosition.record().logOffset() + 1); - processAndEmitRecord(recordAndPosition.record(), sourceOutput); + // Attempt to process and emit the record. + // For $binlog, this returns true only when a complete row (or the final part of + // a split) is emitted. + boolean emitted = processAndEmitRecord(recordAndPosition.record(), sourceOutput); + + if (emitted) { + // Only advance the offset in state if the record was successfully emitted. + // This ensures that if a crash occurs mid-update (between BEFORE and AFTER), + // the source will re-read the same log offset upon recovery, + // allowing the BinlogDeserializationSchema to correctly reconstruct the state. + splitState + .asLogSplitState() + .setNextOffset(recordAndPosition.record().logOffset() + 1); + } } else if (splitState.isLakeSplit()) { if (lakeRecordRecordEmitter == null) { lakeRecordRecordEmitter = new LakeRecordRecordEmitter<>(this::processAndEmitRecord); @@ -85,7 +97,13 @@ public void emitRecord( } } - private void processAndEmitRecord(ScanRecord scanRecord, SourceOutput sourceOutput) { + /** + * Processes and emits a record. + * + * @return true if a record was emitted, false if deserialize returned null (e.g., for $binlog + * UPDATE_BEFORE records that are buffered pending their UPDATE_AFTER pair) + */ + private boolean processAndEmitRecord(ScanRecord scanRecord, SourceOutput sourceOutput) { OUT record; try { record = deserializationSchema.deserialize(scanRecord); @@ -102,6 +120,8 @@ record = deserializationSchema.deserialize(scanRecord); } else { sourceOutput.collect(record); } + return true; } + return false; } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/BinlogRowConverter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/BinlogRowConverter.java new file mode 100644 index 0000000000..3a9f2c9411 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/BinlogRowConverter.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.utils; + +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.record.LogRecord; +import org.apache.fluss.types.RowType; + +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.types.RowKind; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +/** + * A converter that transforms Fluss's {@link LogRecord} to Flink's {@link RowData} with nested + * before/after row structure for the $binlog virtual table. + */ +public class BinlogRowConverter implements RecordToFlinkRowConverter { + + private final FlussRowToFlinkRowConverter baseConverter; + private final org.apache.flink.table.types.logical.RowType producedType; + + /** + * Buffer for the UPDATE_BEFORE (-U) record pending merge with the next UPDATE_AFTER (+U) + * record. Null when no update is in progress. + */ + @Nullable private LogRecord pendingUpdateBefore; + + /** Creates a new BinlogRowConverter. */ + public BinlogRowConverter(RowType rowType) { + this.baseConverter = new FlussRowToFlinkRowConverter(rowType); + this.producedType = buildBinlogRowType(FlinkConversions.toFlinkRowType(rowType)); + } + + /** Converts a LogRecord to a binlog RowData with nested before/after structure. */ + @Nullable + public RowData toBinlogRowData(LogRecord record) { + ChangeType changeType = record.getChangeType(); + + switch (changeType) { + case INSERT: + return buildBinlogRow( + "insert", + record.logOffset(), + record.timestamp(), + null, + baseConverter.toFlinkRowData(record.getRow())); + + case UPDATE_BEFORE: + // Buffer the -U record and return null. + // FlinkRecordEmitter.processAndEmitRecord() skips null results. + this.pendingUpdateBefore = record; + return null; + + case UPDATE_AFTER: + // Merge with the buffered -U record + if (pendingUpdateBefore == null) { + throw new IllegalStateException( + "Received UPDATE_AFTER (+U) without a preceding UPDATE_BEFORE (-U) record. " + + "This indicates a corrupted log sequence."); + } + RowData beforeRow = baseConverter.toFlinkRowData(pendingUpdateBefore.getRow()); + RowData afterRow = baseConverter.toFlinkRowData(record.getRow()); + // Use offset and timestamp from the -U record (first entry of update pair) + long offset = pendingUpdateBefore.logOffset(); + long timestamp = pendingUpdateBefore.timestamp(); + pendingUpdateBefore = null; + return buildBinlogRow("update", offset, timestamp, beforeRow, afterRow); + + case DELETE: + return buildBinlogRow( + "delete", + record.logOffset(), + record.timestamp(), + baseConverter.toFlinkRowData(record.getRow()), + null); + + default: + throw new UnsupportedOperationException( + String.format( + "$binlog virtual table does not support change type: %s. " + + "$binlog is only supported for primary key tables.", + changeType)); + } + } + + @Override + @Nullable + public RowData convert(LogRecord record) { + return toBinlogRowData(record); + } + + @Override + public org.apache.flink.table.types.logical.RowType getProducedType() { + return producedType; + } + + /** + * Builds a binlog row with 5 fields: _change_type, _log_offset, _commit_timestamp, before, + * after. + */ + private RowData buildBinlogRow( + String changeType, + long offset, + long timestamp, + @Nullable RowData before, + @Nullable RowData after) { + GenericRowData row = new GenericRowData(5); + row.setField(0, StringData.fromString(changeType)); + row.setField(1, offset); + row.setField(2, TimestampData.fromEpochMillis(timestamp)); + row.setField(3, before); + row.setField(4, after); + row.setRowKind(RowKind.INSERT); + return row; + } + + /** + * Builds the Flink RowType for the binlog virtual table with nested before/after ROW columns. + */ + public static org.apache.flink.table.types.logical.RowType buildBinlogRowType( + org.apache.flink.table.types.logical.RowType originalType) { + List fields = new ArrayList<>(); + + // Add metadata columns + fields.add( + new org.apache.flink.table.types.logical.RowType.RowField( + TableDescriptor.CHANGE_TYPE_COLUMN, new VarCharType(false, 6))); + fields.add( + new org.apache.flink.table.types.logical.RowType.RowField( + TableDescriptor.LOG_OFFSET_COLUMN, new BigIntType(false))); + fields.add( + new org.apache.flink.table.types.logical.RowType.RowField( + TableDescriptor.COMMIT_TIMESTAMP_COLUMN, + new LocalZonedTimestampType(false, 3))); + + // Add nested before and after ROW columns (nullable at the ROW level) + org.apache.flink.table.types.logical.RowType nullableRowType = + new org.apache.flink.table.types.logical.RowType(true, originalType.getFields()); + fields.add( + new org.apache.flink.table.types.logical.RowType.RowField( + TableDescriptor.BEFORE_COLUMN, nullableRowType)); + fields.add( + new org.apache.flink.table.types.logical.RowType.RowField( + TableDescriptor.AFTER_COLUMN, nullableRowType)); + + return new org.apache.flink.table.types.logical.RowType(fields); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java index 6b8081a6d1..a2b339da5d 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java @@ -86,8 +86,19 @@ public org.apache.flink.table.types.logical.RowType getProducedType() { /** Converts a Fluss ChangeType to its string representation for the changelog virtual table. */ private String convertChangeTypeToString(ChangeType changeType) { - // Use the short string representation from ChangeType - return changeType.shortString(); + switch (changeType) { + case APPEND_ONLY: + case INSERT: + return "insert"; + case UPDATE_BEFORE: + return "update_before"; + case UPDATE_AFTER: + return "update_after"; + case DELETE: + return "delete"; + default: + throw new IllegalArgumentException("Unknown change type: " + changeType); + } } /** @@ -103,7 +114,7 @@ public static org.apache.flink.table.types.logical.RowType buildChangelogRowType // Add metadata columns first (using centralized constants from TableDescriptor) fields.add( new org.apache.flink.table.types.logical.RowType.RowField( - TableDescriptor.CHANGE_TYPE_COLUMN, new VarCharType(false, 2))); + TableDescriptor.CHANGE_TYPE_COLUMN, new VarCharType(false, 13))); fields.add( new org.apache.flink.table.types.logical.RowType.RowField( TableDescriptor.LOG_OFFSET_COLUMN, new BigIntType(false))); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java index 58e78ccd2a..f4ffd61de1 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java @@ -27,6 +27,7 @@ import org.apache.fluss.exception.InvalidAlterTableException; import org.apache.fluss.exception.InvalidConfigException; import org.apache.fluss.exception.InvalidTableException; +import org.apache.fluss.flink.FlinkConnectorOptions; import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; @@ -914,7 +915,7 @@ void testGetChangelogVirtualTable() throws Exception { // Verify options are inherited from base table assertThat(changelogTable.getOptions()).containsEntry("bucket.num", "1"); - // Verify $changelog log tables (append-only with +A change type) + // Verify $changelog log tables (append-only with insert change type) tEnv.executeSql("CREATE TABLE log_table_for_changelog (id INT, name STRING)"); CatalogTable logChangelogTable = @@ -935,6 +936,55 @@ void testGetChangelogVirtualTable() throws Exception { assertThat(logChangelogTable.getUnresolvedSchema()).isEqualTo(expectedLogSchema); } + @Test + void testGetBinlogVirtualTable() throws Exception { + // Create a primary key table with partition + tEnv.executeSql( + "CREATE TABLE pk_table_for_binlog (" + + " id INT NOT NULL," + + " name STRING NOT NULL," + + " amount BIGINT," + + " PRIMARY KEY (id, name) NOT ENFORCED" + + ") PARTITIONED BY (name) " + + "WITH ('bucket.num' = '1')"); + + // Get the $binlog virtual table via catalog API + CatalogTable binlogTable = + (CatalogTable) + catalog.getTable(new ObjectPath(DEFAULT_DB, "pk_table_for_binlog$binlog")); + + // use string representation for assertion to simplify the unresolved schema comparison + assertThat(binlogTable.getUnresolvedSchema().toString()) + .isEqualTo( + "(\n" + + " `_change_type` STRING NOT NULL,\n" + + " `_log_offset` BIGINT NOT NULL,\n" + + " `_commit_timestamp` TIMESTAMP_LTZ(3) NOT NULL,\n" + + " `before` [ROW],\n" + + " `after` [ROW]\n" + + ")"); + + // Binlog virtual tables have empty partition keys (columns are nested) + assertThat(binlogTable.getPartitionKeys()).isEmpty(); + + // Partition info is stored as an internal boolean flag + assertThat(binlogTable.getOptions()) + .containsEntry(FlinkConnectorOptions.INTERNAL_BINLOG_IS_PARTITIONED.key(), "true"); + + // Verify options are inherited from base table + assertThat(binlogTable.getOptions()).containsEntry("bucket.num", "1"); + + // Verify $binlog is NOT supported for log tables (no primary key) + tEnv.executeSql("CREATE TABLE log_table_for_binlog (id INT, name STRING)"); + + assertThatThrownBy( + () -> + catalog.getTable( + new ObjectPath(DEFAULT_DB, "log_table_for_binlog$binlog"))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("only supported for primary key tables"); + } + /** * Before Flink 2.1, the {@link Schema} did not include an index field. Starting from Flink 2.1, * Flink introduced the concept of an index, and in Fluss, the primary key is considered as an diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java new file mode 100644 index 0000000000..2fa306f886 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.client.table.Table; +import org.apache.fluss.client.table.writer.UpsertWriter; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.server.testutils.FlussClusterExtension; +import org.apache.fluss.utils.clock.ManualClock; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; +import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout; +import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows; +import static org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE; +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Integration test for $binlog virtual table functionality. */ +abstract class BinlogVirtualTableITCase extends AbstractTestBase { + + protected static final ManualClock CLOCK = new ManualClock(); + + @RegisterExtension + public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = + FlussClusterExtension.builder() + .setClusterConf(new Configuration()) + .setNumOfTabletServers(1) + .setClock(CLOCK) + .build(); + + static final String CATALOG_NAME = "testcatalog"; + static final String DEFAULT_DB = "test_binlog_db"; + protected StreamExecutionEnvironment execEnv; + protected StreamTableEnvironment tEnv; + protected static Connection conn; + protected static Admin admin; + + protected static Configuration clientConf; + + @BeforeAll + protected static void beforeAll() { + clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig(); + conn = ConnectionFactory.createConnection(clientConf); + admin = conn.getAdmin(); + } + + @BeforeEach + void before() { + // Initialize Flink environment + execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + tEnv = StreamTableEnvironment.create(execEnv, EnvironmentSettings.inStreamingMode()); + + // Initialize catalog and database + String bootstrapServers = String.join(",", clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS)); + tEnv.executeSql( + String.format( + "create catalog %s with ('type' = 'fluss', '%s' = '%s')", + CATALOG_NAME, BOOTSTRAP_SERVERS.key(), bootstrapServers)); + tEnv.executeSql("use catalog " + CATALOG_NAME); + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + tEnv.executeSql("create database " + DEFAULT_DB); + tEnv.useDatabase(DEFAULT_DB); + // reset clock before each test + CLOCK.advanceTime(-CLOCK.milliseconds(), TimeUnit.MILLISECONDS); + } + + @AfterEach + void after() { + tEnv.useDatabase(BUILTIN_DATABASE); + tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB)); + } + + /** Deletes rows from a primary key table using the proper delete API. */ + protected static void deleteRows( + Connection connection, TablePath tablePath, List rows) throws Exception { + try (Table table = connection.getTable(tablePath)) { + UpsertWriter writer = table.newUpsert().createWriter(); + for (InternalRow row : rows) { + writer.delete(row); + } + writer.flush(); + } + } + + @Test + public void testDescribeBinlogTable() throws Exception { + // Create a table with various data types to test complex schema + tEnv.executeSql( + "CREATE TABLE describe_test (" + + " id INT NOT NULL," + + " name STRING," + + " amount BIGINT," + + " PRIMARY KEY (id) NOT ENFORCED" + + ")"); + + // Test DESCRIBE on binlog virtual table + CloseableIterator describeResult = + tEnv.executeSql("DESCRIBE describe_test$binlog").collect(); + + List schemaRows = new ArrayList<>(); + while (describeResult.hasNext()) { + schemaRows.add(describeResult.next().toString()); + } + + // Should have 5 columns: _change_type, _log_offset, _commit_timestamp, before, after + assertThat(schemaRows).hasSize(5); + + // Verify metadata columns are listed first + assertThat(schemaRows.get(0)) + .isEqualTo("+I[_change_type, STRING, false, null, null, null]"); + assertThat(schemaRows.get(1)).isEqualTo("+I[_log_offset, BIGINT, false, null, null, null]"); + assertThat(schemaRows.get(2)) + .isEqualTo("+I[_commit_timestamp, TIMESTAMP_LTZ(3), false, null, null, null]"); + + // Verify before and after are ROW types with original columns + assertThat(schemaRows.get(3)) + .isEqualTo( + "+I[before, ROW<`id` INT NOT NULL, `name` STRING, `amount` BIGINT>, true, null, null, null]"); + assertThat(schemaRows.get(4)) + .isEqualTo( + "+I[after, ROW<`id` INT NOT NULL, `name` STRING, `amount` BIGINT>, true, null, null, null]"); + } + + @Test + public void testBinlogUnsupportedForLogTable() throws Exception { + // Create a log table (no primary key) + tEnv.executeSql( + "CREATE TABLE log_table (" + + " event_id INT," + + " event_type STRING" + + ") WITH ('bucket.num' = '1')"); + + // $binlog should fail for log tables + assertThatThrownBy(() -> tEnv.executeSql("DESCRIBE log_table$binlog").collect()) + .hasMessageContaining("only supported for primary key tables"); + } + + @Test + public void testBinlogWithAllChangeTypes() throws Exception { + // Create a primary key table with 1 bucket for consistent log_offset numbers + tEnv.executeSql( + "CREATE TABLE binlog_test (" + + " id INT NOT NULL," + + " name STRING," + + " amount BIGINT," + + " PRIMARY KEY (id) NOT ENFORCED" + + ") WITH ('bucket.num' = '1')"); + + TablePath tablePath = TablePath.of(DEFAULT_DB, "binlog_test"); + + // Start binlog scan + String query = + "SELECT _change_type, _log_offset, " + + "before.id, before.name, before.amount, " + + "after.id, after.name, after.amount " + + "FROM binlog_test$binlog"; + CloseableIterator rowIter = tEnv.executeSql(query).collect(); + + // Test INSERT + CLOCK.advanceTime(Duration.ofMillis(1000)); + writeRows( + conn, + tablePath, + Arrays.asList(row(1, "Item-1", 100L), row(2, "Item-2", 200L)), + false); + + // Collect inserts - each INSERT produces one binlog row + List insertResults = collectRowsWithTimeout(rowIter, 2, false); + assertThat(insertResults).hasSize(2); + + // INSERT: before=null, after=row data + // Format: +I[_change_type, _log_offset, before.id, before.name, before.amount, + // after.id, after.name, after.amount] + assertThat(insertResults.get(0)) + .isEqualTo("+I[insert, 0, null, null, null, 1, Item-1, 100]"); + assertThat(insertResults.get(1)) + .isEqualTo("+I[insert, 1, null, null, null, 2, Item-2, 200]"); + + // Test UPDATE - should merge -U and +U into single binlog row + CLOCK.advanceTime(Duration.ofMillis(1000)); + writeRows(conn, tablePath, Arrays.asList(row(1, "Item-1-Updated", 150L)), false); + + // UPDATE produces ONE binlog row (not two like changelog) + List updateResults = collectRowsWithTimeout(rowIter, 1, false); + assertThat(updateResults).hasSize(1); + + // UPDATE: before=old row, after=new row, offset=from -U record + assertThat(updateResults.get(0)) + .isEqualTo("+I[update, 2, 1, Item-1, 100, 1, Item-1-Updated, 150]"); + + // Test DELETE + CLOCK.advanceTime(Duration.ofMillis(1000)); + deleteRows(conn, tablePath, Arrays.asList(row(2, "Item-2", 200L))); + + // DELETE produces one binlog row + List deleteResults = collectRowsWithTimeout(rowIter, 1, true); + assertThat(deleteResults).hasSize(1); + + // DELETE: before=row data, after=null + assertThat(deleteResults.get(0)) + .isEqualTo("+I[delete, 4, 2, Item-2, 200, null, null, null]"); + } + + @Test + public void testBinlogSelectStar() throws Exception { + // Test SELECT * which returns the full binlog structure + tEnv.executeSql( + "CREATE TABLE star_test (" + + " id INT NOT NULL," + + " name STRING," + + " PRIMARY KEY (id) NOT ENFORCED" + + ") WITH ('bucket.num' = '1')"); + + TablePath tablePath = TablePath.of(DEFAULT_DB, "star_test"); + + String query = "SELECT * FROM star_test$binlog"; + CloseableIterator rowIter = tEnv.executeSql(query).collect(); + + // Insert a row + CLOCK.advanceTime(Duration.ofMillis(1000)); + writeRows(conn, tablePath, Arrays.asList(row(1, "Alice")), false); + + List results = collectRowsWithTimeout(rowIter, 1, true); + assertThat(results).hasSize(1); + + // SELECT * returns: _change_type, _log_offset, _commit_timestamp, before, after + // before is null for INSERT, after contains the row + assertThat(results.get(0)) + .isEqualTo("+I[insert, 0, 1970-01-01T00:00:01Z, null, +I[1, Alice]]"); + } + + @Test + public void testBinlogWithPartitionedTable() throws Exception { + // Create a partitioned primary key table + tEnv.executeSql( + "CREATE TABLE partitioned_binlog_test (" + + " id INT NOT NULL," + + " name STRING," + + " region STRING NOT NULL," + + " PRIMARY KEY (id, region) NOT ENFORCED" + + ") PARTITIONED BY (region) WITH ('bucket.num' = '1')"); + + // Insert data into different partitions using Flink SQL + CLOCK.advanceTime(Duration.ofMillis(100)); + tEnv.executeSql( + "INSERT INTO partitioned_binlog_test VALUES " + + "(1, 'Item-1', 'us'), " + + "(2, 'Item-2', 'eu')") + .await(); + + // Query binlog with nested field access + String query = + "SELECT _change_type, after.id, after.name, after.region " + + "FROM partitioned_binlog_test$binlog"; + CloseableIterator rowIter = tEnv.executeSql(query).collect(); + + List results = collectRowsWithTimeout(rowIter, 2, false); + // Sort results for deterministic assertion (partitions may return in any order) + Collections.sort(results); + assertThat(results) + .isEqualTo(Arrays.asList("+I[insert, 1, Item-1, us]", "+I[insert, 2, Item-2, eu]")); + + // Update a record in a specific partition + CLOCK.advanceTime(Duration.ofMillis(100)); + tEnv.executeSql("INSERT INTO partitioned_binlog_test VALUES (1, 'Item-1-Updated', 'us')") + .await(); + + List updateResults = collectRowsWithTimeout(rowIter, 1, true); + assertThat(updateResults).hasSize(1); + assertThat(updateResults.get(0)).isEqualTo("+I[update, 1, Item-1-Updated, us]"); + } + + @Test + public void testBinlogScanStartupMode() throws Exception { + // Create a primary key table with 1 bucket + tEnv.executeSql( + "CREATE TABLE startup_binlog_test (" + + " id INT NOT NULL," + + " name STRING," + + " PRIMARY KEY (id) NOT ENFORCED" + + ") WITH ('bucket.num' = '1')"); + + TablePath tablePath = TablePath.of(DEFAULT_DB, "startup_binlog_test"); + + // Write first batch + CLOCK.advanceTime(Duration.ofMillis(100)); + writeRows(conn, tablePath, Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3")), false); + + // Write second batch + CLOCK.advanceTime(Duration.ofMillis(100)); + writeRows(conn, tablePath, Arrays.asList(row(4, "v4"), row(5, "v5")), false); + + // Test scan.startup.mode='earliest' - should read all records from beginning + String optionsEarliest = " /*+ OPTIONS('scan.startup.mode' = 'earliest') */"; + String queryEarliest = + "SELECT _change_type, after.id, after.name FROM startup_binlog_test$binlog" + + optionsEarliest; + CloseableIterator rowIterEarliest = tEnv.executeSql(queryEarliest).collect(); + List earliestResults = collectRowsWithTimeout(rowIterEarliest, 5, true); + assertThat(earliestResults) + .isEqualTo( + Arrays.asList( + "+I[insert, 1, v1]", + "+I[insert, 2, v2]", + "+I[insert, 3, v3]", + "+I[insert, 4, v4]", + "+I[insert, 5, v5]")); + + // Test scan.startup.mode='timestamp' - should read from specific timestamp + String optionsTimestamp = + " /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '150') */"; + String queryTimestamp = + "SELECT _change_type, after.id, after.name FROM startup_binlog_test$binlog" + + optionsTimestamp; + CloseableIterator rowIterTimestamp = tEnv.executeSql(queryTimestamp).collect(); + List timestampResults = collectRowsWithTimeout(rowIterTimestamp, 2, true); + assertThat(timestampResults) + .isEqualTo(Arrays.asList("+I[insert, 4, v4]", "+I[insert, 5, v5]")); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java index 2b9111af89..342772996b 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java @@ -97,7 +97,7 @@ void before() { "create catalog %s with ('type' = 'fluss', '%s' = '%s')", CATALOG_NAME, BOOTSTRAP_SERVERS.key(), bootstrapServers)); tEnv.executeSql("use catalog " + CATALOG_NAME); - tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); tEnv.executeSql("create database " + DEFAULT_DB); tEnv.useDatabase(DEFAULT_DB); // reset clock before each test @@ -220,16 +220,17 @@ public void testChangelogVirtualTableWithLogTable() throws Exception { assertThat(results).hasSize(2); // Format: +I[_change_type, _log_offset, _commit_timestamp, event_id, event_type] - // Log tables use +A (append-only) change type - assertThat(results.get(0)).isEqualTo("+I[+A, 0, 1970-01-01T00:00:01Z, 1, click]"); - assertThat(results.get(1)).isEqualTo("+I[+A, 1, 1970-01-01T00:00:01Z, 2, view]"); + // Log tables use insert (append-only) change type + assertThat(results.get(0)).isEqualTo("+I[insert, 0, 1970-01-01T00:00:01Z, 1, click]"); + assertThat(results.get(1)).isEqualTo("+I[insert, 1, 1970-01-01T00:00:01Z, 2, view]"); // Insert more data with new timestamp CLOCK.advanceTime(Duration.ofMillis(1000)); writeRows(conn, tablePath, Arrays.asList(row(3, "purchase")), true); List moreResults = collectRowsWithTimeout(rowIter, 1, true); - assertThat(moreResults.get(0)).isEqualTo("+I[+A, 2, 1970-01-01T00:00:02Z, 3, purchase]"); + assertThat(moreResults.get(0)) + .isEqualTo("+I[insert, 2, 1970-01-01T00:00:02Z, 3, purchase]"); } @Test @@ -254,7 +255,7 @@ public void testProjectionOnChangelogTable() throws Exception { CLOCK.advanceTime(Duration.ofMillis(100)); writeRows(conn, tablePath, Arrays.asList(row(1, "Item-1", 100L, "Desc-1")), false); List insertResult = collectRowsWithTimeout(rowIter, 1, false); - assertThat(insertResult.get(0)).isEqualTo("+I[+I, 1, Item-1]"); + assertThat(insertResult.get(0)).isEqualTo("+I[insert, 1, Item-1]"); // Test UPDATE CLOCK.advanceTime(Duration.ofMillis(100)); @@ -264,15 +265,15 @@ public void testProjectionOnChangelogTable() throws Exception { Arrays.asList(row(1, "Item-1-Updated", 150L, "Desc-1-Updated")), false); List updateResults = collectRowsWithTimeout(rowIter, 2, false); - assertThat(updateResults.get(0)).isEqualTo("+I[-U, 1, Item-1]"); - assertThat(updateResults.get(1)).isEqualTo("+I[+U, 1, Item-1-Updated]"); + assertThat(updateResults.get(0)).isEqualTo("+I[update_before, 1, Item-1]"); + assertThat(updateResults.get(1)).isEqualTo("+I[update_after, 1, Item-1-Updated]"); // Test DELETE CLOCK.advanceTime(Duration.ofMillis(100)); deleteRows( conn, tablePath, Arrays.asList(row(1, "Item-1-Updated", 150L, "Desc-1-Updated"))); List deleteResult = collectRowsWithTimeout(rowIter, 1, true); - assertThat(deleteResult.get(0)).isEqualTo("+I[-D, 1, Item-1-Updated]"); + assertThat(deleteResult.get(0)).isEqualTo("+I[delete, 1, Item-1-Updated]"); } @Test @@ -304,20 +305,20 @@ public void testChangelogScanWithAllChangeTypes() throws Exception { // With ManualClock and 1 bucket, we can assert exact row values // Format: +I[_change_type, _log_offset, _commit_timestamp, id, name, amount] - assertThat(results.get(0)).isEqualTo("+I[+I, 0, 1970-01-01T00:00:01Z, 1, Item-1, 100]"); - assertThat(results.get(1)).isEqualTo("+I[+I, 1, 1970-01-01T00:00:01Z, 2, Item-2, 200]"); + assertThat(results.get(0)).isEqualTo("+I[insert, 0, 1970-01-01T00:00:01Z, 1, Item-1, 100]"); + assertThat(results.get(1)).isEqualTo("+I[insert, 1, 1970-01-01T00:00:01Z, 2, Item-2, 200]"); // Test UPDATE operation with new timestamp CLOCK.advanceTime(Duration.ofMillis(1000)); writeRows(conn, tablePath, Arrays.asList(row(1, "Item-1-Updated", 150L)), false); - // Collect update records (should get -U and +U) + // Collect update records (should get update_before and update_after) List updateResults = collectRowsWithTimeout(rowIter, 2, false); assertThat(updateResults).hasSize(2); assertThat(updateResults.get(0)) - .isEqualTo("+I[-U, 2, 1970-01-01T00:00:02Z, 1, Item-1, 100]"); + .isEqualTo("+I[update_before, 2, 1970-01-01T00:00:02Z, 1, Item-1, 100]"); assertThat(updateResults.get(1)) - .isEqualTo("+I[+U, 3, 1970-01-01T00:00:02Z, 1, Item-1-Updated, 150]"); + .isEqualTo("+I[update_after, 3, 1970-01-01T00:00:02Z, 1, Item-1-Updated, 150]"); // Test DELETE operation with new timestamp CLOCK.advanceTime(Duration.ofMillis(1000)); @@ -327,7 +328,7 @@ public void testChangelogScanWithAllChangeTypes() throws Exception { List deleteResult = collectRowsWithTimeout(rowIter, 1, true); assertThat(deleteResult).hasSize(1); assertThat(deleteResult.get(0)) - .isEqualTo("+I[-D, 4, 1970-01-01T00:00:03Z, 2, Item-2, 200]"); + .isEqualTo("+I[delete, 4, 1970-01-01T00:00:03Z, 2, Item-2, 200]"); } @Test @@ -361,7 +362,7 @@ public void testChangelogWithScanStartupMode() throws Exception { assertThat(earliestResults).hasSize(5); // All should be INSERT change types for (String result : earliestResults) { - assertThat(result).startsWith("+I[+I,"); + assertThat(result).startsWith("+I[insert,"); } // 2. Test scan.startup.mode='timestamp' - should read records from specific timestamp @@ -374,9 +375,9 @@ public void testChangelogWithScanStartupMode() throws Exception { assertThat(timestampResults).hasSize(2); // Should contain records from batch2 only assertThat(timestampResults) - .containsExactlyInAnyOrder( - "+I[+I, 3, 1970-01-01T00:00:00.200Z, 4, v4]", - "+I[+I, 4, 1970-01-01T00:00:00.200Z, 5, v5]"); + .containsExactly( + "+I[insert, 3, 1970-01-01T00:00:00.200Z, 4, v4]", + "+I[insert, 4, 1970-01-01T00:00:00.200Z, 5, v5]"); } @Test @@ -406,15 +407,19 @@ public void testChangelogWithPartitionedTable() throws Exception { // Collect initial inserts List results = collectRowsWithTimeout(rowIter, 3, false); assertThat(results) - .containsExactlyInAnyOrder( - "+I[+I, 1, Item-1, us]", "+I[+I, 2, Item-2, us]", "+I[+I, 3, Item-3, eu]"); + .containsExactly( + "+I[insert, 1, Item-1, us]", + "+I[insert, 2, Item-2, us]", + "+I[insert, 3, Item-3, eu]"); // Update a record in a specific partition CLOCK.advanceTime(Duration.ofMillis(100)); tEnv.executeSql("INSERT INTO partitioned_test VALUES (1, 'Item-1-Updated', 'us')").await(); List updateResults = collectRowsWithTimeout(rowIter, 2, false); assertThat(updateResults) - .containsExactly("+I[-U, 1, Item-1, us]", "+I[+U, 1, Item-1-Updated, us]"); + .containsExactly( + "+I[update_before, 1, Item-1, us]", + "+I[update_after, 1, Item-1-Updated, us]"); rowIter.close(); } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/BinlogRowConverterTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/BinlogRowConverterTest.java new file mode 100644 index 0000000000..a09dd7c419 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/BinlogRowConverterTest.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.utils; + +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.record.GenericRecord; +import org.apache.fluss.record.LogRecord; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.indexed.IndexedRow; +import org.apache.fluss.row.indexed.IndexedRowWriter; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.types.RowType; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Unit test for {@link BinlogRowConverter}. */ +class BinlogRowConverterTest { + + private RowType testRowType; + private BinlogRowConverter converter; + + @BeforeEach + void setUp() { + // Create a simple test table schema: (id INT, name STRING, amount BIGINT) + testRowType = + RowType.builder() + .field("id", DataTypes.INT()) + .field("name", DataTypes.STRING()) + .field("amount", DataTypes.BIGINT()) + .build(); + + converter = new BinlogRowConverter(testRowType); + } + + @Test + void testConvertInsertRecord() throws Exception { + LogRecord record = createLogRecord(ChangeType.INSERT, 100L, 1000L, 1, "Alice", 5000L); + + RowData result = converter.convert(record); + + // Verify row kind is always INSERT for virtual tables + assertThat(result).isNotNull(); + assertThat(result.getRowKind()).isEqualTo(RowKind.INSERT); + + // Verify metadata columns + assertThat(result.getString(0)).isEqualTo(StringData.fromString("insert")); + assertThat(result.getLong(1)).isEqualTo(100L); // log offset + assertThat(result.getTimestamp(2, 3)).isEqualTo(TimestampData.fromEpochMillis(1000L)); + + // Verify before is null for INSERT + assertThat(result.isNullAt(3)).isTrue(); + + // Verify after contains the row data + RowData afterRow = result.getRow(4, 3); + assertThat(afterRow).isNotNull(); + assertThat(afterRow.getInt(0)).isEqualTo(1); // id + assertThat(afterRow.getString(1).toString()).isEqualTo("Alice"); // name + assertThat(afterRow.getLong(2)).isEqualTo(5000L); // amount + } + + @Test + void testConvertUpdateMerge() throws Exception { + // Send -U (UPDATE_BEFORE) - should return null (buffered) + LogRecord beforeRecord = + createLogRecord(ChangeType.UPDATE_BEFORE, 200L, 2000L, 2, "Bob", 3000L); + RowData beforeResult = converter.convert(beforeRecord); + assertThat(beforeResult).isNull(); + + // Send +U (UPDATE_AFTER) - should return merged row + LogRecord afterRecord = + createLogRecord(ChangeType.UPDATE_AFTER, 201L, 2000L, 2, "Bob-Updated", 4000L); + RowData result = converter.convert(afterRecord); + + assertThat(result).isNotNull(); + assertThat(result.getRowKind()).isEqualTo(RowKind.INSERT); + + // Verify metadata columns + assertThat(result.getString(0)).isEqualTo(StringData.fromString("update")); + // Offset and timestamp should be from the -U record (first entry of update pair) + assertThat(result.getLong(1)).isEqualTo(200L); + assertThat(result.getTimestamp(2, 3)).isEqualTo(TimestampData.fromEpochMillis(2000L)); + + // Verify before contains old data + RowData beforeRow = result.getRow(3, 3); + assertThat(beforeRow).isNotNull(); + assertThat(beforeRow.getInt(0)).isEqualTo(2); + assertThat(beforeRow.getString(1).toString()).isEqualTo("Bob"); + assertThat(beforeRow.getLong(2)).isEqualTo(3000L); + + // Verify after contains new data + RowData afterRow = result.getRow(4, 3); + assertThat(afterRow).isNotNull(); + assertThat(afterRow.getInt(0)).isEqualTo(2); + assertThat(afterRow.getString(1).toString()).isEqualTo("Bob-Updated"); + assertThat(afterRow.getLong(2)).isEqualTo(4000L); + } + + @Test + void testConvertDeleteRecord() throws Exception { + LogRecord record = createLogRecord(ChangeType.DELETE, 300L, 3000L, 3, "Charlie", 1000L); + + RowData result = converter.convert(record); + + assertThat(result).isNotNull(); + assertThat(result.getRowKind()).isEqualTo(RowKind.INSERT); + + // Verify metadata columns + assertThat(result.getString(0)).isEqualTo(StringData.fromString("delete")); + assertThat(result.getLong(1)).isEqualTo(300L); + assertThat(result.getTimestamp(2, 3)).isEqualTo(TimestampData.fromEpochMillis(3000L)); + + // Verify before contains the deleted row data + RowData beforeRow = result.getRow(3, 3); + assertThat(beforeRow).isNotNull(); + assertThat(beforeRow.getInt(0)).isEqualTo(3); + assertThat(beforeRow.getString(1).toString()).isEqualTo("Charlie"); + assertThat(beforeRow.getLong(2)).isEqualTo(1000L); + + // Verify after is null for DELETE + assertThat(result.isNullAt(4)).isTrue(); + } + + @Test + void testUpdateBeforeReturnsNull() throws Exception { + LogRecord record = + createLogRecord(ChangeType.UPDATE_BEFORE, 400L, 4000L, 4, "Diana", 2000L); + + // UPDATE_BEFORE should return null (buffered for merging) + RowData result = converter.convert(record); + assertThat(result).isNull(); + } + + @Test + void testUpdateAfterWithoutBeforeThrows() throws Exception { + // Sending +U without a preceding -U should throw + LogRecord record = createLogRecord(ChangeType.UPDATE_AFTER, 500L, 5000L, 5, "Eve", 6000L); + + assertThatThrownBy(() -> converter.convert(record)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("UPDATE_AFTER (+U) without a preceding UPDATE_BEFORE (-U)"); + } + + @Test + void testAppendOnlyUnsupported() throws Exception { + LogRecord record = createLogRecord(ChangeType.APPEND_ONLY, 600L, 6000L, 6, "Frank", 7000L); + + assertThatThrownBy(() -> converter.convert(record)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("$binlog virtual table does not support change type"); + } + + @Test + void testProducedTypeHasNestedRowColumns() { + org.apache.flink.table.types.logical.RowType producedType = converter.getProducedType(); + + // Should have 5 columns: _change_type, _log_offset, _commit_timestamp, before, after + assertThat(producedType.getFieldCount()).isEqualTo(5); + + // Check column names + assertThat(producedType.getFieldNames()) + .containsExactly( + "_change_type", "_log_offset", "_commit_timestamp", "before", "after"); + + // Check metadata column types + assertThat(producedType.getTypeAt(0)) + .isInstanceOf(org.apache.flink.table.types.logical.VarCharType.class); + assertThat(producedType.getTypeAt(1)) + .isInstanceOf(org.apache.flink.table.types.logical.BigIntType.class); + assertThat(producedType.getTypeAt(2)) + .isInstanceOf(org.apache.flink.table.types.logical.LocalZonedTimestampType.class); + + // Check before and after are ROW types + assertThat(producedType.getTypeAt(3)) + .isInstanceOf(org.apache.flink.table.types.logical.RowType.class); + assertThat(producedType.getTypeAt(4)) + .isInstanceOf(org.apache.flink.table.types.logical.RowType.class); + + // Check nested ROW has the original columns + org.apache.flink.table.types.logical.RowType beforeType = + (org.apache.flink.table.types.logical.RowType) producedType.getTypeAt(3); + assertThat(beforeType.getFieldNames()).containsExactly("id", "name", "amount"); + + // before/after ROW types should be nullable + assertThat(producedType.getTypeAt(3).isNullable()).isTrue(); + assertThat(producedType.getTypeAt(4).isNullable()).isTrue(); + } + + @Test + void testMultipleUpdatesInSequence() throws Exception { + // First update pair + converter.convert(createLogRecord(ChangeType.UPDATE_BEFORE, 10L, 1000L, 1, "A", 100L)); + RowData result1 = + converter.convert( + createLogRecord(ChangeType.UPDATE_AFTER, 11L, 1000L, 1, "B", 200L)); + assertThat(result1).isNotNull(); + assertThat(result1.getString(0)).isEqualTo(StringData.fromString("update")); + + // Second update pair (converter state should be clean after first merge) + converter.convert(createLogRecord(ChangeType.UPDATE_BEFORE, 20L, 2000L, 1, "B", 200L)); + RowData result2 = + converter.convert( + createLogRecord(ChangeType.UPDATE_AFTER, 21L, 2000L, 1, "C", 300L)); + assertThat(result2).isNotNull(); + assertThat(result2.getString(0)).isEqualTo(StringData.fromString("update")); + assertThat(result2.getLong(1)).isEqualTo(20L); // offset from second -U + } + + private LogRecord createLogRecord( + ChangeType changeType, long offset, long timestamp, int id, String name, long amount) + throws Exception { + // Create an IndexedRow with test data + IndexedRow row = new IndexedRow(testRowType.getChildren().toArray(new DataType[0])); + try (IndexedRowWriter writer = + new IndexedRowWriter(testRowType.getChildren().toArray(new DataType[0]))) { + writer.writeInt(id); + writer.writeString(BinaryString.fromString(name)); + writer.writeLong(amount); + writer.complete(); + + row.pointTo(writer.segment(), 0, writer.position()); + + return new GenericRecord(offset, timestamp, changeType, row); + } + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/ChangelogRowConverterTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/ChangelogRowConverterTest.java index c569ba86dd..55f83a5149 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/ChangelogRowConverterTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/ChangelogRowConverterTest.java @@ -65,7 +65,7 @@ void testConvertInsertRecord() throws Exception { assertThat(result.getRowKind()).isEqualTo(RowKind.INSERT); // Verify metadata columns - assertThat(result.getString(0)).isEqualTo(StringData.fromString("+I")); + assertThat(result.getString(0)).isEqualTo(StringData.fromString("insert")); assertThat(result.getLong(1)).isEqualTo(100L); // log offset assertThat(result.getTimestamp(2, 3)).isNotNull(); // commit timestamp @@ -88,7 +88,7 @@ void testConvertUpdateBeforeRecord() throws Exception { assertThat(result.getRowKind()).isEqualTo(RowKind.INSERT); // Verify change type metadata - assertThat(result.getString(0)).isEqualTo(StringData.fromString("-U")); + assertThat(result.getString(0)).isEqualTo(StringData.fromString("update_before")); assertThat(result.getLong(1)).isEqualTo(200L); // Verify physical columns @@ -103,7 +103,7 @@ void testConvertUpdateAfterRecord() throws Exception { RowData result = converter.convert(record); - assertThat(result.getString(0)).isEqualTo(StringData.fromString("+U")); + assertThat(result.getString(0)).isEqualTo(StringData.fromString("update_after")); assertThat(result.getLong(1)).isEqualTo(201L); assertThat(result.getInt(3)).isEqualTo(2); assertThat(result.getString(4).toString()).isEqualTo("Bob"); @@ -116,7 +116,7 @@ void testConvertDeleteRecord() throws Exception { RowData result = converter.convert(record); - assertThat(result.getString(0)).isEqualTo(StringData.fromString("-D")); + assertThat(result.getString(0)).isEqualTo(StringData.fromString("delete")); assertThat(result.getLong(1)).isEqualTo(300L); assertThat(result.getInt(3)).isEqualTo(3); assertThat(result.getString(4).toString()).isEqualTo("Charlie"); @@ -151,7 +151,7 @@ void testAllChangeTypes() throws Exception { converter .convert(createLogRecord(ChangeType.INSERT, 1L, 1, "Test", 100L)) .getString(0)) - .isEqualTo(StringData.fromString("+I")); + .isEqualTo(StringData.fromString("insert")); assertThat( converter @@ -159,7 +159,7 @@ void testAllChangeTypes() throws Exception { createLogRecord( ChangeType.UPDATE_BEFORE, 2L, 1, "Test", 100L)) .getString(0)) - .isEqualTo(StringData.fromString("-U")); + .isEqualTo(StringData.fromString("update_before")); assertThat( converter @@ -167,13 +167,13 @@ void testAllChangeTypes() throws Exception { createLogRecord( ChangeType.UPDATE_AFTER, 3L, 1, "Test", 100L)) .getString(0)) - .isEqualTo(StringData.fromString("+U")); + .isEqualTo(StringData.fromString("update_after")); assertThat( converter .convert(createLogRecord(ChangeType.DELETE, 4L, 1, "Test", 100L)) .getString(0)) - .isEqualTo(StringData.fromString("-D")); + .isEqualTo(StringData.fromString("delete")); // For log tables (append-only) assertThat( @@ -182,7 +182,7 @@ void testAllChangeTypes() throws Exception { createLogRecord( ChangeType.APPEND_ONLY, 5L, 1, "Test", 100L)) .getString(0)) - .isEqualTo(StringData.fromString("+A")); + .isEqualTo(StringData.fromString("insert")); } private LogRecord createLogRecord(