Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions contrib/curl-cmake/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ set (SRCS
"${LIBRARY_DIR}/lib/sendf.c"
"${LIBRARY_DIR}/lib/setopt.c"
"${LIBRARY_DIR}/lib/sha256.c"
"${LIBRARY_DIR}/lib/curl_share.c"
"${LIBRARY_DIR}/lib/slist.c"
"${LIBRARY_DIR}/lib/smb.c"
"${LIBRARY_DIR}/lib/smtp.c"
Expand Down Expand Up @@ -169,6 +170,7 @@ set (SRCS
"${LIBRARY_DIR}/lib/curlx/dynbuf.c"
"${LIBRARY_DIR}/lib/curlx/fopen.c"
"${LIBRARY_DIR}/lib/curlx/inet_ntop.c"
"${LIBRARY_DIR}/lib/curlx/strcopy.c"
"${LIBRARY_DIR}/lib/curlx/inet_pton.c"
"${LIBRARY_DIR}/lib/curlx/multibyte.c"
"${LIBRARY_DIR}/lib/curlx/nonblock.c"
Expand Down
4 changes: 4 additions & 0 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ namespace DatabaseDataLakeSetting
extern const DatabaseDataLakeSettingsString aws_access_key_id;
extern const DatabaseDataLakeSettingsString aws_secret_access_key;
extern const DatabaseDataLakeSettingsString region;
extern const DatabaseDataLakeSettingsString aws_role_arn;
extern const DatabaseDataLakeSettingsString aws_role_session_name;
}

namespace Setting
Expand Down Expand Up @@ -121,6 +123,8 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
.aws_access_key_id = settings[DatabaseDataLakeSetting::aws_access_key_id].value,
.aws_secret_access_key = settings[DatabaseDataLakeSetting::aws_secret_access_key].value,
.region = settings[DatabaseDataLakeSetting::region].value,
.aws_role_arn = settings[DatabaseDataLakeSetting::aws_role_arn].value,
.aws_role_session_name = settings[DatabaseDataLakeSetting::aws_role_session_name].value,
};

switch (settings[DatabaseDataLakeSetting::catalog_type].value)
Expand Down
22 changes: 12 additions & 10 deletions src/Databases/DataLake/DatabaseDataLakeSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@ namespace ErrorCodes

#define DATABASE_ICEBERG_RELATED_SETTINGS(DECLARE, ALIAS) \
DECLARE(DatabaseDataLakeCatalogType, catalog_type, DatabaseDataLakeCatalogType::NONE, "Catalog type", 0) \
DECLARE(String, catalog_credential, "", "", 0) \
DECLARE(Bool, vended_credentials, true, "Use vended credentials (storage credentials) from catalog", 0) \
DECLARE(String, auth_scope, "PRINCIPAL_ROLE:ALL", "Authorization scope for client credentials or token exchange", 0) \
DECLARE(String, oauth_server_uri, "", "OAuth server uri", 0) \
DECLARE(Bool, oauth_server_use_request_body, true, "Put parameters into request body or query params", 0) \
DECLARE(String, warehouse, "", "Warehouse name inside the catalog", 0) \
DECLARE(String, auth_header, "", "Authorization header of format 'Authorization: <scheme> <auth_info>'", 0) \
DECLARE(String, aws_access_key_id, "", "Key for AWS connection for Glue catalog", 0) \
DECLARE(String, aws_secret_access_key, "", "Key for AWS connection for Glue Catalog'", 0) \
DECLARE(String, region, "", "Region for Glue catalog", 0) \
DECLARE(String, catalog_credential, "", "", 0) \
DECLARE(Bool, vended_credentials, true, "Use vended credentials (storage credentials) from catalog", 0) \
DECLARE(String, auth_scope, "PRINCIPAL_ROLE:ALL", "Authorization scope for client credentials or token exchange", 0) \
DECLARE(String, oauth_server_uri, "", "OAuth server uri", 0) \
DECLARE(Bool, oauth_server_use_request_body, true, "Put parameters into request body or query params", 0) \
DECLARE(String, warehouse, "", "Warehouse name inside the catalog", 0) \
DECLARE(String, auth_header, "", "Authorization header of format 'Authorization: <scheme> <auth_info>'", 0) \
DECLARE(String, aws_access_key_id, "", "Key for AWS connection for Glue catalog", 0) \
DECLARE(String, aws_secret_access_key, "", "Key for AWS connection for Glue Catalog'", 0) \
DECLARE(String, region, "", "Region for Glue catalog", 0) \
DECLARE(String, aws_role_arn, "", "Role arn for AWS connection for Glue catalog", 0) \
DECLARE(String, aws_role_session_name, "", "Role session name for AWS connection for Glue catalog", 0) \
DECLARE(String, storage_endpoint, "", "Object storage endpoint", 0) \

