From 20a893f40164cf9e437a45f3a74287a70ede4867 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 20 Feb 2026 18:01:00 +0100 Subject: [PATCH] Do not lock IcebergIterator::next in StorageObjectStorageStableTaskDistributor --- src/Common/ProfileEvents.cpp | 1 + .../DataLakes/Iceberg/IcebergIterator.cpp | 3 +++ .../DataLakes/Iceberg/IcebergIterator.h | 2 ++ src/Storages/ObjectStorage/IObjectIterator.h | 1 + .../StorageObjectStorageStableTaskDistributor.cpp | 13 +++++++++++++ 5 files changed, 20 insertions(+) diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 20c54ddf7cef..d31e3eec9c36 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -307,6 +307,7 @@ M(IcebergAvroFileParsingMicroseconds, "Time spent for parsing avro metadata files for Iceberg tables.", ValueType::Microseconds) \ M(IcebergJsonFileParsing, "Number of times json metadata files have been parsed.", ValueType::Number) \ M(IcebergJsonFileParsingMicroseconds, "Time spent for parsing json metadata files for Iceberg tables.", ValueType::Microseconds) \ + M(IcebergIteratorNextMicroseconds, "Time spent for getting next objects in Iceberg iterator.", ValueType::Microseconds) \ \ M(JoinBuildTableRowCount, "Total number of rows in the build table for a JOIN operation.", ValueType::Number) \ M(JoinProbeTableRowCount, "Total number of rows in the probe table for a JOIN operation.", ValueType::Number) \ diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp index 7264647486e6..a3f635072fc0 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp @@ -61,6 +61,7 @@ extern const Event IcebergPartitionPrunedFiles; extern const Event IcebergMinMaxIndexPrunedFiles; extern const Event IcebergMetadataReadWaitTimeMicroseconds; extern const Event IcebergMetadataReturnedObjectInfos; +extern const Event IcebergIteratorNextMicroseconds; }; @@ -119,6 +120,8 @@ defineDeletesSpan(ManifestFileEntry data_object_, const std::vector SingleThreadIcebergKeysIterator::next() { + ProfileEventTimeIncrement watch(ProfileEvents::IcebergIteratorNextMicroseconds); + if (!data_snapshot) { return std::nullopt; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h index f59db6c16a2c..1ee3a9239cdf 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h @@ -100,6 +100,8 @@ class IcebergIterator : public IObjectIterator size_t estimatedKeysCount() override; ~IcebergIterator() override; + bool has_concurrent_next() const override { return true; } + private: std::unique_ptr filter_dag; ObjectStoragePtr object_storage; diff --git a/src/Storages/ObjectStorage/IObjectIterator.h b/src/Storages/ObjectStorage/IObjectIterator.h index 4ad74b1a76d6..47e603a36dc8 100644 --- a/src/Storages/ObjectStorage/IObjectIterator.h +++ b/src/Storages/ObjectStorage/IObjectIterator.h @@ -21,6 +21,7 @@ struct IObjectIterator virtual ObjectInfoPtr next(size_t) = 0; virtual size_t estimatedKeysCount() = 0; virtual std::optional getSnapshotVersion() const { return std::nullopt; } + virtual bool has_concurrent_next() const { return false; } }; using ObjectIterator = std::shared_ptr; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index aec80ef8e082..2984facad9d0 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -154,6 +154,19 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter { ObjectInfoPtr object_info; + if (iterator->has_concurrent_next()) + { + object_info = iterator->next(0); + + if (!object_info) + { + LOG_TEST(log, "Iterator is exhausted"); + std::lock_guard lock(mutex); + iterator_exhausted = true; + break; + } + } + else { std::lock_guard lock(mutex); object_info = iterator->next(0);