diff --git a/docs/content/maintenance/metrics.md b/docs/content/maintenance/metrics.md index 5d115407e277..7aa8ba759c39 100644 --- a/docs/content/maintenance/metrics.md +++ b/docs/content/maintenance/metrics.md @@ -391,6 +391,12 @@ When using Flink to read and write, Paimon has implemented some key standard Fli Gauge Time difference between reading the data file and file creation. + + sourceScalingMaxParallelism + Flink Source Enumerator + Gauge + Recommended upper bound of parallelism for auto-scaling systems. For fixed bucket tables, this equals the bucket number. For dynamic bucket tables (bucket = -1), this equals the current parallelism. Note: This is a recommendation, not a hard limit. + diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java index 87b3f77d5a7d..e55dd1f61fc2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java @@ -22,6 +22,7 @@ import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner; import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner; import org.apache.paimon.flink.source.assigners.SplitAssigner; +import org.apache.paimon.flink.source.metrics.FileStoreSourceEnumeratorMetrics; import org.apache.paimon.postpone.PostponeBucketFileStoreWrite; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.sink.ChannelComputer; @@ -92,6 +93,9 @@ public class ContinuousFileSplitEnumerator private final int maxSnapshotCount; + // Currently unused, serves as a placeholder for future metric extensions or updates. + @Nullable private final FileStoreSourceEnumeratorMetrics enumeratorMetrics; + public ContinuousFileSplitEnumerator( SplitEnumeratorContext context, Collection remainSplits, @@ -101,7 +105,8 @@ public ContinuousFileSplitEnumerator( boolean unordered, int splitMaxPerTask, boolean shuffleBucketWithPartition, - int maxSnapshotCount) { + int maxSnapshotCount, + @Nullable FileStoreSourceEnumeratorMetrics enumeratorMetrics) { checkArgument(discoveryInterval > 0L); this.context = checkNotNull(context); this.nextSnapshotId = nextSnapshotId; @@ -118,6 +123,7 @@ public ContinuousFileSplitEnumerator( this.consumerProgressCalculator = new ConsumerProgressCalculator(context.currentParallelism()); this.maxSnapshotCount = maxSnapshotCount; + this.enumeratorMetrics = enumeratorMetrics; } @VisibleForTesting diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java index 4e9855e8f2c6..1afa5e334191 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java @@ -22,6 +22,7 @@ import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.metrics.FlinkMetricRegistry; +import org.apache.paimon.flink.source.metrics.FileStoreSourceEnumeratorMetrics; import org.apache.paimon.options.Options; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.StreamDataTableScan; @@ -84,12 +85,14 @@ public SplitEnumerator restoreEnu splits = checkpoint.splits(); } StreamTableScan scan = readBuilder.newStreamScan(); + FileStoreSourceEnumeratorMetrics enumeratorMetrics = null; if (metricGroup(context) != null) { + enumeratorMetrics = new FileStoreSourceEnumeratorMetrics(context, options); ((StreamDataTableScan) scan) .withMetricRegistry(new FlinkMetricRegistry(context.metricGroup())); } scan.restore(nextSnapshotId); - return buildEnumerator(context, splits, nextSnapshotId, scan); + return buildEnumerator(context, splits, nextSnapshotId, scan, enumeratorMetrics); } @Nullable @@ -106,7 +109,8 @@ protected SplitEnumerator buildEn SplitEnumeratorContext context, Collection splits, @Nullable Long nextSnapshotId, - StreamTableScan scan) { + StreamTableScan scan, + @Nullable FileStoreSourceEnumeratorMetrics enumeratorMetrics) { Options options = Options.fromMap(this.options); return new ContinuousFileSplitEnumerator( context, @@ -117,6 +121,7 @@ protected SplitEnumerator buildEn unordered, options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK), options.get(FlinkConnectorOptions.READ_SHUFFLE_BUCKET_WITH_PARTITION), - options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT)); + options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT), + enumeratorMetrics); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java index d1652dfa9e39..ef114db087fc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java @@ -23,6 +23,7 @@ import org.apache.paimon.flink.source.PendingSplitsCheckpoint; import org.apache.paimon.flink.source.assigners.AlignedSplitAssigner; import org.apache.paimon.flink.source.assigners.SplitAssigner; +import org.apache.paimon.flink.source.metrics.FileStoreSourceEnumeratorMetrics; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.EndOfScanException; import org.apache.paimon.table.source.SnapshotNotExistPlan; @@ -95,7 +96,8 @@ public AlignedContinuousFileSplitEnumerator( long alignTimeout, int splitPerTaskMax, boolean shuffleBucketWithPartition, - int maxSnapshotCount) { + int maxSnapshotCount, + @Nullable FileStoreSourceEnumeratorMetrics enumeratorMetrics) { super( context, remainSplits, @@ -105,7 +107,8 @@ public AlignedContinuousFileSplitEnumerator( unawareBucket, splitPerTaskMax, shuffleBucketWithPartition, - maxSnapshotCount); + maxSnapshotCount, + enumeratorMetrics); this.pendingPlans = new ArrayBlockingQueue<>(MAX_PENDING_PLAN); this.alignedAssigner = (AlignedSplitAssigner) super.splitAssigner; this.nextSnapshotId = nextSnapshotId; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java index ab31c13c61b9..cb6b5855b0b0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java @@ -25,6 +25,7 @@ import org.apache.paimon.flink.source.ContinuousFileStoreSource; import org.apache.paimon.flink.source.FileStoreSourceSplit; import org.apache.paimon.flink.source.PendingSplitsCheckpoint; +import org.apache.paimon.flink.source.metrics.FileStoreSourceEnumeratorMetrics; import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; import org.apache.paimon.options.Options; import org.apache.paimon.table.source.ReadBuilder; @@ -80,7 +81,8 @@ protected SplitEnumerator buildEn SplitEnumeratorContext context, Collection splits, @Nullable Long nextSnapshotId, - StreamTableScan scan) { + StreamTableScan scan, + @Nullable FileStoreSourceEnumeratorMetrics enumeratorMetrics) { Options options = Options.fromMap(this.options); return new AlignedContinuousFileSplitEnumerator( context, @@ -92,6 +94,7 @@ protected SplitEnumerator buildEn options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis(), options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK), options.get(FlinkConnectorOptions.READ_SHUFFLE_BUCKET_WITH_PARTITION), - options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT)); + options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT), + enumeratorMetrics); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceEnumeratorMetrics.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceEnumeratorMetrics.java new file mode 100644 index 000000000000..e97e5f5e6fe6 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceEnumeratorMetrics.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source.metrics; + +import org.apache.paimon.CoreOptions; + +import org.apache.flink.api.connector.source.SplitEnumeratorContext; + +import java.util.Map; + +/** + * Source enumerator metrics. + * + *

