Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ class IDataLakeMetadata : boost::noncopyable
virtual void alter(const AlterCommands & /*params*/, ContextPtr /*context*/) { throwNotImplemented("alter"); }
virtual void drop(ContextPtr) { }

virtual std::optional<String> partitionKey(ContextPtr) const { return {}; }
virtual std::optional<String> sortingKey(ContextPtr) const { return {}; }

protected:
virtual ObjectIterator
createKeysIterator(Strings && data_files_, ObjectStoragePtr object_storage_, IDataLakeMetadata::FileProgressCallback callback_) const;
Expand Down
51 changes: 51 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/formatWithPossiblyHidingSecrets.h>
#include <Interpreters/IcebergMetadataLog.h>

#include <Storages/ObjectStorage/DataLakes/Common/Common.h>
Expand Down Expand Up @@ -859,6 +860,39 @@ std::optional<size_t> IcebergMetadata::totalBytes(ContextPtr local_context) cons
return result;
}

std::optional<String> IcebergMetadata::partitionKey(ContextPtr context) const
{
auto [actual_data_snapshot, actual_table_state_snapshot] = getRelevantState(context);
if (!actual_data_snapshot)
return std::nullopt;
return getPartitionKey(context, actual_table_state_snapshot);
}

std::optional<String> IcebergMetadata::sortingKey(ContextPtr context) const
{
auto [actual_data_snapshot, actual_table_state_snapshot] = getRelevantState(context);
if (!actual_data_snapshot)
return std::nullopt;
auto metadata_object = getMetadataJSONObject(
actual_table_state_snapshot.metadata_file_path,
object_storage,
persistent_components.metadata_cache,
context,
log,
persistent_components.metadata_compression_method,
persistent_components.table_uuid);
auto [schema, current_schema_id] = parseTableSchemaV2Method(metadata_object);
const auto & ch_schema = *persistent_components.schema_processor->getClickhouseTableSchemaById(current_schema_id);
auto display = getSortingKeyDisplayStringFromMetadata(metadata_object, ch_schema);
if (display)
return display;
auto key = getSortingKey(context, actual_table_state_snapshot);
if (!key.expression_list_ast)
return std::nullopt;
return format({context, *key.expression_list_ast});
}


