From 3c9d264da05f53a218a73a3866c4d827f7f64587 Mon Sep 17 00:00:00 2001 From: "yangmu.0722" Date: Wed, 21 Jan 2026 19:07:05 +0800 Subject: [PATCH] [core] Add pre check when drop partition for chain table --- .../org/apache/paimon/AbstractFileStore.java | 14 +- .../ChainTableCommitPreCallback.java | 207 ++++++++++++ .../ChainTableOverwriteCommitCallback.java | 10 +- .../paimon/operation/FileStoreCommitImpl.java | 15 +- .../paimon/table/sink/CommitPreCallback.java | 51 +++ .../apache/paimon/utils/ChainTableUtils.java | 16 + .../paimon/spark/SparkChainTableITCase.java | 305 ++++++++++-------- 7 files changed, 474 insertions(+), 144 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableCommitPreCallback.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/sink/CommitPreCallback.java diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index edf7e16294cf..296c74b27291 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -36,6 +36,7 @@ import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.metastore.AddPartitionCommitCallback; import org.apache.paimon.metastore.AddPartitionTagCallback; +import org.apache.paimon.metastore.ChainTableCommitPreCallback; import org.apache.paimon.metastore.ChainTableOverwriteCommitCallback; import org.apache.paimon.metastore.TagPreviewCommitCallback; import org.apache.paimon.operation.ChangelogDeletion; @@ -59,6 +60,7 @@ import org.apache.paimon.table.PartitionHandler; import org.apache.paimon.table.sink.CallbackUtils; import org.apache.paimon.table.sink.CommitCallback; +import org.apache.paimon.table.sink.CommitPreCallback; import org.apache.paimon.table.sink.TagCallback; import org.apache.paimon.tag.SuccessFileTagCallback; import org.apache.paimon.tag.TagAutoManager; @@ -329,7 +331,8 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) { options.commitDiscardDuplicateFiles(), conflictDetection, strictModeChecker, - rollback); + rollback, + createCommitPreCallbacks(commitUser, table)); } @Override @@ -427,6 +430,15 @@ private List createCommitCallbacks(String commitUser, FileStoreT return callbacks; } + private List createCommitPreCallbacks( + String commitUser, FileStoreTable table) { + List callbacks = new ArrayList<>(); + if (options.isChainTable()) { + callbacks.add(new ChainTableCommitPreCallback(table)); + } + return callbacks; + } + @Override @Nullable public PartitionExpire newPartitionExpire(String commitUser, FileStoreTable table) { diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableCommitPreCallback.java b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableCommitPreCallback.java new file mode 100644 index 000000000000..0b26cb637540 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableCommitPreCallback.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.metastore; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.Snapshot.CommitKind; +import org.apache.paimon.codegen.CodeGenUtils; +import org.apache.paimon.codegen.RecordComparator; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.manifest.SimpleFileEntry; +import org.apache.paimon.operation.commit.ManifestEntryChanges; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitPreCallback; +import org.apache.paimon.table.source.snapshot.SnapshotReader; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.ChainTableUtils; +import org.apache.paimon.utils.InternalRowPartitionComputer; +import org.apache.paimon.utils.RowDataToObjectArrayConverter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * {@link CommitPreCallback} implementation for chain tables. + * + *

This callback performs a pre-check before dropping partitions on the snapshot branch of a + * chain table. It verifies that a snapshot partition being dropped is either followed by no delta + * partitions in the chain interval or has a previous snapshot partition that can serve as its + * predecessor. + * + *

The callback is only executed when all of following conditions are met: + * + *

    + *
  • The table is configured as a chain table and the current branch is the snapshot branch (see + * {@link ChainTableUtils#isScanFallbackSnapshotBranch(CoreOptions)}). + *
  • The committed snapshot kind is {@link CommitKind#OVERWRITE}. + *
  • All table and index manifest entries in the commit are {@link FileKind#DELETE deletes}. + *
+ * + *

If the validation fails for any of the affected partitions, a {@link RuntimeException} is + * thrown and the commit is aborted. + * + *

This implementation keeps only references to the table and its options and does not maintain + * mutable state between invocations. + */ +public class ChainTableCommitPreCallback implements CommitPreCallback { + + private static final Logger LOG = LoggerFactory.getLogger(ChainTableCommitPreCallback.class); + + private transient FileStoreTable table; + private transient CoreOptions coreOptions; + + public ChainTableCommitPreCallback(FileStoreTable table) { + this.table = table; + this.coreOptions = table.coreOptions(); + } + + @Override + public void call( + List baseFiles, + List deltaFiles, + List indexFiles, + Snapshot snapshot) { + if (!ChainTableUtils.isScanFallbackSnapshotBranch(coreOptions)) { + return; + } + if (snapshot.commitKind() != CommitKind.OVERWRITE) { + return; + } + if (!isPureDeleteCommit(deltaFiles, indexFiles)) { + return; + } + List changedPartitions = + ManifestEntryChanges.changedPartitions(deltaFiles, indexFiles); + FileStoreTable candidateTable = ChainTableUtils.resolveChainPrimaryTable(table); + FileStoreTable deltaTable = + candidateTable.switchToBranch(coreOptions.scanFallbackDeltaBranch()); + RowType partitionType = table.schema().logicalPartitionType(); + RowDataToObjectArrayConverter partitionConverter = + new RowDataToObjectArrayConverter(partitionType); + InternalRowPartitionComputer partitionComputer = + new InternalRowPartitionComputer( + coreOptions.partitionDefaultName(), + partitionType, + table.schema().partitionKeys().toArray(new String[0]), + coreOptions.legacyPartitionName()); + RecordComparator partitionComparator = + CodeGenUtils.newRecordComparator(partitionType.getFieldTypes()); + List snapshotPartitions = + table.newSnapshotReader().partitionEntries().stream() + .map(PartitionEntry::partition) + .sorted(partitionComparator) + .collect(Collectors.toList()); + SnapshotReader deltaSnapshotReader = deltaTable.newSnapshotReader(); + PredicateBuilder builder = new PredicateBuilder(partitionType); + for (BinaryRow partition : changedPartitions) { + Optional preSnapshotPartition = + findPreSnapshotPartition(snapshotPartitions, partition, partitionComparator); + Optional nextSnapshotPartition = + findNextSnapshotPartition(snapshotPartitions, partition, partitionComparator); + Predicate deltaFollowingPredicate = + ChainTableUtils.createTriangularPredicate( + partition, partitionConverter, builder::equal, builder::greaterThan); + List deltaFollowingPartitions = + deltaSnapshotReader.withPartitionFilter(deltaFollowingPredicate) + .partitionEntries().stream() + .map(PartitionEntry::partition) + .filter( + deltaPartition -> + isBeforeNextSnapshotPartition( + deltaPartition, + nextSnapshotPartition, + partitionComparator)) + .collect(Collectors.toList()); + boolean canDrop = + deltaFollowingPartitions.isEmpty() || preSnapshotPartition.isPresent(); + LOG.info( + "Drop partition, partition={}, canDrop={}, preSnapshotPartition={}, nextSnapshotPartition={}", + partitionComputer.generatePartValues(partition), + canDrop, + generatePartitionValues(preSnapshotPartition, partitionComputer), + generatePartitionValues(nextSnapshotPartition, partitionComputer)); + if (!canDrop) { + throw new RuntimeException("Snapshot partition cannot be dropped."); + } + } + } + + private boolean isPureDeleteCommit( + List deltaFiles, List indexFiles) { + return deltaFiles.stream().allMatch(f -> f.kind() == FileKind.DELETE) + && indexFiles.stream().allMatch(f -> f.kind() == FileKind.DELETE); + } + + private Optional findPreSnapshotPartition( + List snapshotPartitions, + BinaryRow partition, + RecordComparator partitionComparator) { + BinaryRow pre = null; + for (BinaryRow snapshotPartition : snapshotPartitions) { + if (partitionComparator.compare(snapshotPartition, partition) < 0) { + pre = snapshotPartition; + } else { + break; + } + } + return Optional.ofNullable(pre); + } + + private Optional findNextSnapshotPartition( + List snapshotPartitions, + BinaryRow partition, + RecordComparator partitionComparator) { + for (BinaryRow snapshotPartition : snapshotPartitions) { + if (partitionComparator.compare(snapshotPartition, partition) > 0) { + return Optional.of(snapshotPartition); + } + } + return Optional.empty(); + } + + private boolean isBeforeNextSnapshotPartition( + BinaryRow partition, + Optional nextSnapshotPartition, + RecordComparator partitionComparator) { + return !nextSnapshotPartition.isPresent() + || partitionComparator.compare(partition, nextSnapshotPartition.get()) < 0; + } + + private String generatePartitionValues( + Optional partition, InternalRowPartitionComputer partitionComputer) { + if (!partition.isPresent()) { + return ""; + } + return partitionComputer.generatePartValues(partition.get()).toString(); + } + + @Override + public void close() throws Exception {} +} diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableOverwriteCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableOverwriteCommitCallback.java index 76defb1ed03e..08a8ea82d16d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableOverwriteCommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableOverwriteCommitCallback.java @@ -26,8 +26,6 @@ import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.SimpleFileEntry; -import org.apache.paimon.table.ChainGroupReadTable; -import org.apache.paimon.table.FallbackReadFileStoreTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.CommitCallback; @@ -78,13 +76,7 @@ public void call( return; } - // Find the underlying table for writing snapshot branch. - FileStoreTable candidateTable = table; - if (table instanceof FallbackReadFileStoreTable) { - candidateTable = - ((ChainGroupReadTable) ((FallbackReadFileStoreTable) table).fallback()) - .wrapped(); - } + FileStoreTable candidateTable = ChainTableUtils.resolveChainPrimaryTable(table); FileStoreTable snapshotTable = candidateTable.switchToBranch(coreOptions.scanFallbackSnapshotBranch()); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 6494b6185513..38461d6396b5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -66,6 +66,7 @@ import org.apache.paimon.table.sink.CommitCallback; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.table.sink.CommitPreCallback; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.DataFilePathFactories; import org.apache.paimon.utils.FileStorePathFactory; @@ -166,6 +167,8 @@ public class FileStoreCommitImpl implements FileStoreCommit { private CommitMetrics commitMetrics; private boolean appendCommitCheckConflict = false; + private final List commitPreCallbacks; + public FileStoreCommitImpl( SnapshotCommit snapshotCommit, FileIO fileIO, @@ -199,7 +202,8 @@ public FileStoreCommitImpl( boolean discardDuplicateFiles, ConflictDetection conflictDetection, @Nullable StrictModeChecker strictModeChecker, - @Nullable CommitRollback rollback) { + @Nullable CommitRollback rollback, + List commitPreCallbacks) { this.snapshotCommit = snapshotCommit; this.fileIO = fileIO; this.schemaManager = schemaManager; @@ -241,6 +245,7 @@ public FileStoreCommitImpl( this.strictModeChecker = strictModeChecker; this.conflictDetection = conflictDetection; this.commitCleaner = new CommitCleaner(manifestList, manifestFile, indexManifestFile); + this.commitPreCallbacks = commitPreCallbacks; } @Override @@ -999,6 +1004,11 @@ CommitResult tryCommitOnce( } boolean success; + final List finalBaseFiles = baseDataFiles; + final List finalDeltaFiles = deltaFiles; + commitPreCallbacks.forEach( + callback -> + callback.call(finalBaseFiles, finalDeltaFiles, indexFiles, newSnapshot)); try { success = commitSnapshotImpl(newSnapshot, deltaStatistics); } catch (Exception e) { @@ -1035,8 +1045,6 @@ CommitResult tryCommitOnce( if (strictModeChecker != null) { strictModeChecker.update(newSnapshotId); } - final List finalBaseFiles = baseDataFiles; - final List finalDeltaFiles = deltaFiles; commitCallbacks.forEach( callback -> callback.call(finalBaseFiles, finalDeltaFiles, indexFiles, newSnapshot)); @@ -1179,6 +1187,7 @@ private boolean commitSnapshotImpl(Snapshot newSnapshot, List de @Override public void close() { + IOUtils.closeAllQuietly(commitPreCallbacks); IOUtils.closeAllQuietly(commitCallbacks); IOUtils.closeQuietly(snapshotCommit); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitPreCallback.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitPreCallback.java new file mode 100644 index 000000000000..d9d3d7b64941 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitPreCallback.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.sink; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.SimpleFileEntry; + +import java.util.List; + +/** + * Callback which is invoked before a snapshot is committed. + * + *

This hook allows additional validation or bookkeeping to be performed before a snapshot + * becomes visible. Implementations can inspect the files that are about to be committed as well as + * the {@link Snapshot} metadata and decide whether the commit should proceed. + * + *

If {@link #call(List, List, List, Snapshot)} throws a {@link RuntimeException}, the commit is + * aborted and the snapshot will not be committed. Implementations are expected to be fast and + * either side effect free or idempotent, because a single logical commit may be retried and this + * callback might therefore be invoked multiple times for the same logical changes. + * + *

Implementations may optionally override {@link AutoCloseable#close()} if they hold external + * resources that need to be released when the surrounding {@link org.apache.paimon.table.Table} + * commit is closed. + */ +public interface CommitPreCallback extends AutoCloseable { + + void call( + List baseFiles, + List deltaFiles, + List indexFiles, + Snapshot snapshot); +} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java index fd4cdff4d689..f6c573440f7d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java @@ -25,6 +25,9 @@ import org.apache.paimon.partition.PartitionTimeExtractor; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.table.ChainGroupReadTable; +import org.apache.paimon.table.FallbackReadFileStoreTable; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.RowType; import java.time.LocalDateTime; @@ -209,4 +212,17 @@ public static boolean isScanFallbackDeltaBranch(CoreOptions options) { return options.isChainTable() && options.scanFallbackDeltaBranch().equalsIgnoreCase(options.branch()); } + + public static boolean isScanFallbackSnapshotBranch(CoreOptions options) { + return options.isChainTable() + && options.scanFallbackSnapshotBranch().equalsIgnoreCase(options.branch()); + } + + public static FileStoreTable resolveChainPrimaryTable(FileStoreTable table) { + if (table.coreOptions().isChainTable() && table instanceof FallbackReadFileStoreTable) { + return ((ChainGroupReadTable) ((FallbackReadFileStoreTable) table).fallback()) + .wrapped(); + } + return table; + } } diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java index b5a28fe50010..8f25a3841a26 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java @@ -33,6 +33,8 @@ import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Base tests for spark read. */ public class SparkChainTableITCase { @@ -53,70 +55,10 @@ public static void closeMetastore() throws Exception { @Test public void testChainTable(@TempDir java.nio.file.Path tempDir) throws IOException { - Path warehousePath = new Path("file:" + tempDir.toString()); - SparkSession.Builder builder = - SparkSession.builder() - .config("spark.sql.warehouse.dir", warehousePath.toString()) - // with hive metastore - .config("spark.sql.catalogImplementation", "hive") - .config("hive.metastore.uris", "thrift://localhost:" + PORT) - .config("spark.sql.catalog.spark_catalog", SparkCatalog.class.getName()) - .config("spark.sql.catalog.spark_catalog.metastore", "hive") - .config( - "spark.sql.catalog.spark_catalog.hive.metastore.uris", - "thrift://localhost:" + PORT) - .config("spark.sql.catalog.spark_catalog.format-table.enabled", "true") - .config( - "spark.sql.catalog.spark_catalog.warehouse", - warehousePath.toString()) - .config( - "spark.sql.extensions", - "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") - .master("local[2]"); - SparkSession spark = builder.getOrCreate(); - spark.sql("CREATE DATABASE IF NOT EXISTS my_db1"); - spark.sql("USE spark_catalog.my_db1"); - - /** Create table */ - spark.sql( - "CREATE TABLE IF NOT EXISTS \n" - + " `my_db1`.`chain_test` (\n" - + " `t1` BIGINT COMMENT 't1',\n" - + " `t2` BIGINT COMMENT 't2',\n" - + " `t3` STRING COMMENT 't3'\n" - + " ) PARTITIONED BY (`dt` STRING COMMENT 'dt') ROW FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n" - + "WITH\n" - + " SERDEPROPERTIES ('serialization.format' = '1') STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat' OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES (\n" - + " 'bucket-key' = 't1',\n" - + " 'primary-key' = 'dt,t1',\n" - + " 'partition.timestamp-pattern' = '$dt',\n" - + " 'partition.timestamp-formatter' = 'yyyyMMdd',\n" - + " 'chain-table.enabled' = 'true',\n" - + " 'bucket' = '2',\n" - + " 'merge-engine' = 'deduplicate', \n" - + " 'sequence.field' = 't2'\n" - + " )"); - - /** Create branch */ - spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');"); - spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')"); - - /** Set branch */ - spark.sql( - "ALTER TABLE my_db1.chain_test SET tblproperties (" - + "'scan.fallback-snapshot-branch' = 'snapshot', " - + "'scan.fallback-delta-branch' = 'delta')"); - spark.sql( - "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET tblproperties (" - + "'scan.fallback-snapshot-branch' = 'snapshot'," - + "'scan.fallback-delta-branch' = 'delta')"); - spark.sql( - "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET tblproperties (" - + "'scan.fallback-snapshot-branch' = 'snapshot'," - + "'scan.fallback-delta-branch' = 'delta')"); - spark.close(); - spark = builder.getOrCreate(); + SparkSession.Builder builder = initSparkClient(tempDir); + initDailyChainTable(builder); + SparkSession spark = builder.getOrCreate(); /** Write main branch */ spark.sql( "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250810') values (1, 1, '1'),(2, 1, '1');"); @@ -269,80 +211,16 @@ public void testChainTable(@TempDir java.nio.file.Path tempDir) throws IOExcepti spark.sql( "SELECT t1,t2,t3 FROM `my_db1`.`chain_test$branch_delta` where dt = '20250814'"); assertThat(df.count()).isEqualTo(1); - spark.close(); - spark = builder.getOrCreate(); - /** Drop table */ - spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;"); - spark.close(); + dropTable(builder); } @Test public void testHourlyChainTable(@TempDir java.nio.file.Path tempDir) throws IOException { - Path warehousePath = new Path("file:" + tempDir.toString()); - SparkSession.Builder builder = - SparkSession.builder() - .config("spark.sql.warehouse.dir", warehousePath.toString()) - // with hive metastore - .config("spark.sql.catalogImplementation", "hive") - .config("hive.metastore.uris", "thrift://localhost:" + PORT) - .config("spark.sql.catalog.spark_catalog", SparkCatalog.class.getName()) - .config("spark.sql.catalog.spark_catalog.metastore", "hive") - .config( - "spark.sql.catalog.spark_catalog.hive.metastore.uris", - "thrift://localhost:" + PORT) - .config("spark.sql.catalog.spark_catalog.format-table.enabled", "true") - .config( - "spark.sql.catalog.spark_catalog.warehouse", - warehousePath.toString()) - .config( - "spark.sql.extensions", - "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") - .master("local[2]"); + SparkSession.Builder builder = initSparkClient(tempDir); + initHourlyChainTable(builder); SparkSession spark = builder.getOrCreate(); - spark.sql("CREATE DATABASE IF NOT EXISTS my_db1"); - spark.sql("USE spark_catalog.my_db1"); - - /** Create table */ - spark.sql( - "CREATE TABLE IF NOT EXISTS \n" - + " `my_db1`.`chain_test` (\n" - + " `t1` BIGINT COMMENT 't1',\n" - + " `t2` BIGINT COMMENT 't2',\n" - + " `t3` STRING COMMENT 't3'\n" - + " ) PARTITIONED BY (`dt` STRING COMMENT 'dt', `hour` STRING COMMENT 'hour') ROW FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n" - + "WITH\n" - + " SERDEPROPERTIES ('serialization.format' = '1') STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat' OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES (\n" - + " 'bucket-key' = 't1',\n" - + " 'primary-key' = 'dt,hour,t1',\n" - + " 'partition.timestamp-pattern' = '$dt $hour:00:00',\n" - + " 'partition.timestamp-formatter' = 'yyyyMMdd HH:mm:ss',\n" - + " 'chain-table.enabled' = 'true',\n" - + " 'bucket' = '2',\n" - + " 'merge-engine' = 'deduplicate', \n" - + " 'sequence.field' = 't2'\n" - + " )"); - - /** Create branch */ - spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');"); - spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')"); - - /** Set branch */ - spark.sql( - "ALTER TABLE my_db1.chain_test SET tblproperties (" - + "'scan.fallback-snapshot-branch' = 'snapshot', " - + "'scan.fallback-delta-branch' = 'delta')"); - spark.sql( - "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET tblproperties (" - + "'scan.fallback-snapshot-branch' = 'snapshot'," - + "'scan.fallback-delta-branch' = 'delta')"); - spark.sql( - "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET tblproperties (" - + "'scan.fallback-snapshot-branch' = 'snapshot'," - + "'scan.fallback-delta-branch' = 'delta')"); - spark.close(); - spark = builder.getOrCreate(); /** Write main branch */ spark.sql( @@ -502,8 +380,173 @@ public void testHourlyChainTable(@TempDir java.nio.file.Path tempDir) throws IOE spark.close(); spark = builder.getOrCreate(); /** Drop table */ - spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;"); + spark.close(); + } + @Test + public void testDropSnapshotPartition(@TempDir java.nio.file.Path tempDir) throws IOException { + SparkSession.Builder builder = initSparkClient(tempDir); + initDailyChainTable(builder); + + SparkSession spark = builder.getOrCreate(); + /** Write delta branch */ + spark.sql("set spark.paimon.branch=delta;"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20260101') values (1, 1, '1'),(2, 1, '1');"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20260102') values (1, 2, '1-1' ),(3, 1, '1' );"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20260103') values (2, 2, '1-1' ),(4, 1, '1' );"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20260104') values (3, 2, '1-1' ),(4, 2, '1-1' );"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20260105') values (5, 1, '1' ),(6, 1, '1' );"); + + /** Write snapshot branch */ + spark.sql("set spark.paimon.branch=snapshot;"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20260101') values (1, 2, '1-1'),(2, 1, '1'),(3, 1, '1');"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20260103') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1');"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20260105') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1'), (5, 1, '1' ), (6, 1, '1');"); + spark.close(); + + final SparkSession session = builder.getOrCreate(); + assertThatNoException() + .isThrownBy( + () -> { + session.sql( + "alter table `my_db1`.`chain_test$branch_snapshot` drop partition (dt = '20260105');"); + }); + assertThatThrownBy( + () -> { + session.sql( + "alter table `my_db1`.`chain_test$branch_snapshot` drop partition (dt = '20260101');"); + }); + session.close(); + + dropTable(builder); + } + + private SparkSession.Builder initSparkClient(@TempDir java.nio.file.Path tempDir) { + Path warehousePath = new Path("file:" + tempDir.toString()); + SparkSession.Builder builder = + SparkSession.builder() + .config("spark.sql.warehouse.dir", warehousePath.toString()) + // with hive metastore + .config("spark.sql.catalogImplementation", "hive") + .config("hive.metastore.uris", "thrift://localhost:" + PORT) + .config("spark.sql.catalog.spark_catalog", SparkCatalog.class.getName()) + .config("spark.sql.catalog.spark_catalog.metastore", "hive") + .config( + "spark.sql.catalog.spark_catalog.hive.metastore.uris", + "thrift://localhost:" + PORT) + .config("spark.sql.catalog.spark_catalog.format-table.enabled", "true") + .config( + "spark.sql.catalog.spark_catalog.warehouse", + warehousePath.toString()) + .config( + "spark.sql.extensions", + "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") + .master("local[2]"); + return builder; + } + + private void initDailyChainTable(SparkSession.Builder builder) throws IOException { + SparkSession spark = builder.getOrCreate(); + spark.sql("CREATE DATABASE IF NOT EXISTS my_db1"); + spark.sql("USE spark_catalog.my_db1"); + + /** Create table */ + spark.sql( + "CREATE TABLE IF NOT EXISTS \n" + + " `my_db1`.`chain_test` (\n" + + " `t1` BIGINT COMMENT 't1',\n" + + " `t2` BIGINT COMMENT 't2',\n" + + " `t3` STRING COMMENT 't3'\n" + + " ) PARTITIONED BY (`dt` STRING COMMENT 'dt') ROW FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n" + + "WITH\n" + + " SERDEPROPERTIES ('serialization.format' = '1') STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat' OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES (\n" + + " 'bucket-key' = 't1',\n" + + " 'primary-key' = 'dt,t1',\n" + + " 'partition.timestamp-pattern' = '$dt',\n" + + " 'partition.timestamp-formatter' = 'yyyyMMdd',\n" + + " 'chain-table.enabled' = 'true',\n" + + " 'bucket' = '2',\n" + + " 'merge-engine' = 'deduplicate', \n" + + " 'sequence.field' = 't2'\n" + + " )"); + + /** Create branch */ + spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');"); + spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')"); + + /** Set branch */ + spark.sql( + "ALTER TABLE my_db1.chain_test SET tblproperties (" + + "'scan.fallback-snapshot-branch' = 'snapshot', " + + "'scan.fallback-delta-branch' = 'delta')"); + spark.sql( + "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET tblproperties (" + + "'scan.fallback-snapshot-branch' = 'snapshot'," + + "'scan.fallback-delta-branch' = 'delta')"); + spark.sql( + "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET tblproperties (" + + "'scan.fallback-snapshot-branch' = 'snapshot'," + + "'scan.fallback-delta-branch' = 'delta')"); + + spark.close(); + } + + private void initHourlyChainTable(SparkSession.Builder builder) throws IOException { + SparkSession spark = builder.getOrCreate(); + spark.sql("CREATE DATABASE IF NOT EXISTS my_db1"); + spark.sql("USE spark_catalog.my_db1"); + + /** Create table */ + spark.sql( + "CREATE TABLE IF NOT EXISTS \n" + + " `my_db1`.`chain_test` (\n" + + " `t1` BIGINT COMMENT 't1',\n" + + " `t2` BIGINT COMMENT 't2',\n" + + " `t3` STRING COMMENT 't3'\n" + + " ) PARTITIONED BY (`dt` STRING COMMENT 'dt', `hour` STRING COMMENT 'hour') ROW FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n" + + "WITH\n" + + " SERDEPROPERTIES ('serialization.format' = '1') STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat' OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES (\n" + + " 'bucket-key' = 't1',\n" + + " 'primary-key' = 'dt,hour,t1',\n" + + " 'partition.timestamp-pattern' = '$dt $hour:00:00',\n" + + " 'partition.timestamp-formatter' = 'yyyyMMdd HH:mm:ss',\n" + + " 'chain-table.enabled' = 'true',\n" + + " 'bucket' = '2',\n" + + " 'merge-engine' = 'deduplicate', \n" + + " 'sequence.field' = 't2'\n" + + " )"); + + /** Create branch */ + spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');"); + spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')"); + + /** Set branch */ + spark.sql( + "ALTER TABLE my_db1.chain_test SET tblproperties (" + + "'scan.fallback-snapshot-branch' = 'snapshot', " + + "'scan.fallback-delta-branch' = 'delta')"); + spark.sql( + "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET tblproperties (" + + "'scan.fallback-snapshot-branch' = 'snapshot'," + + "'scan.fallback-delta-branch' = 'delta')"); + spark.sql( + "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET tblproperties (" + + "'scan.fallback-snapshot-branch' = 'snapshot'," + + "'scan.fallback-delta-branch' = 'delta')"); + spark.close(); + } + + private void dropTable(SparkSession.Builder builder) throws IOException { + SparkSession spark = builder.getOrCreate(); + spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;"); spark.close(); } }