Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/content/maintenance/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,12 @@ When using Flink to read and write, Paimon has implemented some key standard Fli
<td>Gauge</td>
<td>Time difference between reading the data file and file creation.</td>
</tr>
<tr>
<td>sourceScalingMaxParallelism</td>
<td>Flink Source Enumerator</td>
<td>Gauge</td>
<td>Recommended upper bound of parallelism for auto-scaling systems. For fixed bucket tables, this equals the bucket number. For dynamic bucket tables (bucket = -1), this equals the current parallelism. Note: This is a recommendation, not a hard limit.</td>
</tr>
</tbody>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner;
import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
import org.apache.paimon.flink.source.metrics.FileStoreSourceEnumeratorMetrics;
import org.apache.paimon.postpone.PostponeBucketFileStoreWrite;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.ChannelComputer;
Expand Down Expand Up @@ -92,6 +93,9 @@ public class ContinuousFileSplitEnumerator

private final int maxSnapshotCount;

// Currently unused, serves as a placeholder for future metric extensions or updates.
@Nullable private final FileStoreSourceEnumeratorMetrics enumeratorMetrics;

public ContinuousFileSplitEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context,
Collection<FileStoreSourceSplit> remainSplits,
Expand All @@ -101,7 +105,8 @@ public ContinuousFileSplitEnumerator(
boolean unordered,
int splitMaxPerTask,
boolean shuffleBucketWithPartition,
int maxSnapshotCount) {
int maxSnapshotCount,
@Nullable FileStoreSourceEnumeratorMetrics enumeratorMetrics) {
checkArgument(discoveryInterval > 0L);
this.context = checkNotNull(context);
this.nextSnapshotId = nextSnapshotId;
Expand All @@ -118,6 +123,7 @@ public ContinuousFileSplitEnumerator(
this.consumerProgressCalculator =
new ConsumerProgressCalculator(context.currentParallelism());
this.maxSnapshotCount = maxSnapshotCount;
this.enumeratorMetrics = enumeratorMetrics;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.NestedProjectedRowData;
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import org.apache.paimon.flink.source.metrics.FileStoreSourceEnumeratorMetrics;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.StreamDataTableScan;
Expand Down Expand Up @@ -84,12 +85,14 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
splits = checkpoint.splits();
}
StreamTableScan scan = readBuilder.newStreamScan();
FileStoreSourceEnumeratorMetrics enumeratorMetrics = null;
if (metricGroup(context) != null) {
enumeratorMetrics = new FileStoreSourceEnumeratorMetrics(context, options);
((StreamDataTableScan) scan)
.withMetricRegistry(new FlinkMetricRegistry(context.metricGroup()));
}
scan.restore(nextSnapshotId);
return buildEnumerator(context, splits, nextSnapshotId, scan);
return buildEnumerator(context, splits, nextSnapshotId, scan, enumeratorMetrics);
}

@Nullable
Expand All @@ -106,7 +109,8 @@ protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> buildEn
SplitEnumeratorContext<FileStoreSourceSplit> context,
Collection<FileStoreSourceSplit> splits,
@Nullable Long nextSnapshotId,
StreamTableScan scan) {
StreamTableScan scan,
@Nullable FileStoreSourceEnumeratorMetrics enumeratorMetrics) {
Options options = Options.fromMap(this.options);
return new ContinuousFileSplitEnumerator(
context,
Expand All @@ -117,6 +121,7 @@ protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> buildEn
unordered,
options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
options.get(FlinkConnectorOptions.READ_SHUFFLE_BUCKET_WITH_PARTITION),
options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT));
options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT),
enumeratorMetrics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.flink.source.PendingSplitsCheckpoint;
import org.apache.paimon.flink.source.assigners.AlignedSplitAssigner;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
import org.apache.paimon.flink.source.metrics.FileStoreSourceEnumeratorMetrics;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.SnapshotNotExistPlan;
Expand Down Expand Up @@ -95,7 +96,8 @@ public AlignedContinuousFileSplitEnumerator(
long alignTimeout,
int splitPerTaskMax,
boolean shuffleBucketWithPartition,
int maxSnapshotCount) {
int maxSnapshotCount,
@Nullable FileStoreSourceEnumeratorMetrics enumeratorMetrics) {
super(
context,
remainSplits,
Expand All @@ -105,7 +107,8 @@ public AlignedContinuousFileSplitEnumerator(
unawareBucket,
splitPerTaskMax,
shuffleBucketWithPartition,
maxSnapshotCount);
maxSnapshotCount,
enumeratorMetrics);
this.pendingPlans = new ArrayBlockingQueue<>(MAX_PENDING_PLAN);
this.alignedAssigner = (AlignedSplitAssigner) super.splitAssigner;
this.nextSnapshotId = nextSnapshotId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.flink.source.ContinuousFileStoreSource;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.PendingSplitsCheckpoint;
import org.apache.paimon.flink.source.metrics.FileStoreSourceEnumeratorMetrics;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.source.ReadBuilder;
Expand Down Expand Up @@ -80,7 +81,8 @@ protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> buildEn
SplitEnumeratorContext<FileStoreSourceSplit> context,
Collection<FileStoreSourceSplit> splits,
@Nullable Long nextSnapshotId,
StreamTableScan scan) {
StreamTableScan scan,
@Nullable FileStoreSourceEnumeratorMetrics enumeratorMetrics) {
Options options = Options.fromMap(this.options);
return new AlignedContinuousFileSplitEnumerator(
context,
Expand All @@ -92,6 +94,7 @@ protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> buildEn
options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis(),
options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
options.get(FlinkConnectorOptions.READ_SHUFFLE_BUCKET_WITH_PARTITION),
options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT));
options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT),
enumeratorMetrics);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.source.metrics;