This class manages metrics for the source split enumerator. + */ +public class FileStoreSourceEnumeratorMetrics { + + /** + * Metric name for source scaling max parallelism. This metric provides a recommended upper + * bound of parallelism for auto-scaling systems. For fixed bucket tables, this equals the + * bucket number; for dynamic bucket tables, this equals the current parallelism. Note: This is + * a recommendation, not a hard limit - users can configure higher parallelism manually if + * needed. + */ + public static final String SCALING_MAX_PARALLELISM = "sourceScalingMaxParallelism"; + + private final int scalingMaxParallelism; + + /** + * Creates enumerator metrics and registers them with the given metric group. + * + * @param context the split enumerator context + * @param options the source options + */ + public FileStoreSourceEnumeratorMetrics( + SplitEnumeratorContext context, Map options) { + int bucketNum = CoreOptions.fromMap(options).bucket(); + // Dynamic bucket mode uses -1. + // In this case, scaling max parallelism equals current parallelism. + this.scalingMaxParallelism = bucketNum < 0 ? context.currentParallelism() : bucketNum; + + context.metricGroup().gauge(SCALING_MAX_PARALLELISM, this::getScalingMaxParallelism); + } + + public int getScalingMaxParallelism() { + return scalingMaxParallelism; + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java index 051a09e87458..0077abc21342 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java @@ -927,7 +927,8 @@ public ContinuousFileSplitEnumerator build() { unawareBucket, this.splitMaxPerTask, false, - maxSnapshotCount); + maxSnapshotCount, + null); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java index 164ab844665c..b741c30e505c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java @@ -20,6 +20,7 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.source.metrics.FileStoreSourceEnumeratorMetrics; import org.apache.paimon.flink.utils.TestingMetricUtils; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; @@ -54,32 +55,48 @@ /** Tests for file store sources with metrics. */ public class FileStoreSourceMetricsTest { private FileStoreTable table; + private FileStoreTable fixBucketTable; private TestingSplitEnumeratorContextWithRegisteringGroup context; private MetricGroup scanMetricGroup; + private MetricGroup enumeratorMetricGroup; @BeforeEach public void before(@TempDir java.nio.file.Path path) throws Exception { FileIO fileIO = LocalFileIO.create(); Path tablePath = new Path(path.toString()); + Path fixBucketTablePath = new Path(path.toString(), "fix_bucket"); SchemaManager schemaManager = new SchemaManager(fileIO, tablePath); + SchemaManager fixBucketSchemaManager = new SchemaManager(fileIO, fixBucketTablePath); TableSchema tableSchema = schemaManager.createTable( Schema.newBuilder() .column("a", DataTypes.INT()) .column("b", DataTypes.BIGINT()) .build()); + TableSchema fixBucketTableSchema = + fixBucketSchemaManager.createTable( + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.BIGINT()) + .primaryKey("a") + .option("bucket", "2") + .option("bucket-key", "a") + .build()); table = FileStoreTableFactory.create(fileIO, tablePath, tableSchema); + fixBucketTable = + FileStoreTableFactory.create(fileIO, fixBucketTablePath, fixBucketTableSchema); context = new TestingSplitEnumeratorContextWithRegisteringGroup(1); scanMetricGroup = context.metricGroup() .addGroup("paimon") .addGroup("table", table.name()) .addGroup("scan"); + enumeratorMetricGroup = context.metricGroup(); } @Test public void staticFileStoreSourceScanMetricsTest() throws Exception { - writeOnce(); + writeOnce(table); StaticFileStoreSource staticFileStoreSource = new StaticFileStoreSource( table.newReadBuilder(), @@ -98,7 +115,7 @@ public void staticFileStoreSourceScanMetricsTest() throws Exception { @Test public void continuousFileStoreSourceScanMetricsTest() throws Exception { - writeOnce(); + writeOnce(table); ContinuousFileStoreSource continuousFileStoreSource = new ContinuousFileStoreSource(table.newReadBuilder(), table.options(), null); ContinuousFileSplitEnumerator enumerator = @@ -114,7 +131,7 @@ public void continuousFileStoreSourceScanMetricsTest() throws Exception { .getValue()) .isEqualTo(1L); - writeAgain(); + writeAgain(table); enumerator.scanNextSnapshot(); assertThat(TestingMetricUtils.getHistogram(scanMetricGroup, "scanDuration").getCount()) .isEqualTo(2); @@ -126,7 +143,48 @@ public void continuousFileStoreSourceScanMetricsTest() throws Exception { .isEqualTo(1L); } - private void writeOnce() throws Exception { + @Test + public void continuousFileStoreFixBucketEnumeratorMetricsTest() throws Exception { + writeOnce(fixBucketTable); + + ContinuousFileStoreSource continuousFileStoreSource = + new ContinuousFileStoreSource( + fixBucketTable.newReadBuilder(), fixBucketTable.options(), null); + ContinuousFileSplitEnumerator enumerator = + (ContinuousFileSplitEnumerator) + continuousFileStoreSource.restoreEnumerator(context, null); + enumerator.scanNextSnapshot(); + + // equal bucketNum when bucket > 0 + assertThat( + TestingMetricUtils.getGauge( + enumeratorMetricGroup, + FileStoreSourceEnumeratorMetrics.SCALING_MAX_PARALLELISM) + .getValue()) + .isEqualTo(2); + } + + @Test + public void continuousFileStoreDynBucketEnumeratorMetricsTest() throws Exception { + writeOnce(table); + + ContinuousFileStoreSource continuousFileStoreSource = + new ContinuousFileStoreSource(table.newReadBuilder(), table.options(), null); + ContinuousFileSplitEnumerator enumerator = + (ContinuousFileSplitEnumerator) + continuousFileStoreSource.restoreEnumerator(context, null); + enumerator.scanNextSnapshot(); + + // equal parallelism when bucket < 0 + assertThat( + TestingMetricUtils.getGauge( + enumeratorMetricGroup, + FileStoreSourceEnumeratorMetrics.SCALING_MAX_PARALLELISM) + .getValue()) + .isEqualTo(1); + } + + private void writeOnce(FileStoreTable table) throws Exception { InnerTableWrite writer = table.newWrite("test"); TableCommitImpl commit = table.newCommit("test"); writer.write(GenericRow.of(1, 2L)); @@ -140,7 +198,7 @@ private void writeOnce() throws Exception { writer.close(); } - private void writeAgain() throws Exception { + private void writeAgain(FileStoreTable table) throws Exception { InnerTableWrite writer = table.newWrite("test"); TableCommitImpl commit = table.newCommit("test"); writer.write(GenericRow.of(10, 2L)); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java index a42d69716004..57f264aff270 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java @@ -252,7 +252,8 @@ public AlignedContinuousFileSplitEnumerator build() { timeout, 10, false, - -1); + -1, + null); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/TestingMetricUtils.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/TestingMetricUtils.java index 39ab5ae30bcb..901528d80824 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/TestingMetricUtils.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/TestingMetricUtils.java @@ -24,6 +24,7 @@ import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; +import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup; import java.lang.reflect.Field; import java.util.Map; @@ -46,6 +47,14 @@ public static Histogram getHistogram(MetricGroup group, String metricName) { @SuppressWarnings("unchecked") private static Metric getMetric(MetricGroup group, String metricName) { try { + // Handle ProxyMetricGroup wrapper class + if (ProxyMetricGroup.class.isAssignableFrom(group.getClass())) { + Field parentField = + group.getClass().getSuperclass().getDeclaredField("parentMetricGroup"); + parentField.setAccessible(true); + group = (MetricGroup) parentField.get(group); + } + Field field = AbstractMetricGroup.class.getDeclaredField("metrics"); field.setAccessible(true); return ((Map) field.get(group)).get(metricName);