diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogScopedCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogScopedCacheMgr.java new file mode 100644 index 00000000000000..6b9afc1b74a6de --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogScopedCacheMgr.java @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; + +public class CatalogScopedCacheMgr { + private final Map cacheMap = new ConcurrentHashMap<>(); + private final Function cacheFactory; + + public CatalogScopedCacheMgr(Function cacheFactory) { + this.cacheFactory = cacheFactory; + } + + public T getCache(ExternalCatalog catalog) { + return cacheMap.computeIfAbsent(catalog.getId(), id -> cacheFactory.apply(catalog)); + } + + public T getCache(long catalogId) { + return cacheMap.get(catalogId); + } + + public T removeCache(long catalogId) { + return cacheMap.remove(catalogId); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 3501fcafb5e3f9..1ccb3801b9b016 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -1450,4 +1450,3 @@ public void reorderColumns(TableIf dorisTable, List newOrder) throws Use } } } - diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index 8d6aa5522f1f9f..9b44833f01a67c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -36,7 +36,6 @@ import org.apache.doris.datasource.metacache.MetaCache; import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.datasource.paimon.PaimonMetadataCache; -import org.apache.doris.datasource.paimon.PaimonMetadataCacheMgr; import org.apache.doris.fs.FileSystemCache; import org.apache.doris.nereids.exceptions.NotSupportedException; @@ -87,13 +86,10 @@ public class ExternalMetaCacheMgr { // This executor is used to schedule the getting split tasks private ExecutorService scheduleExecutor; - // catalog id -> HiveMetaStoreCache - private final Map hiveCacheMap = Maps.newConcurrentMap(); - - // catalog id -> IcebergMetadataCache - private final Map icebergCacheMap = Maps.newConcurrentMap(); - // catalog id -> table schema cache - private final Map schemaCacheMap = Maps.newHashMap(); + private final CatalogScopedCacheMgr hiveMetaStoreCacheMgr; + private final CatalogScopedCacheMgr icebergMetadataCacheMgr; + private final CatalogScopedCacheMgr paimonMetadataCacheMgr; + private final CatalogScopedCacheMgr schemaCacheMgr; // hudi partition manager private final HudiMetadataCacheMgr hudiMetadataCacheMgr; // all catalogs could share the same fsCache. @@ -101,7 +97,6 @@ public class ExternalMetaCacheMgr { // all external table row count cache. private ExternalRowCountCache rowCountCache; private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr; - private final PaimonMetadataCacheMgr paimonMetadataCacheMgr; private final DorisExternalMetaCacheMgr dorisExternalMetaCacheMgr; public ExternalMetaCacheMgr(boolean isCheckpointCatalog) { @@ -132,7 +127,15 @@ public ExternalMetaCacheMgr(boolean isCheckpointCatalog) { hudiMetadataCacheMgr = new HudiMetadataCacheMgr(commonRefreshExecutor); maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr(); - paimonMetadataCacheMgr = new PaimonMetadataCacheMgr(commonRefreshExecutor); + hiveMetaStoreCacheMgr = new CatalogScopedCacheMgr<>( + catalog -> new HiveMetaStoreCache((HMSExternalCatalog) catalog, + commonRefreshExecutor, fileListingExecutor)); + icebergMetadataCacheMgr = new CatalogScopedCacheMgr<>( + catalog -> new IcebergMetadataCache(catalog, commonRefreshExecutor)); + schemaCacheMgr = new CatalogScopedCacheMgr<>( + catalog -> new ExternalSchemaCache(catalog, commonRefreshExecutor)); + paimonMetadataCacheMgr = new CatalogScopedCacheMgr<>( + catalog -> new PaimonMetadataCache(catalog, commonRefreshExecutor)); dorisExternalMetaCacheMgr = new DorisExternalMetaCacheMgr(commonRefreshExecutor); } @@ -160,30 +163,11 @@ public ExecutorService getScheduleExecutor() { } public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) { - HiveMetaStoreCache cache = hiveCacheMap.get(catalog.getId()); - if (cache == null) { - synchronized (hiveCacheMap) { - if (!hiveCacheMap.containsKey(catalog.getId())) { - hiveCacheMap.put(catalog.getId(), - new HiveMetaStoreCache(catalog, commonRefreshExecutor, fileListingExecutor)); - } - cache = hiveCacheMap.get(catalog.getId()); - } - } - return cache; + return hiveMetaStoreCacheMgr.getCache(catalog); } public ExternalSchemaCache getSchemaCache(ExternalCatalog catalog) { - ExternalSchemaCache cache = schemaCacheMap.get(catalog.getId()); - if (cache == null) { - synchronized (schemaCacheMap) { - if (!schemaCacheMap.containsKey(catalog.getId())) { - schemaCacheMap.put(catalog.getId(), new ExternalSchemaCache(catalog, commonRefreshExecutor)); - } - cache = schemaCacheMap.get(catalog.getId()); - } - } - return cache; + return schemaCacheMgr.getCache(catalog); } public HudiPartitionProcessor getHudiPartitionProcess(ExternalCatalog catalog) { @@ -203,20 +187,11 @@ public HudiMetadataCacheMgr getHudiMetadataCacheMgr() { } public IcebergMetadataCache getIcebergMetadataCache(ExternalCatalog catalog) { - IcebergMetadataCache cache = icebergCacheMap.get(catalog.getId()); - if (cache == null) { - synchronized (icebergCacheMap) { - if (!icebergCacheMap.containsKey(catalog.getId())) { - icebergCacheMap.put(catalog.getId(), new IcebergMetadataCache(catalog, commonRefreshExecutor)); - } - cache = icebergCacheMap.get(catalog.getId()); - } - } - return cache; + return icebergMetadataCacheMgr.getCache(catalog); } - public PaimonMetadataCache getPaimonMetadataCache() { - return paimonMetadataCacheMgr.getPaimonMetadataCache(); + public PaimonMetadataCache getPaimonMetadataCache(ExternalCatalog catalog) { + return paimonMetadataCacheMgr.getCache(catalog); } public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) { @@ -236,41 +211,43 @@ public DorisExternalMetaCacheMgr getDorisExternalMetaCacheMgr() { } public void removeCache(long catalogId) { - if (hiveCacheMap.remove(catalogId) != null) { + if (hiveMetaStoreCacheMgr.removeCache(catalogId) != null) { LOG.info("remove hive metastore cache for catalog {}", catalogId); } - synchronized (schemaCacheMap) { - if (schemaCacheMap.remove(catalogId) != null) { - LOG.info("remove schema cache for catalog {}", catalogId); - } + if (schemaCacheMgr.removeCache(catalogId) != null) { + LOG.info("remove schema cache for catalog {}", catalogId); } - if (icebergCacheMap.remove(catalogId) != null) { + if (icebergMetadataCacheMgr.removeCache(catalogId) != null) { LOG.info("remove iceberg meta cache for catalog {}", catalogId); } hudiMetadataCacheMgr.removeCache(catalogId); maxComputeMetadataCacheMgr.removeCache(catalogId); - paimonMetadataCacheMgr.removeCache(catalogId); + PaimonMetadataCache paimonMetadataCache = paimonMetadataCacheMgr.removeCache(catalogId); + if (paimonMetadataCache != null) { + paimonMetadataCache.invalidateCatalogCache(catalogId); + } dorisExternalMetaCacheMgr.removeCache(catalogId); } public void invalidateTableCache(ExternalTable dorisTable) { - synchronized (schemaCacheMap) { - ExternalSchemaCache schemaCache = schemaCacheMap.get(dorisTable.getCatalog().getId()); - if (schemaCache != null) { - schemaCache.invalidateTableCache(dorisTable); - } + ExternalSchemaCache schemaCache = schemaCacheMgr.getCache(dorisTable.getCatalog().getId()); + if (schemaCache != null) { + schemaCache.invalidateTableCache(dorisTable); } - HiveMetaStoreCache hiveMetaCache = hiveCacheMap.get(dorisTable.getCatalog().getId()); + HiveMetaStoreCache hiveMetaCache = hiveMetaStoreCacheMgr.getCache(dorisTable.getCatalog().getId()); if (hiveMetaCache != null) { hiveMetaCache.invalidateTableCache(dorisTable.getOrBuildNameMapping()); } - IcebergMetadataCache icebergMetadataCache = icebergCacheMap.get(dorisTable.getCatalog().getId()); + IcebergMetadataCache icebergMetadataCache = icebergMetadataCacheMgr.getCache(dorisTable.getCatalog().getId()); if (icebergMetadataCache != null) { icebergMetadataCache.invalidateTableCache(dorisTable); } hudiMetadataCacheMgr.invalidateTableCache(dorisTable); maxComputeMetadataCacheMgr.invalidateTableCache(dorisTable); - paimonMetadataCacheMgr.invalidateTableCache(dorisTable); + PaimonMetadataCache paimonMetadataCache = paimonMetadataCacheMgr.getCache(dorisTable.getCatalog().getId()); + if (paimonMetadataCache != null) { + paimonMetadataCache.invalidateTableCache(dorisTable); + } if (LOG.isDebugEnabled()) { LOG.debug("invalid table cache for {}.{} in catalog {}", dorisTable.getRemoteDbName(), dorisTable.getRemoteName(), dorisTable.getCatalog().getName()); @@ -279,43 +256,45 @@ public void invalidateTableCache(ExternalTable dorisTable) { public void invalidateDbCache(long catalogId, String dbName) { dbName = ClusterNamespace.getNameFromFullName(dbName); - synchronized (schemaCacheMap) { - ExternalSchemaCache schemaCache = schemaCacheMap.get(catalogId); - if (schemaCache != null) { - schemaCache.invalidateDbCache(dbName); - } + ExternalSchemaCache schemaCache = schemaCacheMgr.getCache(catalogId); + if (schemaCache != null) { + schemaCache.invalidateDbCache(dbName); } - HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId); + HiveMetaStoreCache metaCache = hiveMetaStoreCacheMgr.getCache(catalogId); if (metaCache != null) { metaCache.invalidateDbCache(dbName); } - IcebergMetadataCache icebergMetadataCache = icebergCacheMap.get(catalogId); + IcebergMetadataCache icebergMetadataCache = icebergMetadataCacheMgr.getCache(catalogId); if (icebergMetadataCache != null) { icebergMetadataCache.invalidateDbCache(catalogId, dbName); } hudiMetadataCacheMgr.invalidateDbCache(catalogId, dbName); maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName); - paimonMetadataCacheMgr.invalidateDbCache(catalogId, dbName); + PaimonMetadataCache paimonMetadataCache = paimonMetadataCacheMgr.getCache(catalogId); + if (paimonMetadataCache != null) { + paimonMetadataCache.invalidateDbCache(catalogId, dbName); + } if (LOG.isDebugEnabled()) { LOG.debug("invalid db cache for {} in catalog {}", dbName, catalogId); } } public void invalidateCatalogCache(long catalogId) { - synchronized (schemaCacheMap) { - schemaCacheMap.remove(catalogId); - } - HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId); + schemaCacheMgr.removeCache(catalogId); + HiveMetaStoreCache metaCache = hiveMetaStoreCacheMgr.getCache(catalogId); if (metaCache != null) { metaCache.invalidateAll(); } - IcebergMetadataCache icebergMetadataCache = icebergCacheMap.get(catalogId); + IcebergMetadataCache icebergMetadataCache = icebergMetadataCacheMgr.getCache(catalogId); if (icebergMetadataCache != null) { icebergMetadataCache.invalidateCatalogCache(catalogId); } hudiMetadataCacheMgr.invalidateCatalogCache(catalogId); maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId); - paimonMetadataCacheMgr.invalidateCatalogCache(catalogId); + PaimonMetadataCache paimonMetadataCache = paimonMetadataCacheMgr.getCache(catalogId); + if (paimonMetadataCache != null) { + paimonMetadataCache.invalidateCatalogCache(catalogId); + } dorisExternalMetaCacheMgr.invalidateCatalogCache(catalogId); if (LOG.isDebugEnabled()) { LOG.debug("invalid catalog cache for {}", catalogId); @@ -323,14 +302,12 @@ public void invalidateCatalogCache(long catalogId) { } public void invalidSchemaCache(long catalogId) { - synchronized (schemaCacheMap) { - schemaCacheMap.remove(catalogId); - } + schemaCacheMgr.removeCache(catalogId); } public void addPartitionsCache(long catalogId, HMSExternalTable table, List partitionNames) { String dbName = ClusterNamespace.getNameFromFullName(table.getDbName()); - HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId); + HiveMetaStoreCache metaCache = hiveMetaStoreCacheMgr.getCache(catalogId); if (metaCache != null) { List partitionColumnTypes; try { @@ -348,7 +325,7 @@ public void addPartitionsCache(long catalogId, HMSExternalTable table, List partitionNames) { String dbName = ClusterNamespace.getNameFromFullName(table.getDbName()); - HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId); + HiveMetaStoreCache metaCache = hiveMetaStoreCacheMgr.getCache(catalogId); if (metaCache != null) { metaCache.dropPartitionsCache(table, partitionNames, true); } @@ -358,7 +335,7 @@ public void dropPartitionsCache(long catalogId, HMSExternalTable table, List partitionNames) { - HiveMetaStoreCache metaCache = hiveCacheMap.get(dorisTable.getCatalog().getId()); + HiveMetaStoreCache metaCache = hiveMetaStoreCacheMgr.getCache(dorisTable.getCatalog().getId()); if (metaCache != null) { for (String partitionName : partitionNames) { metaCache.invalidatePartitionCache(dorisTable, partitionName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java index a1c0236eeb47a8..cf129ea8623f6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java @@ -19,6 +19,7 @@ import org.apache.doris.common.CacheFactory; import org.apache.doris.common.Config; +import org.apache.doris.datasource.metacache.CacheSpec; import org.apache.doris.metric.GaugeMetric; import org.apache.doris.metric.Metric; import org.apache.doris.metric.MetricLabel; @@ -26,7 +27,6 @@ import com.github.benmanes.caffeine.cache.LoadingCache; import lombok.Data; -import org.apache.commons.lang3.math.NumberUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -49,13 +49,14 @@ public ExternalSchemaCache(ExternalCatalog catalog, ExecutorService executor) { } private void init(ExecutorService executor) { - long schemaCacheTtlSecond = NumberUtils.toLong( - (catalog.getProperties().get(ExternalCatalog.SCHEMA_CACHE_TTL_SECOND)), ExternalCatalog.CACHE_NO_TTL); + CacheSpec cacheSpec = CacheSpec.fromTtlValue( + catalog.getProperties().get(ExternalCatalog.SCHEMA_CACHE_TTL_SECOND), + Config.external_cache_expire_time_seconds_after_access, + Config.max_external_schema_cache_num); CacheFactory schemaCacheFactory = new CacheFactory( - OptionalLong.of(schemaCacheTtlSecond >= ExternalCatalog.CACHE_TTL_DISABLE_CACHE - ? schemaCacheTtlSecond : Config.external_cache_expire_time_seconds_after_access), + CacheSpec.toExpireAfterAccess(cacheSpec.getTtlSecond()), OptionalLong.of(Config.external_cache_refresh_time_minutes * 60), - Config.max_external_schema_cache_num, + cacheSpec.getCapacity(), false, null); schemaCache = schemaCacheFactory.buildCache(this::loadSchema, executor); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index 039bba4ed0f634..ee8ad8b4fc0fbc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -28,11 +28,11 @@ import org.apache.doris.datasource.ExternalObjectLog; import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.SessionContext; +import org.apache.doris.datasource.metacache.CacheSpec; import org.apache.doris.datasource.operations.ExternalMetadataOperations; import org.apache.doris.datasource.property.metastore.AbstractIcebergProperties; import org.apache.doris.transaction.TransactionManagerFactory; -import org.apache.commons.lang3.math.NumberUtils; import org.apache.iceberg.catalog.Catalog; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -52,12 +52,14 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { public static final String ICEBERG_DLF = "dlf"; public static final String ICEBERG_S3_TABLES = "s3tables"; public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name"; - public static final String ICEBERG_TABLE_META_CACHE_TTL_SECOND = "iceberg.table.meta.cache.ttl-second"; - public static final String ICEBERG_MANIFEST_CACHE_ENABLE = "iceberg.manifest.cache.enable"; - public static final String ICEBERG_MANIFEST_CACHE_CAPACITY_MB = "iceberg.manifest.cache.capacity-mb"; - public static final String ICEBERG_MANIFEST_CACHE_TTL_SECOND = "iceberg.manifest.cache.ttl-second"; + public static final String ICEBERG_TABLE_CACHE_ENABLE = "meta.cache.iceberg.table.enable"; + public static final String ICEBERG_TABLE_CACHE_TTL_SECOND = "meta.cache.iceberg.table.ttl-second"; + public static final String ICEBERG_TABLE_CACHE_CAPACITY = "meta.cache.iceberg.table.capacity"; + public static final String ICEBERG_MANIFEST_CACHE_ENABLE = "meta.cache.iceberg.manifest.enable"; + public static final String ICEBERG_MANIFEST_CACHE_TTL_SECOND = "meta.cache.iceberg.manifest.ttl-second"; + public static final String ICEBERG_MANIFEST_CACHE_CAPACITY = "meta.cache.iceberg.manifest.capacity"; public static final boolean DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE = false; - public static final long DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY_MB = 1024; + public static final long DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY = 1024; public static final long DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND = 48 * 60 * 60; protected String icebergCatalogType; protected Catalog catalog; @@ -86,51 +88,33 @@ protected void initCatalog() { @Override public void checkProperties() throws DdlException { super.checkProperties(); - // check iceberg.table.meta.cache.ttl-second parameter - String tableMetaCacheTtlSecond = catalogProperty.getOrDefault(ICEBERG_TABLE_META_CACHE_TTL_SECOND, null); - if (Objects.nonNull(tableMetaCacheTtlSecond) && NumberUtils.toInt(tableMetaCacheTtlSecond, CACHE_NO_TTL) - < CACHE_TTL_DISABLE_CACHE) { - throw new DdlException( - "The parameter " + ICEBERG_TABLE_META_CACHE_TTL_SECOND + " is wrong, value is " - + tableMetaCacheTtlSecond); - } - - String manifestCacheEnable = catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE, null); - if (Objects.nonNull(manifestCacheEnable) - && !(manifestCacheEnable.equalsIgnoreCase("true") || manifestCacheEnable.equalsIgnoreCase("false"))) { - throw new DdlException( - "The parameter " + ICEBERG_MANIFEST_CACHE_ENABLE + " is wrong, value is " - + manifestCacheEnable); - } - - String manifestCacheCapacity = catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_CAPACITY_MB, null); - if (Objects.nonNull(manifestCacheCapacity) && NumberUtils.toLong(manifestCacheCapacity, -1) <= 0) { - throw new DdlException( - "The parameter " + ICEBERG_MANIFEST_CACHE_CAPACITY_MB + " is wrong, value is " - + manifestCacheCapacity); - } - - String manifestCacheTtlSecond = catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_TTL_SECOND, null); - if (Objects.nonNull(manifestCacheTtlSecond) - && NumberUtils.toLong(manifestCacheTtlSecond, CACHE_NO_TTL) < CACHE_TTL_DISABLE_CACHE) { - throw new DdlException( - "The parameter " + ICEBERG_MANIFEST_CACHE_TTL_SECOND + " is wrong, value is " - + manifestCacheTtlSecond); - } + CacheSpec.checkBooleanProperty(catalogProperty.getOrDefault(ICEBERG_TABLE_CACHE_ENABLE, null), + ICEBERG_TABLE_CACHE_ENABLE); + CacheSpec.checkLongProperty(catalogProperty.getOrDefault(ICEBERG_TABLE_CACHE_TTL_SECOND, null), + -1L, ICEBERG_TABLE_CACHE_TTL_SECOND); + CacheSpec.checkLongProperty(catalogProperty.getOrDefault(ICEBERG_TABLE_CACHE_CAPACITY, null), + 0L, ICEBERG_TABLE_CACHE_CAPACITY); + + CacheSpec.checkBooleanProperty(catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE, null), + ICEBERG_MANIFEST_CACHE_ENABLE); + CacheSpec.checkLongProperty(catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_TTL_SECOND, null), + -1L, ICEBERG_MANIFEST_CACHE_TTL_SECOND); + CacheSpec.checkLongProperty(catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_CAPACITY, null), + 0L, ICEBERG_MANIFEST_CACHE_CAPACITY); catalogProperty.checkMetaStoreAndStorageProperties(AbstractIcebergProperties.class); } @Override public void notifyPropertiesUpdated(Map updatedProps) { super.notifyPropertiesUpdated(updatedProps); - String tableMetaCacheTtl = updatedProps.getOrDefault(ICEBERG_TABLE_META_CACHE_TTL_SECOND, null); - if (Objects.nonNull(tableMetaCacheTtl)) { - Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(this).init(); - } + String tableCacheEnable = updatedProps.getOrDefault(ICEBERG_TABLE_CACHE_ENABLE, null); + String tableCacheTtl = updatedProps.getOrDefault(ICEBERG_TABLE_CACHE_TTL_SECOND, null); + String tableCacheCapacity = updatedProps.getOrDefault(ICEBERG_TABLE_CACHE_CAPACITY, null); String manifestCacheEnable = updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE, null); - String manifestCacheCapacity = updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_CAPACITY_MB, null); + String manifestCacheCapacity = updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_CAPACITY, null); String manifestCacheTtl = updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_TTL_SECOND, null); - if (Objects.nonNull(manifestCacheEnable) || Objects.nonNull(manifestCacheCapacity) + if (Objects.nonNull(tableCacheEnable) || Objects.nonNull(tableCacheTtl) || Objects.nonNull(tableCacheCapacity) + || Objects.nonNull(manifestCacheEnable) || Objects.nonNull(manifestCacheCapacity) || Objects.nonNull(manifestCacheTtl)) { Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(this).init(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java index 5f0c0700efe0db..40c8ba29184183 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java @@ -28,6 +28,7 @@ import org.apache.doris.datasource.NameMapping; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache; +import org.apache.doris.datasource.metacache.CacheSpec; import org.apache.doris.mtmv.MTMVRelatedTableIf; import com.github.benmanes.caffeine.cache.LoadingCache; @@ -35,7 +36,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.commons.lang3.math.NumberUtils; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; @@ -65,28 +65,38 @@ public IcebergMetadataCache(ExternalCatalog catalog, ExecutorService executor) { } public void init() { - long tableMetaCacheTtlSecond = NumberUtils.toLong( - catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_TABLE_META_CACHE_TTL_SECOND), - ExternalCatalog.CACHE_NO_TTL); - + CacheSpec tableCacheSpec = resolveTableCacheSpec(); CacheFactory tableCacheFactory = new CacheFactory( - OptionalLong.of(tableMetaCacheTtlSecond >= ExternalCatalog.CACHE_TTL_DISABLE_CACHE - ? tableMetaCacheTtlSecond : Config.external_cache_expire_time_seconds_after_access), - OptionalLong.of(Config.external_cache_refresh_time_minutes * 60), - Config.max_external_table_cache_num, + CacheSpec.toExpireAfterAccess(tableCacheSpec.getTtlSecond()), + OptionalLong.empty(), + tableCacheSpec.getCapacity(), true, null); this.tableCache = tableCacheFactory.buildCache(this::loadTableCacheValue, executor); this.viewCache = tableCacheFactory.buildCache(this::loadView, executor); - long manifestCacheCapacityMb = NumberUtils.toLong( - catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_CAPACITY_MB), - IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY_MB); - manifestCacheCapacityMb = Math.max(manifestCacheCapacityMb, 0L); - long manifestCacheTtlSec = NumberUtils.toLong( - catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_TTL_SECOND), - IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND); - this.manifestCache = new IcebergManifestCache(manifestCacheCapacityMb, manifestCacheTtlSec); + CacheSpec manifestCacheSpec = resolveManifestCacheSpec(); + this.manifestCache = new IcebergManifestCache(manifestCacheSpec.getCapacity(), + manifestCacheSpec.getTtlSecond()); + } + + private CacheSpec resolveTableCacheSpec() { + return CacheSpec.fromProperties(catalog.getProperties(), + IcebergExternalCatalog.ICEBERG_TABLE_CACHE_ENABLE, true, + IcebergExternalCatalog.ICEBERG_TABLE_CACHE_TTL_SECOND, + Config.external_cache_expire_time_seconds_after_access, + IcebergExternalCatalog.ICEBERG_TABLE_CACHE_CAPACITY, + Config.max_external_table_cache_num); + } + + private CacheSpec resolveManifestCacheSpec() { + return CacheSpec.fromProperties(catalog.getProperties(), + IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_ENABLE, + IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE, + IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_TTL_SECOND, + IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND, + IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_CAPACITY, + IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY); } public Table getIcebergTable(ExternalTable dorisTable) { @@ -169,74 +179,61 @@ private IcebergSnapshotCacheValue loadSnapshot(ExternalTable dorisTable, Table i } public void invalidateCatalogCache(long catalogId) { - tableCache.asMap().entrySet().stream() - .filter(entry -> entry.getKey().nameMapping.getCtlId() == catalogId) - .forEach(entry -> { - ManifestFiles.dropCache(entry.getValue().getIcebergTable().io()); - if (LOG.isDebugEnabled()) { - LOG.info("invalidate iceberg table cache {} when invalidating catalog cache", - entry.getKey().nameMapping, new Exception()); - } - tableCache.invalidate(entry.getKey()); - }); - - viewCache.asMap().entrySet().stream() - .filter(entry -> entry.getKey().nameMapping.getCtlId() == catalogId) - .forEach(entry -> viewCache.invalidate(entry.getKey())); + if (LOG.isDebugEnabled()) { + LOG.debug("invalidate all iceberg table cache when invalidating catalog {}", catalogId); + } + // Invalidate all entries related to the catalog + tableCache.invalidateAll(); + viewCache.invalidateAll(); manifestCache.invalidateAll(); } public void invalidateTableCache(ExternalTable dorisTable) { - long catalogId = dorisTable.getCatalog().getId(); + IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(dorisTable.getOrBuildNameMapping()); + IcebergTableCacheValue tableCacheValue = tableCache.getIfPresent(key); + if (tableCacheValue != null) { + invalidateTableCache(key, tableCacheValue); + } else { + invalidateTableCacheByLocalName(dorisTable); + } + } + + private void invalidateTableCache(IcebergMetadataCacheKey key, IcebergTableCacheValue tableCacheValue) { + ManifestFiles.dropCache(tableCacheValue.getIcebergTable().io()); + if (LOG.isDebugEnabled()) { + LOG.debug("invalidate iceberg table cache {}", key.nameMapping, new Exception()); + } + tableCache.invalidate(key); + viewCache.invalidate(key); + } + + private void invalidateTableCacheByLocalName(ExternalTable dorisTable) { String dbName = dorisTable.getDbName(); String tblName = dorisTable.getName(); tableCache.asMap().entrySet().stream() - .filter(entry -> { - IcebergMetadataCacheKey key = entry.getKey(); - return key.nameMapping.getCtlId() == catalogId - && key.nameMapping.getLocalDbName().equals(dbName) - && key.nameMapping.getLocalTblName().equals(tblName); - }) - .forEach(entry -> { - ManifestFiles.dropCache(entry.getValue().getIcebergTable().io()); - if (LOG.isDebugEnabled()) { - LOG.info("invalidate iceberg table cache {}", - entry.getKey().nameMapping, new Exception()); - } - tableCache.invalidate(entry.getKey()); - }); - viewCache.asMap().entrySet().stream() - .filter(entry -> { - IcebergMetadataCacheKey key = entry.getKey(); - return key.nameMapping.getCtlId() == catalogId - && key.nameMapping.getLocalDbName().equals(dbName) - && key.nameMapping.getLocalTblName().equals(tblName); - }) - .forEach(entry -> viewCache.invalidate(entry.getKey())); + .filter(entry -> entry.getKey().nameMapping.getLocalDbName().equals(dbName) + && entry.getKey().nameMapping.getLocalTblName().equals(tblName)) + .forEach(entry -> invalidateTableCache(entry.getKey(), entry.getValue())); + viewCache.asMap().keySet().stream() + .filter(key -> key.nameMapping.getLocalDbName().equals(dbName) + && key.nameMapping.getLocalTblName().equals(tblName)) + .forEach(viewCache::invalidate); } public void invalidateDbCache(long catalogId, String dbName) { tableCache.asMap().entrySet().stream() - .filter(entry -> { - IcebergMetadataCacheKey key = entry.getKey(); - return key.nameMapping.getCtlId() == catalogId - && key.nameMapping.getLocalDbName().equals(dbName); - }) + .filter(entry -> entry.getKey().nameMapping.getLocalDbName().equals(dbName)) .forEach(entry -> { ManifestFiles.dropCache(entry.getValue().getIcebergTable().io()); if (LOG.isDebugEnabled()) { - LOG.info("invalidate iceberg table cache {} when invalidating db cache", + LOG.debug("invalidate iceberg table cache {} when invalidating db cache", entry.getKey().nameMapping, new Exception()); } tableCache.invalidate(entry.getKey()); }); - viewCache.asMap().entrySet().stream() - .filter(entry -> { - IcebergMetadataCacheKey key = entry.getKey(); - return key.nameMapping.getCtlId() == catalogId - && key.nameMapping.getLocalDbName().equals(dbName); - }) - .forEach(entry -> viewCache.invalidate(entry.getKey())); + viewCache.asMap().keySet().stream() + .filter(key -> key.nameMapping.getLocalDbName().equals(dbName)) + .forEach(viewCache::invalidate); } private static void initIcebergTableFileIO(Table table, Map props) { @@ -260,6 +257,10 @@ private IcebergMetadataCacheKey(NameMapping nameMapping) { this.nameMapping = nameMapping; } + private static IcebergMetadataCacheKey of(NameMapping nameMapping) { + return new IcebergMetadataCacheKey(nameMapping); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java index e14a79cf3ea379..75db779d157549 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java @@ -58,6 +58,7 @@ import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache; import org.apache.doris.datasource.iceberg.source.IcebergTableQueryInfo; +import org.apache.doris.datasource.metacache.CacheSpec; import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.datasource.property.metastore.HMSBaseProperties; @@ -1588,11 +1589,14 @@ public static IcebergManifestCache getManifestCache(ExternalCatalog catalog) { } public static boolean isManifestCacheEnabled(ExternalCatalog catalog) { - String enabled = catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_ENABLE); - if (enabled == null) { - return IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE; - } - return Boolean.parseBoolean(enabled); + CacheSpec spec = CacheSpec.fromProperties(catalog.getProperties(), + IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_ENABLE, + IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE, + IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_TTL_SECOND, + IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND, + IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_CAPACITY, + IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY); + return CacheSpec.isCacheEnabled(spec.isEnable(), spec.getTtlSecond(), spec.getCapacity()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ContentFileEstimator.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ContentFileEstimator.java deleted file mode 100644 index 112f161389bcef..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ContentFileEstimator.java +++ /dev/null @@ -1,194 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.datasource.iceberg.cache; - -import org.apache.iceberg.ContentFile; -import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.StructLike; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; - -/** - * Utility to estimate the JVM weight of Iceberg {@link ContentFile} objects. - */ -public final class ContentFileEstimator { - private static final long LIST_BASE_WEIGHT = 48L; - private static final long OBJECT_REFERENCE_WEIGHT = 8L; - private static final long CONTENT_FILE_BASE_WEIGHT = 256L; - private static final long STRING_BASE_WEIGHT = 40L; - private static final long CHAR_BYTES = 2L; - private static final long BYTE_BUFFER_BASE_WEIGHT = 16L; - private static final long MAP_BASE_WEIGHT = 48L; - private static final long MAP_ENTRY_OVERHEAD = 24L; - private static final long LONG_OBJECT_WEIGHT = 24L; - private static final long INT_OBJECT_WEIGHT = 16L; - private static final long PARTITION_BASE_WEIGHT = 48L; - private static final long PARTITION_VALUE_BASE_WEIGHT = 8L; - - private ContentFileEstimator() { - } - - public static long estimate(List> files) { - return listReferenceWeight(files) + estimateContentFilesWeight(files); - } - - private static long listReferenceWeight(List files) { - if (files == null || files.isEmpty()) { - return 0L; - } - return LIST_BASE_WEIGHT + (long) files.size() * OBJECT_REFERENCE_WEIGHT; - } - - private static long estimateContentFilesWeight(List> files) { - long total = 0L; - if (files == null) { - return 0L; - } - for (ContentFile file : files) { - total += estimateContentFileWeight(file); - } - return total; - } - - private static long estimateContentFileWeight(ContentFile file) { - if (file == null) { - return 0L; - } - - long weight = CONTENT_FILE_BASE_WEIGHT; - weight += charSequenceWeight(file.path()); - weight += stringWeight(file.manifestLocation()); - weight += byteBufferWeight(file.keyMetadata()); - weight += partitionWeight(file.partition()); - - weight += numericMapWeight(file.columnSizes()); - weight += numericMapWeight(file.valueCounts()); - weight += numericMapWeight(file.nullValueCounts()); - weight += numericMapWeight(file.nanValueCounts()); - weight += byteBufferMapWeight(file.lowerBounds()); - weight += byteBufferMapWeight(file.upperBounds()); - - weight += listWeight(file.splitOffsets(), LONG_OBJECT_WEIGHT); - weight += listWeight(file.equalityFieldIds(), INT_OBJECT_WEIGHT); - - weight += optionalLongWeight(file.pos()); - weight += optionalLongWeight(file.dataSequenceNumber()); - weight += optionalLongWeight(file.fileSequenceNumber()); - weight += optionalLongWeight(file.firstRowId()); - weight += optionalIntWeight(file.sortOrderId()); - - if (file instanceof DeleteFile) { - DeleteFile deleteFile = (DeleteFile) file; - weight += stringWeight(deleteFile.referencedDataFile()); - weight += optionalLongWeight(deleteFile.contentOffset()); - weight += optionalLongWeight(deleteFile.contentSizeInBytes()); - } - - return weight; - } - - private static long listWeight(List list, long elementWeight) { - if (list == null || list.isEmpty()) { - return 0L; - } - return LIST_BASE_WEIGHT + (long) list.size() * (OBJECT_REFERENCE_WEIGHT + elementWeight); - } - - private static long numericMapWeight(Map map) { - if (map == null || map.isEmpty()) { - return 0L; - } - return MAP_BASE_WEIGHT + (long) map.size() * (MAP_ENTRY_OVERHEAD + LONG_OBJECT_WEIGHT); - } - - private static long byteBufferMapWeight(Map map) { - if (map == null || map.isEmpty()) { - return 0L; - } - long weight = MAP_BASE_WEIGHT + (long) map.size() * MAP_ENTRY_OVERHEAD; - for (ByteBuffer buffer : map.values()) { - weight += byteBufferWeight(buffer); - } - return weight; - } - - private static long partitionWeight(StructLike partition) { - if (partition == null) { - return 0L; - } - long weight = PARTITION_BASE_WEIGHT + (long) partition.size() * PARTITION_VALUE_BASE_WEIGHT; - for (int i = 0; i < partition.size(); i++) { - Object value = partition.get(i, Object.class); - weight += estimateValueWeight(value); - } - return weight; - } - - private static long estimateValueWeight(Object value) { - if (value == null) { - return 0L; - } - if (value instanceof CharSequence) { - return charSequenceWeight((CharSequence) value); - } else if (value instanceof byte[]) { - return BYTE_BUFFER_BASE_WEIGHT + ((byte[]) value).length; - } else if (value instanceof ByteBuffer) { - return byteBufferWeight((ByteBuffer) value); - } else if (value instanceof Long || value instanceof Double) { - return LONG_OBJECT_WEIGHT; - } else if (value instanceof Integer || value instanceof Float) { - return INT_OBJECT_WEIGHT; - } else if (value instanceof Short || value instanceof Character) { - return 4L; - } else if (value instanceof Boolean) { - return 1L; - } - return OBJECT_REFERENCE_WEIGHT; - } - - private static long charSequenceWeight(CharSequence value) { - if (value == null) { - return 0L; - } - return STRING_BASE_WEIGHT + (long) value.length() * CHAR_BYTES; - } - - private static long stringWeight(String value) { - if (value == null) { - return 0L; - } - return STRING_BASE_WEIGHT + (long) value.length() * CHAR_BYTES; - } - - private static long byteBufferWeight(ByteBuffer buffer) { - if (buffer == null) { - return 0L; - } - return BYTE_BUFFER_BASE_WEIGHT + buffer.remaining(); - } - - private static long optionalLongWeight(Long value) { - return value == null ? 0L : LONG_OBJECT_WEIGHT; - } - - private static long optionalIntWeight(Integer value) { - return value == null ? 0L : INT_OBJECT_WEIGHT; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java index 6c5d79ecb69ace..6016b2ab7e997e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java @@ -17,16 +17,15 @@ package org.apache.doris.datasource.iceberg.cache; +import org.apache.doris.common.CacheFactory; import org.apache.doris.datasource.CacheException; +import org.apache.doris.datasource.metacache.CacheSpec; import com.github.benmanes.caffeine.cache.CacheLoader; -import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; -import com.github.benmanes.caffeine.cache.Weigher; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.time.Duration; import java.util.Optional; import java.util.concurrent.Callable; @@ -38,20 +37,14 @@ public class IcebergManifestCache { private final LoadingCache cache; - public IcebergManifestCache(long capacityMb, long ttlSec) { - long capacityInBytes = capacityMb * 1024L * 1024L; - Weigher weigher = (key, value) -> { - long weight = Optional.ofNullable(value).map(ManifestCacheValue::getWeightBytes).orElse(0L); - if (weight > Integer.MAX_VALUE) { - return Integer.MAX_VALUE; - } - return (int) weight; - }; - Caffeine builder = Caffeine.newBuilder() - .maximumWeight(capacityInBytes) - .weigher(weigher) - .expireAfterAccess(Duration.ofSeconds(ttlSec)); - cache = builder.build(new CacheLoader() { + public IcebergManifestCache(long capacity, long ttlSec) { + CacheFactory cacheFactory = new CacheFactory( + CacheSpec.toExpireAfterAccess(ttlSec), + java.util.OptionalLong.empty(), + capacity, + true, + null); + cache = cacheFactory.buildCache(new CacheLoader() { @Override public ManifestCacheValue load(ManifestCacheKey key) { throw new CacheException("Manifest cache loader should be provided explicitly for key %s", null, key); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java index 91e2f6db72f934..e98ca6b2fb2808 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java @@ -24,27 +24,23 @@ import java.util.List; /** - * Cached manifest payload containing parsed files and an estimated weight. + * Cached manifest payload containing parsed files. */ public class ManifestCacheValue { private final List dataFiles; private final List deleteFiles; - private final long weightBytes; - private ManifestCacheValue(List dataFiles, List deleteFiles, long weightBytes) { + private ManifestCacheValue(List dataFiles, List deleteFiles) { this.dataFiles = dataFiles == null ? Collections.emptyList() : dataFiles; this.deleteFiles = deleteFiles == null ? Collections.emptyList() : deleteFiles; - this.weightBytes = weightBytes; } public static ManifestCacheValue forDataFiles(List dataFiles) { - return new ManifestCacheValue(dataFiles, Collections.emptyList(), - estimateWeight(dataFiles, Collections.emptyList())); + return new ManifestCacheValue(dataFiles, Collections.emptyList()); } public static ManifestCacheValue forDeleteFiles(List deleteFiles) { - return new ManifestCacheValue(Collections.emptyList(), deleteFiles, - estimateWeight(Collections.emptyList(), deleteFiles)); + return new ManifestCacheValue(Collections.emptyList(), deleteFiles); } public List getDataFiles() { @@ -54,12 +50,4 @@ public List getDataFiles() { public List getDeleteFiles() { return deleteFiles; } - - public long getWeightBytes() { - return weightBytes; - } - - private static long estimateWeight(List dataFiles, List deleteFiles) { - return ContentFileEstimator.estimate(dataFiles) + ContentFileEstimator.estimate(deleteFiles); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/CacheSpec.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/CacheSpec.java new file mode 100644 index 00000000000000..ca0f1be330d753 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/CacheSpec.java @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.metacache; + +import org.apache.doris.common.DdlException; + +import org.apache.commons.lang3.math.NumberUtils; + +import java.util.Map; +import java.util.OptionalLong; + +/** + * Common cache specification for external metadata caches. + * + *

Semantics: + *

    + *
  • enable=false disables cache
  • + *
  • ttlSecond=0 disables cache, ttlSecond=-1 means no expiration
  • + *
  • capacity=0 disables cache; capacity is count-based
  • + *
+ */ +public final class CacheSpec { + public static final long CACHE_NO_TTL = -1L; + public static final long CACHE_TTL_DISABLE_CACHE = 0L; + + private final boolean enable; + private final long ttlSecond; + private final long capacity; + + private CacheSpec(boolean enable, long ttlSecond, long capacity) { + this.enable = enable; + this.ttlSecond = ttlSecond; + this.capacity = capacity; + } + + public static CacheSpec fromProperties(Map properties, + String enableKey, boolean defaultEnable, + String ttlKey, long defaultTtlSecond, + String capacityKey, long defaultCapacity) { + boolean enable = getBooleanProperty(properties, enableKey, defaultEnable); + long ttlSecond = getLongProperty(properties, ttlKey, defaultTtlSecond); + long capacity = getLongProperty(properties, capacityKey, defaultCapacity); + if (!isCacheEnabled(enable, ttlSecond, capacity)) { + capacity = 0; + } + return new CacheSpec(enable, ttlSecond, capacity); + } + + /** + * Build a cache spec from a ttl property value and fixed capacity. + * + *

Semantics are compatible with legacy schema cache behavior: + *

    + *
  • ttlValue is null: use default ttl
  • + *
  • ttl=-1: no expiration
  • + *
  • ttl=0: disable cache by forcing capacity=0
  • + *
  • ttl parse failure: fallback to -1 (no expiration)
  • + *
+ * TODO: Refactor schema cache and its parameters to the unified enable/ttl/capacity model, + * then remove this ttl-only adapter. + */ + public static CacheSpec fromTtlValue(String ttlValue, long defaultTtlSecond, long defaultCapacity) { + long ttlSecond = ttlValue == null ? defaultTtlSecond : NumberUtils.toLong(ttlValue, CACHE_NO_TTL); + long capacity = defaultCapacity; + if (!isCacheEnabled(true, ttlSecond, capacity)) { + capacity = 0; + } + return new CacheSpec(true, ttlSecond, capacity); + } + + public static void checkBooleanProperty(String value, String key) throws DdlException { + if (value == null) { + return; + } + if (!value.equalsIgnoreCase("true") && !value.equalsIgnoreCase("false")) { + throw new DdlException("The parameter " + key + " is wrong, value is " + value); + } + } + + public static void checkLongProperty(String value, long minValue, String key) throws DdlException { + if (value == null) { + return; + } + long parsed; + try { + parsed = Long.parseLong(value); + } catch (NumberFormatException e) { + throw new DdlException("The parameter " + key + " is wrong, value is " + value); + } + if (parsed < minValue) { + throw new DdlException("The parameter " + key + " is wrong, value is " + value); + } + } + + public static boolean isCacheEnabled(boolean enable, long ttlSecond, long capacity) { + return enable && ttlSecond != 0 && capacity != 0; + } + + /** + * Convert ttlSecond to OptionalLong for CacheFactory. + * ttlSecond=-1 means no expiration; ttlSecond=0 disables cache. + */ + public static OptionalLong toExpireAfterAccess(long ttlSecond) { + if (ttlSecond == CACHE_NO_TTL) { + return OptionalLong.empty(); + } + return OptionalLong.of(Math.max(ttlSecond, CACHE_TTL_DISABLE_CACHE)); + } + + private static boolean getBooleanProperty(Map properties, String key, boolean defaultValue) { + String value = properties.get(key); + if (value == null) { + return defaultValue; + } + return Boolean.parseBoolean(value); + } + + private static long getLongProperty(Map properties, String key, long defaultValue) { + String value = properties.get(key); + if (value == null) { + return defaultValue; + } + return NumberUtils.toLong(value, defaultValue); + } + + public boolean isEnable() { + return enable; + } + + public long getTtlSecond() { + return ttlSecond; + } + + public long getCapacity() { + return capacity; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java index 2dd3c0c8c6b6c3..b6a06fd4670c96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java @@ -23,6 +23,7 @@ import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.NameMapping; import org.apache.doris.datasource.SessionContext; +import org.apache.doris.datasource.metacache.CacheSpec; import org.apache.doris.datasource.property.metastore.AbstractPaimonProperties; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -36,6 +37,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; // The subclasses of this class are all deprecated, only for meta persistence compatibility. public class PaimonExternalCatalog extends ExternalCatalog { @@ -45,6 +47,9 @@ public class PaimonExternalCatalog extends ExternalCatalog { public static final String PAIMON_HMS = "hms"; public static final String PAIMON_DLF = "dlf"; public static final String PAIMON_REST = "rest"; + public static final String PAIMON_TABLE_CACHE_ENABLE = "meta.cache.paimon.table.enable"; + public static final String PAIMON_TABLE_CACHE_TTL_SECOND = "meta.cache.paimon.table.ttl-second"; + public static final String PAIMON_TABLE_CACHE_CAPACITY = "meta.cache.paimon.table.capacity"; protected String catalogType; protected Catalog catalog; @@ -188,9 +193,27 @@ public Map getPaimonOptionsMap() { @Override public void checkProperties() throws DdlException { super.checkProperties(); + CacheSpec.checkBooleanProperty(catalogProperty.getOrDefault(PAIMON_TABLE_CACHE_ENABLE, null), + PAIMON_TABLE_CACHE_ENABLE); + CacheSpec.checkLongProperty(catalogProperty.getOrDefault(PAIMON_TABLE_CACHE_TTL_SECOND, null), + -1L, PAIMON_TABLE_CACHE_TTL_SECOND); + CacheSpec.checkLongProperty(catalogProperty.getOrDefault(PAIMON_TABLE_CACHE_CAPACITY, null), + 0L, PAIMON_TABLE_CACHE_CAPACITY); catalogProperty.checkMetaStoreAndStorageProperties(AbstractPaimonProperties.class); } + @Override + public void notifyPropertiesUpdated(Map updatedProps) { + super.notifyPropertiesUpdated(updatedProps); + String tableCacheEnable = updatedProps.getOrDefault(PAIMON_TABLE_CACHE_ENABLE, null); + String tableCacheTtl = updatedProps.getOrDefault(PAIMON_TABLE_CACHE_TTL_SECOND, null); + String tableCacheCapacity = updatedProps.getOrDefault(PAIMON_TABLE_CACHE_CAPACITY, null); + if (Objects.nonNull(tableCacheEnable) || Objects.nonNull(tableCacheTtl) + || Objects.nonNull(tableCacheCapacity)) { + PaimonUtils.getPaimonMetadataCache(this).init(); + } + } + @Override public void onClose() { super.onClose(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 174bfa64a2d21f..1782bae59b078d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -20,7 +20,6 @@ import org.apache.doris.analysis.TableScanParams; import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionType; @@ -76,12 +75,9 @@ public class PaimonExternalTable extends ExternalTable implements MTMVRelatedTab private static final Logger LOG = LogManager.getLogger(PaimonExternalTable.class); - private final Table paimonTable; - public PaimonExternalTable(long id, String name, String remoteName, PaimonExternalCatalog catalog, PaimonExternalDatabase db) { super(id, name, remoteName, catalog, db, TableType.PAIMON_EXTERNAL_TABLE); - this.paimonTable = catalog.getPaimonTable(getOrBuildNameMapping()); } public String getPaimonCatalogType() { @@ -96,7 +92,13 @@ protected synchronized void makeSureInitialized() { } public Table getPaimonTable(Optional snapshot) { - return getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getTable(); + if (snapshot.isPresent()) { + // MTMV scenario: get from snapshot cache + return getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getTable(); + } else { + // Normal query scenario: get directly from table cache + return PaimonUtils.getPaimonTable(this); + } } private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue(Optional tableSnapshot, @@ -109,7 +111,8 @@ private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue(Optional scanOptions = new HashMap<>(); @@ -135,18 +138,19 @@ private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue(Optional latestSnapshot = table.latestSnapshot(); long latestSnapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID; @@ -160,15 +164,14 @@ private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue(Optional splits = paimonTable.newReadBuilder().newScan().plan().splits(); + List splits = getBasePaimonTable().newReadBuilder().newScan().plan().splits(); for (Split split : splits) { rowCount += split.rowCount(); } @@ -320,7 +323,7 @@ public Optional initSchema(SchemaCacheKey key) { makeSureInitialized(); PaimonSchemaCacheKey paimonSchemaCacheKey = (PaimonSchemaCacheKey) key; try { - Table table = ((PaimonExternalCatalog) getCatalog()).getPaimonTable(getOrBuildNameMapping()); + Table table = getBasePaimonTable(); TableSchema tableSchema = ((DataTable) table).schemaManager().schema(paimonSchemaCacheKey.getSchemaId()); List columns = tableSchema.fields(); List dorisColumns = Lists.newArrayListWithCapacity(columns.size()); @@ -353,15 +356,15 @@ null, getCatalog().getName(), key.getNameMapping().getLocalDbName(), private PaimonSchemaCacheValue getPaimonSchemaCacheValue(Optional snapshot) { PaimonSnapshotCacheValue snapshotCacheValue = getOrFetchSnapshotCacheValue(snapshot); - return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache() - .getPaimonSchemaCacheValue(getOrBuildNameMapping(), snapshotCacheValue.getSnapshot().getSchemaId()); + return PaimonUtils.getSchemaCacheValue(this, snapshotCacheValue); } private PaimonSnapshotCacheValue getOrFetchSnapshotCacheValue(Optional snapshot) { if (snapshot.isPresent()) { return ((PaimonMvccSnapshot) snapshot.get()).getSnapshotCacheValue(); } else { - return getPaimonSnapshotCacheValue(Optional.empty(), Optional.empty()); + // Use new lazy-loading snapshot cache API + return PaimonUtils.getSnapshotCacheValue(snapshot, this); } } @@ -373,13 +376,14 @@ public List getSupportedSysTables() { @Override public String getComment() { - return paimonTable.comment().isPresent() ? paimonTable.comment().get() : ""; + Table table = getBasePaimonTable(); + return table.comment().isPresent() ? table.comment().get() : ""; } public Map getTableProperties() { - - if (paimonTable instanceof DataTable) { - DataTable dataTable = (DataTable) paimonTable; + Table table = getBasePaimonTable(); + if (table instanceof DataTable) { + DataTable dataTable = (DataTable) table; Map properties = new LinkedHashMap<>(dataTable.coreOptions().toMap()); if (!dataTable.primaryKeys().isEmpty()) { @@ -395,6 +399,10 @@ public Map getTableProperties() { @Override public boolean isPartitionedTable() { makeSureInitialized(); - return !paimonTable.partitionKeys().isEmpty(); + return !getBasePaimonTable().partitionKeys().isEmpty(); + } + + private Table getBasePaimonTable() { + return PaimonUtils.getPaimonTable(this); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java index fc58a7f15ebedd..7f118490fdddee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java @@ -29,6 +29,7 @@ import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.NameMapping; import org.apache.doris.datasource.SchemaCacheValue; +import org.apache.doris.datasource.metacache.CacheSpec; import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.collect.Maps; @@ -51,31 +52,65 @@ public class PaimonMetadataCache { - private final LoadingCache snapshotCache; + private final ExecutorService executor; + private final ExternalCatalog catalog; + private LoadingCache tableCache; - public PaimonMetadataCache(ExecutorService executor) { - CacheFactory snapshotCacheFactory = new CacheFactory( - OptionalLong.of(Config.external_cache_expire_time_seconds_after_access), - OptionalLong.of(Config.external_cache_refresh_time_minutes * 60), - Config.max_external_table_cache_num, + public PaimonMetadataCache(ExternalCatalog catalog, ExecutorService executor) { + this.executor = executor; + this.catalog = catalog; + init(); + } + + public void init() { + CacheSpec cacheSpec = resolveTableCacheSpec(); + CacheFactory tableCacheFactory = new CacheFactory( + CacheSpec.toExpireAfterAccess(cacheSpec.getTtlSecond()), + OptionalLong.empty(), + cacheSpec.getCapacity(), true, null); - this.snapshotCache = snapshotCacheFactory.buildCache(key -> loadSnapshot(key), executor); + this.tableCache = tableCacheFactory.buildCache(key -> loadTableCacheValue(key), executor); + } + + private CacheSpec resolveTableCacheSpec() { + return CacheSpec.fromProperties(catalog.getProperties(), + PaimonExternalCatalog.PAIMON_TABLE_CACHE_ENABLE, true, + PaimonExternalCatalog.PAIMON_TABLE_CACHE_TTL_SECOND, + Config.external_cache_expire_time_seconds_after_access, + PaimonExternalCatalog.PAIMON_TABLE_CACHE_CAPACITY, + Config.max_external_table_cache_num); } @NotNull - private PaimonSnapshotCacheValue loadSnapshot(PaimonSnapshotCacheKey key) { + private PaimonTableCacheValue loadTableCacheValue(PaimonTableCacheKey key) { NameMapping nameMapping = key.getNameMapping(); try { - PaimonSnapshot latestSnapshot = loadLatestSnapshot(key); + PaimonExternalCatalog externalCatalog = (PaimonExternalCatalog) Env.getCurrentEnv().getCatalogMgr() + .getCatalogOrException(nameMapping.getCtlId(), + id -> new IOException("Catalog not found: " + id)); + Table table = externalCatalog.getPaimonTable(nameMapping); + return new PaimonTableCacheValue(table); + } catch (Exception e) { + throw new CacheException("failed to load paimon table %s.%s.%s: %s", + e, nameMapping.getCtlId(), nameMapping.getLocalDbName(), + nameMapping.getLocalTblName(), e.getMessage()); + } + } + + @NotNull + private PaimonSnapshotCacheValue loadSnapshot(ExternalTable dorisTable, Table paimonTable) { + NameMapping nameMapping = dorisTable.getOrBuildNameMapping(); + try { + PaimonSnapshot latestSnapshot = loadLatestSnapshot(paimonTable, nameMapping); List partitionColumns = getPaimonSchemaCacheValue(nameMapping, latestSnapshot.getSchemaId()).getPartitionColumns(); - PaimonPartitionInfo partitionInfo = loadPartitionInfo(key, partitionColumns); + PaimonPartitionInfo partitionInfo = loadPartitionInfo(nameMapping, partitionColumns); return new PaimonSnapshotCacheValue(partitionInfo, latestSnapshot); } catch (Exception e) { - throw new CacheException("failed to load paimon snapshot %s.%s.%s or reason: %s", - e, nameMapping.getCtlId(), nameMapping.getLocalDbName(), nameMapping.getLocalTblName(), - e.getMessage()); + throw new CacheException("failed to load paimon snapshot %s.%s.%s: %s", + e, nameMapping.getCtlId(), nameMapping.getLocalDbName(), + nameMapping.getLocalTblName(), e.getMessage()); } } @@ -97,67 +132,96 @@ public PaimonSchemaCacheValue getPaimonSchemaCacheValue(NameMapping nameMapping, return (PaimonSchemaCacheValue) schemaCacheValue.get(); } - private PaimonPartitionInfo loadPartitionInfo(PaimonSnapshotCacheKey key, List partitionColumns) + private PaimonPartitionInfo loadPartitionInfo(NameMapping nameMapping, List partitionColumns) throws AnalysisException { if (CollectionUtils.isEmpty(partitionColumns)) { return PaimonPartitionInfo.EMPTY; } - NameMapping nameMapping = key.getNameMapping(); PaimonExternalCatalog externalCatalog = (PaimonExternalCatalog) Env.getCurrentEnv().getCatalogMgr() .getCatalogOrAnalysisException(nameMapping.getCtlId()); List paimonPartitions = externalCatalog.getPaimonPartitions(nameMapping); return PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions); } - private PaimonSnapshot loadLatestSnapshot(PaimonSnapshotCacheKey key) throws IOException { - NameMapping nameMapping = key.getNameMapping(); - PaimonExternalCatalog externalCatalog = (PaimonExternalCatalog) Env.getCurrentEnv().getCatalogMgr() - .getCatalogOrException(nameMapping.getCtlId(), id -> new IOException("Catalog not found: " + id)); - Table table = externalCatalog.getPaimonTable(nameMapping); - Table snapshotTable = table; + private PaimonSnapshot loadLatestSnapshot(Table paimonTable, NameMapping nameMapping) { + Table snapshotTable = paimonTable; // snapshotId and schemaId Long latestSnapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID; - Optional optionalSnapshot = table.latestSnapshot(); + Optional optionalSnapshot = paimonTable.latestSnapshot(); if (optionalSnapshot.isPresent()) { latestSnapshotId = optionalSnapshot.get().id(); - snapshotTable = - table.copy(Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), latestSnapshotId.toString())); + snapshotTable = paimonTable.copy( + Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), latestSnapshotId.toString())); } - DataTable dataTable = (DataTable) table; + DataTable dataTable = (DataTable) paimonTable; long latestSchemaId = dataTable.schemaManager().latest().map(TableSchema::id).orElse(0L); return new PaimonSnapshot(latestSnapshotId, latestSchemaId, snapshotTable); } + public Table getPaimonTable(ExternalTable dorisTable) { + PaimonTableCacheKey key = new PaimonTableCacheKey(dorisTable.getOrBuildNameMapping()); + return tableCache.get(key).getPaimonTable(); + } + + public Table getPaimonTable(PaimonTableCacheKey key) { + return tableCache.get(key).getPaimonTable(); + } + + public PaimonSnapshotCacheValue getSnapshotCache(ExternalTable dorisTable) { + PaimonTableCacheKey key = new PaimonTableCacheKey(dorisTable.getOrBuildNameMapping()); + PaimonTableCacheValue tableCacheValue = tableCache.get(key); + return tableCacheValue.getSnapshotCacheValue(() -> loadSnapshot(dorisTable, + tableCacheValue.getPaimonTable())); + } + public void invalidateCatalogCache(long catalogId) { - snapshotCache.asMap().keySet().stream() - .filter(key -> key.getNameMapping().getCtlId() == catalogId) - .forEach(snapshotCache::invalidate); + tableCache.invalidateAll(); } public void invalidateTableCache(ExternalTable dorisTable) { - snapshotCache.asMap().keySet().stream() - .filter(key -> key.getNameMapping().getCtlId() == dorisTable.getCatalog().getId() - && key.getNameMapping().getLocalDbName().equals(dorisTable.getDbName()) - && key.getNameMapping().getLocalTblName().equals(dorisTable.getName())) - .forEach(snapshotCache::invalidate); + PaimonTableCacheKey key = new PaimonTableCacheKey(dorisTable.getOrBuildNameMapping()); + tableCache.invalidate(key); } public void invalidateDbCache(long catalogId, String dbName) { - snapshotCache.asMap().keySet().stream() - .filter(key -> key.getNameMapping().getCtlId() == catalogId - && key.getNameMapping().getLocalTblName().equals(dbName)) - .forEach(snapshotCache::invalidate); - } - - public PaimonSnapshotCacheValue getPaimonSnapshot(ExternalTable dorisTable) { - PaimonSnapshotCacheKey key = new PaimonSnapshotCacheKey(dorisTable.getOrBuildNameMapping()); - return snapshotCache.get(key); + tableCache.asMap().keySet().stream() + .filter(key -> key.getNameMapping().getLocalDbName().equals(dbName)) + .forEach(tableCache::invalidate); } public Map> getCacheStats() { Map> res = Maps.newHashMap(); - res.put("paimon_snapshot_cache", ExternalMetaCacheMgr.getCacheStats(snapshotCache.stats(), - snapshotCache.estimatedSize())); + res.put("paimon_table_cache", ExternalMetaCacheMgr.getCacheStats(tableCache.stats(), + tableCache.estimatedSize())); return res; } + + static class PaimonTableCacheKey { + private final NameMapping nameMapping; + + public PaimonTableCacheKey(NameMapping nameMapping) { + this.nameMapping = nameMapping; + } + + public NameMapping getNameMapping() { + return nameMapping; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PaimonTableCacheKey that = (PaimonTableCacheKey) o; + return nameMapping.equals(that.nameMapping); + } + + @Override + public int hashCode() { + return nameMapping.hashCode(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java deleted file mode 100644 index 4493f91893944e..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java +++ /dev/null @@ -1,51 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.datasource.paimon; - -import org.apache.doris.datasource.ExternalTable; - -import java.util.concurrent.ExecutorService; - -public class PaimonMetadataCacheMgr { - - private PaimonMetadataCache paimonMetadataCache; - - public PaimonMetadataCacheMgr(ExecutorService executor) { - this.paimonMetadataCache = new PaimonMetadataCache(executor); - } - - public PaimonMetadataCache getPaimonMetadataCache() { - return paimonMetadataCache; - } - - public void removeCache(long catalogId) { - paimonMetadataCache.invalidateCatalogCache(catalogId); - } - - public void invalidateCatalogCache(long catalogId) { - paimonMetadataCache.invalidateCatalogCache(catalogId); - } - - public void invalidateTableCache(ExternalTable dorisTable) { - paimonMetadataCache.invalidateTableCache(dorisTable); - } - - public void invalidateDbCache(long catalogId, String dbName) { - paimonMetadataCache.invalidateDbCache(catalogId, dbName); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java deleted file mode 100644 index 6154d607f0b2b3..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java +++ /dev/null @@ -1,60 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.datasource.paimon; - -import org.apache.doris.datasource.NameMapping; - -import java.util.StringJoiner; - -public class PaimonSnapshotCacheKey { - private final NameMapping nameMapping; - - public PaimonSnapshotCacheKey(NameMapping nameMapping) { - this.nameMapping = nameMapping; - } - - public NameMapping getNameMapping() { - return nameMapping; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - PaimonSnapshotCacheKey that = (PaimonSnapshotCacheKey) o; - return nameMapping.equals(that.nameMapping); - } - - @Override - public int hashCode() { - return nameMapping.hashCode(); - } - - @Override - public String toString() { - return new StringJoiner(", ", PaimonSnapshotCacheKey.class.getSimpleName() + "[", "]") - .add("catalog=" + nameMapping.getCtlId()) - .add("dbName='" + nameMapping.getLocalDbName() + "'") - .add("tableName='" + nameMapping.getLocalTblName() + "'") - .toString(); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonTableCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonTableCacheValue.java new file mode 100644 index 00000000000000..cbbd9076b65a85 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonTableCacheValue.java @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.paimon.table.Table; + +import java.util.function.Supplier; + +/** + * Cache value for Paimon table metadata. + * Encapsulates the Paimon Table object and provides lazy loading for snapshot cache. + */ +public class PaimonTableCacheValue { + private final Table paimonTable; + + // Lazy-loaded snapshot cache + private volatile boolean snapshotCacheLoaded; + private volatile PaimonSnapshotCacheValue snapshotCacheValue; + + public PaimonTableCacheValue(Table paimonTable) { + this.paimonTable = paimonTable; + } + + public Table getPaimonTable() { + return paimonTable; + } + + /** + * Get snapshot cache value with lazy loading. + * Uses double-checked locking to ensure thread-safe initialization. + * + * @param loader Supplier to load snapshot cache value when needed + * @return The cached or newly loaded snapshot cache value + */ + public PaimonSnapshotCacheValue getSnapshotCacheValue(Supplier loader) { + if (!snapshotCacheLoaded) { + synchronized (this) { + if (!snapshotCacheLoaded) { + snapshotCacheValue = loader.get(); + snapshotCacheLoaded = true; + } + } + } + return snapshotCacheValue; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtils.java new file mode 100644 index 00000000000000..30ec4a1185ea82 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtils.java @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.catalog.Env; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.mvcc.MvccSnapshot; + +import org.apache.paimon.table.Table; + +import java.util.Optional; + +public class PaimonUtils { + + public static Table getPaimonTable(ExternalTable dorisTable) { + return paimonMetadataCache(dorisTable.getCatalog()).getPaimonTable(dorisTable); + } + + public static PaimonSnapshotCacheValue getLatestSnapshotCacheValue(ExternalTable dorisTable) { + return paimonMetadataCache(dorisTable.getCatalog()).getSnapshotCache(dorisTable); + } + + public static PaimonSnapshotCacheValue getSnapshotCacheValue(Optional snapshot, + ExternalTable dorisTable) { + if (snapshot.isPresent() && snapshot.get() instanceof PaimonMvccSnapshot) { + return ((PaimonMvccSnapshot) snapshot.get()).getSnapshotCacheValue(); + } + return getLatestSnapshotCacheValue(dorisTable); + } + + public static PaimonSchemaCacheValue getSchemaCacheValue(ExternalTable dorisTable, + PaimonSnapshotCacheValue snapshotValue) { + return getSchemaCacheValue(dorisTable, snapshotValue.getSnapshot().getSchemaId()); + } + + public static PaimonSchemaCacheValue getSchemaCacheValue(ExternalTable dorisTable, long schemaId) { + return paimonMetadataCache(dorisTable.getCatalog()) + .getPaimonSchemaCacheValue(dorisTable.getOrBuildNameMapping(), schemaId); + } + + public static PaimonMetadataCache getPaimonMetadataCache(ExternalCatalog catalog) { + return paimonMetadataCache(catalog); + } + + private static PaimonMetadataCache paimonMetadataCache(ExternalCatalog catalog) { + return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache(catalog); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index d8db43c21fda76..063f962d42e422 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -19,7 +19,6 @@ import org.apache.doris.analysis.TableScanParams; import org.apache.doris.analysis.TupleDescriptor; -import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; @@ -33,6 +32,7 @@ import org.apache.doris.datasource.paimon.PaimonExternalCatalog; import org.apache.doris.datasource.paimon.PaimonExternalTable; import org.apache.doris.datasource.paimon.PaimonUtil; +import org.apache.doris.datasource.paimon.PaimonUtils; import org.apache.doris.datasource.paimon.profile.PaimonMetricRegistry; import org.apache.doris.datasource.paimon.profile.PaimonScanMetricsReporter; import org.apache.doris.datasource.property.storage.StorageProperties; @@ -203,8 +203,7 @@ public void createScanRangeLocations() throws UserException { private void putHistorySchemaInfo(Long schemaId) { if (currentQuerySchema.putIfAbsent(schemaId, Boolean.TRUE) == null) { PaimonExternalTable table = (PaimonExternalTable) source.getTargetTable(); - TableSchema tableSchema = Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache() - .getPaimonSchemaCacheValue(table.getOrBuildNameMapping(), schemaId).getTableSchema(); + TableSchema tableSchema = PaimonUtils.getSchemaCacheValue(table, schemaId).getTableSchema(); params.addToHistorySchemaInfo( PaimonUtil.getSchemaInfo(tableSchema, source.getCatalog().getEnableMappingVarbinary(), source.getCatalog().getEnableMappingTimestampTz())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java index cb2a3e1581688e..d9777d10b39bdf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java @@ -18,6 +18,8 @@ package org.apache.doris.datasource.property.metastore; import org.apache.doris.common.security.authentication.ExecutionAuthenticator; +import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; +import org.apache.doris.datasource.metacache.CacheSpec; import org.apache.doris.datasource.property.ConnectorProperty; import org.apache.doris.datasource.property.storage.StorageProperties; @@ -136,6 +138,8 @@ public final Catalog initializeCatalog(String catalogName, * @param catalogProps the catalog properties map to add manifest cache properties to */ protected void addManifestCacheProperties(Map catalogProps) { + boolean hasIoManifestCacheEnabled = StringUtils.isNotBlank(ioManifestCacheEnabled) + || StringUtils.isNotBlank(catalogProps.get(CatalogProperties.IO_MANIFEST_CACHE_ENABLED)); if (StringUtils.isNotBlank(ioManifestCacheEnabled)) { catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_ENABLED, ioManifestCacheEnabled); } @@ -149,6 +153,22 @@ protected void addManifestCacheProperties(Map catalogProps) { if (StringUtils.isNotBlank(ioManifestCacheMaxContentLength)) { catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH, ioManifestCacheMaxContentLength); } + + // default enable io manifest cache if the meta.cache.manifest is enabled + if (!hasIoManifestCacheEnabled) { + CacheSpec manifestCacheSpec = CacheSpec.fromProperties(catalogProps, + IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_ENABLE, + IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE, + IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_TTL_SECOND, + IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND, + IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_CAPACITY, + IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY); + if (CacheSpec.isCacheEnabled(manifestCacheSpec.isEnable(), + manifestCacheSpec.getTtlSecond(), + manifestCacheSpec.getCapacity())) { + catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_ENABLED, "true"); + } + } } /** diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index a8eaf8bc2f4072..2af0aef97103f4 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -1519,16 +1519,47 @@ class Suite implements GroovyInterceptable { // Split by semicolon and execute each statement def statements = sqlStatements.split(';').collect { it.trim() }.findAll { it } def results = [] - + for (stmt in statements) { if (stmt) { results << spark_iceberg(stmt, timeoutSeconds) } } - + return results } + /** + * Execute Spark SQL on the spark-iceberg container with Paimon extensions enabled. + * + * Usage in test suite: + * spark_paimon "CREATE TABLE paimon.test_db.t1 (id INT) USING paimon" + * spark_paimon "INSERT INTO paimon.test_db.t1 VALUES (1)" + * def result = spark_paimon "SELECT * FROM paimon.test_db.t1" + */ + String spark_paimon(String sqlStr, int timeoutSeconds = 120) { + String containerName = getSparkIcebergContainerName() + if (containerName == null) { + throw new RuntimeException("spark-iceberg container not found. Please ensure the container is running.") + } + String masterUrl = "spark://${containerName}:7077" + + String escapedSql = sqlStr.replaceAll('"', '\\\\"') + String command = """docker exec ${containerName} spark-sql --master ${masterUrl} --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions -e "${escapedSql}" """ + + logger.info("Executing Spark Paimon SQL: ${sqlStr}".toString()) + logger.info("Container: ${containerName}".toString()) + + try { + String result = cmd(command, timeoutSeconds) + logger.info("Spark Paimon SQL result: ${result}".toString()) + return result + } catch (Exception e) { + logger.error("Spark Paimon SQL failed: ${e.message}".toString()) + throw e + } + } + List> db2_docker(String sqlStr, boolean isOrder = false) { String cleanedSqlStr = sqlStr.replaceAll("\\s*;\\s*\$", "") def (result, meta) = JdbcUtils.executeToList(context.getDB2DockerConnection(), cleanedSqlStr) diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_manifest_cache.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_manifest_cache.groovy index d95f05ae76c5b9..fc7a21785f75ff 100644 --- a/regression-test/suites/external_table_p0/iceberg/test_iceberg_manifest_cache.groovy +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_manifest_cache.groovy @@ -39,7 +39,7 @@ suite("test_iceberg_manifest_cache", "p0,external,doris,external_docker,external "s3.secret_key" = "password", "s3.endpoint" = "http://${externalEnvIp}:${minioPort}", "s3.region" = "us-east-1", - "iceberg.manifest.cache.enable" = "true" + "meta.cache.iceberg.manifest.enable" = "true" ); """ @@ -54,7 +54,7 @@ suite("test_iceberg_manifest_cache", "p0,external,doris,external_docker,external "s3.secret_key" = "password", "s3.endpoint" = "http://${externalEnvIp}:${minioPort}", "s3.region" = "us-east-1", - "iceberg.manifest.cache.enable" = "false" + "meta.cache.iceberg.manifest.enable" = "false" ); """ @@ -116,4 +116,3 @@ suite("test_iceberg_manifest_cache", "p0,external,doris,external_docker,external } } } - diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_cache.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_cache.groovy index 7cc9f6af0b7b95..cf9ad8145d9725 100644 --- a/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_cache.groovy +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_cache.groovy @@ -58,7 +58,7 @@ suite("test_iceberg_table_cache", "p0,external,doris,external_docker,external_do "s3.secret_key" = "password", "s3.endpoint" = "http://${externalEnvIp}:${minioPort}", "s3.region" = "us-east-1", - "iceberg.table.meta.cache.ttl-second" = "0" + "meta.cache.iceberg.table.ttl-second" = "0" ); """ diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_meta_cache.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_meta_cache.groovy index 0f3391805b0fa4..2e2a2ea8e9b5c9 100644 --- a/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_meta_cache.groovy +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_meta_cache.groovy @@ -37,7 +37,7 @@ suite("test_iceberg_table_meta_cache", "p0,external,doris,external_docker,extern 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}', 'fs.defaultFS' = '${default_fs}', 'warehouse' = '${warehouse}', - 'iceberg.manifest.cache.enable' = 'true' + 'meta.cache.iceberg.manifest.enable' = 'true' ); """ sql """switch ${catalog_name}""" @@ -65,7 +65,7 @@ suite("test_iceberg_table_meta_cache", "p0,external,doris,external_docker,extern sql """select * from test_iceberg_meta_cache_db.sales""" sql """drop table test_iceberg_meta_cache_db.sales""" - // 2. test catalog with iceberg.table.meta.cache.ttl-second + // 2. test catalog with meta.cache.iceberg.table.ttl-second sql """drop catalog if exists ${catalog_name_no_cache};""" test { sql """ @@ -75,8 +75,8 @@ suite("test_iceberg_table_meta_cache", "p0,external,doris,external_docker,extern 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}', 'fs.defaultFS' = '${default_fs}', 'warehouse' = '${warehouse}', - 'iceberg.manifest.cache.enable' = 'false', - 'iceberg.table.meta.cache.ttl-second' = '-2' + 'meta.cache.iceberg.manifest.enable' = 'false', + 'meta.cache.iceberg.table.ttl-second' = '-2' ); """ exception "is wrong" @@ -90,8 +90,8 @@ suite("test_iceberg_table_meta_cache", "p0,external,doris,external_docker,extern 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}', 'fs.defaultFS' = '${default_fs}', 'warehouse' = '${warehouse}', - 'iceberg.manifest.cache.enable' = 'false', - 'iceberg.table.meta.cache.ttl-second' = '0' + 'meta.cache.iceberg.manifest.enable' = 'false', + 'meta.cache.iceberg.table.ttl-second' = '0' ); """ sql """switch ${catalog_name_no_cache}""" @@ -126,7 +126,7 @@ suite("test_iceberg_table_meta_cache", "p0,external,doris,external_docker,extern 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}', 'fs.defaultFS' = '${default_fs}', 'warehouse' = '${warehouse}', - 'iceberg.manifest.cache.enable' = 'false' + 'meta.cache.iceberg.manifest.enable' = 'false' ); """ sql """switch ${catalog_name_no_cache}""" @@ -147,11 +147,11 @@ suite("test_iceberg_table_meta_cache", "p0,external,doris,external_docker,extern sql """select * from test_iceberg_meta_cache_db.sales""" // alter wrong catalog property test { - sql """alter catalog ${catalog_name_no_cache} set properties ("iceberg.table.meta.cache.ttl-second" = "-2")""" + sql """alter catalog ${catalog_name_no_cache} set properties ("meta.cache.iceberg.table.ttl-second" = "-2")""" exception "is wrong" } // alter catalog property, disable meta cache - sql """alter catalog ${catalog_name_no_cache} set properties ("iceberg.table.meta.cache.ttl-second" = "0")""" + sql """alter catalog ${catalog_name_no_cache} set properties ("meta.cache.iceberg.table.ttl-second" = "0")""" // select 2 rows sql """select * from test_iceberg_meta_cache_db.sales""" // insert into new value diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_table_meta_cache.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_table_meta_cache.groovy new file mode 100644 index 00000000000000..2a3176688f505f --- /dev/null +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_table_meta_cache.groovy @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_paimon_table_meta_cache", "p0,external,doris,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enablePaimonTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable paimon test.") + return + } + + String minioPort = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + String catalogWithCache = "test_paimon_table_cache_with_cache" + String catalogNoCache = "test_paimon_table_cache_no_cache" + String testDb = "paimon_cache_test_db" + + sql """drop catalog if exists ${catalogWithCache}""" + sql """drop catalog if exists ${catalogNoCache}""" + + sql """ + CREATE CATALOG ${catalogWithCache} PROPERTIES ( + 'type' = 'paimon', + 'warehouse' = 's3://warehouse/wh', + 's3.endpoint' = 'http://${externalEnvIp}:${minioPort}', + 's3.access_key' = 'admin', + 's3.secret_key' = 'password', + 's3.path.style.access' = 'true' + ); + """ + + sql """ + CREATE CATALOG ${catalogNoCache} PROPERTIES ( + 'type' = 'paimon', + 'warehouse' = 's3://warehouse/wh', + 's3.endpoint' = 'http://${externalEnvIp}:${minioPort}', + 's3.access_key' = 'admin', + 's3.secret_key' = 'password', + 's3.path.style.access' = 'true', + 'meta.cache.paimon.table.ttl-second' = '0' + ); + """ + + try { + spark_paimon "CREATE DATABASE IF NOT EXISTS paimon.${testDb}" + + // ==================== Test 1: DML (INSERT) ==================== + logger.info("========== Test 1: DML (INSERT) ==========") + spark_paimon "DROP TABLE IF EXISTS paimon.${testDb}.test_insert" + spark_paimon "CREATE TABLE paimon.${testDb}.test_insert (id INT, name STRING) USING paimon" + spark_paimon "INSERT INTO paimon.${testDb}.test_insert VALUES (1, 'initial')" + + sql """switch ${catalogWithCache}""" + def result1 = sql """select * from ${testDb}.test_insert order by id""" + assertEquals(1, result1.size()) + + sql """switch ${catalogNoCache}""" + def result1NoCache = sql """select * from ${testDb}.test_insert order by id""" + assertEquals(1, result1NoCache.size()) + + spark_paimon "INSERT INTO paimon.${testDb}.test_insert VALUES (2, 'external_insert')" + + sql """switch ${catalogWithCache}""" + def result2 = sql """select * from ${testDb}.test_insert order by id""" + assertEquals(1, result2.size()) + + sql """switch ${catalogNoCache}""" + def result2NoCache = sql """select * from ${testDb}.test_insert order by id""" + assertEquals(2, result2NoCache.size()) + + sql """switch ${catalogWithCache}""" + sql """refresh table ${testDb}.test_insert""" + def result3 = sql """select * from ${testDb}.test_insert order by id""" + assertEquals(2, result3.size()) + + // ==================== Test 2: Schema Change (ADD COLUMN) ==================== + logger.info("========== Test 2: Schema Change (ADD COLUMN) ==========") + spark_paimon "DROP TABLE IF EXISTS paimon.${testDb}.test_add_column" + spark_paimon "CREATE TABLE paimon.${testDb}.test_add_column (id INT, name STRING) USING paimon" + spark_paimon "INSERT INTO paimon.${testDb}.test_add_column VALUES (1, 'test')" + + sql """switch ${catalogWithCache}""" + def addColDesc1 = sql """desc ${testDb}.test_add_column""" + assertEquals(2, addColDesc1.size()) + + sql """switch ${catalogNoCache}""" + def addColDesc1NoCache = sql """desc ${testDb}.test_add_column""" + assertEquals(2, addColDesc1NoCache.size()) + + spark_paimon "ALTER TABLE paimon.${testDb}.test_add_column ADD COLUMNS (new_col INT)" + + sql """switch ${catalogWithCache}""" + def addColDesc2 = sql """desc ${testDb}.test_add_column""" + assertEquals(2, addColDesc2.size()) + + sql """switch ${catalogNoCache}""" + def addColDesc2NoCache = sql """desc ${testDb}.test_add_column""" + assertEquals(3, addColDesc2NoCache.size()) + + sql """switch ${catalogWithCache}""" + sql """refresh table ${testDb}.test_add_column""" + def addColDesc3 = sql """desc ${testDb}.test_add_column""" + assertEquals(3, addColDesc3.size()) + } finally { + try { + spark_paimon "DROP TABLE IF EXISTS paimon.${testDb}.test_insert" + spark_paimon "DROP TABLE IF EXISTS paimon.${testDb}.test_add_column" + } catch (Exception e) { + logger.warn("Failed to drop paimon tables: ${e.message}".toString()) + } + sql """drop catalog if exists ${catalogWithCache}""" + sql """drop catalog if exists ${catalogNoCache}""" + } +}