import org.apache.paimon.CoreOptions;

import org.apache.flink.api.connector.source.SplitEnumeratorContext;

import java.util.Map;

/**
* Source enumerator metrics.
*
* <p>This class manages metrics for the source split enumerator.
*/
public class FileStoreSourceEnumeratorMetrics {

/**
* Metric name for source scaling max parallelism. This metric provides a recommended upper
* bound of parallelism for auto-scaling systems. For fixed bucket tables, this equals the
* bucket number; for dynamic bucket tables, this equals the current parallelism. Note: This is
* a recommendation, not a hard limit - users can configure higher parallelism manually if
* needed.
*/
public static final String SCALING_MAX_PARALLELISM = "sourceScalingMaxParallelism";

private final int scalingMaxParallelism;

/**
* Creates enumerator metrics and registers them with the given metric group.
*
* @param context the split enumerator context
* @param options the source options
*/
public FileStoreSourceEnumeratorMetrics(
SplitEnumeratorContext<?> context, Map<String, String> options) {
int bucketNum = CoreOptions.fromMap(options).bucket();
// Dynamic bucket mode uses -1.
// In this case, scaling max parallelism equals current parallelism.
this.scalingMaxParallelism = bucketNum < 0 ? context.currentParallelism() : bucketNum;

context.metricGroup().gauge(SCALING_MAX_PARALLELISM, this::getScalingMaxParallelism);
}

public int getScalingMaxParallelism() {
return scalingMaxParallelism;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,8 @@ public ContinuousFileSplitEnumerator build() {
unawareBucket,
this.splitMaxPerTask,
false,
maxSnapshotCount);
maxSnapshotCount,
null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.source.metrics.FileStoreSourceEnumeratorMetrics;
import org.apache.paimon.flink.utils.TestingMetricUtils;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -54,32 +55,48 @@
/** Tests for file store sources with metrics. */
public class FileStoreSourceMetricsTest {
private FileStoreTable table;
private FileStoreTable fixBucketTable;
private TestingSplitEnumeratorContextWithRegisteringGroup context;
private MetricGroup scanMetricGroup;
private MetricGroup enumeratorMetricGroup;

@BeforeEach
public void before(@TempDir java.nio.file.Path path) throws Exception {
FileIO fileIO = LocalFileIO.create();
Path tablePath = new Path(path.toString());
Path fixBucketTablePath = new Path(path.toString(), "fix_bucket");
SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
SchemaManager fixBucketSchemaManager = new SchemaManager(fileIO, fixBucketTablePath);
TableSchema tableSchema =
schemaManager.createTable(
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.BIGINT())
.build());
TableSchema fixBucketTableSchema =
fixBucketSchemaManager.createTable(
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.BIGINT())
.primaryKey("a")
.option("bucket", "2")
.option("bucket-key", "a")
.build());
table = FileStoreTableFactory.create(fileIO, tablePath, tableSchema);
fixBucketTable =
FileStoreTableFactory.create(fileIO, fixBucketTablePath, fixBucketTableSchema);
context = new TestingSplitEnumeratorContextWithRegisteringGroup(1);
scanMetricGroup =
context.metricGroup()
.addGroup("paimon")
.addGroup("table", table.name())
.addGroup("scan");
enumeratorMetricGroup = context.metricGroup();
}

@Test
public void staticFileStoreSourceScanMetricsTest() throws Exception {
writeOnce();
writeOnce(table);
StaticFileStoreSource staticFileStoreSource =
new StaticFileStoreSource(
table.newReadBuilder(),
Expand All @@ -98,7 +115,7 @@ public void staticFileStoreSourceScanMetricsTest() throws Exception {

@Test
public void continuousFileStoreSourceScanMetricsTest() throws Exception {
writeOnce();
writeOnce(table);
ContinuousFileStoreSource continuousFileStoreSource =
new ContinuousFileStoreSource(table.newReadBuilder(), table.options(), null);
ContinuousFileSplitEnumerator enumerator =
Expand All @@ -114,7 +131,7 @@ public void continuousFileStoreSourceScanMetricsTest() throws Exception {
.getValue())
.isEqualTo(1L);

writeAgain();
writeAgain(table);
enumerator.scanNextSnapshot();
assertThat(TestingMetricUtils.getHistogram(scanMetricGroup, "scanDuration").getCount())
.isEqualTo(2);
Expand All @@ -126,7 +143,48 @@ public void continuousFileStoreSourceScanMetricsTest() throws Exception {
.isEqualTo(1L);
}

private void writeOnce() throws Exception {
@Test
public void continuousFileStoreFixBucketEnumeratorMetricsTest() throws Exception {
writeOnce(fixBucketTable);

ContinuousFileStoreSource continuousFileStoreSource =
new ContinuousFileStoreSource(
fixBucketTable.newReadBuilder(), fixBucketTable.options(), null);
ContinuousFileSplitEnumerator enumerator =
(ContinuousFileSplitEnumerator)
continuousFileStoreSource.restoreEnumerator(context, null);
enumerator.scanNextSnapshot();

// equal bucketNum when bucket > 0
assertThat(
TestingMetricUtils.getGauge(
enumeratorMetricGroup,
FileStoreSourceEnumeratorMetrics.SCALING_MAX_PARALLELISM)
.getValue())
.isEqualTo(2);
}

@Test
public void continuousFileStoreDynBucketEnumeratorMetricsTest() throws Exception {
writeOnce(table);

ContinuousFileStoreSource continuousFileStoreSource =
new ContinuousFileStoreSource(table.newReadBuilder(), table.options(), null);
ContinuousFileSplitEnumerator enumerator =
(ContinuousFileSplitEnumerator)
continuousFileStoreSource.restoreEnumerator(context, null);
enumerator.scanNextSnapshot();

// equal parallelism when bucket < 0
assertThat(
TestingMetricUtils.getGauge(
enumeratorMetricGroup,
FileStoreSourceEnumeratorMetrics.SCALING_MAX_PARALLELISM)
.getValue())
.isEqualTo(1);
}

private void writeOnce(FileStoreTable table) throws Exception {
InnerTableWrite writer = table.newWrite("test");
TableCommitImpl commit = table.newCommit("test");
writer.write(GenericRow.of(1, 2L));
Expand All @@ -140,7 +198,7 @@ private void writeOnce() throws Exception {
writer.close();
}

private void writeAgain() throws Exception {
private void writeAgain(FileStoreTable table) throws Exception {
InnerTableWrite writer = table.newWrite("test");
TableCommitImpl commit = table.newCommit("test");
writer.write(GenericRow.of(10, 2L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ public AlignedContinuousFileSplitEnumerator build() {
timeout,
10,
false,
-1);
-1,
null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;

import java.lang.reflect.Field;
import java.util.Map;
Expand All @@ -46,6 +47,14 @@ public static Histogram getHistogram(MetricGroup group, String metricName) {
@SuppressWarnings("unchecked")
private static Metric getMetric(MetricGroup group, String metricName) {
try {
// Handle ProxyMetricGroup wrapper class
if (ProxyMetricGroup.class.isAssignableFrom(group.getClass())) {
Field parentField =
group.getClass().getSuperclass().getDeclaredField("parentMetricGroup");
parentField.setAccessible(true);
group = (MetricGroup) parentField.get(group);
}

Field field = AbstractMetricGroup.class.getDeclaredField("metrics");
field.setAccessible(true);
return ((Map<String, Metric>) field.get(group)).get(metricName);
Expand Down