Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public final class TableDescriptor implements Serializable {
public static final String LOG_OFFSET_COLUMN = "_log_offset";
public static final String COMMIT_TIMESTAMP_COLUMN = "_commit_timestamp";

// column names for $binlog virtual table nested row fields
public static final String BEFORE_COLUMN = "before";
public static final String AFTER_COLUMN = "after";

private final Schema schema;
private final @Nullable String comment;
private final List<String> partitionKeys;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.source;

/** IT case for {@link BinlogVirtualTableITCase} in Flink 1.18. */
public class Flink118BinlogVirtualTableITCase extends BinlogVirtualTableITCase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.source;

/** IT case for {@link BinlogVirtualTableITCase} in Flink 1.19. */
public class Flink119BinlogVirtualTableITCase extends BinlogVirtualTableITCase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.source;

/** IT case for {@link BinlogVirtualTableITCase} in Flink 1.20. */
public class Flink120BinlogVirtualTableITCase extends BinlogVirtualTableITCase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.source;

/** IT case for {@link BinlogVirtualTableITCase} in Flink 2.2. */
public class Flink22BinlogVirtualTableITCase extends BinlogVirtualTableITCase {}
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,13 @@ public class FlinkConnectorOptions {
.withDescription(
"The serialized base64 bytes of refresh handler of materialized table.");

// ------------------------------------------------------------------------------------------
/** Internal option to indicate whether the base table is partitioned for $binlog sources. */
public static final ConfigOption<Boolean> INTERNAL_BINLOG_IS_PARTITIONED =
ConfigOptions.key("_internal.binlog.is-partitioned")
.booleanType()
.defaultValue(false)
.withDescription(
"Internal option: indicates whether the base table is partitioned for $binlog virtual tables. Not part of public API.");

/** Startup mode for the fluss scanner, see {@link #SCAN_STARTUP_MODE}. */
public enum ScanStartupMode implements DescribedEnum {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.flink.FlinkConnectorOptions;
import org.apache.fluss.flink.adapter.CatalogTableAdapter;
import org.apache.fluss.flink.lake.LakeFlinkCatalog;
import org.apache.fluss.flink.procedure.ProcedureManager;
Expand All @@ -39,6 +40,7 @@
import org.apache.fluss.utils.IOUtils;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
Expand Down Expand Up @@ -73,6 +75,7 @@
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.procedures.Procedure;
import org.apache.flink.table.types.AbstractDataType;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -315,11 +318,7 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
return getVirtualChangelogTable(objectPath);
} else if (tableName.endsWith(BINLOG_TABLE_SUFFIX)
&& !tableName.contains(LAKE_TABLE_SPLITTER)) {
// TODO: Implement binlog virtual table in future
throw new UnsupportedOperationException(
String.format(
"$binlog virtual tables are not yet supported for table %s",
objectPath));
return getVirtualBinlogTable(objectPath);
}

TablePath tablePath = toTablePath(objectPath);
Expand Down Expand Up @@ -960,4 +959,109 @@ private Schema buildChangelogSchema(Schema originalSchema) {

return builder.build();
}

/**
* Creates a virtual $binlog table by modifying the base table's schema to include metadata
* columns and nested before/after ROW fields.
*/
private CatalogBaseTable getVirtualBinlogTable(ObjectPath objectPath)
throws TableNotExistException, CatalogException {
// Extract the base table name (remove $binlog suffix)
String virtualTableName = objectPath.getObjectName();
String baseTableName =
virtualTableName.substring(
0, virtualTableName.length() - BINLOG_TABLE_SUFFIX.length());

// Get the base table
ObjectPath baseObjectPath = new ObjectPath(objectPath.getDatabaseName(), baseTableName);
TablePath baseTablePath = toTablePath(baseObjectPath);

try {
// Retrieve base table info
TableInfo tableInfo = admin.getTableInfo(baseTablePath).get();

// $binlog is only supported for primary key tables
if (!tableInfo.hasPrimaryKey()) {
throw new UnsupportedOperationException(
String.format(
"$binlog virtual tables are only supported for primary key tables. "
+ "Table %s does not have a primary key.",
baseTablePath));
}

// Convert to Flink table
CatalogBaseTable catalogBaseTable = FlinkConversions.toFlinkTable(tableInfo);

if (!(catalogBaseTable instanceof CatalogTable)) {
throw new UnsupportedOperationException(
"Virtual $binlog tables are only supported for regular tables");
}

CatalogTable baseTable = (CatalogTable) catalogBaseTable;

// Build the binlog schema with nested before/after ROW columns
Schema originalSchema = baseTable.getUnresolvedSchema();
Schema binlogSchema = buildBinlogSchema(originalSchema);

// Copy options from base table
Map<String, String> newOptions = new HashMap<>(baseTable.getOptions());
newOptions.put(BOOTSTRAP_SERVERS.key(), bootstrapServers);
newOptions.putAll(securityConfigs);

// Store whether the base table is partitioned for the table source to use.
// Since binlog schema has nested columns, we can't use Flink's partition key mechanism.
newOptions.put(
FlinkConnectorOptions.INTERNAL_BINLOG_IS_PARTITIONED.key(),
String.valueOf(!baseTable.getPartitionKeys().isEmpty()));

// Create a new CatalogTable with the binlog schema
// Binlog virtual tables don't have partition keys at the top level
return CatalogTableAdapter.toCatalogTable(
binlogSchema, baseTable.getComment(), Collections.emptyList(), newOptions);

} catch (Exception e) {
Throwable t = ExceptionUtils.stripExecutionException(e);
if (t instanceof UnsupportedOperationException) {
throw (UnsupportedOperationException) t;
}
if (isTableNotExist(t)) {
throw new TableNotExistException(getName(), baseObjectPath);
} else {
throw new CatalogException(
String.format(
"Failed to get virtual binlog table %s in %s",
objectPath, getName()),
t);
}
}
}

private Schema buildBinlogSchema(Schema originalSchema) {
Schema.Builder builder = Schema.newBuilder();

// Add metadata columns
builder.column("_change_type", STRING().notNull());
builder.column("_log_offset", BIGINT().notNull());
builder.column("_commit_timestamp", TIMESTAMP_LTZ(3).notNull());

// Build nested ROW type from original columns for before/after fields
// Using UnresolvedField since physCol.getDataType() returns AbstractDataType (unresolved)
List<DataTypes.UnresolvedField> rowFields = new ArrayList<>();
for (Schema.UnresolvedColumn col : originalSchema.getColumns()) {
if (col instanceof Schema.UnresolvedPhysicalColumn) {
Schema.UnresolvedPhysicalColumn physCol = (Schema.UnresolvedPhysicalColumn) col;
rowFields.add(DataTypes.FIELD(physCol.getName(), physCol.getDataType()));
}
}
AbstractDataType<?> nestedRowType =
DataTypes.ROW(rowFields.toArray(new DataTypes.UnresolvedField[0]));

// Add before and after as nullable nested ROW columns
builder.column("before", nestedRowType);
builder.column("after", nestedRowType);

// Note: We don't copy primary keys or watermarks for virtual tables

return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.fluss.flink.lake.LakeTableFactory;
import org.apache.fluss.flink.sink.FlinkTableSink;
import org.apache.fluss.flink.sink.shuffle.DistributionMode;
import org.apache.fluss.flink.source.BinlogFlinkTableSource;
import org.apache.fluss.flink.source.ChangelogFlinkTableSource;
import org.apache.fluss.flink.source.FlinkTableSource;
import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
Expand Down Expand Up @@ -93,6 +94,11 @@ public DynamicTableSource createDynamicTableSource(Context context) {
return createChangelogTableSource(context, tableIdentifier, tableName);
}

// Check if this is a $binlog suffix in table name
if (tableName.endsWith(FlinkCatalog.BINLOG_TABLE_SUFFIX)) {
return createBinlogTableSource(context, tableIdentifier, tableName);
}

FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig tableOptions = helper.getOptions();
validateSourceOptions(tableOptions);
Expand Down Expand Up @@ -327,4 +333,51 @@ private DynamicTableSource createChangelogTableSource(
partitionDiscoveryIntervalMs,
catalogTableOptions);
}

/** Creates a BinlogFlinkTableSource for $binlog virtual tables. */
private DynamicTableSource createBinlogTableSource(
Context context, ObjectIdentifier tableIdentifier, String tableName) {
// Extract the base table name by removing the $binlog suffix
String baseTableName =
tableName.substring(
0, tableName.length() - FlinkCatalog.BINLOG_TABLE_SUFFIX.length());

boolean isStreamingMode =
context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;

// tableOutputType: [_change_type, _log_offset, _commit_timestamp, before ROW<...>, after
// ROW<...>]
RowType tableOutputType = (RowType) context.getPhysicalRowDataType().getLogicalType();

Map<String, String> catalogTableOptions = context.getCatalogTable().getOptions();
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig tableOptions = helper.getOptions();
validateSourceOptions(tableOptions);

ZoneId timeZone =
FlinkConnectorOptionsUtils.getLocalTimeZone(
context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE));
final FlinkConnectorOptionsUtils.StartupOptions startupOptions =
FlinkConnectorOptionsUtils.getStartupOptions(tableOptions, timeZone);

// Check if the table is partitioned from the internal option
boolean isPartitioned =
tableOptions.get(FlinkConnectorOptions.INTERNAL_BINLOG_IS_PARTITIONED);

long partitionDiscoveryIntervalMs =
tableOptions
.get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL)
.toMillis();

return new BinlogFlinkTableSource(
TablePath.of(tableIdentifier.getDatabaseName(), baseTableName),
toFlussClientConfig(catalogTableOptions, context.getConfiguration()),
tableOutputType,
isPartitioned,
isStreamingMode,
startupOptions,
partitionDiscoveryIntervalMs,
catalogTableOptions);
}
}
Loading