Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 35 additions & 22 deletions docs/content/append-table/blob.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,18 @@ For details about the blob file format structure, see [File Format - BLOB]({{< r
<td>Specifies column names that should be stored as blob type. This is used when you want to treat a BYTES column as a BLOB.</td>
</tr>
<tr>
<td><h5>blob-as-descriptor</h5></td>
<td><h5>write-blob-from-descriptor</h5></td>
<td>No</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>When set to true, the blob field input is treated as a serialized BlobDescriptor. Paimon reads from the descriptor's URI and streams the data into Paimon's blob files in small chunks, avoiding loading the entire blob into memory. This is useful for writing very large blobs that cannot fit in memory. When reading, if set to true, returns the BlobDescriptor bytes; if false, returns actual blob bytes.</td>
<td>When set to true, the blob field input is treated as a serialized BlobDescriptor. Paimon reads from the descriptor's URI and streams the data into Paimon's blob files in small chunks, avoiding loading the entire blob into memory. This is useful for writing very large blobs that cannot fit in memory.</td>
</tr>
<tr>
<td><h5>read-blob-as-descriptor</h5></td>
<td>No</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>When set to true, returns the BlobDescriptor bytes (reference to Paimon blob file) instead of actual blob bytes when reading. This is useful when you need the blob metadata without loading the actual data.</td>
</tr>
<tr>
<td><h5>blob.target-file-size</h5></td>
Expand Down Expand Up @@ -201,10 +208,10 @@ SELECT * FROM image_table WHERE id = 1;

### Blob Descriptor Mode

When you want to store references from external blob data (stored in object storage) without loading the entire blob into memory, you can use the `blob-as-descriptor` option:
When you want to store blob data (stored in object storage) without loading the entire blob into memory, you can use the `write-blob-from-descriptor`:

```sql
-- Create table in descriptor mode
-- Create table with write blob from descriptor mode enabled
CREATE TABLE blob_table (
id INT,
name STRING,
Expand All @@ -213,16 +220,19 @@ CREATE TABLE blob_table (
'row-tracking.enabled' = 'true',
'data-evolution.enabled' = 'true',
'blob-field' = 'image',
'blob-as-descriptor' = 'true'
'write-blob-from-descriptor' = 'true'
);

-- Insert with serialized BlobDescriptor bytes
-- The BlobDescriptor contains: version (1 byte) + uri_length (4 bytes) + uri_bytes + offset (8 bytes) + length (8 bytes)
-- Paimon will read from the descriptor's URI and stream data into Paimon's blob files in small chunks, avoiding loading the entire blob into memory
INSERT INTO blob_table VALUES (1, 'photo', X'<serialized_blob_descriptor_hex>');

-- Toggle this setting to control read output format:
ALTER TABLE blob_table SET ('blob-as-descriptor' = 'false');
-- Control read output format independently:
ALTER TABLE blob_table SET ('read-blob-as-descriptor' = 'true');
SELECT * FROM blob_table; -- Returns BlobDescriptor bytes (reference)

ALTER TABLE blob_table SET ('read-blob-as-descriptor' = 'false');
SELECT * FROM blob_table; -- Returns actual blob bytes from Paimon storage
```

Expand Down Expand Up @@ -426,15 +436,19 @@ long length = descriptor.length(); // Length of the blob data

### Blob Descriptor Mode

The `blob-as-descriptor` option enables **memory-efficient writing** for very large blobs. When enabled, you provide a `BlobDescriptor` pointing to external data, and Paimon streams the data from the external source into Paimon's `.blob` files without loading the entire blob into memory.
The `write-blob-from-descriptor` option enables **memory-efficient writing** for very large blobs. When enabled, you provide a `BlobDescriptor` pointing to external data, and Paimon streams the data from the external source into Paimon's `.blob` files without loading the entire blob into memory.

The `read-blob-as-descriptor` option controls the **read output format**. When enabled, reading returns BlobDescriptor bytes instead of actual blob data, which is useful when you only need blob metadata.

**How it works:**
1. **Writing**: You provide a serialized `BlobDescriptor` (containing URI, offset, length) as the blob field value
2. **Paimon copies the data**: Paimon reads from the descriptor's URI in small chunks (e.g., 1024 bytes at a time) and writes to Paimon's `.blob` files
3. **Data is stored in Paimon**: The blob data IS copied to Paimon storage, but in a streaming fashion
4. **Reading**: You can choose to get BlobDescriptor bytes (reference) or actual blob bytes

**Key benefit:**
**Key benefits:**
- **Memory efficiency**: For very large blobs (e.g., gigabyte-sized videos), you don't need to load the entire file into memory. Paimon streams the data incrementally.
- **Independent control**: Write and read behaviors can be controlled separately using `write-blob-from-descriptor` and `read-blob-as-descriptor`.

```java
import org.apache.paimon.catalog.Catalog;
Expand Down Expand Up @@ -466,14 +480,14 @@ public class BlobDescriptorExample {
Catalog catalog = CatalogFactory.createCatalog(catalogContext);
catalog.createDatabase("my_db", true);

// Create table with blob-as-descriptor enabled
// Create table with write-blob-from-descriptor enabled
Schema schema = Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("video", DataTypes.BLOB())
.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true")
.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true")
.option(CoreOptions.BLOB_AS_DESCRIPTOR.key(), "true") // This is not necessary in java api
.option(CoreOptions.WRITE_BLOB_FROM_DESCRIPTOR.key(), "true") // Enable memory-efficient writing
.build();

Identifier tableId = Identifier.create("my_db", "video_table");
Expand Down Expand Up @@ -543,33 +557,32 @@ public class BlobDescriptorExample {

**Reading blob data with different modes:**

The `blob-as-descriptor` option also affects how data is returned when reading:
The `read-blob-as-descriptor` option controls how data is returned when reading:

```sql
-- When blob-as-descriptor = true: Returns BlobDescriptor bytes (reference to Paimon blob file)
ALTER TABLE video_table SET ('blob-as-descriptor' = 'true');
-- When read-blob-as-descriptor = true: Returns BlobDescriptor bytes (reference to Paimon blob file)
ALTER TABLE video_table SET ('read-blob-as-descriptor' = 'true');
SELECT * FROM video_table; -- Returns serialized BlobDescriptor

-- When blob-as-descriptor = false: Returns actual blob bytes
ALTER TABLE video_table SET ('blob-as-descriptor' = 'false');
-- When read-blob-as-descriptor = false: Returns actual blob bytes
ALTER TABLE video_table SET ('read-blob-as-descriptor' = 'false');
SELECT * FROM video_table; -- Returns actual blob bytes from Paimon storage
```

## Limitations

1. **Single Blob Field**: Currently, only one blob field per table is supported.
2. **Append Table Only**: Blob type is designed for append-only tables. Primary key tables are not supported.
3. **No Predicate Pushdown**: Blob columns cannot be used in filter predicates.
4. **No Statistics**: Statistics collection is not supported for blob columns.
5. **Required Options**: `row-tracking.enabled` and `data-evolution.enabled` must be set to `true`.
1. **Append Table Only**: Blob type is designed for append-only tables. Primary key tables are not supported.
2. **No Predicate Pushdown**: Blob columns cannot be used in filter predicates.
3. **No Statistics**: Statistics collection is not supported for blob columns.
4. **Required Options**: `row-tracking.enabled` and `data-evolution.enabled` must be set to `true`.

## Best Practices

1. **Use Column Projection**: Always select only the columns you need. Avoid `SELECT *` if you don't need blob data.

2. **Set Appropriate Target File Size**: Configure `blob.target-file-size` based on your blob sizes. Larger values mean fewer files but larger individual files.

3. **Consider Descriptor Mode**: For very large blobs that cannot fit in memory, use `blob-as-descriptor` mode to stream data from external sources into Paimon without loading the entire blob into memory.
3. **Consider Descriptor Mode**: For very large blobs that cannot fit in memory, use `write-blob-from-descriptor` mode to stream data from external sources into Paimon without loading the entire blob into memory. Use `read-blob-as-descriptor` when you only need blob metadata without loading actual data.

4. **Use Partitioning**: Partition your blob tables by date or other dimensions to improve query performance and data management.

Expand Down
30 changes: 18 additions & 12 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,6 @@
<td>Boolean</td>
<td>Whether to create underlying storage when reading and writing the table.</td>
</tr>
<tr>
<td><h5>blob-as-descriptor</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Write blob field using blob descriptor rather than blob bytes.</td>
</tr>
<tr>
<td><h5>blob-field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -152,12 +146,6 @@
<td>String</td>
<td>Fields that are ignored for comparison while generating -U, +U changelog for the same record. This configuration is only valid for the changelog-producer.row-deduplicate is true.</td>
</tr>
<tr>
<td><h5>table-read.sequence-number.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to include the _SEQUENCE_NUMBER field when reading the audit_log or binlog system tables. This is only valid for primary key tables.</td>
</tr>
<tr>
<td><h5>changelog.num-retained.max</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -1043,6 +1031,12 @@
<td>Boolean</td>
<td>Enable query auth to give Catalog the opportunity to perform column level and row level permission validation on queries.</td>
</tr>
<tr>
<td><h5>read-blob-as-descriptor</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Read blob field as blob descriptor (returns BlobDescriptor bytes) rather than actual blob bytes. </td>
</tr>
<tr>
<td><h5>read.batch-size</h5></td>
<td style="word-wrap: break-word;">1024</td>
Expand Down Expand Up @@ -1308,6 +1302,12 @@
<td>Duration</td>
<td>The delay duration of stream read when scan incremental snapshots.</td>
</tr>
<tr>
<td><h5>table-read.sequence-number.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to include the _SEQUENCE_NUMBER field when reading the audit_log or binlog system tables. This is only valid for primary key tables.</td>
</tr>
<tr>
<td><h5>tag.automatic-completion</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down Expand Up @@ -1440,6 +1440,12 @@
<td>String</td>
<td>The Variant shredding schema for writing.</td>
</tr>
<tr>
<td><h5>write-blob-from-descriptor</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Write blob field from blob descriptor rather than blob bytes. </td>
</tr>
<tr>
<td><h5>write-buffer-for-append</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
26 changes: 19 additions & 7 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2143,12 +2143,21 @@ public InlineElement getDescription() {
"Specifies column names that should be stored as blob type. "
+ "This is used when you want to treat a BYTES column as a BLOB.");

public static final ConfigOption<Boolean> BLOB_AS_DESCRIPTOR =
key("blob-as-descriptor")
public static final ConfigOption<Boolean> READ_BLOB_AS_DESCRIPTOR =
key("read-blob-as-descriptor")
.booleanType()
.defaultValue(false)
.withFallbackKeys("blob-as-descriptor")
.withDescription(
"Write blob field using blob descriptor rather than blob bytes.");
"Read blob field as blob descriptor (returns BlobDescriptor bytes) rather than actual blob bytes. ");

public static final ConfigOption<Boolean> WRITE_BLOB_FROM_DESCRIPTOR =
key("write-blob-from-descriptor")
.booleanType()
.defaultValue(false)
.withFallbackKeys("blob-as-descriptor")
.withDescription(
"Write blob field from blob descriptor rather than blob bytes. ");

public static final ConfigOption<Boolean> COMMIT_DISCARD_DUPLICATE_FILES =
key("commit.discard-duplicate-files")
Expand Down Expand Up @@ -2666,8 +2675,7 @@ public long blobTargetFileSize() {
}

public boolean blobSplitByFileSize() {
return options.getOptional(BLOB_SPLIT_BY_FILE_SIZE)
.orElse(!options.get(BLOB_AS_DESCRIPTOR));
return options.getOptional(BLOB_SPLIT_BY_FILE_SIZE).orElse(!readBlobAsDescriptor());
}

public long compactionFileSize(boolean hasPrimaryKey) {
Expand Down Expand Up @@ -3383,8 +3391,12 @@ public boolean formatTablePartitionOnlyValueInPath() {
return options.get(FORMAT_TABLE_PARTITION_ONLY_VALUE_IN_PATH);
}

public boolean blobAsDescriptor() {
return options.get(BLOB_AS_DESCRIPTOR);
public boolean readBlobAsDescriptor() {
return options.get(READ_BLOB_AS_DESCRIPTOR);
}

public boolean writeBlobFromDescriptor() {
return options.get(WRITE_BLOB_FROM_DESCRIPTOR);
}

public boolean postponeBatchWriteFixedBucket() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ public void testBlobSplitByFileSizeDefault() {
assertThat(options.blobSplitByFileSize()).isTrue();

conf = new Options();
conf.set(CoreOptions.BLOB_AS_DESCRIPTOR, true);
conf.set(CoreOptions.READ_BLOB_AS_DESCRIPTOR, true);
options = new CoreOptions(conf);
assertThat(options.blobSplitByFileSize()).isFalse();

conf = new Options();
conf.set(CoreOptions.BLOB_AS_DESCRIPTOR, false);
conf.set(CoreOptions.READ_BLOB_AS_DESCRIPTOR, false);
options = new CoreOptions(conf);
assertThat(options.blobSplitByFileSize()).isTrue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ protected boolean buildForPostponeBucketCompaction(
partitionSpec,
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));

boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
boolean blobAsDescriptor = table.coreOptions().writeBlobFromDescriptor();
DataStream<InternalRow> partitioned =
FlinkStreamPartitioner.partition(
FlinkSinkBuilder.mapToInternalRow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ protected List<DataStream<Committable>> buildCompactOperator(
// 2.3 write and then reorganize the committable
// set parallelism to null, and it'll forward parallelism when doWrite()
RowAppendTableSink sink = new RowAppendTableSink(table, null, null);
boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
boolean blobAsDescriptor = table.coreOptions().writeBlobFromDescriptor();
DataStream<Committable> written =
sink.doWrite(
FlinkSinkBuilder.mapToInternalRow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public FlinkSinkBuilder clusteringIfPossible(
public DataStreamSink<?> build() {
setParallelismIfAdaptiveConflict();
input = trySortInput(input);
boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
boolean blobAsDescriptor = table.coreOptions().writeBlobFromDescriptor();
DataStream<InternalRow> input =
mapToInternalRow(
this.input,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,14 @@ public CompactorSourceBuilder withPartitionIdleTime(@Nullable Duration partition
if (isContinuous) {
return new ContinuousFileStoreSource(readBuilder, compactBucketsTable.options(), null);
} else {
Options options = compactBucketsTable.coreOptions().toConfiguration();
CoreOptions coreOptions = compactBucketsTable.coreOptions();
Options options = coreOptions.toConfiguration();
return new StaticFileStoreSource(
readBuilder,
null,
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE),
options.get(CoreOptions.BLOB_AS_DESCRIPTOR));
coreOptions.readBlobAsDescriptor());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,7 @@ public ContinuousFileStoreSource(
@Nullable Long limit,
boolean unordered,
@Nullable NestedProjectedRowData rowData) {
super(
readBuilder,
limit,
rowData,
Boolean.parseBoolean(
options.getOrDefault(CoreOptions.BLOB_AS_DESCRIPTOR.key(), "false")));
super(readBuilder, limit, rowData, CoreOptions.fromMap(options).readBlobAsDescriptor());
this.options = options;
this.unordered = unordered;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private DataStream<RowData> buildStaticFileSource() {
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE),
dynamicPartitionFilteringInfo,
outerProject(),
options.get(CoreOptions.BLOB_AS_DESCRIPTOR)));
new CoreOptions(options).readBlobAsDescriptor()));
}

private DataStream<RowData> buildContinuousFileSource() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
splitAssignMode,
null,
rowData,
Boolean.parseBoolean(
table.options()
.getOrDefault(
CoreOptions.BLOB_AS_DESCRIPTOR.key(),
"false")));
CoreOptions.fromMap(table.options()).readBlobAsDescriptor());
}
return new PaimonDataStreamScanProvider(
source.getBoundedness() == Boundedness.BOUNDED,
Expand Down
Loading