Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/content/concepts/rest/dlf.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,26 @@ WITH (
-- 'dlf.token-ecs-role-name' = 'my_ecs_role_name'
);
```

## DLF Endpoint Configuration

Paimon supports two types of DLF endpoints and automatically selects the appropriate signing algorithm:

- **DLF VPC endpoints** (e.g., `cn-hangzhou-vpc.dlf.aliyuncs.com`): Recommended for VPC environments with better performance and lower latency.
- **DLF OpenAPI endpoints** (e.g., `dlfnext.cn-hangzhou.aliyuncs.com`): Supports public network access through Alibaba Cloud API infrastructure.
**Note:** Currently OpenAPI Endpoints only supports database and table names with alphanumeric characters (A-Z, a-z, 0-9) and specific symbols.

Simply configure the endpoint URI, and Paimon will automatically handle the authentication:

```sql
CREATE CATALOG `paimon-rest-catalog`
WITH (
'type' = 'paimon',
'uri' = 'https://${region}-vpc.dlf.aliyuncs.com', -- or OpenAPI endpoint: https://dlfnext.cn-hangzhou.aliyuncs.com
'metastore' = 'rest',
'warehouse' = 'my_instance_name',
'token.provider' = 'dlf',
'dlf.access-key-id'='<access-key-id>',
'dlf.access-key-secret'='<access-key-secret>'
);
```
21 changes: 18 additions & 3 deletions paimon-api/src/main/java/org/apache/paimon/rest/HttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@
import org.apache.hc.core5.http.message.BasicHeader;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;

import static org.apache.paimon.rest.HttpClientUtils.DEFAULT_HTTP_CLIENT;
Expand Down Expand Up @@ -142,7 +144,7 @@ private <T extends RESTResponse> T exec(HttpUriRequestBase request, Class<T> res
: "response body is null",
response.getCode());
}
errorHandler.accept(error, getRequestId(response));
errorHandler.accept(error, extractRequestId(response));
}
if (responseType != null && responseBodyStr != null) {
return RESTApi.fromJson(responseBodyStr, responseType);
Expand Down Expand Up @@ -184,9 +186,22 @@ public String uri() {
return uri;
}

private static String getRequestId(ClassicHttpResponse response) {
private static String extractRequestId(ClassicHttpResponse response) {
Header header = response.getFirstHeader(LoggingInterceptor.REQUEST_ID_KEY);
return header != null ? header.getValue() : LoggingInterceptor.DEFAULT_REQUEST_ID;
if (header != null && header.getValue() != null) {
return header.getValue();
}

// look for any header containing "request-id"
return Arrays.stream(response.getHeaders())
.filter(
h ->
h.getName() != null
&& h.getName().toLowerCase().contains("request-id"))
.map(Header::getValue)
.filter(Objects::nonNull)
.findFirst()
.orElse(LoggingInterceptor.DEFAULT_REQUEST_ID);
}

private static Header[] getHeaders(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ public class RESTCatalogOptions {
.noDefaultValue()
.withDescription("REST Catalog DLF OSS endpoint.");

public static final ConfigOption<String> DLF_SIGNING_ALGORITHM =
ConfigOptions.key("dlf.signing-algorithm")
.stringType()
.defaultValue("default")
.withDescription(
"DLF signing algorithm. Options: 'default' (for default VPC endpoint), "
+ "'openapi' (for DlfNext/2026-01-18). "
+ "If not set, will be automatically selected based on endpoint host.");

public static final ConfigOption<Boolean> IO_CACHE_ENABLED =
ConfigOptions.key("io-cache.enabled")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@

import javax.annotation.Nullable;

import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -53,49 +52,103 @@ public class DLFAuthProvider implements AuthProvider {
protected static final String MEDIA_TYPE = "application/json";

@Nullable private final DLFTokenLoader tokenLoader;
private final String uri;
private final String region;
private final String signingAlgorithm;

@Nullable protected volatile DLFToken token;
private final DLFRequestSigner signer;

public static DLFAuthProvider fromTokenLoader(DLFTokenLoader tokenLoader, String region) {
return new DLFAuthProvider(tokenLoader, null, region);
public static DLFAuthProvider fromTokenLoader(
DLFTokenLoader tokenLoader, String uri, String region, String signingAlgorithm) {
return new DLFAuthProvider(tokenLoader, null, uri, region, signingAlgorithm);
}

public static DLFAuthProvider fromAccessKey(
String accessKeyId, String accessKeySecret, String securityToken, String region) {
String accessKeyId,
String accessKeySecret,
@Nullable String securityToken,
String uri,
String region,
String signingAlgorithm) {
DLFToken token = new DLFToken(accessKeyId, accessKeySecret, securityToken, null);
return new DLFAuthProvider(null, token, region);
return new DLFAuthProvider(null, token, uri, region, signingAlgorithm);
}

public DLFAuthProvider(
@Nullable DLFTokenLoader tokenLoader, @Nullable DLFToken token, String region) {
@Nullable DLFTokenLoader tokenLoader,
@Nullable DLFToken token,
String uri,
String region,
String signingAlgorithm) {
this.tokenLoader = tokenLoader;
this.token = token;
this.uri = uri;
this.region = region;
this.signingAlgorithm = signingAlgorithm;
this.signer = createSigner(signingAlgorithm);
}

@Override
public Map<String, String> mergeAuthHeader(
Map<String, String> baseHeader, RESTAuthParameter restAuthParameter) {
DLFToken token = getFreshToken();
try {
String dateTime =
baseHeader.getOrDefault(
DLF_DATE_HEADER_KEY.toLowerCase(),
ZonedDateTime.now(ZoneOffset.UTC).format(AUTH_DATE_TIME_FORMATTER));
String date = dateTime.substring(0, 8);
Instant now = Instant.now();
String host = extractHost(uri);
Map<String, String> signHeaders =
generateSignHeaders(
restAuthParameter.data(), dateTime, token.getSecurityToken());
signer.signHeaders(
restAuthParameter.data(), now, token.getSecurityToken(), host);
String authorization =
DLFAuthSignature.getAuthorization(
restAuthParameter, token, region, signHeaders, dateTime, date);
signer.authorization(restAuthParameter, token, host, signHeaders);
Map<String, String> headersWithAuth = new HashMap<>(baseHeader);
headersWithAuth.putAll(signHeaders);
headersWithAuth.put(DLF_AUTHORIZATION_HEADER_KEY, authorization);
return headersWithAuth;
} catch (Exception e) {
throw new RuntimeException(e);
throw new RuntimeException("Failed to generate authorization header", e);
}
}

/**
* Extracts the host (with port if present) from a URI string.
*
* <p>Handles URIs in the following formats:
*
* <ul>
* <li>http://hostname/prefix -> hostname
* <li>https://hostname:8080/prefix -> hostname:8080
* <li>http://hostname -> hostname
* <li>https://hostname:8080 -> hostname:8080
* </ul>
*
* @param uri the URI string
* @return the host part (with port if present) of the URI
*/
@VisibleForTesting
static String extractHost(String uri) {
// Remove protocol (http:// or https://)
String withoutProtocol = uri.replaceFirst("^https?://", "");

// Remove path (everything after '/')
int pathIndex = withoutProtocol.indexOf('/');
return pathIndex >= 0 ? withoutProtocol.substring(0, pathIndex) : withoutProtocol;
}

private DLFRequestSigner createSigner(String signingAlgorithm) {
switch (signingAlgorithm) {
case DLFDefaultSigner.IDENTIFIER:
return new DLFDefaultSigner(region);
case DLFOpenApiSigner.IDENTIFIER:
return new DLFOpenApiSigner();
default:
throw new IllegalArgumentException(
"Unknown DLF signing algorithm: "
+ signingAlgorithm
+ ". Supported: "
+ DLFDefaultSigner.IDENTIFIER
+ ", "
+ DLFOpenApiSigner.IDENTIFIER);
}
}

Expand Down Expand Up @@ -135,20 +188,4 @@ private boolean shouldRefresh() {
long now = System.currentTimeMillis();
return expireTime - now < TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
}

public static Map<String, String> generateSignHeaders(
String data, String dateTime, String securityToken) throws Exception {
Map<String, String> signHeaders = new HashMap<>();
signHeaders.put(DLF_DATE_HEADER_KEY, dateTime);
signHeaders.put(DLF_CONTENT_SHA56_HEADER_KEY, DLF_CONTENT_SHA56_VALUE);
signHeaders.put(DLF_AUTH_VERSION_HEADER_KEY, DLFAuthSignature.VERSION);
if (data != null && !data.isEmpty()) {
signHeaders.put(DLF_CONTENT_TYPE_KEY, MEDIA_TYPE);
signHeaders.put(DLF_CONTENT_MD5_HEADER_KEY, DLFAuthSignature.md5(data));
}
if (securityToken != null) {
signHeaders.put(DLF_SECURITY_TOKEN_HEADER_KEY, securityToken);
}
return signHeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.options.Options;
import org.apache.paimon.rest.RESTCatalogOptions;
import org.apache.paimon.utils.StringUtils;

import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -36,25 +37,32 @@ public String identifier() {

@Override
public AuthProvider create(Options options) {
String uri = options.get(URI);
String region =
options.getOptional(RESTCatalogOptions.DLF_REGION)
.orElseGet(() -> parseRegionFromUri(options.get(URI)));
.orElseGet(() -> parseRegionFromUri(uri));
String signingAlgorithm =
options.getOptional(RESTCatalogOptions.DLF_SIGNING_ALGORITHM)
.orElseGet(() -> parseSigningAlgoFromUri(uri));

if (options.getOptional(RESTCatalogOptions.DLF_TOKEN_LOADER).isPresent()) {
DLFTokenLoader dlfTokenLoader =
DLFTokenLoaderFactory.createDLFTokenLoader(
options.get(RESTCatalogOptions.DLF_TOKEN_LOADER), options);
return DLFAuthProvider.fromTokenLoader(dlfTokenLoader, region);
return DLFAuthProvider.fromTokenLoader(dlfTokenLoader, uri, region, signingAlgorithm);
} else if (options.getOptional(RESTCatalogOptions.DLF_TOKEN_PATH).isPresent()) {
DLFTokenLoader dlfTokenLoader =
DLFTokenLoaderFactory.createDLFTokenLoader("local_file", options);
return DLFAuthProvider.fromTokenLoader(dlfTokenLoader, region);
return DLFAuthProvider.fromTokenLoader(dlfTokenLoader, uri, region, signingAlgorithm);
} else if (options.getOptional(RESTCatalogOptions.DLF_ACCESS_KEY_ID).isPresent()
&& options.getOptional(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET).isPresent()) {
return DLFAuthProvider.fromAccessKey(
options.get(RESTCatalogOptions.DLF_ACCESS_KEY_ID),
options.get(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET),
options.get(RESTCatalogOptions.DLF_SECURITY_TOKEN),
region);
uri,
region,
signingAlgorithm);
}
throw new IllegalArgumentException("DLF token path or AK must be set for DLF Auth.");
}
Expand All @@ -74,4 +82,25 @@ protected static String parseRegionFromUri(String uri) {
throw new IllegalArgumentException(
"Could not get region from conf or uri, please check your config.");
}

/**
* Parse signing algorithm from uri. Automatically selects the appropriate signer based on the
* endpoint uri.
*
* @param uri endpoint uri
* @return signing algorithm identifier
*/
protected static String parseSigningAlgoFromUri(String uri) {
if (StringUtils.isEmpty(uri)) {
return DLFDefaultSigner.IDENTIFIER;
}

// Check for aliyun openapi endpoints
if (uri.toLowerCase().contains("dlfnext")) {
return DLFOpenApiSigner.IDENTIFIER;
}

// Default to dlf for unknown hosts
return DLFDefaultSigner.IDENTIFIER;
}
}
Loading