#define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \
Expand Down
57 changes: 33 additions & 24 deletions src/Databases/DataLake/GlueCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@ namespace DB::Setting
extern const SettingsUInt64 s3_request_timeout_ms;
}

namespace DB::StorageObjectStorageSetting
{
extern const StorageObjectStorageSettingsString iceberg_metadata_file_path;
}

namespace DB::DatabaseDataLakeSetting
{
extern const DatabaseDataLakeSettingsString storage_endpoint;
Expand All @@ -96,14 +91,15 @@ GlueCatalog::GlueCatalog(
: ICatalog("")
, DB::WithContext(context_)
, log(getLogger("GlueCatalog(" + settings_.region + ")"))
, credentials(settings_.aws_access_key_id, settings_.aws_secret_access_key)
, region(settings_.region)
, settings(settings_)
, table_engine_definition(table_engine_definition_)
, metadata_objects(CurrentMetrics::MarkCacheBytes, CurrentMetrics::MarkCacheFiles, 1024)
{
DB::S3::CredentialsConfiguration creds_config;
creds_config.use_environment_credentials = true;
creds_config.role_arn = settings.aws_role_arn;
creds_config.role_session_name = settings.aws_role_session_name;

const DB::Settings & global_settings = getContext()->getGlobalContext()->getSettingsRef();

Expand All @@ -126,38 +122,41 @@ GlueCatalog::GlueCatalog(
/* get_request_throttler = */ nullptr,
/* put_request_throttler = */ nullptr);


Aws::Glue::GlueClientConfiguration client_configuration;
client_configuration.maxConnections = static_cast<unsigned>(global_settings[DB::Setting::s3_max_connections]);
client_configuration.connectTimeoutMs = static_cast<unsigned>(global_settings[DB::Setting::s3_connect_timeout_ms]);
client_configuration.requestTimeoutMs = static_cast<unsigned>(global_settings[DB::Setting::s3_request_timeout_ms]);
client_configuration.region = region;
auto endpoint_provider = std::make_shared<Aws::Glue::GlueEndpointProvider>();

Aws::Auth::AWSCredentials credentials(settings_.aws_access_key_id, settings_.aws_secret_access_key);
/// Only for testing when we are mocking glue
if (!endpoint.empty())
{
client_configuration.endpointOverride = endpoint;
endpoint_provider->OverrideEndpoint(endpoint);
Aws::Auth::AWSCredentials fake_credentials_for_fake_catalog;

if (credentials.IsEmpty())
{
/// You can specify any key for fake moto glue, it's just important
/// for it not to be empty.
fake_credentials_for_fake_catalog.SetAWSAccessKeyId("testing");
fake_credentials_for_fake_catalog.SetAWSSecretKey("testing");
credentials.SetAWSAccessKeyId("testing");
credentials.SetAWSSecretKey("testing");
}
else
fake_credentials_for_fake_catalog = credentials;

glue_client = std::make_unique<Aws::Glue::GlueClient>(fake_credentials_for_fake_catalog, endpoint_provider, client_configuration);
Poco::URI uri(endpoint);
if (uri.getScheme() == "http")
poco_config.scheme = Aws::Http::Scheme::HTTP;
}
else
{
LOG_TRACE(log, "Creating AWS glue client with credentials empty {}, region '{}', endpoint '{}'", credentials.IsEmpty(), region, endpoint);
std::shared_ptr<DB::S3::S3CredentialsProviderChain> chain = std::make_shared<DB::S3::S3CredentialsProviderChain>(poco_config, credentials, creds_config);
glue_client = std::make_unique<Aws::Glue::GlueClient>(chain, endpoint_provider, client_configuration);
}

std::shared_ptr<DB::S3::S3CredentialsProviderChain> chain = std::make_shared<DB::S3::S3CredentialsProviderChain>(poco_config, credentials, creds_config);
credentials_provider = chain;
glue_client = std::make_unique<Aws::Glue::GlueClient>(chain, endpoint_provider, client_configuration);
}

GlueCatalog::~GlueCatalog() = default;
Expand Down Expand Up @@ -282,7 +281,6 @@ bool GlueCatalog::tryGetTableMetadata(
request.SetDatabaseName(database_name);
request.SetName(table_name);


auto outcome = glue_client->GetTable(request);
if (outcome.IsSuccess())
{
Expand Down Expand Up @@ -412,8 +410,9 @@ void GlueCatalog::setCredentials(TableMetadata & metadata) const

if (storage_type == StorageType::S3)
{
auto creds = std::make_shared<S3Credentials>(credentials.GetAWSAccessKeyId(), credentials.GetAWSSecretKey(), credentials.GetSessionToken());
metadata.setStorageCredentials(creds);
auto credentials = credentials_provider->GetAWSCredentials();
auto s3_creds = std::make_shared<S3Credentials>(credentials.GetAWSAccessKeyId(), credentials.GetAWSSecretKey(), credentials.GetSessionToken());
metadata.setStorageCredentials(s3_creds);
}
else
{
Expand Down Expand Up @@ -459,7 +458,7 @@ bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMet
DB::ASTs args = storage->engine->arguments->children;

String storage_endpoint = !settings.storage_endpoint.empty() ? settings.storage_endpoint : metadata_uri;

if (args.empty())
args.emplace_back(std::make_shared<DB::ASTLiteral>(storage_endpoint));
else
Expand All @@ -469,8 +468,12 @@ bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMet
{
if (table_metadata.hasStorageCredentials())
table_metadata.getStorageCredentials()->addCredentialsToEngineArgs(args);
else if (!credentials.IsExpiredOrEmpty())
DataLake::S3Credentials(credentials.GetAWSAccessKeyId(), credentials.GetAWSSecretKey(), credentials.GetSessionToken()).addCredentialsToEngineArgs(args);
else
{
auto credentials = credentials_provider->GetAWSCredentials();
if (!credentials.IsExpiredOrEmpty())
DataLake::S3Credentials(credentials.GetAWSAccessKeyId(), credentials.GetAWSSecretKey(), credentials.GetSessionToken()).addCredentialsToEngineArgs(args);
}
}

auto storage_settings = std::make_shared<DB::DataLakeStorageSettings>();
Expand Down Expand Up @@ -529,11 +532,17 @@ String GlueCatalog::resolveMetadataPathFromTableLocation(const String & table_lo
else
args[0] = std::make_shared<DB::ASTLiteral>(storage_endpoint);

if (args.size() == 1 && table_metadata.hasStorageCredentials())
if (args.size() == 1)
{
auto storage_credentials = table_metadata.getStorageCredentials();
if (storage_credentials)
storage_credentials->addCredentialsToEngineArgs(args);
if (table_metadata.hasStorageCredentials())
{
table_metadata.getStorageCredentials()->addCredentialsToEngineArgs(args);
}
else
{
auto credentials = credentials_provider->GetAWSCredentials();
DataLake::S3Credentials(credentials.GetAWSAccessKeyId(), credentials.GetAWSSecretKey(), credentials.GetSessionToken()).addCredentialsToEngineArgs(args);
}
}

auto storage_settings = std::make_shared<DB::DataLakeStorageSettings>();
Expand Down
7 changes: 6 additions & 1 deletion src/Databases/DataLake/GlueCatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ namespace Aws::Glue
class GlueClient;
}

namespace Aws::Auth
{
class AWSCredentialsProvider;
}

namespace DataLake
{

Expand Down Expand Up @@ -70,7 +75,7 @@ class GlueCatalog final : public ICatalog, private DB::WithContext

std::unique_ptr<Aws::Glue::GlueClient> glue_client;
const LoggerPtr log;
Aws::Auth::AWSCredentials credentials;
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider;
std::string region;
CatalogSettings settings;
DB::ASTPtr table_engine_definition;
Expand Down
2 changes: 2 additions & 0 deletions src/Databases/DataLake/ICatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ DB::SettingsChanges CatalogSettings::allChanged() const
changes.emplace_back("aws_access_key_id", aws_access_key_id);
changes.emplace_back("aws_secret_access_key", aws_secret_access_key);
changes.emplace_back("region", region);
changes.emplace_back("aws_role_arn", aws_role_arn);
changes.emplace_back("aws_role_session_name", aws_role_session_name);

return changes;
}
Expand Down
2 changes: 2 additions & 0 deletions src/Databases/DataLake/ICatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ struct CatalogSettings
String aws_access_key_id;
String aws_secret_access_key;
String region;
String aws_role_arn;
String aws_role_session_name;

DB::SettingsChanges allChanged() const;
};
Expand Down
5 changes: 4 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ namespace DataLakeStorageSetting
extern const DataLakeStorageSettingsString storage_catalog_url;
extern const DataLakeStorageSettingsString storage_warehouse;
extern const DataLakeStorageSettingsString storage_catalog_credential;

extern DataLakeStorageSettingsString storage_aws_role_arn;
extern DataLakeStorageSettingsString storage_aws_role_session_name;
extern const DataLakeStorageSettingsString storage_auth_scope;
extern const DataLakeStorageSettingsString storage_auth_header;
extern const DataLakeStorageSettingsString storage_oauth_server_uri;
Expand Down Expand Up @@ -287,6 +288,8 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
.aws_access_key_id = (*settings)[DataLakeStorageSetting::storage_aws_access_key_id].value,
.aws_secret_access_key = (*settings)[DataLakeStorageSetting::storage_aws_secret_access_key].value,
.region = (*settings)[DataLakeStorageSetting::storage_region].value,
.aws_role_arn = (*settings)[DataLakeStorageSetting::storage_aws_role_arn].value,
.aws_role_session_name = (*settings)[DataLakeStorageSetting::storage_aws_role_session_name].value
};

return std::make_shared<DataLake::GlueCatalog>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ Metadata format version.
DECLARE(String, storage_aws_access_key_id, "", "Key for AWS connection for Glue catalog", 0) \
DECLARE(String, storage_aws_secret_access_key, "", "Key for AWS connection for Glue Catalog'", 0) \
DECLARE(String, storage_region, "", "Region for Glue catalog", 0) \
DECLARE(String, storage_aws_role_arn, "", "Role arn for AWS connection for Glue catalog", 0) \
DECLARE(String, storage_aws_role_session_name, "", "Role session name for AWS connection for Glue catalog", 0) \
DECLARE(String, object_storage_endpoint, "", "Object storage endpoint", 0) \
DECLARE(String, storage_catalog_url, "", "Catalog url", 0) \

Expand Down
1 change: 0 additions & 1 deletion tests/integration/compose/docker_compose_glue_catalog.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,5 @@ services:
until (/usr/bin/mc config host add minio http://minio:9000 minio ClickHouse_Minio_P@ssw0rd) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc rm -r --force minio/warehouse-glue;
/usr/bin/mc mb minio/warehouse-glue --ignore-existing;
/usr/bin/mc policy set public minio/warehouse-glue;
tail -f /dev/null
"
42 changes: 42 additions & 0 deletions tests/integration/test_database_glue/s3_mocks/mock_sts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import sys
from datetime import datetime, timedelta, timezone

from bottle import request, response, route, run

if len(sys.argv) >= 3:
expected_role = sys.argv[2]
else:
expected_role = 'miniorole'

@route("/")
def ping():
response.content_type = "text/plain"
response.set_header("Content-Length", 2)
return "OK"


@route("/", method="POST")
def sts():
access_key = "minio"
secret_access_key = "wrong_key"

if f"RoleSessionName={expected_role}" in str(request.url):
secret_access_key = "ClickHouse_Minio_P@ssw0rd"

expiration = datetime.now(timezone.utc) + timedelta(hours=1)
expiration_str = expiration.strftime("%Y-%m-%dT%H:%M:%SZ")

response.content_type = "text/xml"
return f"""
<AssumeRoleResponse xmlns="https://sts.amazonaws.com/doc/2011-06-15/">
<AssumeRoleResult>
<Credentials>
<AccessKeyId>{access_key}</AccessKeyId>
<SecretAccessKey>{secret_access_key}</SecretAccessKey>
<Expiration>{expiration_str}</Expiration>
</Credentials>
</AssumeRoleResult>
</AssumeRoleResponse>
"""

run(host="0.0.0.0", port=int(sys.argv[1]))
Loading
Loading