buildEn
options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis(),
options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
options.get(FlinkConnectorOptions.READ_SHUFFLE_BUCKET_WITH_PARTITION),
- options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT));
+ options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT),
+ enumeratorMetrics);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceEnumeratorMetrics.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceEnumeratorMetrics.java
new file mode 100644
index 000000000000..e97e5f5e6fe6
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceEnumeratorMetrics.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.metrics;
+
+import org.apache.paimon.CoreOptions;
+
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+
+import java.util.Map;
+
+/**
+ * Source enumerator metrics.
+ *
+ * This class manages metrics for the source split enumerator.
+ */
+public class FileStoreSourceEnumeratorMetrics {
+
+ /**
+ * Metric name for source scaling max parallelism. This metric provides a recommended upper
+ * bound of parallelism for auto-scaling systems. For fixed bucket tables, this equals the
+ * bucket number; for dynamic bucket tables, this equals the current parallelism. Note: This is
+ * a recommendation, not a hard limit - users can configure higher parallelism manually if
+ * needed.
+ */
+ public static final String SCALING_MAX_PARALLELISM = "sourceScalingMaxParallelism";
+
+ private final int scalingMaxParallelism;
+
+ /**
+ * Creates enumerator metrics and registers them with the given metric group.
+ *
+ * @param context the split enumerator context
+ * @param options the source options
+ */
+ public FileStoreSourceEnumeratorMetrics(
+ SplitEnumeratorContext> context, Map options) {
+ int bucketNum = CoreOptions.fromMap(options).bucket();
+ // Dynamic bucket mode uses -1.
+ // In this case, scaling max parallelism equals current parallelism.
+ this.scalingMaxParallelism = bucketNum < 0 ? context.currentParallelism() : bucketNum;
+
+ context.metricGroup().gauge(SCALING_MAX_PARALLELISM, this::getScalingMaxParallelism);
+ }
+
+ public int getScalingMaxParallelism() {
+ return scalingMaxParallelism;
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index 051a09e87458..0077abc21342 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -927,7 +927,8 @@ public ContinuousFileSplitEnumerator build() {
unawareBucket,
this.splitMaxPerTask,
false,
- maxSnapshotCount);
+ maxSnapshotCount,
+ null);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java
index 164ab844665c..b741c30e505c 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java
@@ -20,6 +20,7 @@
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.source.metrics.FileStoreSourceEnumeratorMetrics;
import org.apache.paimon.flink.utils.TestingMetricUtils;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -54,32 +55,48 @@
/** Tests for file store sources with metrics. */
public class FileStoreSourceMetricsTest {
private FileStoreTable table;
+ private FileStoreTable fixBucketTable;
private TestingSplitEnumeratorContextWithRegisteringGroup context;
private MetricGroup scanMetricGroup;
+ private MetricGroup enumeratorMetricGroup;
@BeforeEach
public void before(@TempDir java.nio.file.Path path) throws Exception {
FileIO fileIO = LocalFileIO.create();
Path tablePath = new Path(path.toString());
+ Path fixBucketTablePath = new Path(path.toString(), "fix_bucket");
SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
+ SchemaManager fixBucketSchemaManager = new SchemaManager(fileIO, fixBucketTablePath);
TableSchema tableSchema =
schemaManager.createTable(
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.BIGINT())
.build());
+ TableSchema fixBucketTableSchema =
+ fixBucketSchemaManager.createTable(
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.BIGINT())
+ .primaryKey("a")
+ .option("bucket", "2")
+ .option("bucket-key", "a")
+ .build());
table = FileStoreTableFactory.create(fileIO, tablePath, tableSchema);
+ fixBucketTable =
+ FileStoreTableFactory.create(fileIO, fixBucketTablePath, fixBucketTableSchema);
context = new TestingSplitEnumeratorContextWithRegisteringGroup(1);
scanMetricGroup =
context.metricGroup()
.addGroup("paimon")
.addGroup("table", table.name())
.addGroup("scan");
+ enumeratorMetricGroup = context.metricGroup();
}
@Test
public void staticFileStoreSourceScanMetricsTest() throws Exception {
- writeOnce();
+ writeOnce(table);
StaticFileStoreSource staticFileStoreSource =
new StaticFileStoreSource(
table.newReadBuilder(),
@@ -98,7 +115,7 @@ public void staticFileStoreSourceScanMetricsTest() throws Exception {
@Test
public void continuousFileStoreSourceScanMetricsTest() throws Exception {
- writeOnce();
+ writeOnce(table);
ContinuousFileStoreSource continuousFileStoreSource =
new ContinuousFileStoreSource(table.newReadBuilder(), table.options(), null);
ContinuousFileSplitEnumerator enumerator =
@@ -114,7 +131,7 @@ public void continuousFileStoreSourceScanMetricsTest() throws Exception {
.getValue())
.isEqualTo(1L);
- writeAgain();
+ writeAgain(table);
enumerator.scanNextSnapshot();
assertThat(TestingMetricUtils.getHistogram(scanMetricGroup, "scanDuration").getCount())
.isEqualTo(2);
@@ -126,7 +143,48 @@ public void continuousFileStoreSourceScanMetricsTest() throws Exception {
.isEqualTo(1L);
}
- private void writeOnce() throws Exception {
+ @Test
+ public void continuousFileStoreFixBucketEnumeratorMetricsTest() throws Exception {
+ writeOnce(fixBucketTable);
+
+ ContinuousFileStoreSource continuousFileStoreSource =
+ new ContinuousFileStoreSource(
+ fixBucketTable.newReadBuilder(), fixBucketTable.options(), null);
+ ContinuousFileSplitEnumerator enumerator =
+ (ContinuousFileSplitEnumerator)
+ continuousFileStoreSource.restoreEnumerator(context, null);
+ enumerator.scanNextSnapshot();
+
+ // equal bucketNum when bucket > 0
+ assertThat(
+ TestingMetricUtils.getGauge(
+ enumeratorMetricGroup,
+ FileStoreSourceEnumeratorMetrics.SCALING_MAX_PARALLELISM)
+ .getValue())
+ .isEqualTo(2);
+ }
+
+ @Test
+ public void continuousFileStoreDynBucketEnumeratorMetricsTest() throws Exception {
+ writeOnce(table);
+
+ ContinuousFileStoreSource continuousFileStoreSource =
+ new ContinuousFileStoreSource(table.newReadBuilder(), table.options(), null);
+ ContinuousFileSplitEnumerator enumerator =
+ (ContinuousFileSplitEnumerator)
+ continuousFileStoreSource.restoreEnumerator(context, null);
+ enumerator.scanNextSnapshot();
+
+ // equal parallelism when bucket < 0
+ assertThat(
+ TestingMetricUtils.getGauge(
+ enumeratorMetricGroup,
+ FileStoreSourceEnumeratorMetrics.SCALING_MAX_PARALLELISM)
+ .getValue())
+ .isEqualTo(1);
+ }
+
+ private void writeOnce(FileStoreTable table) throws Exception {
InnerTableWrite writer = table.newWrite("test");
TableCommitImpl commit = table.newCommit("test");
writer.write(GenericRow.of(1, 2L));
@@ -140,7 +198,7 @@ private void writeOnce() throws Exception {
writer.close();
}
- private void writeAgain() throws Exception {
+ private void writeAgain(FileStoreTable table) throws Exception {
InnerTableWrite writer = table.newWrite("test");
TableCommitImpl commit = table.newCommit("test");
writer.write(GenericRow.of(10, 2L));
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
index a42d69716004..57f264aff270 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
@@ -252,7 +252,8 @@ public AlignedContinuousFileSplitEnumerator build() {
timeout,
10,
false,
- -1);
+ -1,
+ null);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/TestingMetricUtils.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/TestingMetricUtils.java
index 39ab5ae30bcb..901528d80824 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/TestingMetricUtils.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/TestingMetricUtils.java
@@ -24,6 +24,7 @@
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
import java.lang.reflect.Field;
import java.util.Map;
@@ -46,6 +47,14 @@ public static Histogram getHistogram(MetricGroup group, String metricName) {
@SuppressWarnings("unchecked")
private static Metric getMetric(MetricGroup group, String metricName) {
try {
+ // Handle ProxyMetricGroup wrapper class
+ if (ProxyMetricGroup.class.isAssignableFrom(group.getClass())) {
+ Field parentField =
+ group.getClass().getSuperclass().getDeclaredField("parentMetricGroup");
+ parentField.setAccessible(true);
+ group = (MetricGroup) parentField.get(group);
+ }
+
Field field = AbstractMetricGroup.class.getDeclaredField("metrics");
field.setAccessible(true);
return ((Map) field.get(group)).get(metricName);