Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

public class CatalogScopedCacheMgr<T> {
private final Map<Long, T> cacheMap = new ConcurrentHashMap<>();
private final Function<ExternalCatalog, T> cacheFactory;

public CatalogScopedCacheMgr(Function<ExternalCatalog, T> cacheFactory) {
this.cacheFactory = cacheFactory;
}

public T getCache(ExternalCatalog catalog) {
return cacheMap.computeIfAbsent(catalog.getId(), id -> cacheFactory.apply(catalog));
}

public T getCache(long catalogId) {
return cacheMap.get(catalogId);
}

public T removeCache(long catalogId) {
return cacheMap.remove(catalogId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1450,4 +1450,3 @@ public void reorderColumns(TableIf dorisTable, List<String> newOrder) throws Use
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.doris.datasource.metacache.MetaCache;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.datasource.paimon.PaimonMetadataCache;
import org.apache.doris.datasource.paimon.PaimonMetadataCacheMgr;
import org.apache.doris.fs.FileSystemCache;
import org.apache.doris.nereids.exceptions.NotSupportedException;

Expand Down Expand Up @@ -87,21 +86,17 @@ public class ExternalMetaCacheMgr {
// This executor is used to schedule the getting split tasks
private ExecutorService scheduleExecutor;

// catalog id -> HiveMetaStoreCache
private final Map<Long, HiveMetaStoreCache> hiveCacheMap = Maps.newConcurrentMap();

// catalog id -> IcebergMetadataCache
private final Map<Long, IcebergMetadataCache> icebergCacheMap = Maps.newConcurrentMap();
// catalog id -> table schema cache
private final Map<Long, ExternalSchemaCache> schemaCacheMap = Maps.newHashMap();
private final CatalogScopedCacheMgr<HiveMetaStoreCache> hiveMetaStoreCacheMgr;
private final CatalogScopedCacheMgr<IcebergMetadataCache> icebergMetadataCacheMgr;
private final CatalogScopedCacheMgr<PaimonMetadataCache> paimonMetadataCacheMgr;
private final CatalogScopedCacheMgr<ExternalSchemaCache> schemaCacheMgr;
// hudi partition manager
private final HudiMetadataCacheMgr hudiMetadataCacheMgr;
// all catalogs could share the same fsCache.
private FileSystemCache fsCache;
// all external table row count cache.
private ExternalRowCountCache rowCountCache;
private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr;
private final PaimonMetadataCacheMgr paimonMetadataCacheMgr;
private final DorisExternalMetaCacheMgr dorisExternalMetaCacheMgr;

public ExternalMetaCacheMgr(boolean isCheckpointCatalog) {
Expand Down Expand Up @@ -132,7 +127,15 @@ public ExternalMetaCacheMgr(boolean isCheckpointCatalog) {

hudiMetadataCacheMgr = new HudiMetadataCacheMgr(commonRefreshExecutor);
maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
paimonMetadataCacheMgr = new PaimonMetadataCacheMgr(commonRefreshExecutor);
hiveMetaStoreCacheMgr = new CatalogScopedCacheMgr<>(
catalog -> new HiveMetaStoreCache((HMSExternalCatalog) catalog,
commonRefreshExecutor, fileListingExecutor));
icebergMetadataCacheMgr = new CatalogScopedCacheMgr<>(
catalog -> new IcebergMetadataCache(catalog, commonRefreshExecutor));
schemaCacheMgr = new CatalogScopedCacheMgr<>(
catalog -> new ExternalSchemaCache(catalog, commonRefreshExecutor));
paimonMetadataCacheMgr = new CatalogScopedCacheMgr<>(
catalog -> new PaimonMetadataCache(catalog, commonRefreshExecutor));
dorisExternalMetaCacheMgr = new DorisExternalMetaCacheMgr(commonRefreshExecutor);
}

Expand Down Expand Up @@ -160,30 +163,11 @@ public ExecutorService getScheduleExecutor() {
}

public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) {
HiveMetaStoreCache cache = hiveCacheMap.get(catalog.getId());
if (cache == null) {
synchronized (hiveCacheMap) {
if (!hiveCacheMap.containsKey(catalog.getId())) {
hiveCacheMap.put(catalog.getId(),
new HiveMetaStoreCache(catalog, commonRefreshExecutor, fileListingExecutor));
}
cache = hiveCacheMap.get(catalog.getId());
}
}
return cache;
return hiveMetaStoreCacheMgr.getCache(catalog);
}

public ExternalSchemaCache getSchemaCache(ExternalCatalog catalog) {
ExternalSchemaCache cache = schemaCacheMap.get(catalog.getId());
if (cache == null) {
synchronized (schemaCacheMap) {
if (!schemaCacheMap.containsKey(catalog.getId())) {
schemaCacheMap.put(catalog.getId(), new ExternalSchemaCache(catalog, commonRefreshExecutor));
}
cache = schemaCacheMap.get(catalog.getId());
}
}
return cache;
return schemaCacheMgr.getCache(catalog);
}

public HudiPartitionProcessor getHudiPartitionProcess(ExternalCatalog catalog) {
Expand All @@ -203,20 +187,11 @@ public HudiMetadataCacheMgr getHudiMetadataCacheMgr() {
}

public IcebergMetadataCache getIcebergMetadataCache(ExternalCatalog catalog) {
IcebergMetadataCache cache = icebergCacheMap.get(catalog.getId());
if (cache == null) {
synchronized (icebergCacheMap) {
if (!icebergCacheMap.containsKey(catalog.getId())) {
icebergCacheMap.put(catalog.getId(), new IcebergMetadataCache(catalog, commonRefreshExecutor));
}
cache = icebergCacheMap.get(catalog.getId());
}
}
return cache;
return icebergMetadataCacheMgr.getCache(catalog);
}

public PaimonMetadataCache getPaimonMetadataCache() {
return paimonMetadataCacheMgr.getPaimonMetadataCache();
public PaimonMetadataCache getPaimonMetadataCache(ExternalCatalog catalog) {
return paimonMetadataCacheMgr.getCache(catalog);
}

public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) {
Expand All @@ -236,41 +211,43 @@ public DorisExternalMetaCacheMgr getDorisExternalMetaCacheMgr() {
}

public void removeCache(long catalogId) {
if (hiveCacheMap.remove(catalogId) != null) {
if (hiveMetaStoreCacheMgr.removeCache(catalogId) != null) {
LOG.info("remove hive metastore cache for catalog {}", catalogId);
}
synchronized (schemaCacheMap) {
if (schemaCacheMap.remove(catalogId) != null) {
LOG.info("remove schema cache for catalog {}", catalogId);
}
if (schemaCacheMgr.removeCache(catalogId) != null) {
LOG.info("remove schema cache for catalog {}", catalogId);
}
if (icebergCacheMap.remove(catalogId) != null) {
if (icebergMetadataCacheMgr.removeCache(catalogId) != null) {
LOG.info("remove iceberg meta cache for catalog {}", catalogId);
}
hudiMetadataCacheMgr.removeCache(catalogId);
maxComputeMetadataCacheMgr.removeCache(catalogId);
paimonMetadataCacheMgr.removeCache(catalogId);
PaimonMetadataCache paimonMetadataCache = paimonMetadataCacheMgr.removeCache(catalogId);
if (paimonMetadataCache != null) {
paimonMetadataCache.invalidateCatalogCache(catalogId);
}
dorisExternalMetaCacheMgr.removeCache(catalogId);
}

public void invalidateTableCache(ExternalTable dorisTable) {
synchronized (schemaCacheMap) {
ExternalSchemaCache schemaCache = schemaCacheMap.get(dorisTable.getCatalog().getId());
if (schemaCache != null) {
schemaCache.invalidateTableCache(dorisTable);
}
ExternalSchemaCache schemaCache = schemaCacheMgr.getCache(dorisTable.getCatalog().getId());
if (schemaCache != null) {
schemaCache.invalidateTableCache(dorisTable);
}
HiveMetaStoreCache hiveMetaCache = hiveCacheMap.get(dorisTable.getCatalog().getId());
HiveMetaStoreCache hiveMetaCache = hiveMetaStoreCacheMgr.getCache(dorisTable.getCatalog().getId());
if (hiveMetaCache != null) {
hiveMetaCache.invalidateTableCache(dorisTable.getOrBuildNameMapping());
}
IcebergMetadataCache icebergMetadataCache = icebergCacheMap.get(dorisTable.getCatalog().getId());
IcebergMetadataCache icebergMetadataCache = icebergMetadataCacheMgr.getCache(dorisTable.getCatalog().getId());
if (icebergMetadataCache != null) {
icebergMetadataCache.invalidateTableCache(dorisTable);
}
hudiMetadataCacheMgr.invalidateTableCache(dorisTable);
maxComputeMetadataCacheMgr.invalidateTableCache(dorisTable);
paimonMetadataCacheMgr.invalidateTableCache(dorisTable);
PaimonMetadataCache paimonMetadataCache = paimonMetadataCacheMgr.getCache(dorisTable.getCatalog().getId());
if (paimonMetadataCache != null) {
paimonMetadataCache.invalidateTableCache(dorisTable);
}
if (LOG.isDebugEnabled()) {
LOG.debug("invalid table cache for {}.{} in catalog {}", dorisTable.getRemoteDbName(),
dorisTable.getRemoteName(), dorisTable.getCatalog().getName());
Expand All @@ -279,58 +256,58 @@ public void invalidateTableCache(ExternalTable dorisTable) {

public void invalidateDbCache(long catalogId, String dbName) {
dbName = ClusterNamespace.getNameFromFullName(dbName);
synchronized (schemaCacheMap) {
ExternalSchemaCache schemaCache = schemaCacheMap.get(catalogId);
if (schemaCache != null) {
schemaCache.invalidateDbCache(dbName);
}
ExternalSchemaCache schemaCache = schemaCacheMgr.getCache(catalogId);
if (schemaCache != null) {
schemaCache.invalidateDbCache(dbName);
}
HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId);
HiveMetaStoreCache metaCache = hiveMetaStoreCacheMgr.getCache(catalogId);
if (metaCache != null) {
metaCache.invalidateDbCache(dbName);
}
IcebergMetadataCache icebergMetadataCache = icebergCacheMap.get(catalogId);
IcebergMetadataCache icebergMetadataCache = icebergMetadataCacheMgr.getCache(catalogId);
if (icebergMetadataCache != null) {
icebergMetadataCache.invalidateDbCache(catalogId, dbName);
}
hudiMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
paimonMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
PaimonMetadataCache paimonMetadataCache = paimonMetadataCacheMgr.getCache(catalogId);
if (paimonMetadataCache != null) {
paimonMetadataCache.invalidateDbCache(catalogId, dbName);
}
if (LOG.isDebugEnabled()) {
LOG.debug("invalid db cache for {} in catalog {}", dbName, catalogId);
}
}

public void invalidateCatalogCache(long catalogId) {
synchronized (schemaCacheMap) {
schemaCacheMap.remove(catalogId);
}
HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId);
schemaCacheMgr.removeCache(catalogId);
HiveMetaStoreCache metaCache = hiveMetaStoreCacheMgr.getCache(catalogId);
if (metaCache != null) {
metaCache.invalidateAll();
}
IcebergMetadataCache icebergMetadataCache = icebergCacheMap.get(catalogId);
IcebergMetadataCache icebergMetadataCache = icebergMetadataCacheMgr.getCache(catalogId);
if (icebergMetadataCache != null) {
icebergMetadataCache.invalidateCatalogCache(catalogId);
}
hudiMetadataCacheMgr.invalidateCatalogCache(catalogId);
maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId);
paimonMetadataCacheMgr.invalidateCatalogCache(catalogId);
PaimonMetadataCache paimonMetadataCache = paimonMetadataCacheMgr.getCache(catalogId);
if (paimonMetadataCache != null) {
paimonMetadataCache.invalidateCatalogCache(catalogId);
}
dorisExternalMetaCacheMgr.invalidateCatalogCache(catalogId);
if (LOG.isDebugEnabled()) {
LOG.debug("invalid catalog cache for {}", catalogId);
}
}

public void invalidSchemaCache(long catalogId) {
synchronized (schemaCacheMap) {
schemaCacheMap.remove(catalogId);
}
schemaCacheMgr.removeCache(catalogId);
}

public void addPartitionsCache(long catalogId, HMSExternalTable table, List<String> partitionNames) {
String dbName = ClusterNamespace.getNameFromFullName(table.getDbName());
HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId);
HiveMetaStoreCache metaCache = hiveMetaStoreCacheMgr.getCache(catalogId);
if (metaCache != null) {
List<Type> partitionColumnTypes;
try {
Expand All @@ -348,7 +325,7 @@ public void addPartitionsCache(long catalogId, HMSExternalTable table, List<Stri

public void dropPartitionsCache(long catalogId, HMSExternalTable table, List<String> partitionNames) {
String dbName = ClusterNamespace.getNameFromFullName(table.getDbName());
HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId);
HiveMetaStoreCache metaCache = hiveMetaStoreCacheMgr.getCache(catalogId);
if (metaCache != null) {
metaCache.dropPartitionsCache(table, partitionNames, true);
}
Expand All @@ -358,7 +335,7 @@ public void dropPartitionsCache(long catalogId, HMSExternalTable table, List<Str
}

public void invalidatePartitionsCache(ExternalTable dorisTable, List<String> partitionNames) {
HiveMetaStoreCache metaCache = hiveCacheMap.get(dorisTable.getCatalog().getId());
HiveMetaStoreCache metaCache = hiveMetaStoreCacheMgr.getCache(dorisTable.getCatalog().getId());
if (metaCache != null) {
for (String partitionName : partitionNames) {
metaCache.invalidatePartitionCache(dorisTable, partitionName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@

import org.apache.doris.common.CacheFactory;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.metacache.CacheSpec;
import org.apache.doris.metric.GaugeMetric;
import org.apache.doris.metric.Metric;
import org.apache.doris.metric.MetricLabel;
import org.apache.doris.metric.MetricRepo;

import com.github.benmanes.caffeine.cache.LoadingCache;
import lombok.Data;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -49,13 +49,14 @@ public ExternalSchemaCache(ExternalCatalog catalog, ExecutorService executor) {
}

private void init(ExecutorService executor) {
long schemaCacheTtlSecond = NumberUtils.toLong(
(catalog.getProperties().get(ExternalCatalog.SCHEMA_CACHE_TTL_SECOND)), ExternalCatalog.CACHE_NO_TTL);
CacheSpec cacheSpec = CacheSpec.fromTtlValue(
catalog.getProperties().get(ExternalCatalog.SCHEMA_CACHE_TTL_SECOND),
Config.external_cache_expire_time_seconds_after_access,
Config.max_external_schema_cache_num);
CacheFactory schemaCacheFactory = new CacheFactory(
OptionalLong.of(schemaCacheTtlSecond >= ExternalCatalog.CACHE_TTL_DISABLE_CACHE
? schemaCacheTtlSecond : Config.external_cache_expire_time_seconds_after_access),
CacheSpec.toExpireAfterAccess(cacheSpec.getTtlSecond()),
OptionalLong.of(Config.external_cache_refresh_time_minutes * 60),
Config.max_external_schema_cache_num,
cacheSpec.getCapacity(),
false,
null);
schemaCache = schemaCacheFactory.buildCache(this::loadSchema, executor);
Expand Down
Loading