ObjectIterator IcebergMetadata::iterate(
const ActionsDAG * filter_dag,
FileProgressCallback callback,
Expand Down Expand Up @@ -1089,6 +1123,23 @@ ColumnMapperPtr IcebergMetadata::getColumnMapperForCurrentSchema(StorageMetadata
return persistent_components.schema_processor->getColumnMapperById(iceberg_table_state->schema_id);
}

std::optional<String> IcebergMetadata::getPartitionKey(ContextPtr local_context, TableStateSnapshot actual_table_state_snapshot) const
{
auto metadata_object = getMetadataJSONObject(
actual_table_state_snapshot.metadata_file_path,
object_storage,
persistent_components.metadata_cache,
local_context,
log,
persistent_components.metadata_compression_method,
persistent_components.table_uuid);
auto [schema, current_schema_id] = parseTableSchemaV2Method(metadata_object);
return getPartitionKeyStringFromMetadata(
metadata_object,
*persistent_components.schema_processor->getClickhouseTableSchemaById(current_schema_id),
local_context);
}

KeyDescription IcebergMetadata::getSortingKey(ContextPtr local_context, TableStateSnapshot actual_table_state_snapshot) const
{
auto metadata_object = getMetadataJSONObject(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ class IcebergMetadata : public IDataLakeMetadata

void drop(ContextPtr context) override;

std::optional<String> partitionKey(ContextPtr) const override;
std::optional<String> sortingKey(ContextPtr) const override;

private:
Iceberg::PersistentTableComponents initializePersistentTableComponents(
StorageObjectStorageConfigurationPtr configuration, IcebergMetadataFilesCachePtr cache_ptr, ContextPtr context_);
Expand All @@ -145,12 +148,14 @@ class IcebergMetadata : public IDataLakeMetadata
getRelevantDataSnapshotFromTableStateSnapshot(Iceberg::TableStateSnapshot table_state_snapshot, ContextPtr local_context) const;
std::pair<Iceberg::IcebergDataSnapshotPtr, Iceberg::TableStateSnapshot> getRelevantState(const ContextPtr & context) const;

std::optional<String> getPartitionKey(ContextPtr local_context, Iceberg::TableStateSnapshot actual_table_state_snapshot) const;
KeyDescription getSortingKey(ContextPtr local_context, Iceberg::TableStateSnapshot actual_table_state_snapshot) const;

LoggerPtr log;
const ObjectStoragePtr object_storage;
DB::Iceberg::PersistentTableComponents persistent_components;
const DataLakeStorageSettings & data_lake_settings;
const String write_format;
KeyDescription getSortingKey(ContextPtr local_context, Iceberg::TableStateSnapshot actual_table_state_snapshot) const;
};
}

Expand Down
2 changes: 2 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ struct IcebergDataSnapshot
std::optional<size_t> total_rows;
std::optional<size_t> total_bytes;
std::optional<size_t> total_position_delete_rows;
std::optional<String> partition_key;
std::optional<String> sorting_key;

std::optional<size_t> getTotalRows() const
{
Expand Down
118 changes: 118 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,124 @@ KeyDescription getSortingKeyDescriptionFromMetadata(Poco::JSON::Object::Ptr meta
return KeyDescription::parse(order_by_str, column_description, local_context, true);
}

/// Format one partition field for display in Iceberg/Spark style, e.g. "day(ts)" or "bucket(16, id)".
static String formatPartitionFieldDisplay(const String & iceberg_transform_name, const String & column_name)
{
std::string name = Poco::toLower(iceberg_transform_name);
if (name == "identity")
return column_name;
if (name == "year" || name == "years")
return "year(" + column_name + ")";
if (name == "month" || name == "months")
return "month(" + column_name + ")";
if (name == "day" || name == "date" || name == "days" || name == "dates")
return "day(" + column_name + ")";
if (name == "hour" || name == "hours")
return "hour(" + column_name + ")";
if (name.starts_with("truncate") && name.back() == ']')
{
auto p = name.find('[');
if (p != std::string::npos)
return "truncate(" + name.substr(p + 1, name.size() - p - 2) + ", " + column_name + ")";
}
if (name.starts_with("bucket") && name.back() == ']')
{
auto p = name.find('[');
if (p != std::string::npos)
return "bucket(" + name.substr(p + 1, name.size() - p - 2) + ", " + column_name + ")";
}
return column_name;
}

std::optional<String> getPartitionKeyStringFromMetadata(Poco::JSON::Object::Ptr metadata_object, const NamesAndTypesList & /* ch_schema */, ContextPtr /* local_context */)
{
if (!metadata_object->has(f_partition_specs) || !metadata_object->has(f_default_spec_id))
return std::nullopt;
auto partition_spec_id = metadata_object->getValue<Int64>(f_default_spec_id);
Poco::JSON::Array::Ptr partition_specs = metadata_object->getArray(f_partition_specs);
std::unordered_map<Int64, String> source_id_to_column_name;
auto [schema, current_schema_id] = parseTableSchemaV2Method(metadata_object);
auto mapper = createColumnMapper(schema)->getStorageColumnEncoding();
for (const auto & [col_name, source_id] : mapper)
source_id_to_column_name[source_id] = col_name;

Poco::JSON::Object::Ptr partition_spec;
for (size_t i = 0; i < partition_specs->size(); ++i)
{
auto spec = partition_specs->getObject(static_cast<UInt32>(i));
if (spec->getValue<Int64>(f_spec_id) == partition_spec_id)
{
partition_spec = spec;
break;
}
}
if (!partition_spec || !partition_spec->has(f_fields))
return std::nullopt;
auto fields = partition_spec->getArray(f_fields);
if (fields->size() == 0)
return std::nullopt;

std::vector<String> part_exprs;
for (UInt32 i = 0; i < fields->size(); ++i)
{
auto field = fields->getObject(i);
auto source_id = field->getValue<Int64>(f_source_id);
auto it = source_id_to_column_name.find(source_id);
if (it == source_id_to_column_name.end())
return std::nullopt;
String column_name = it->second;
auto iceberg_transform_name = field->getValue<String>(f_transform);
part_exprs.push_back(formatPartitionFieldDisplay(iceberg_transform_name, column_name));
}
String result;
for (size_t i = 0; i < part_exprs.size(); ++i)
{
if (i != 0)
result += ", ";
result += part_exprs[i];
}
return result;
}

std::optional<String> getSortingKeyDisplayStringFromMetadata(Poco::JSON::Object::Ptr metadata_object, const NamesAndTypesList & /* ch_schema */)
{
if (!metadata_object->has(f_sort_orders) || !metadata_object->has(f_default_sort_order_id))
return std::nullopt;
auto sort_order_id = metadata_object->getValue<Int64>(f_default_sort_order_id);
Poco::JSON::Array::Ptr sort_orders = metadata_object->getArray(f_sort_orders);
std::unordered_map<Int64, String> source_id_to_column_name;
auto [schema, current_schema_id] = parseTableSchemaV2Method(metadata_object);
auto mapper = createColumnMapper(schema)->getStorageColumnEncoding();
for (const auto & [col_name, source_id] : mapper)
source_id_to_column_name[source_id] = col_name;

for (UInt32 i = 0; i < sort_orders->size(); ++i)
{
auto sort_order = sort_orders->getObject(i);
if (sort_order->getValue<Int64>(f_order_id) != sort_order_id)
continue;
auto sort_fields = sort_order->getArray(f_fields);
String result;
for (UInt32 j = 0; j < sort_fields->size(); ++j)
{
auto field = sort_fields->getObject(j);
auto source_id = field->getValue<Int64>(f_source_id);
auto it = source_id_to_column_name.find(source_id);
if (it == source_id_to_column_name.end())
return std::nullopt;
String column_name = it->second;
String direction = field->getValue<String>(f_direction) == "asc" ? " asc" : " desc";
auto iceberg_transform_name = field->getValue<String>(f_transform);
String expr = formatPartitionFieldDisplay(iceberg_transform_name, column_name);
if (!result.empty())
result += ", ";
result += expr + direction;
}
return result.empty() ? std::nullopt : std::optional<String>(result);
}
return std::nullopt;
}

DataTypePtr getFunctionResultType(const String & iceberg_transform_name, DataTypePtr source_type)
{
if (iceberg_transform_name.starts_with("identity") || iceberg_transform_name.starts_with("truncate"))
Expand Down
6 changes: 6 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <optional>
#include <string>
#include <string_view>
#include <Storages/ObjectStorage/DataLakes/Iceberg/PersistentTableComponents.h>
Expand Down Expand Up @@ -108,6 +109,11 @@ DataTypePtr getFunctionResultType(const String & iceberg_transform_name, DataTyp

KeyDescription getSortingKeyDescriptionFromMetadata(
Poco::JSON::Object::Ptr metadata_object, const NamesAndTypesList & ch_schema, ContextPtr local_context);
/// Returns Iceberg/Spark-style display string for sort order, e.g. "id desc, hour(ts) asc".
std::optional<String> getSortingKeyDisplayStringFromMetadata(
Poco::JSON::Object::Ptr metadata_object, const NamesAndTypesList & ch_schema);
std::optional<String> getPartitionKeyStringFromMetadata(
Poco::JSON::Object::Ptr metadata_object, const NamesAndTypesList & ch_schema, ContextPtr local_context);
void sortBlockByKeyDescription(Block & block, const KeyDescription & sort_description, ContextPtr context);
}

Expand Down
71 changes: 63 additions & 8 deletions src/Storages/System/StorageSystemTables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageView.h>
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
Expand Down Expand Up @@ -605,18 +607,71 @@ class TablesBlockSource : public ISource
ASTPtr expression_ptr;
if (columns_mask[src_index++])
{
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getPartitionKeyAST()))
res_columns[res_index++]->insert(format({context, *expression_ptr}));
else
res_columns[res_index++]->insertDefault();
bool inserted = false;

try
{
// Extract from specific DataLake metadata if suitable
if (auto * obj = dynamic_cast<StorageObjectStorage *>(table.get()))
{
if (auto * dl_meta = obj->getExternalMetadata(context))
{
if (auto p = dl_meta->partitionKey(context); p.has_value())
{
res_columns[res_index++]->insert(*p);
inserted = true;
}
}

}
}
catch (const Exception &)
{
/// Failed to get info. It's not critical, just log it.
tryLogCurrentException("StorageSystemTables");
}

if (!inserted)
{
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getPartitionKeyAST()))
res_columns[res_index++]->insert(format({context, *expression_ptr}));
else
res_columns[res_index++]->insertDefault();
}
}

if (columns_mask[src_index++])
{
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getSortingKey().expression_list_ast))
res_columns[res_index++]->insert(format({context, *expression_ptr}));
else
res_columns[res_index++]->insertDefault();
bool inserted = false;

try
{
// Extract from specific DataLake metadata if suitable
if (auto * obj = dynamic_cast<StorageObjectStorage *>(table.get()))
{
if (auto * dl_meta = obj->getExternalMetadata(context))
{
if (auto p = dl_meta->sortingKey(context); p.has_value())
{
res_columns[res_index++]->insert(*p);
inserted = true;
}
}
}
}
catch (const Exception &)
{
/// Failed to get info. It's not critical, just log it.
tryLogCurrentException("StorageSystemTables");
}

if (!inserted)
{
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getSortingKey().expression_list_ast))
res_columns[res_index++]->insert(format({context, *expression_ptr}));
else
res_columns[res_index++]->insertDefault();
}
}

if (columns_mask[src_index++])
Expand Down
Loading
Loading