Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
#include <Storages/System/attachSystemTables.h>
#include <Storages/System/attachInformationSchemaTables.h>
#include <Storages/Cache/registerRemoteFileMetadatas.h>
#include <Storages/Cache/ObjectStorageListObjectsCache.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
#include <Functions/registerFunctions.h>
Expand Down Expand Up @@ -415,6 +416,9 @@ namespace ServerSetting
extern const ServerSettingsUInt64 keeper_server_socket_send_timeout_sec;
extern const ServerSettingsString hdfs_libhdfs3_conf;
extern const ServerSettingsString config_file;
extern const ServerSettingsUInt64 object_storage_list_objects_cache_ttl;
extern const ServerSettingsUInt64 object_storage_list_objects_cache_size;
extern const ServerSettingsUInt64 object_storage_list_objects_cache_max_entries;
}

namespace ErrorCodes
Expand All @@ -425,6 +429,9 @@ namespace ErrorCodes
namespace FileCacheSetting
{
extern const FileCacheSettingsBool load_metadata_asynchronously;
extern const ServerSettingsUInt64 object_storage_list_objects_cache_size;
extern const ServerSettingsUInt64 object_storage_list_objects_cache_max_entries;
extern const ServerSettingsUInt64 object_storage_list_objects_cache_ttl;
}

}
Expand Down Expand Up @@ -2737,6 +2744,10 @@ try
/// try set up encryption. There are some errors in config, error will be printed and server wouldn't start.
CompressionCodecEncrypted::Configuration::instance().load(config(), "encryption_codecs");

ObjectStorageListObjectsCache::instance().setMaxSizeInBytes(server_settings[ServerSetting::object_storage_list_objects_cache_size]);
ObjectStorageListObjectsCache::instance().setMaxCount(server_settings[ServerSetting::object_storage_list_objects_cache_max_entries]);
ObjectStorageListObjectsCache::instance().setTTL(server_settings[ServerSetting::object_storage_list_objects_cache_ttl]);

auto replicas_reconnector = ReplicasReconnector::init(global_context);

/// Set current database name before loading tables and databases because
Expand Down
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ enum class AccessType : uint8_t
M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM CLEAR SCHEMA CACHE, SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_FORMAT_SCHEMA_CACHE, "SYSTEM CLEAR FORMAT SCHEMA CACHE, SYSTEM DROP FORMAT SCHEMA CACHE, DROP FORMAT SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_S3_CLIENT_CACHE, "SYSTEM CLEAR S3 CLIENT CACHE, SYSTEM DROP S3 CLIENT, DROP S3 CLIENT CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE, "SYSTEM DROP OBJECT STORAGE LIST OBJECTS CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \
M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_USERS, "RELOAD USERS", GLOBAL, SYSTEM_RELOAD) \
Expand Down
4 changes: 4 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1298,6 +1298,10 @@ The server successfully detected this situation and will download merged part fr
M(RuntimeFilterRowsChecked, "Number of rows checked by JOIN Runtime Filters", ValueType::Number) \
M(RuntimeFilterRowsPassed, "Number of rows that passed (not filtered out by) JOIN Runtime Filters", ValueType::Number) \
M(RuntimeFilterRowsSkipped, "Number of rows in blocks that were skipped by JOIN Runtime Filters", ValueType::Number) \
M(ObjectStorageListObjectsCacheHits, "Number of times object storage list objects operation hit the cache.", ValueType::Number) \
M(ObjectStorageListObjectsCacheMisses, "Number of times object storage list objects operation miss the cache.", ValueType::Number) \
M(ObjectStorageListObjectsCacheExactMatchHits, "Number of times object storage list objects operation hit the cache with an exact match.", ValueType::Number) \
M(ObjectStorageListObjectsCachePrefixMatchHits, "Number of times object storage list objects operation miss the cache using prefix matching.", ValueType::Number) \


#ifdef APPLY_FOR_EXTERNAL_EVENTS
Expand Down
4 changes: 2 additions & 2 deletions src/Common/TTLCachePolicy.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,10 @@ class TTLCachePolicy : public ICachePolicy<Key, Mapped, HashFunction, WeightFunc
return res;
}

private:
protected:
using Cache = std::unordered_map<Key, MappedPtr, HashFunction>;
Cache cache;

