diff --git a/docs/content/append-table/blob.md b/docs/content/append-table/blob.md
index 23cea8aa517a..a175111de7a5 100644
--- a/docs/content/append-table/blob.md
+++ b/docs/content/append-table/blob.md
@@ -90,11 +90,18 @@ For details about the blob file format structure, see [File Format - BLOB]({{< r
Specifies column names that should be stored as blob type. This is used when you want to treat a BYTES column as a BLOB. |
- blob-as-descriptor |
+ write-blob-from-descriptor |
No |
false |
Boolean |
- When set to true, the blob field input is treated as a serialized BlobDescriptor. Paimon reads from the descriptor's URI and streams the data into Paimon's blob files in small chunks, avoiding loading the entire blob into memory. This is useful for writing very large blobs that cannot fit in memory. When reading, if set to true, returns the BlobDescriptor bytes; if false, returns actual blob bytes. |
+ When set to true, the blob field input is treated as a serialized BlobDescriptor. Paimon reads from the descriptor's URI and streams the data into Paimon's blob files in small chunks, avoiding loading the entire blob into memory. This is useful for writing very large blobs that cannot fit in memory. |
+
+
+ read-blob-as-descriptor |
+ No |
+ false |
+ Boolean |
+ When set to true, returns the BlobDescriptor bytes (reference to Paimon blob file) instead of actual blob bytes when reading. This is useful when you need the blob metadata without loading the actual data. |
blob.target-file-size |
@@ -201,10 +208,10 @@ SELECT * FROM image_table WHERE id = 1;
### Blob Descriptor Mode
-When you want to store references from external blob data (stored in object storage) without loading the entire blob into memory, you can use the `blob-as-descriptor` option:
+When you want to store blob data (stored in object storage) without loading the entire blob into memory, you can use the `write-blob-from-descriptor`:
```sql
--- Create table in descriptor mode
+-- Create table with write blob from descriptor mode enabled
CREATE TABLE blob_table (
id INT,
name STRING,
@@ -213,7 +220,7 @@ CREATE TABLE blob_table (
'row-tracking.enabled' = 'true',
'data-evolution.enabled' = 'true',
'blob-field' = 'image',
- 'blob-as-descriptor' = 'true'
+ 'write-blob-from-descriptor' = 'true'
);
-- Insert with serialized BlobDescriptor bytes
@@ -221,8 +228,11 @@ CREATE TABLE blob_table (
-- Paimon will read from the descriptor's URI and stream data into Paimon's blob files in small chunks, avoiding loading the entire blob into memory
INSERT INTO blob_table VALUES (1, 'photo', X'');
--- Toggle this setting to control read output format:
-ALTER TABLE blob_table SET ('blob-as-descriptor' = 'false');
+-- Control read output format independently:
+ALTER TABLE blob_table SET ('read-blob-as-descriptor' = 'true');
+SELECT * FROM blob_table; -- Returns BlobDescriptor bytes (reference)
+
+ALTER TABLE blob_table SET ('read-blob-as-descriptor' = 'false');
SELECT * FROM blob_table; -- Returns actual blob bytes from Paimon storage
```
@@ -426,15 +436,19 @@ long length = descriptor.length(); // Length of the blob data
### Blob Descriptor Mode
-The `blob-as-descriptor` option enables **memory-efficient writing** for very large blobs. When enabled, you provide a `BlobDescriptor` pointing to external data, and Paimon streams the data from the external source into Paimon's `.blob` files without loading the entire blob into memory.
+The `write-blob-from-descriptor` option enables **memory-efficient writing** for very large blobs. When enabled, you provide a `BlobDescriptor` pointing to external data, and Paimon streams the data from the external source into Paimon's `.blob` files without loading the entire blob into memory.
+
+The `read-blob-as-descriptor` option controls the **read output format**. When enabled, reading returns BlobDescriptor bytes instead of actual blob data, which is useful when you only need blob metadata.
**How it works:**
1. **Writing**: You provide a serialized `BlobDescriptor` (containing URI, offset, length) as the blob field value
2. **Paimon copies the data**: Paimon reads from the descriptor's URI in small chunks (e.g., 1024 bytes at a time) and writes to Paimon's `.blob` files
3. **Data is stored in Paimon**: The blob data IS copied to Paimon storage, but in a streaming fashion
+4. **Reading**: You can choose to get BlobDescriptor bytes (reference) or actual blob bytes
-**Key benefit:**
+**Key benefits:**
- **Memory efficiency**: For very large blobs (e.g., gigabyte-sized videos), you don't need to load the entire file into memory. Paimon streams the data incrementally.
+- **Independent control**: Write and read behaviors can be controlled separately using `write-blob-from-descriptor` and `read-blob-as-descriptor`.
```java
import org.apache.paimon.catalog.Catalog;
@@ -466,14 +480,14 @@ public class BlobDescriptorExample {
Catalog catalog = CatalogFactory.createCatalog(catalogContext);
catalog.createDatabase("my_db", true);
- // Create table with blob-as-descriptor enabled
+ // Create table with write-blob-from-descriptor enabled
Schema schema = Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("video", DataTypes.BLOB())
.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true")
.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true")
- .option(CoreOptions.BLOB_AS_DESCRIPTOR.key(), "true") // This is not necessary in java api
+ .option(CoreOptions.WRITE_BLOB_FROM_DESCRIPTOR.key(), "true") // Enable memory-efficient writing
.build();
Identifier tableId = Identifier.create("my_db", "video_table");
@@ -543,25 +557,24 @@ public class BlobDescriptorExample {
**Reading blob data with different modes:**
-The `blob-as-descriptor` option also affects how data is returned when reading:
+The `read-blob-as-descriptor` option controls how data is returned when reading:
```sql
--- When blob-as-descriptor = true: Returns BlobDescriptor bytes (reference to Paimon blob file)
-ALTER TABLE video_table SET ('blob-as-descriptor' = 'true');
+-- When read-blob-as-descriptor = true: Returns BlobDescriptor bytes (reference to Paimon blob file)
+ALTER TABLE video_table SET ('read-blob-as-descriptor' = 'true');
SELECT * FROM video_table; -- Returns serialized BlobDescriptor
--- When blob-as-descriptor = false: Returns actual blob bytes
-ALTER TABLE video_table SET ('blob-as-descriptor' = 'false');
+-- When read-blob-as-descriptor = false: Returns actual blob bytes
+ALTER TABLE video_table SET ('read-blob-as-descriptor' = 'false');
SELECT * FROM video_table; -- Returns actual blob bytes from Paimon storage
```
## Limitations
-1. **Single Blob Field**: Currently, only one blob field per table is supported.
-2. **Append Table Only**: Blob type is designed for append-only tables. Primary key tables are not supported.
-3. **No Predicate Pushdown**: Blob columns cannot be used in filter predicates.
-4. **No Statistics**: Statistics collection is not supported for blob columns.
-5. **Required Options**: `row-tracking.enabled` and `data-evolution.enabled` must be set to `true`.
+1. **Append Table Only**: Blob type is designed for append-only tables. Primary key tables are not supported.
+2. **No Predicate Pushdown**: Blob columns cannot be used in filter predicates.
+3. **No Statistics**: Statistics collection is not supported for blob columns.
+4. **Required Options**: `row-tracking.enabled` and `data-evolution.enabled` must be set to `true`.
## Best Practices
@@ -569,7 +582,7 @@ SELECT * FROM video_table; -- Returns actual blob bytes from Paimon storage
2. **Set Appropriate Target File Size**: Configure `blob.target-file-size` based on your blob sizes. Larger values mean fewer files but larger individual files.
-3. **Consider Descriptor Mode**: For very large blobs that cannot fit in memory, use `blob-as-descriptor` mode to stream data from external sources into Paimon without loading the entire blob into memory.
+3. **Consider Descriptor Mode**: For very large blobs that cannot fit in memory, use `write-blob-from-descriptor` mode to stream data from external sources into Paimon without loading the entire blob into memory. Use `read-blob-as-descriptor` when you only need blob metadata without loading actual data.
4. **Use Partitioning**: Partition your blob tables by date or other dimensions to improve query performance and data management.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 9e4d3f0b330a..6928ad97ca3e 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -50,12 +50,6 @@
Boolean |
Whether to create underlying storage when reading and writing the table. |
-
- blob-as-descriptor |
- false |
- Boolean |
- Write blob field using blob descriptor rather than blob bytes. |
-
blob-field |
(none) |
@@ -152,12 +146,6 @@
String |
Fields that are ignored for comparison while generating -U, +U changelog for the same record. This configuration is only valid for the changelog-producer.row-deduplicate is true. |
-
- table-read.sequence-number.enabled |
- false |
- Boolean |
- Whether to include the _SEQUENCE_NUMBER field when reading the audit_log or binlog system tables. This is only valid for primary key tables. |
-
changelog.num-retained.max |
(none) |
@@ -1043,6 +1031,12 @@
Boolean |
Enable query auth to give Catalog the opportunity to perform column level and row level permission validation on queries. |
+
+ read-blob-as-descriptor |
+ false |
+ Boolean |
+ Read blob field as blob descriptor (returns BlobDescriptor bytes) rather than actual blob bytes. |
+
read.batch-size |
1024 |
@@ -1308,6 +1302,12 @@
Duration |
The delay duration of stream read when scan incremental snapshots. |
+
+ table-read.sequence-number.enabled |
+ false |
+ Boolean |
+ Whether to include the _SEQUENCE_NUMBER field when reading the audit_log or binlog system tables. This is only valid for primary key tables. |
+
tag.automatic-completion |
false |
@@ -1440,6 +1440,12 @@
String |
The Variant shredding schema for writing. |
+
+ write-blob-from-descriptor |
+ false |
+ Boolean |
+ Write blob field from blob descriptor rather than blob bytes. |
+
write-buffer-for-append |
false |
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 95f1387491f0..eec116e57d41 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2143,12 +2143,21 @@ public InlineElement getDescription() {
"Specifies column names that should be stored as blob type. "
+ "This is used when you want to treat a BYTES column as a BLOB.");
- public static final ConfigOption BLOB_AS_DESCRIPTOR =
- key("blob-as-descriptor")
+ public static final ConfigOption READ_BLOB_AS_DESCRIPTOR =
+ key("read-blob-as-descriptor")
.booleanType()
.defaultValue(false)
+ .withFallbackKeys("blob-as-descriptor")
.withDescription(
- "Write blob field using blob descriptor rather than blob bytes.");
+ "Read blob field as blob descriptor (returns BlobDescriptor bytes) rather than actual blob bytes. ");
+
+ public static final ConfigOption WRITE_BLOB_FROM_DESCRIPTOR =
+ key("write-blob-from-descriptor")
+ .booleanType()
+ .defaultValue(false)
+ .withFallbackKeys("blob-as-descriptor")
+ .withDescription(
+ "Write blob field from blob descriptor rather than blob bytes. ");
public static final ConfigOption COMMIT_DISCARD_DUPLICATE_FILES =
key("commit.discard-duplicate-files")
@@ -2666,8 +2675,7 @@ public long blobTargetFileSize() {
}
public boolean blobSplitByFileSize() {
- return options.getOptional(BLOB_SPLIT_BY_FILE_SIZE)
- .orElse(!options.get(BLOB_AS_DESCRIPTOR));
+ return options.getOptional(BLOB_SPLIT_BY_FILE_SIZE).orElse(!readBlobAsDescriptor());
}
public long compactionFileSize(boolean hasPrimaryKey) {
@@ -3383,8 +3391,12 @@ public boolean formatTablePartitionOnlyValueInPath() {
return options.get(FORMAT_TABLE_PARTITION_ONLY_VALUE_IN_PATH);
}
- public boolean blobAsDescriptor() {
- return options.get(BLOB_AS_DESCRIPTOR);
+ public boolean readBlobAsDescriptor() {
+ return options.get(READ_BLOB_AS_DESCRIPTOR);
+ }
+
+ public boolean writeBlobFromDescriptor() {
+ return options.get(WRITE_BLOB_FROM_DESCRIPTOR);
}
public boolean postponeBatchWriteFixedBucket() {
diff --git a/paimon-core/src/test/java/org/apache/paimon/CoreOptionsTest.java b/paimon-core/src/test/java/org/apache/paimon/CoreOptionsTest.java
index 359b9f25e817..6efbee51a80e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/CoreOptionsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/CoreOptionsTest.java
@@ -108,12 +108,12 @@ public void testBlobSplitByFileSizeDefault() {
assertThat(options.blobSplitByFileSize()).isTrue();
conf = new Options();
- conf.set(CoreOptions.BLOB_AS_DESCRIPTOR, true);
+ conf.set(CoreOptions.READ_BLOB_AS_DESCRIPTOR, true);
options = new CoreOptions(conf);
assertThat(options.blobSplitByFileSize()).isFalse();
conf = new Options();
- conf.set(CoreOptions.BLOB_AS_DESCRIPTOR, false);
+ conf.set(CoreOptions.READ_BLOB_AS_DESCRIPTOR, false);
options = new CoreOptions(conf);
assertThat(options.blobSplitByFileSize()).isTrue();
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 3be60281c00f..544435c713b2 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -346,7 +346,7 @@ protected boolean buildForPostponeBucketCompaction(
partitionSpec,
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
- boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
+ boolean blobAsDescriptor = table.coreOptions().writeBlobFromDescriptor();
DataStream partitioned =
FlinkStreamPartitioner.partition(
FlinkSinkBuilder.mapToInternalRow(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
index 18eb84a68d4d..1de97f63a41c 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
@@ -151,7 +151,7 @@ protected List> buildCompactOperator(
// 2.3 write and then reorganize the committable
// set parallelism to null, and it'll forward parallelism when doWrite()
RowAppendTableSink sink = new RowAppendTableSink(table, null, null);
- boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
+ boolean blobAsDescriptor = table.coreOptions().writeBlobFromDescriptor();
DataStream written =
sink.doWrite(
FlinkSinkBuilder.mapToInternalRow(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 46c1f040eccb..b08058160d4a 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -206,7 +206,7 @@ public FlinkSinkBuilder clusteringIfPossible(
public DataStreamSink> build() {
setParallelismIfAdaptiveConflict();
input = trySortInput(input);
- boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
+ boolean blobAsDescriptor = table.coreOptions().writeBlobFromDescriptor();
DataStream input =
mapToInternalRow(
this.input,
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index d4edb5cbaac0..f8b83064bbdd 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -99,13 +99,14 @@ public CompactorSourceBuilder withPartitionIdleTime(@Nullable Duration partition
if (isContinuous) {
return new ContinuousFileStoreSource(readBuilder, compactBucketsTable.options(), null);
} else {
- Options options = compactBucketsTable.coreOptions().toConfiguration();
+ CoreOptions coreOptions = compactBucketsTable.coreOptions();
+ Options options = coreOptions.toConfiguration();
return new StaticFileStoreSource(
readBuilder,
null,
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE),
- options.get(CoreOptions.BLOB_AS_DESCRIPTOR));
+ coreOptions.readBlobAsDescriptor());
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index 4e9855e8f2c6..f938713ce9c9 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -57,12 +57,7 @@ public ContinuousFileStoreSource(
@Nullable Long limit,
boolean unordered,
@Nullable NestedProjectedRowData rowData) {
- super(
- readBuilder,
- limit,
- rowData,
- Boolean.parseBoolean(
- options.getOrDefault(CoreOptions.BLOB_AS_DESCRIPTOR.key(), "false")));
+ super(readBuilder, limit, rowData, CoreOptions.fromMap(options).readBlobAsDescriptor());
this.options = options;
this.unordered = unordered;
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 3e96dec1ea50..dbb581b96966 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -213,7 +213,7 @@ private DataStream buildStaticFileSource() {
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE),
dynamicPartitionFilteringInfo,
outerProject(),
- options.get(CoreOptions.BLOB_AS_DESCRIPTOR)));
+ new CoreOptions(options).readBlobAsDescriptor()));
}
private DataStream buildContinuousFileSource() {
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
index e99f265d03b4..e5cafaef8287 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
@@ -111,11 +111,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
splitAssignMode,
null,
rowData,
- Boolean.parseBoolean(
- table.options()
- .getOrDefault(
- CoreOptions.BLOB_AS_DESCRIPTOR.key(),
- "false")));
+ CoreOptions.fromMap(table.options()).readBlobAsDescriptor());
}
return new PaimonDataStreamScanProvider(
source.getBoundedness() == Boundedness.BOUNDED,
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
index 255139e85f95..41bd793a8bdd 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
@@ -48,7 +48,7 @@
/** {@link FileFormat} for blob file. */
public class BlobFileFormat extends FileFormat {
- private final boolean blobAsDescriptor;
+ private final boolean readBlobAsDescriptor;
@Nullable public BlobConsumer writeConsumer;
@@ -56,9 +56,9 @@ public BlobFileFormat() {
this(false);
}
- public BlobFileFormat(boolean blobAsDescriptor) {
+ public BlobFileFormat(boolean readBlobAsDescriptor) {
super(BlobFileFormatFactory.IDENTIFIER);
- this.blobAsDescriptor = blobAsDescriptor;
+ this.readBlobAsDescriptor = readBlobAsDescriptor;
}
public static boolean isBlobFile(String fileName) {
@@ -74,7 +74,7 @@ public FormatReaderFactory createReaderFactory(
RowType dataSchemaRowType,
RowType projectedRowType,
@Nullable List filters) {
- return new BlobFormatReaderFactory(blobAsDescriptor);
+ return new BlobFormatReaderFactory(readBlobAsDescriptor);
}
@Override
@@ -113,10 +113,10 @@ public FormatWriter create(PositionOutputStream out, String compression) {
private static class BlobFormatReaderFactory implements FormatReaderFactory {
- private final boolean blobAsDescriptor;
+ private final boolean readBlobAsDescriptor;
public BlobFormatReaderFactory(boolean blobAsDescriptor) {
- this.blobAsDescriptor = blobAsDescriptor;
+ this.readBlobAsDescriptor = blobAsDescriptor;
}
@Override
@@ -129,7 +129,7 @@ public FileRecordReader createReader(Context context) throws IOExce
in = fileIO.newInputStream(filePath);
fileMeta = new BlobFileMeta(in, context.fileSize(), context.selection());
} finally {
- if (blobAsDescriptor) {
+ if (readBlobAsDescriptor) {
IOUtils.closeQuietly(in);
in = null;
}
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormatFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormatFactory.java
index 2a54d497093b..47b573b19b50 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormatFactory.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormatFactory.java
@@ -34,7 +34,7 @@ public String identifier() {
@Override
public FileFormat create(FormatContext formatContext) {
- boolean blobAsDescriptor = formatContext.options().get(CoreOptions.BLOB_AS_DESCRIPTOR);
- return new BlobFileFormat(blobAsDescriptor);
+ CoreOptions coreOptions = new CoreOptions(formatContext.options());
+ return new BlobFileFormat(coreOptions.readBlobAsDescriptor());
}
}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
index 077d92cc292b..760a195a0b53 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
@@ -74,7 +74,7 @@ case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable, dataSplits: Se
table.newBatchWriteBuilder(),
writeType,
firstRowIdToPartitionMap,
- coreOptions.blobAsDescriptor(),
+ coreOptions.writeBlobFromDescriptor(),
table.catalogEnvironment().catalogContext())
try {
iter.foreach(row => write.write(row))
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 721c09e2d5ee..8958b57060d1 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -136,7 +136,7 @@ case class PaimonSparkWriter(
writeRowTracking,
fullCompactionDeltaCommits,
batchId,
- coreOptions.blobAsDescriptor(),
+ coreOptions.writeBlobFromDescriptor(),
table.catalogEnvironment().catalogContext(),
postponePartitionBucketComputer
)
@@ -449,7 +449,7 @@ case class PaimonSparkWriter(
val toPaimonRow = SparkRowUtils.toPaimonRow(
rowType,
rowKindColIdx,
- table.coreOptions().blobAsDescriptor(),
+ table.coreOptions().writeBlobFromDescriptor(),
table.catalogEnvironment().catalogContext())
bootstrapIterator.asScala
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BaseScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BaseScan.scala
index 705017506701..a230147540b9 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BaseScan.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BaseScan.scala
@@ -126,7 +126,7 @@ trait BaseScan extends Scan with SupportsReportStatistics with Logging {
override def toBatch: Batch = {
val metadataColumns = metadataFields.map(
field => PaimonMetadataColumn.get(field.name, SparkTypeUtils.toSparkPartitionType(table)))
- PaimonBatch(inputPartitions, readBuilder, coreOptions.blobAsDescriptor(), metadataColumns)
+ PaimonBatch(inputPartitions, readBuilder, coreOptions.readBlobAsDescriptor(), metadataColumns)
}
def estimateStatistics: Statistics = {
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
index e8fbea0e69a4..f189b94ba855 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
@@ -40,6 +40,7 @@ class PaimonMicroBatchStream(
with Logging {
private val options = Options.fromMap(table.options())
+ private val coreOptions = new CoreOptions(options)
lazy val initOffset: PaimonSourceOffset = {
val initSnapshotId = Math.max(
@@ -91,8 +92,6 @@ class PaimonMicroBatchStream(
.getOrElse(ReadLimit.allAvailable())
}
- private lazy val blobAsDescriptor: Boolean = options.get(CoreOptions.BLOB_AS_DESCRIPTOR)
-
override def getDefaultReadLimit: ReadLimit = defaultReadLimit
override def prepareForTriggerAvailableNow(): Unit = {
@@ -130,7 +129,7 @@ class PaimonMicroBatchStream(
}
override def createReaderFactory(): PartitionReaderFactory = {
- PaimonPartitionReaderFactory(readBuilder, blobAsDescriptor = blobAsDescriptor)
+ PaimonPartitionReaderFactory(readBuilder, blobAsDescriptor = coreOptions.readBlobAsDescriptor())
}
override def initialOffset(): Offset = {
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
index fbd166a18312..505a30db861c 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
@@ -45,7 +45,7 @@ case class PaimonV2DataWriter(
val fullCompactionDeltaCommits: Option[Int] =
Option.apply(coreOptions.fullCompactionDeltaCommits())
- val blobAsDescriptor: Boolean = coreOptions.blobAsDescriptor()
+ val blobAsDescriptor: Boolean = coreOptions.writeBlobFromDescriptor()
val write: TableWriteImpl[InternalRow] = {
writeBuilder
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
index 5d8e1ef9fb8b..da1c6db2a1e3 100644
--- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
@@ -191,6 +191,73 @@ class BlobTestBase extends PaimonSparkTestBase {
}
}
+ test("Blob: test write-blob-from-descriptor and read-blob-as-descriptor") {
+ for (writeBlobFromDescriptor <- Seq("true", "false")) {
+ withTable("t") {
+ val blobData = new Array[Byte](1024 * 1024)
+ RANDOM.nextBytes(blobData)
+
+ sql("""
+ |CREATE TABLE IF NOT EXISTS t (
+ | id STRING,
+ | name STRING,
+ | content BINARY
+ |)
+ |PARTITIONED BY (ds STRING)
+ |TBLPROPERTIES (
+ | 'row-tracking.enabled' = 'true',
+ | 'data-evolution.enabled' = 'true',
+ | 'blob-field' = 'content'
+ |)
+ """.stripMargin)
+
+ withSparkSQLConf("spark.paimon.write-blob-from-descriptor" -> writeBlobFromDescriptor) {
+ if (writeBlobFromDescriptor.toBoolean) {
+ val fileIO = new LocalFileIO
+ val uri = "file://" + tempDBDir.toString + "/external_blob_split_config"
+ try {
+ val outputStream = fileIO.newOutputStream(new Path(uri), true)
+ try outputStream.write(blobData)
+ finally if (outputStream != null) outputStream.close()
+ }
+ sql(s"""
+ |INSERT OVERWRITE TABLE t
+ |PARTITION(ds = '20240126')
+ |VALUES ('1', 'paimon', sys.path_to_descriptor('$uri'))
+ |""".stripMargin)
+ } else {
+ sql(s"""
+ |INSERT OVERWRITE TABLE t
+ |PARTITION(ds = '20240126')
+ |VALUES ('1', 'paimon', X'${bytesToHex(blobData)}')
+ |""".stripMargin)
+ }
+ }
+
+ withSparkSQLConf("spark.paimon.read-blob-as-descriptor" -> "true") {
+ val descriptorBytes =
+ sql("SELECT content FROM t WHERE id = '1'")
+ .collect()(0)
+ .get(0)
+ .asInstanceOf[Array[Byte]]
+ val readDescriptor = BlobDescriptor.deserialize(descriptorBytes)
+ val catalogContext = loadTable("t").catalogEnvironment().catalogContext()
+ val uriReaderFactory = new UriReaderFactory(catalogContext)
+ val blob =
+ Blob.fromDescriptor(uriReaderFactory.create(readDescriptor.uri), readDescriptor)
+ assert(util.Arrays.equals(blobData, blob.toData))
+ }
+
+ withSparkSQLConf("spark.paimon.read-blob-as-descriptor" -> "false") {
+ checkAnswer(
+ sql("SELECT id, name, content, _ROW_ID, _SEQUENCE_NUMBER FROM t WHERE id = '1'"),
+ Seq(Row("1", "paimon", blobData, 0, 1))
+ )
+ }
+ }
+ }
+ }
+
test("Blob: test compaction") {
withTable("t") {
sql(