Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.metastore.AddPartitionCommitCallback;
import org.apache.paimon.metastore.AddPartitionTagCallback;
import org.apache.paimon.metastore.ChainTableCommitPreCallback;
import org.apache.paimon.metastore.ChainTableOverwriteCommitCallback;
import org.apache.paimon.metastore.TagPreviewCommitCallback;
import org.apache.paimon.operation.ChangelogDeletion;
Expand All @@ -59,6 +60,7 @@
import org.apache.paimon.table.PartitionHandler;
import org.apache.paimon.table.sink.CallbackUtils;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.CommitPreCallback;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.SuccessFileTagCallback;
import org.apache.paimon.tag.TagAutoManager;
Expand Down Expand Up @@ -329,7 +331,8 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
options.commitDiscardDuplicateFiles(),
conflictDetection,
strictModeChecker,
rollback);
rollback,
createCommitPreCallbacks(commitUser, table));
}

@Override
Expand Down Expand Up @@ -427,6 +430,15 @@ private List<CommitCallback> createCommitCallbacks(String commitUser, FileStoreT
return callbacks;
}

private List<CommitPreCallback> createCommitPreCallbacks(
String commitUser, FileStoreTable table) {
List<CommitPreCallback> callbacks = new ArrayList<>();
if (options.isChainTable()) {
callbacks.add(new ChainTableCommitPreCallback(table));
}
return callbacks;
}