private:
/// TODO To speed up removal of stale entries, we could also add another container sorted on expiry times which maps keys to iterators
/// into the cache. To insert an entry, add it to the cache + add the iterator to the sorted container. To remove stale entries, do a
/// binary search on the sorted container and erase all left of the found key.
Expand Down
5 changes: 4 additions & 1 deletion src/Core/ServerSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1469,7 +1469,10 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_
```xml
<skip_check_for_incorrect_settings>1</skip_check_for_incorrect_settings>
```
)", 0)
)", 0) \
DECLARE(UInt64, object_storage_list_objects_cache_size, 500000000, "Maximum size of ObjectStorage list objects cache in bytes. Zero means disabled.", 0) \
DECLARE(UInt64, object_storage_list_objects_cache_max_entries, 1000, "Maximum size of ObjectStorage list objects cache in entries. Zero means disabled.", 0) \
DECLARE(UInt64, object_storage_list_objects_cache_ttl, 3600, "Time to live of records in ObjectStorage list objects cache in seconds. Zero means unlimited", 0)

/// Settings with a path are server settings with at least one layer of nesting that have a fixed structure (no lists, lists, enumerations, repetitions, ...).
#define LIST_OF_SERVER_SETTINGS_WITH_PATH(DECLARE, ALIAS) \
Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7456,6 +7456,9 @@ Write full paths (including s3://) into iceberg metadata files.
)", EXPERIMENTAL) \
DECLARE(String, iceberg_metadata_compression_method, "", R"(
Method to compress `.metadata.json` file.
)", EXPERIMENTAL) \
DECLARE(Bool, use_object_storage_list_objects_cache, false, R"(
Cache the list of objects returned by list objects calls in object storage
)", EXPERIMENTAL) \
DECLARE(Bool, make_distributed_plan, false, R"(
Make distributed query plan.
Expand Down
2 changes: 2 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"throw_if_deduplication_in_dependent_materialized_views_enabled_with_async_insert", true, false, "It becomes obsolete."},
{"database_datalake_require_metadata_access", true, true, "New setting."},
{"automatic_parallel_replicas_min_bytes_per_replica", 0, 1_MiB, "Better default value derived from testing results"},
{"use_object_storage_list_objects_cache", false, false, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.12",
{
Expand Down Expand Up @@ -289,6 +290,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"allow_experimental_lightweight_update", false, true, "Lightweight updates were moved to Beta."},
{"s3_slow_all_threads_after_retryable_error", false, false, "Added an alias for setting `backup_slow_all_threads_after_retryable_s3_error`"},
{"serialize_string_in_memory_with_zero_byte", true, true, "New setting"},
{"use_object_storage_list_objects_cache", false, false, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.7",
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class AzureObjectStorage : public IObjectStorage
const String & description_,
const String & common_key_prefix_);

bool supportsListObjectsCache() override { return true; }

void listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const override;

/// Sanitizer build may crash with max_keys=1; this looks like a false positive.
Expand Down
2 changes: 2 additions & 0 deletions src/Disks/DiskObjectStorage/ObjectStorages/IObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ class IObjectStorage
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "The method 'tagObjects' is only implemented for S3 and Azure storages");
}
#endif

virtual bool supportsListObjectsCache() { return false; }
};

using ObjectStoragePtr = std::shared_ptr<IObjectStorage>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class S3ObjectStorage : public IObjectStorage

ObjectStorageType getType() const override { return ObjectStorageType::S3; }

bool supportsListObjectsCache() override { return true; }

bool exists(const StoredObject & object) const override;

std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT
Expand Down
9 changes: 8 additions & 1 deletion src/Interpreters/InterpreterSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <QueryPipeline/QueryPipeline.h>
#include <Storages/Cache/ObjectStorageListObjectsCache.h>
#include <Storages/Freeze.h>
#include <Storages/MaterializedView/RefreshTask.h>
#include <Storages/ObjectStorage/Azure/Configuration.h>
Expand Down Expand Up @@ -470,7 +471,12 @@ BlockIO InterpreterSystemQuery::execute()
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "The server was compiled without the support for AWS S3");
#endif

case Type::DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE:
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE);
ObjectStorageListObjectsCache::instance().clear();
break;
}
case Type::CLEAR_FILESYSTEM_CACHE:
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_FILESYSTEM_CACHE);
Expand Down Expand Up @@ -1998,6 +2004,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::CLEAR_SCHEMA_CACHE:
case Type::CLEAR_FORMAT_SCHEMA_CACHE:
case Type::CLEAR_S3_CLIENT_CACHE:
case Type::DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE:
{
required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE);
break;
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti
case Type::CLEAR_COMPILED_EXPRESSION_CACHE:
case Type::CLEAR_S3_CLIENT_CACHE:
case Type::CLEAR_ICEBERG_METADATA_CACHE:
case Type::DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE:
case Type::RESET_COVERAGE:
case Type::RESTART_REPLICAS:
case Type::JEMALLOC_PURGE:
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTSystemQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster
CLEAR_SCHEMA_CACHE,
CLEAR_FORMAT_SCHEMA_CACHE,
CLEAR_S3_CLIENT_CACHE,
DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE,
STOP_LISTEN,
START_LISTEN,
RESTART_REPLICAS,
Expand Down
Loading
Loading