@Override
@Nullable
public PartitionExpire newPartitionExpire(String commitUser, FileStoreTable table) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.metastore;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.Snapshot.CommitKind;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.RecordComparator;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.operation.commit.ManifestEntryChanges;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitPreCallback;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChainTableUtils;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* {@link CommitPreCallback} implementation for chain tables.
*
* <p>This callback performs a pre-check before dropping partitions on the snapshot branch of a
* chain table. It verifies that a snapshot partition being dropped is either followed by no delta
* partitions in the chain interval or has a previous snapshot partition that can serve as its
* predecessor.
*
* <p>The callback is only executed when all of following conditions are met:
*
* <ul>
* <li>The table is configured as a chain table and the current branch is the snapshot branch (see
* {@link ChainTableUtils#isScanFallbackSnapshotBranch(CoreOptions)}).
* <li>The committed snapshot kind is {@link CommitKind#OVERWRITE}.
* <li>All table and index manifest entries in the commit are {@link FileKind#DELETE deletes}.
* </ul>
*
* <p>If the validation fails for any of the affected partitions, a {@link RuntimeException} is
* thrown and the commit is aborted.
*
* <p>This implementation keeps only references to the table and its options and does not maintain
* mutable state between invocations.
*/
public class ChainTableCommitPreCallback implements CommitPreCallback {

private static final Logger LOG = LoggerFactory.getLogger(ChainTableCommitPreCallback.class);

private transient FileStoreTable table;
private transient CoreOptions coreOptions;

public ChainTableCommitPreCallback(FileStoreTable table) {
this.table = table;
this.coreOptions = table.coreOptions();
}

@Override
public void call(
List<SimpleFileEntry> baseFiles,
List<ManifestEntry> deltaFiles,
List<IndexManifestEntry> indexFiles,
Snapshot snapshot) {
if (!ChainTableUtils.isScanFallbackSnapshotBranch(coreOptions)) {
return;
}
if (snapshot.commitKind() != CommitKind.OVERWRITE) {
return;
}
if (!isPureDeleteCommit(deltaFiles, indexFiles)) {
return;
}
List<BinaryRow> changedPartitions =
ManifestEntryChanges.changedPartitions(deltaFiles, indexFiles);
FileStoreTable candidateTable = ChainTableUtils.resolveChainPrimaryTable(table);
FileStoreTable deltaTable =
candidateTable.switchToBranch(coreOptions.scanFallbackDeltaBranch());
RowType partitionType = table.schema().logicalPartitionType();
RowDataToObjectArrayConverter partitionConverter =
new RowDataToObjectArrayConverter(partitionType);
InternalRowPartitionComputer partitionComputer =
new InternalRowPartitionComputer(
coreOptions.partitionDefaultName(),
partitionType,
table.schema().partitionKeys().toArray(new String[0]),
coreOptions.legacyPartitionName());
RecordComparator partitionComparator =
CodeGenUtils.newRecordComparator(partitionType.getFieldTypes());
List<BinaryRow> snapshotPartitions =
table.newSnapshotReader().partitionEntries().stream()
.map(PartitionEntry::partition)
.sorted(partitionComparator)
.collect(Collectors.toList());
SnapshotReader deltaSnapshotReader = deltaTable.newSnapshotReader();
PredicateBuilder builder = new PredicateBuilder(partitionType);
for (BinaryRow partition : changedPartitions) {
Optional<BinaryRow> preSnapshotPartition =
findPreSnapshotPartition(snapshotPartitions, partition, partitionComparator);
Optional<BinaryRow> nextSnapshotPartition =
findNextSnapshotPartition(snapshotPartitions, partition, partitionComparator);
Predicate deltaFollowingPredicate =
ChainTableUtils.createTriangularPredicate(
partition, partitionConverter, builder::equal, builder::greaterThan);
List<BinaryRow> deltaFollowingPartitions =
deltaSnapshotReader.withPartitionFilter(deltaFollowingPredicate)
.partitionEntries().stream()
.map(PartitionEntry::partition)
.filter(
deltaPartition ->
isBeforeNextSnapshotPartition(
deltaPartition,
nextSnapshotPartition,
partitionComparator))
.collect(Collectors.toList());
boolean canDrop =
deltaFollowingPartitions.isEmpty() || preSnapshotPartition.isPresent();
LOG.info(
"Drop partition, partition={}, canDrop={}, preSnapshotPartition={}, nextSnapshotPartition={}",
partitionComputer.generatePartValues(partition),
canDrop,
generatePartitionValues(preSnapshotPartition, partitionComputer),
generatePartitionValues(nextSnapshotPartition, partitionComputer));
if (!canDrop) {
throw new RuntimeException("Snapshot partition cannot be dropped.");
}
}
}

private boolean isPureDeleteCommit(
List<ManifestEntry> deltaFiles, List<IndexManifestEntry> indexFiles) {
return deltaFiles.stream().allMatch(f -> f.kind() == FileKind.DELETE)
&& indexFiles.stream().allMatch(f -> f.kind() == FileKind.DELETE);
}

private Optional<BinaryRow> findPreSnapshotPartition(
List<BinaryRow> snapshotPartitions,
BinaryRow partition,
RecordComparator partitionComparator) {
BinaryRow pre = null;
for (BinaryRow snapshotPartition : snapshotPartitions) {
if (partitionComparator.compare(snapshotPartition, partition) < 0) {
pre = snapshotPartition;
} else {
break;
}
}
return Optional.ofNullable(pre);
}

private Optional<BinaryRow> findNextSnapshotPartition(
List<BinaryRow> snapshotPartitions,
BinaryRow partition,
RecordComparator partitionComparator) {
for (BinaryRow snapshotPartition : snapshotPartitions) {
if (partitionComparator.compare(snapshotPartition, partition) > 0) {
return Optional.of(snapshotPartition);
}
}
return Optional.empty();
}

private boolean isBeforeNextSnapshotPartition(
BinaryRow partition,
Optional<BinaryRow> nextSnapshotPartition,
RecordComparator partitionComparator) {
return !nextSnapshotPartition.isPresent()
|| partitionComparator.compare(partition, nextSnapshotPartition.get()) < 0;
}

private String generatePartitionValues(
Optional<BinaryRow> partition, InternalRowPartitionComputer partitionComputer) {
if (!partition.isPresent()) {
return "<none>";
}
return partitionComputer.generatePartValues(partition.get()).toString();
}

@Override
public void close() throws Exception {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.table.ChainGroupReadTable;
import org.apache.paimon.table.FallbackReadFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.CommitCallback;
Expand Down Expand Up @@ -78,13 +76,7 @@ public void call(
return;
}

// Find the underlying table for writing snapshot branch.
FileStoreTable candidateTable = table;
if (table instanceof FallbackReadFileStoreTable) {
candidateTable =
((ChainGroupReadTable) ((FallbackReadFileStoreTable) table).fallback())
.wrapped();
}
FileStoreTable candidateTable = ChainTableUtils.resolveChainPrimaryTable(table);

FileStoreTable snapshotTable =
candidateTable.switchToBranch(coreOptions.scanFallbackSnapshotBranch());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.CommitPreCallback;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.DataFilePathFactories;
import org.apache.paimon.utils.FileStorePathFactory;
Expand Down Expand Up @@ -166,6 +167,8 @@ public class FileStoreCommitImpl implements FileStoreCommit {
private CommitMetrics commitMetrics;
private boolean appendCommitCheckConflict = false;

private final List<CommitPreCallback> commitPreCallbacks;

public FileStoreCommitImpl(
SnapshotCommit snapshotCommit,
FileIO fileIO,
Expand Down Expand Up @@ -199,7 +202,8 @@ public FileStoreCommitImpl(
boolean discardDuplicateFiles,
ConflictDetection conflictDetection,
@Nullable StrictModeChecker strictModeChecker,
@Nullable CommitRollback rollback) {
@Nullable CommitRollback rollback,
List<CommitPreCallback> commitPreCallbacks) {
this.snapshotCommit = snapshotCommit;
this.fileIO = fileIO;
this.schemaManager = schemaManager;
Expand Down Expand Up @@ -241,6 +245,7 @@ public FileStoreCommitImpl(
this.strictModeChecker = strictModeChecker;
this.conflictDetection = conflictDetection;
this.commitCleaner = new CommitCleaner(manifestList, manifestFile, indexManifestFile);
this.commitPreCallbacks = commitPreCallbacks;
}

@Override
Expand Down Expand Up @@ -999,6 +1004,11 @@ CommitResult tryCommitOnce(
}

boolean success;
final List<SimpleFileEntry> finalBaseFiles = baseDataFiles;
final List<ManifestEntry> finalDeltaFiles = deltaFiles;
commitPreCallbacks.forEach(
callback ->
callback.call(finalBaseFiles, finalDeltaFiles, indexFiles, newSnapshot));
try {
success = commitSnapshotImpl(newSnapshot, deltaStatistics);
} catch (Exception e) {
Expand Down Expand Up @@ -1035,8 +1045,6 @@ CommitResult tryCommitOnce(
if (strictModeChecker != null) {
strictModeChecker.update(newSnapshotId);
}
final List<SimpleFileEntry> finalBaseFiles = baseDataFiles;
final List<ManifestEntry> finalDeltaFiles = deltaFiles;
commitCallbacks.forEach(
callback ->
callback.call(finalBaseFiles, finalDeltaFiles, indexFiles, newSnapshot));
Expand Down Expand Up @@ -1179,6 +1187,7 @@ private boolean commitSnapshotImpl(Snapshot newSnapshot, List<PartitionEntry> de

@Override
public void close() {
IOUtils.closeAllQuietly(commitPreCallbacks);
IOUtils.closeAllQuietly(commitCallbacks);
IOUtils.closeQuietly(snapshotCommit);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table.sink;

import org.apache.paimon.Snapshot;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.SimpleFileEntry;

import java.util.List;

/**
* Callback which is invoked before a snapshot is committed.
*
* <p>This hook allows additional validation or bookkeeping to be performed before a snapshot
* becomes visible. Implementations can inspect the files that are about to be committed as well as
* the {@link Snapshot} metadata and decide whether the commit should proceed.
*
* <p>If {@link #call(List, List, List, Snapshot)} throws a {@link RuntimeException}, the commit is
* aborted and the snapshot will not be committed. Implementations are expected to be fast and
* either side effect free or idempotent, because a single logical commit may be retried and this
* callback might therefore be invoked multiple times for the same logical changes.
*
* <p>Implementations may optionally override {@link AutoCloseable#close()} if they hold external
* resources that need to be released when the surrounding {@link org.apache.paimon.table.Table}
* commit is closed.
*/
public interface CommitPreCallback extends AutoCloseable {

void call(
List<SimpleFileEntry> baseFiles,
List<ManifestEntry> deltaFiles,
List<IndexManifestEntry> indexFiles,
Snapshot snapshot);
}
Loading