Fields that are ignored for comparison while generating -U, +U changelog for the same record. This configuration is only valid for the changelog-producer.row-deduplicate is true.
-
-
table-read.sequence-number.enabled
-
false
-
Boolean
-
Whether to include the _SEQUENCE_NUMBER field when reading the audit_log or binlog system tables. This is only valid for primary key tables.
-
changelog.num-retained.max
(none)
@@ -1308,6 +1302,12 @@
Duration
The delay duration of stream read when scan incremental snapshots.
+
+
table-read.sequence-number.enabled
+
false
+
Boolean
+
Whether to include the _SEQUENCE_NUMBER field when reading the audit_log or binlog system tables. This is only valid for primary key tables.
+
tag.automatic-completion
false
@@ -1440,6 +1440,24 @@
String
The Variant shredding schema for writing.
+
+
vector-store.fields
+
(none)
+
String
+
Specify the vector store fields.
+
+
+
vector-store.format
+
(none)
+
String
+
Specify the vector store file format.
+
+
+
vector-store.target-file-size
+
(none)
+
MemorySize
+
Target size of a vector-store file. Default is 10 * TARGET_FILE_SIZE.
+
write-buffer-for-append
false
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 95f1387491f0..0ee34343e826 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2190,6 +2190,29 @@ public InlineElement getDescription() {
.withDescription(
"Whether to try upgrading the data files after overwriting a primary key table.");
+ public static final ConfigOption VECTOR_STORE_FORMAT =
+ key("vector-store.format")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Specify the vector store file format.");
+
+ public static final ConfigOption VECTOR_STORE_FIELDS =
+ key("vector-store.fields")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Specify the vector store fields.");
+
+ public static final ConfigOption VECTOR_STORE_TARGET_FILE_SIZE =
+ key("vector-store.target-file-size")
+ .memoryType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Target size of a vector-store file."
+ + " Default is 10 * TARGET_FILE_SIZE.")
+ .build());
+
private final Options options;
public CoreOptions(Map options) {
@@ -3407,6 +3430,26 @@ public boolean overwriteUpgrade() {
return options.get(OVERWRITE_UPGRADE);
}
+ public String vectorStoreFileFormatString() {
+ return normalizeFileFormat(options.get(VECTOR_STORE_FORMAT));
+ }
+
+ public List vectorStoreFieldNames() {
+ String vectorStoreFields = options.get(CoreOptions.VECTOR_STORE_FIELDS);
+ if (vectorStoreFields == null || vectorStoreFields.trim().isEmpty()) {
+ return new ArrayList<>();
+ } else {
+ return Arrays.asList(vectorStoreFields.split(","));
+ }
+ }
+
+ public long vectorStoreTargetFileSize() {
+ // Since vectors are large, it would be better to set a larger target size for vectors.
+ return options.getOptional(VECTOR_STORE_TARGET_FILE_SIZE)
+ .map(MemorySize::getBytes)
+ .orElse(10 * targetFileSize(false));
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeChecks.java b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeChecks.java
index 20eed4d03842..5f11c2e95d7b 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeChecks.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeChecks.java
@@ -240,6 +240,11 @@ public List visit(ArrayType arrayType) {
return Collections.singletonList(arrayType.getElementType());
}
+ @Override
+ public List visit(VectorType vectorType) {
+ return Collections.singletonList(vectorType.getElementType());
+ }
+
@Override
public List visit(MultisetType multisetType) {
return Collections.singletonList(multisetType.getElementType());
diff --git a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeDefaultVisitor.java b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeDefaultVisitor.java
index b3dc8a3cd994..af680ede62e2 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeDefaultVisitor.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeDefaultVisitor.java
@@ -124,6 +124,11 @@ public R visit(ArrayType arrayType) {
return defaultMethod(arrayType);
}
+ @Override
+ public R visit(VectorType vectorType) {
+ return defaultMethod(vectorType);
+ }
+
@Override
public R visit(MultisetType multisetType) {
return defaultMethod(multisetType);
diff --git a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java
index 13637265d45f..4079dd8c47c0 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java
@@ -78,6 +78,10 @@ public static DataType parseDataType(JsonNode json, AtomicInteger fieldId) {
if (typeString.startsWith("ARRAY")) {
DataType element = parseDataType(json.get("element"), fieldId);
return new ArrayType(!typeString.contains("NOT NULL"), element);
+ } else if (typeString.startsWith("VECTOR")) {
+ DataType element = parseDataType(json.get("element"), fieldId);
+ int length = json.get("length").asInt();
+ return new VectorType(!typeString.contains("NOT NULL"), length, element);
} else if (typeString.startsWith("MULTISET")) {
DataType element = parseDataType(json.get("element"), fieldId);
return new MultisetType(!typeString.contains("NOT NULL"), element);
@@ -318,6 +322,7 @@ private enum Keyword {
SECOND,
TO,
ARRAY,
+ VECTOR,
MULTISET,
MAP,
ROW,
@@ -544,6 +549,8 @@ private DataType parseTypeByKeyword() {
return new VariantType();
case BLOB:
return new BlobType();
+ case VECTOR:
+ return parseVectorType();
default:
throw parsingError("Unsupported type: " + token().value);
}
@@ -665,5 +672,16 @@ private int parseOptionalPrecision(int defaultPrecision) {
}
return precision;
}
+
+ private DataType parseVectorType() {
+ // VECTOR
+ nextToken(TokenType.BEGIN_SUBTYPE);
+ DataType elementType = parseTypeWithNullability();
+ nextToken(TokenType.LIST_SEPARATOR);
+ nextToken(TokenType.LITERAL_INT);
+ int length = tokenAsInt();
+ nextToken(TokenType.END_SUBTYPE);
+ return DataTypes.VECTOR(length, elementType);
+ }
}
}
diff --git a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeRoot.java b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeRoot.java
index 1b339765986c..f55da9c4706f 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeRoot.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeRoot.java
@@ -106,6 +106,8 @@ public enum DataTypeRoot {
ARRAY(DataTypeFamily.CONSTRUCTED, DataTypeFamily.COLLECTION),
+ VECTOR(DataTypeFamily.CONSTRUCTED, DataTypeFamily.COLLECTION),
+
MULTISET(DataTypeFamily.CONSTRUCTED, DataTypeFamily.COLLECTION),
MAP(DataTypeFamily.CONSTRUCTED, DataTypeFamily.EXTENSION),
diff --git a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeVisitor.java b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeVisitor.java
index cdeb4204b17c..6e377309f237 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeVisitor.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeVisitor.java
@@ -68,6 +68,8 @@ public interface DataTypeVisitor {
R visit(ArrayType arrayType);
+ R visit(VectorType vectorType);
+
R visit(MultisetType multisetType);
R visit(MapType mapType);
diff --git a/paimon-api/src/main/java/org/apache/paimon/types/DataTypes.java b/paimon-api/src/main/java/org/apache/paimon/types/DataTypes.java
index 33a5c9e5e442..39b180651ef5 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/DataTypes.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/DataTypes.java
@@ -59,6 +59,10 @@ public static ArrayType ARRAY(DataType element) {
return new ArrayType(element);
}
+ public static VectorType VECTOR(int length, DataType element) {
+ return new VectorType(length, element);
+ }
+
public static CharType CHAR(int length) {
return new CharType(length);
}
@@ -221,6 +225,11 @@ public OptionalInt visit(VarBinaryType varBinaryType) {
return OptionalInt.of(varBinaryType.getLength());
}
+ @Override
+ public OptionalInt visit(VectorType vectorType) {
+ return OptionalInt.of(vectorType.getLength());
+ }
+
@Override
protected OptionalInt defaultMethod(DataType dataType) {
return OptionalInt.empty();
diff --git a/paimon-api/src/main/java/org/apache/paimon/types/ReassignFieldId.java b/paimon-api/src/main/java/org/apache/paimon/types/ReassignFieldId.java
index 2aacfeaf889a..85f104d99320 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/ReassignFieldId.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/ReassignFieldId.java
@@ -38,6 +38,14 @@ public DataType visit(ArrayType arrayType) {
return new ArrayType(arrayType.isNullable(), arrayType.getElementType().accept(this));
}
+ @Override
+ public DataType visit(VectorType vectorType) {
+ return new VectorType(
+ vectorType.isNullable(),
+ vectorType.getLength(),
+ vectorType.getElementType().accept(this));
+ }
+
@Override
public DataType visit(MultisetType multisetType) {
return new MultisetType(
diff --git a/paimon-api/src/main/java/org/apache/paimon/types/VectorType.java b/paimon-api/src/main/java/org/apache/paimon/types/VectorType.java
new file mode 100644
index 000000000000..a98bd6fa30d5
--- /dev/null
+++ b/paimon-api/src/main/java/org/apache/paimon/types/VectorType.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.types;
+
+import org.apache.paimon.annotation.Public;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Data type of fixed-size vector type. The elements are densely stored.
+ *
+ * @since 2.0.0
+ */
+@Public
+public class VectorType extends DataType {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final int MIN_LENGTH = 1;
+
+ public static final int MAX_LENGTH = Integer.MAX_VALUE;
+
+ public static final String FORMAT = "VECTOR<%s, %d>";
+
+ private final DataType elementType;
+
+ private final int length;
+
+ public VectorType(boolean isNullable, int length, DataType elementType) {
+ super(isNullable, DataTypeRoot.VECTOR);
+ // TODO: should we support nullable for vector type?
+ // Preconditions.checkArgument(!isNullable, "Nullable is not supported for VectorType.");
+ this.elementType =
+ Preconditions.checkNotNull(elementType, "Element type must not be null.");
+ Preconditions.checkArgument(
+ isValidElementType(elementType), "Invalid element type for vector: " + elementType);
+ if (length < MIN_LENGTH) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Vector length must be between %d and %d (both inclusive).",
+ MIN_LENGTH, MAX_LENGTH));
+ }
+ this.length = length;
+ }
+
+ public VectorType(int length, DataType elementType) {
+ this(false, length, elementType); // For vector type we prefer NOT NULL
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ public DataType getElementType() {
+ return elementType;
+ }
+
+ public static boolean isValidElementType(DataType elementType) {
+ switch (elementType.getTypeRoot()) {
+ case BOOLEAN:
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ @Override
+ public int defaultSize() {
+ return elementType.defaultSize() * length;
+ }
+
+ @Override
+ public DataType copy(boolean isNullable) {
+ return new VectorType(isNullable, length, elementType.copy());
+ }
+
+ @Override
+ public String asSQLString() {
+ return withNullability(FORMAT, elementType.asSQLString(), length);
+ }
+
+ @Override
+ public void serializeJson(JsonGenerator generator) throws IOException {
+ generator.writeStartObject();
+ generator.writeStringField("type", isNullable() ? "VECTOR" : "VECTOR NOT NULL");
+ generator.writeFieldName("element");
+ elementType.serializeJson(generator);
+ generator.writeFieldName("length");
+ generator.writeNumber(length);
+ generator.writeEndObject();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ VectorType vectorType = (VectorType) o;
+ return elementType.equals(vectorType.elementType) && length == vectorType.length;
+ }
+
+ @Override
+ public boolean equalsIgnoreFieldId(DataType o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ VectorType vectorType = (VectorType) o;
+ return elementType.equalsIgnoreFieldId(vectorType.elementType)
+ && length == vectorType.length;
+ }
+
+ @Override
+ public boolean isPrunedFrom(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ VectorType vectorType = (VectorType) o;
+ return elementType.isPrunedFrom(vectorType.elementType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), elementType, length);
+ }
+
+ @Override
+ public R accept(DataTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public void collectFieldIds(Set fieldIds) {
+ elementType.collectFieldIds(fieldIds);
+ }
+}
diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java
index e6bc1281a74d..33defc8f9a01 100644
--- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java
+++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java
@@ -42,6 +42,7 @@
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.types.VariantType;
+import org.apache.paimon.types.VectorType;
import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.Types;
@@ -179,6 +180,12 @@ public FieldType visit(ArrayType arrayType) {
return new FieldType(arrayType.isNullable(), Types.MinorType.LIST.getType(), null);
}
+ @Override
+ public FieldType visit(VectorType vectorType) {
+ ArrowType arrowType = new ArrowType.FixedSizeList(vectorType.getLength());
+ return new FieldType(vectorType.isNullable(), arrowType, null);
+ }
+
@Override
public FieldType visit(MultisetType multisetType) {
throw new UnsupportedOperationException("Doesn't support MultisetType.");
diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
index 7b387b409e1b..041927b7fbd6 100644
--- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
+++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
@@ -30,6 +30,7 @@
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VariantType;
+import org.apache.paimon.types.VectorType;
import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
@@ -137,14 +138,16 @@ public static Field toArrowField(
fieldType.getDictionary(),
Collections.singletonMap(PARQUET_FIELD_ID, String.valueOf(fieldId)));
List children = null;
- if (dataType instanceof ArrayType) {
+ if (dataType instanceof ArrayType || dataType instanceof VectorType) {
+ final DataType elementType;
+ if (dataType instanceof VectorType) {
+ elementType = ((VectorType) dataType).getElementType();
+ } else {
+ elementType = ((ArrayType) dataType).getElementType();
+ }
Field field =
toArrowField(
- ListVector.DATA_VECTOR_NAME,
- fieldId,
- ((ArrayType) dataType).getElementType(),
- depth + 1,
- visitor);
+ ListVector.DATA_VECTOR_NAME, fieldId, elementType, depth + 1, visitor);
FieldType typeInner = field.getFieldType();
field =
new Field(
diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java
index 4afd976c1747..f4e18c66e4ab 100644
--- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java
+++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java
@@ -22,6 +22,7 @@
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.columnar.ArrayColumnVector;
import org.apache.paimon.data.columnar.BooleanColumnVector;
@@ -31,6 +32,7 @@
import org.apache.paimon.data.columnar.ColumnarArray;
import org.apache.paimon.data.columnar.ColumnarMap;
import org.apache.paimon.data.columnar.ColumnarRow;
+import org.apache.paimon.data.columnar.ColumnarVec;
import org.apache.paimon.data.columnar.DecimalColumnVector;
import org.apache.paimon.data.columnar.DoubleColumnVector;
import org.apache.paimon.data.columnar.FloatColumnVector;
@@ -40,6 +42,7 @@
import org.apache.paimon.data.columnar.RowColumnVector;
import org.apache.paimon.data.columnar.ShortColumnVector;
import org.apache.paimon.data.columnar.TimestampColumnVector;
+import org.apache.paimon.data.columnar.VecColumnVector;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
@@ -66,6 +69,7 @@
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.types.VariantType;
+import org.apache.paimon.types.VectorType;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
@@ -82,6 +86,7 @@
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.complex.FixedSizeListVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.StructVector;
@@ -482,6 +487,53 @@ public ColumnVector getColumnVector() {
};
}
+ @Override
+ public Arrow2PaimonVectorConverter visit(VectorType vectorType) {
+ final Arrow2PaimonVectorConverter arrowVectorConvertor =
+ vectorType.getElementType().accept(this);
+
+ return vector ->
+ new VecColumnVector() {
+
+ private boolean inited = false;
+ private ColumnVector columnVector;
+
+ private void init() {
+ if (!inited) {
+ FieldVector child = ((FixedSizeListVector) vector).getDataVector();
+ this.columnVector = arrowVectorConvertor.convertVector(child);
+ inited = true;
+ }
+ }
+
+ @Override
+ public boolean isNullAt(int index) {
+ return vector.isNull(index);
+ }
+
+ @Override
+ public InternalVector getVector(int index) {
+ init();
+ FixedSizeListVector listVector = (FixedSizeListVector) vector;
+ int start = listVector.getElementStartIndex(index);
+ int end = listVector.getElementEndIndex(index);
+ return new ColumnarVec(columnVector, start, end - start);
+ }
+
+ @Override
+ public ColumnVector getColumnVector() {
+ init();
+ return columnVector;
+ }
+
+ @Override
+ public int getVectorSize() {
+ FixedSizeListVector listVector = (FixedSizeListVector) vector;
+ return listVector.getListSize();
+ }
+ };
+ }
+
@Override
public Arrow2PaimonVectorConverter visit(MultisetType multisetType) {
throw new UnsupportedOperationException("Doesn't support MultisetType.");
diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java
index a20e6fc4814c..ccff6d6a24f6 100644
--- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java
+++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java
@@ -41,8 +41,10 @@
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.types.VariantType;
+import org.apache.paimon.types.VectorType;
import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.complex.FixedSizeListVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
@@ -165,6 +167,18 @@ public ArrowFieldWriterFactory visit(ArrayType arrayType) {
isNullable);
}
+ @Override
+ public ArrowFieldWriterFactory visit(VectorType vectorType) {
+ ArrowFieldWriterFactory elementWriterFactory = vectorType.getElementType().accept(this);
+ return (fieldVector, isNullable) ->
+ new ArrowFieldWriters.VectorWriter(
+ fieldVector,
+ vectorType.getLength(),
+ elementWriterFactory.create(
+ ((FixedSizeListVector) fieldVector).getDataVector(), isNullable),
+ isNullable);
+ }
+
@Override
public ArrowFieldWriterFactory visit(MultisetType multisetType) {
throw new UnsupportedOperationException("Doesn't support MultisetType.");
diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java
index 409b1f9ab2ec..7cb64de7fd88 100644
--- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java
+++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java
@@ -25,6 +25,7 @@
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.columnar.ArrayColumnVector;
import org.apache.paimon.data.columnar.BooleanColumnVector;
@@ -40,6 +41,7 @@
import org.apache.paimon.data.columnar.RowColumnVector;
import org.apache.paimon.data.columnar.ShortColumnVector;
import org.apache.paimon.data.columnar.TimestampColumnVector;
+import org.apache.paimon.data.columnar.VecColumnVector;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.data.variant.GenericVariant;
import org.apache.paimon.data.variant.PaimonShreddingUtils;
@@ -65,6 +67,7 @@
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.complex.FixedSizeListVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.StructVector;
@@ -711,6 +714,93 @@ protected void doWrite(int rowIndex, DataGetters getters, int pos) {
}
}
+ /** Writer for VECTOR. */
+ public static class VectorWriter extends ArrowFieldWriter {
+
+ private final ArrowFieldWriter elementWriter;
+
+ private final int length;
+
+ public VectorWriter(
+ FieldVector fieldVector,
+ int length,
+ ArrowFieldWriter elementWriter,
+ boolean isNullable) {
+ super(fieldVector, isNullable);
+ this.length = length;
+ this.elementWriter = elementWriter;
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ elementWriter.reset();
+ }
+
+ @Override
+ protected void doWrite(
+ ColumnVector columnVector,
+ @Nullable int[] pickedInColumn,
+ int startIndex,
+ int batchRows) {
+ VecColumnVector vecColumnVector = (VecColumnVector) columnVector;
+
+ if (pickedInColumn == null) {
+ elementWriter.write(
+ vecColumnVector.getColumnVector(),
+ null,
+ startIndex * length,
+ batchRows * length);
+ } else {
+ int[] childPickedInColumn = new int[batchRows * length];
+ for (int i = 0; i < batchRows; ++i) {
+ int pickedIndexInChild = pickedInColumn[startIndex + i] * length;
+ for (int j = 0; j < length; ++j) {
+ childPickedInColumn[i * length + j] = pickedIndexInChild + j;
+ }
+ }
+ elementWriter.write(
+ vecColumnVector.getColumnVector(),
+ childPickedInColumn,
+ 0,
+ batchRows * length);
+ }
+
+ // set FixedSizeListVector
+ FixedSizeListVector listVector = (FixedSizeListVector) fieldVector;
+ for (int i = 0; i < batchRows; i++) {
+ int row = getRowNumber(startIndex, i, pickedInColumn);
+ if (vecColumnVector.isNullAt(row)) {
+ listVector.setNull(i);
+ } else {
+ listVector.startNewValue(i);
+ }
+ }
+ }
+
+ @Override
+ protected void doWrite(int rowIndex, DataGetters getters, int pos) {
+ InternalVector vector = getters.getVector(pos);
+ if (vector.size() != length) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The size of vector %s is not equal to the length %s",
+ vector.size(), length));
+ }
+ FixedSizeListVector listVector = (FixedSizeListVector) fieldVector;
+ listVector.setNotNull(rowIndex);
+ final int rowBase = rowIndex * length;
+ for (int vectorIndex = 0; vectorIndex < length; ++vectorIndex) {
+ elementWriter.write(rowBase + vectorIndex, vector, vectorIndex);
+ }
+ // Ensure child value count is large enough.
+ listVector
+ .getDataVector()
+ .setValueCount(
+ Math.max(listVector.getDataVector().getValueCount(), rowBase + length));
+ }
+ }
+
/** Writer for MAP. */
public static class MapWriter extends ArrowFieldWriter {
diff --git a/paimon-arrow/src/test/java/org/apache/paimon/arrow/ArrowUtilsTest.java b/paimon-arrow/src/test/java/org/apache/paimon/arrow/ArrowUtilsTest.java
index 49af9751509d..ee613a05d706 100644
--- a/paimon-arrow/src/test/java/org/apache/paimon/arrow/ArrowUtilsTest.java
+++ b/paimon-arrow/src/test/java/org/apache/paimon/arrow/ArrowUtilsTest.java
@@ -24,6 +24,7 @@
import org.apache.paimon.types.RowType;
import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.assertj.core.api.Assertions;
@@ -108,4 +109,13 @@ public void testMap() {
.getFieldType();
Assertions.assertThat(fieldType.isNullable()).isTrue();
}
+
+ @Test
+ public void testVectorType() {
+ Field field =
+ ArrowUtils.toArrowField("embed", 0, DataTypes.VECTOR(4, DataTypes.FLOAT()), 0);
+ Assertions.assertThat(field.getFieldType().getType())
+ .isEqualTo(new ArrowType.FixedSizeList(4));
+ Assertions.assertThat(field.getChildren()).hasSize(1);
+ }
}
diff --git a/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverterTest.java b/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverterTest.java
new file mode 100644
index 000000000000..4c7e2bb2838a
--- /dev/null
+++ b/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverterTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.arrow.converter;
+
+import org.apache.paimon.arrow.ArrowUtils;
+import org.apache.paimon.arrow.writer.ArrowFieldWriter;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
+import org.apache.paimon.data.columnar.ColumnVector;
+import org.apache.paimon.data.columnar.ColumnarVec;
+import org.apache.paimon.data.columnar.VecColumnVector;
+import org.apache.paimon.data.columnar.VectorizedColumnBatch;
+import org.apache.paimon.data.columnar.heap.HeapFloatVector;
+import org.apache.paimon.reader.VectorizedRecordIterator;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.FixedSizeListVector;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ArrowVectorizedBatchConverter}. */
+public class ArrowVectorizedBatchConverterTest {
+
+ @Test
+ public void testVectorColumnWrite() {
+ RowType rowType = RowType.of(DataTypes.VECTOR(3, DataTypes.FLOAT()));
+ try (RootAllocator allocator = new RootAllocator()) {
+ VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(rowType, allocator);
+ ArrowFieldWriter[] fieldWriters = ArrowUtils.createArrowFieldWriters(vsr, rowType);
+
+ int length = 3;
+ int rows = 2;
+ HeapFloatVector elementVector = new HeapFloatVector(rows * length);
+ float[] values = new float[] {1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f};
+ for (int i = 0; i < values.length; i++) {
+ elementVector.setFloat(i, values[i]);
+ }
+ VecColumnVector vector =
+ new VecColumnVector() {
+ @Override
+ public InternalVector getVector(int i) {
+ return new ColumnarVec(elementVector, i * length, length);
+ }
+
+ @Override
+ public ColumnVector getColumnVector() {
+ return elementVector;
+ }
+
+ @Override
+ public int getVectorSize() {
+ return length;
+ }
+
+ @Override
+ public boolean isNullAt(int i) {
+ return i % 2 == 1;
+ }
+ };
+
+ VectorizedColumnBatch batch = new VectorizedColumnBatch(new ColumnVector[] {vector});
+ batch.setNumRows(rows);
+
+ ArrowVectorizedBatchConverter converter =
+ new ArrowVectorizedBatchConverter(vsr, fieldWriters);
+ converter.reset(
+ new VectorizedRecordIterator() {
+ @Override
+ public VectorizedColumnBatch batch() {
+ return batch;
+ }
+
+ @Override
+ public InternalRow next() {
+ return null;
+ }
+
+ @Override
+ public void releaseBatch() {}
+ });
+ converter.next(rows);
+
+ FixedSizeListVector listVector = (FixedSizeListVector) vsr.getVector(0);
+ assertThat(listVector.isNull(0)).isFalse();
+ assertThat(listVector.isNull(1)).isTrue();
+
+ @SuppressWarnings("unchecked")
+ List row0 = (List) listVector.getObject(0);
+ assertThat(row0).containsExactly(1.0f, 2.0f, 3.0f);
+ assertThat(listVector.getObject(1)).isNull();
+
+ converter.close();
+ }
+ }
+
+ @Test
+ public void testVectorColumnWriteWithPickedInColumn() {
+ RowType rowType = RowType.of(DataTypes.VECTOR(2, DataTypes.FLOAT()));
+ try (RootAllocator allocator = new RootAllocator()) {
+ VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(rowType, allocator);
+ ArrowFieldWriter[] fieldWriters = ArrowUtils.createArrowFieldWriters(vsr, rowType);
+
+ int length = 2;
+ int rows = 4;
+ HeapFloatVector elementVector = new HeapFloatVector(rows * length);
+ float[] values = new float[] {1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f, 7.0f, 8.0f};
+ for (int i = 0; i < values.length; i++) {
+ elementVector.setFloat(i, values[i]);
+ }
+
+ VecColumnVector vector =
+ new VecColumnVector() {
+ @Override
+ public InternalVector getVector(int i) {
+ return new ColumnarVec(elementVector, i * length, length);
+ }
+
+ @Override
+ public ColumnVector getColumnVector() {
+ return elementVector;
+ }
+
+ @Override
+ public int getVectorSize() {
+ return length;
+ }
+
+ @Override
+ public boolean isNullAt(int i) {
+ return false;
+ }
+ };
+
+ int[] pickedInColumn = new int[] {2, 0};
+ fieldWriters[0].reset();
+ fieldWriters[0].write(vector, pickedInColumn, 0, pickedInColumn.length);
+
+ FixedSizeListVector listVector = (FixedSizeListVector) vsr.getVector(0);
+ @SuppressWarnings("unchecked")
+ List row0 = (List) listVector.getObject(0);
+ assertThat(row0).containsExactly(5.0f, 6.0f);
+ @SuppressWarnings("unchecked")
+ List row1 = (List) listVector.getObject(1);
+ assertThat(row1).containsExactly(1.0f, 2.0f);
+
+ vsr.close();
+ }
+ }
+}
diff --git a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
index ec1fe10c46a4..a716264807ef 100644
--- a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
+++ b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
@@ -24,6 +24,7 @@
import org.apache.paimon.arrow.reader.ArrowBatchReader;
import org.apache.paimon.arrow.writer.ArrowFieldWriterFactoryVisitor;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.BinaryVector;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericMap;
@@ -140,6 +141,32 @@ public void testWrite() {
}
}
+ @Test
+ public void testWriteVector() {
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "embed", DataTypes.VECTOR(3, DataTypes.FLOAT()))));
+ float[] values = new float[] {1.0f, 2.0f, 3.0f};
+ try (ArrowFormatWriter writer = new ArrowFormatWriter(rowType, 16, true)) {
+ writer.write(GenericRow.of(1, BinaryVector.fromPrimitiveArray(values)));
+
+ writer.flush();
+ VectorSchemaRoot vectorSchemaRoot = writer.getVectorSchemaRoot();
+
+ ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType, true);
+ Iterable rows = arrowBatchReader.readBatch(vectorSchemaRoot);
+
+ Iterator iterator = rows.iterator();
+ InternalRow row = iterator.next();
+
+ assertThat(row.getInt(0)).isEqualTo(1);
+ assertThat(row.getVector(1).toFloatArray()).isEqualTo(values);
+ vectorSchemaRoot.close();
+ }
+ }
+
@Test
public void testWriteVariant() {
RowType rowType = new RowType(Arrays.asList(new DataField(0, "v", DataTypes.VARIANT())));
diff --git a/paimon-codegen/src/main/scala/org/apache/paimon/codegen/GenerateUtils.scala b/paimon-codegen/src/main/scala/org/apache/paimon/codegen/GenerateUtils.scala
index 6897e5424857..967d58ad30db 100644
--- a/paimon-codegen/src/main/scala/org/apache/paimon/codegen/GenerateUtils.scala
+++ b/paimon-codegen/src/main/scala/org/apache/paimon/codegen/GenerateUtils.scala
@@ -127,10 +127,13 @@ object GenerateUtils {
s"$sortUtil.compareBinary($leftTerm, $rightTerm)"
case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | DATE | TIME_WITHOUT_TIME_ZONE =>
s"($leftTerm > $rightTerm ? 1 : $leftTerm < $rightTerm ? -1 : 0)"
- case ARRAY =>
- val at = t.asInstanceOf[ArrayType]
+ case ARRAY | VECTOR =>
+ val elementType = t.getTypeRoot match {
+ case ARRAY => t.asInstanceOf[ArrayType].getElementType
+ case VECTOR => t.asInstanceOf[VectorType].getElementType
+ }
val compareFunc = newName("compareArray")
- val compareCode = generateArrayCompare(ctx, nullsIsLast = false, at, "a", "b")
+ val compareCode = generateArrayCompare(ctx, nullsIsLast = false, elementType, "a", "b")
val funcCode: String =
s"""
public int $compareFunc($ARRAY_DATA a, $ARRAY_DATA b) {
@@ -188,11 +191,10 @@ object GenerateUtils {
def generateArrayCompare(
ctx: CodeGeneratorContext,
nullsIsLast: Boolean,
- arrayType: ArrayType,
+ elementType: DataType,
leftTerm: String,
rightTerm: String): String = {
val nullIsLastRet = if (nullsIsLast) 1 else -1
- val elementType = arrayType.getElementType
val fieldA = newName("fieldA")
val isNullA = newName("isNullA")
val lengthA = newName("lengthA")
@@ -379,6 +381,7 @@ object GenerateUtils {
case DOUBLE => className[JDouble]
case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE => className[Timestamp]
case ARRAY => className[InternalArray]
+ case VECTOR => className[InternalVector]
case MULTISET | MAP => className[InternalMap]
case ROW => className[InternalRow]
case VARIANT => className[Variant]
@@ -417,6 +420,8 @@ object GenerateUtils {
s"$rowTerm.getTimestamp($indexTerm, ${getPrecision(t)})"
case ARRAY =>
s"$rowTerm.getArray($indexTerm)"
+ case VECTOR =>
+ s"$rowTerm.getVector($indexTerm)"
case MULTISET | MAP =>
s"$rowTerm.getMap($indexTerm)"
case ROW =>
@@ -606,6 +611,9 @@ object GenerateUtils {
case ARRAY =>
val ser = addSerializer(t)
s"$writerTerm.writeArray($indexTerm, $fieldValTerm, $ser)"
+ case VECTOR =>
+ val ser = addSerializer(t)
+ s"$writerTerm.writeVector($indexTerm, $fieldValTerm, $ser)"
case MULTISET | MAP =>
val ser = addSerializer(t)
s"$writerTerm.writeMap($indexTerm, $fieldValTerm, $ser)"
diff --git a/paimon-codegen/src/main/scala/org/apache/paimon/codegen/ScalarOperatorGens.scala b/paimon-codegen/src/main/scala/org/apache/paimon/codegen/ScalarOperatorGens.scala
index 5dfa4bff6835..46217b1c4cf1 100644
--- a/paimon-codegen/src/main/scala/org/apache/paimon/codegen/ScalarOperatorGens.scala
+++ b/paimon-codegen/src/main/scala/org/apache/paimon/codegen/ScalarOperatorGens.scala
@@ -60,7 +60,13 @@ object ScalarOperatorGens {
}
// array types
else if (isArray(left.resultType) && canEqual) {
- generateArrayComparison(ctx, left, right, resultType)
+ val elementType = left.resultType.asInstanceOf[ArrayType].getElementType
+ generateArrayComparison(ctx, left, right, elementType, resultType)
+ }
+ // vector type
+ else if (isVector(left.resultType) && canEqual) {
+ val elementType = left.resultType.asInstanceOf[VectorType].getElementType
+ generateArrayComparison(ctx, left, right, elementType, resultType)
}
// map types
else if (isMap(left.resultType) && canEqual) {
@@ -196,6 +202,7 @@ object ScalarOperatorGens {
ctx: CodeGeneratorContext,
left: GeneratedExpression,
right: GeneratedExpression,
+ elementType: DataType,
resultType: DataType): GeneratedExpression = {
generateCallWithStmtIfArgsNotNull(ctx, resultType, Seq(left, right)) {
args =>
@@ -204,7 +211,6 @@ object ScalarOperatorGens {
val resultTerm = newName("compareResult")
- val elementType = left.resultType.asInstanceOf[ArrayType].getElementType
val elementCls = primitiveTypeTermForType(elementType)
val elementDefault = primitiveDefaultValue(elementType)
@@ -225,6 +231,7 @@ object ScalarOperatorGens {
rightElementExpr,
new BooleanType(elementType.isNullable))
+ // TODO: With BinaryVector available, we can use it here.
val stmt =
s"""
|boolean $resultTerm;
diff --git a/paimon-codegen/src/test/java/org/apache/paimon/codegen/EqualiserCodeGeneratorTest.java b/paimon-codegen/src/test/java/org/apache/paimon/codegen/EqualiserCodeGeneratorTest.java
index e662449858e7..7e977291e81f 100644
--- a/paimon-codegen/src/test/java/org/apache/paimon/codegen/EqualiserCodeGeneratorTest.java
+++ b/paimon-codegen/src/test/java/org/apache/paimon/codegen/EqualiserCodeGeneratorTest.java
@@ -30,6 +30,7 @@
import org.apache.paimon.data.serializer.InternalArraySerializer;
import org.apache.paimon.data.serializer.InternalMapSerializer;
import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.data.serializer.InternalVectorSerializer;
import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.data.variant.GenericVariant;
import org.apache.paimon.types.DataType;
@@ -133,6 +134,16 @@ public class EqualiserCodeGeneratorTest {
castFromString("[1,2,3]", DataTypes.ARRAY(new VarCharType())),
castFromString("[4,5,6]", DataTypes.ARRAY(new VarCharType()))),
new InternalArraySerializer(DataTypes.VARCHAR(1))));
+ TEST_DATA.put(
+ DataTypeRoot.VECTOR,
+ new GeneratedData(
+ DataTypes.VECTOR(3, DataTypes.FLOAT()),
+ Pair.of(
+ castFromString(
+ "[1.1,2.2,3.3]", DataTypes.VECTOR(3, DataTypes.FLOAT())),
+ castFromString(
+ "[4.4,5.5,6.6]", DataTypes.VECTOR(3, DataTypes.FLOAT()))),
+ new InternalVectorSerializer(DataTypes.FLOAT(), 3)));
TEST_DATA.put(
DataTypeRoot.MULTISET,
new GeneratedData(
diff --git a/paimon-common/src/main/java/org/apache/paimon/PartitionSettedRow.java b/paimon-common/src/main/java/org/apache/paimon/PartitionSettedRow.java
index 01b3ae48a2da..a62464d1981f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/PartitionSettedRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/PartitionSettedRow.java
@@ -25,6 +25,7 @@
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
@@ -176,6 +177,13 @@ public InternalArray getArray(int pos) {
: row.getArray(partitionInfo.getRealIndex(pos));
}
+ @Override
+ public InternalVector getVector(int pos) {
+ return partitionInfo.inPartitionRow(pos)
+ ? partition.getVector(partitionInfo.getRealIndex(pos))
+ : row.getVector(partitionInfo.getRealIndex(pos));
+ }
+
@Override
public InternalMap getMap(int pos) {
return partitionInfo.inPartitionRow(pos)
diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java b/paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java
index 821be67e2a55..4e95c9db8dcf 100644
--- a/paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java
+++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java
@@ -24,6 +24,7 @@
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
@@ -201,6 +202,11 @@ public InternalArray getArray(int pos) {
return castElementGetter.getElementOrNull(array, pos);
}
+ @Override
+ public InternalVector getVector(int pos) {
+ return castElementGetter.getElementOrNull(array, pos);
+ }
+
@Override
public InternalMap getMap(int pos) {
return castElementGetter.getElementOrNull(array, pos);
diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java b/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java
index a60ea635cf45..76a1366d4784 100644
--- a/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java
@@ -24,6 +24,7 @@
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;
@@ -148,6 +149,11 @@ public InternalArray getArray(int pos) {
return castMapping[pos].getFieldOrNull(row);
}
+ @Override
+ public InternalVector getVector(int pos) {
+ return castMapping[pos].getFieldOrNull(row);
+ }
+
@Override
public InternalMap getMap(int pos) {
return castMapping[pos].getFieldOrNull(row);
diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/CastedVector.java b/paimon-common/src/main/java/org/apache/paimon/casting/CastedVector.java
new file mode 100644
index 000000000000..c7bf0303467a
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastedVector.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.casting;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.Variant;
+
+/**
+ * An implementation of {@link InternalVector} which provides a casted view of the underlying {@link
+ * InternalVector}.
+ *
+ *
It reads data from underlying {@link InternalVector} according to source logical type and
+ * casts it with specific {@link CastExecutor}.
+ */
+public class CastedVector extends CastedArray implements InternalVector {
+
+ protected CastedVector(CastElementGetter castElementGetter) {
+ super(castElementGetter);
+ }
+
+ /**
+ * Replaces the underlying {@link InternalVector} backing this {@link CastedVector}.
+ *
+ *
This method replaces the vector in place and does not return a new object. This is done
+ * for performance reasons.
+ */
+ public static CastedVector from(CastElementGetter castElementGetter) {
+ return new CastedVector(castElementGetter);
+ }
+
+ public CastedVector replaceVector(InternalVector vector) {
+ super.replaceArray(vector);
+ return this;
+ }
+
+ @Override
+ public CastedArray replaceArray(InternalArray array) {
+ throw new IllegalArgumentException("CastedVector does not support replaceArray.");
+ }
+
+ @Override
+ public BinaryString getString(int pos) {
+ throw new UnsupportedOperationException("CastedVector does not support String.");
+ }
+
+ @Override
+ public Decimal getDecimal(int pos, int precision, int scale) {
+ throw new UnsupportedOperationException("CastedVector does not support Decimal.");
+ }
+
+ @Override
+ public Timestamp getTimestamp(int pos, int precision) {
+ throw new UnsupportedOperationException("CastedVector does not support Timestamp.");
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ throw new UnsupportedOperationException("CastedVector does not support Binary.");
+ }
+
+ @Override
+ public Variant getVariant(int pos) {
+ throw new UnsupportedOperationException("CastedVector does not support Variant.");
+ }
+
+ @Override
+ public Blob getBlob(int pos) {
+ throw new UnsupportedOperationException("CastedVector does not support Blob.");
+ }
+
+ @Override
+ public InternalArray getArray(int pos) {
+ throw new UnsupportedOperationException("CastedVector does not support nested Array.");
+ }
+
+ @Override
+ public InternalVector getVector(int pos) {
+ throw new UnsupportedOperationException("CastedVector does not support nested VectorType.");
+ }
+
+ @Override
+ public InternalMap getMap(int pos) {
+ throw new UnsupportedOperationException("CastedVector does not support Map.");
+ }
+
+ @Override
+ public InternalRow getRow(int pos, int numFields) {
+ throw new UnsupportedOperationException("CastedVector does not support nested Row.");
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java b/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java
index 25b7453861dd..555f065c7031 100644
--- a/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java
@@ -25,6 +25,7 @@
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.DataField;
@@ -176,6 +177,14 @@ public InternalArray getArray(int pos) {
return defaultValueRow.getArray(pos);
}
+ @Override
+ public InternalVector getVector(int pos) {
+ if (!row.isNullAt(pos)) {
+ return row.getVector(pos);
+ }
+ return defaultValueRow.getVector(pos);
+ }
+
@Override
public InternalMap getMap(int pos) {
if (!row.isNullAt(pos)) {
diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/FallbackMappingRow.java b/paimon-common/src/main/java/org/apache/paimon/casting/FallbackMappingRow.java
index b4bf853230fc..b981d552876d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/casting/FallbackMappingRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/casting/FallbackMappingRow.java
@@ -24,6 +24,7 @@
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;
@@ -174,6 +175,14 @@ public InternalArray getArray(int pos) {
return main.getArray(pos);
}
+ @Override
+ public InternalVector getVector(int pos) {
+ if (mappings[pos] != -1 && main.isNullAt(pos)) {
+ return fallbackRow.getVector(mappings[pos]);
+ }
+ return main.getVector(pos);
+ }
+
@Override
public InternalMap getMap(int pos) {
if (mappings[pos] != -1 && main.isNullAt(pos)) {
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java b/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java
index 85d044594851..bb632d5b31ed 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java
@@ -21,6 +21,7 @@
import org.apache.paimon.data.serializer.InternalArraySerializer;
import org.apache.paimon.data.serializer.InternalMapSerializer;
import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.data.serializer.InternalVectorSerializer;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentUtils;
@@ -91,6 +92,12 @@ public void writeArray(int pos, InternalArray input, InternalArraySerializer ser
pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
}
+ @Override
+ public void writeVector(int pos, InternalVector input, InternalVectorSerializer serializer) {
+ BinaryVector binary = serializer.toBinaryVector(input);
+ writeVectorToVarLenPart(pos, binary);
+ }
+
@Override
public void writeMap(int pos, InternalMap input, InternalMapSerializer serializer) {
BinaryMap binary = serializer.toBinaryMap(input);
@@ -270,6 +277,35 @@ private void writeBytesToVarLenPart(int pos, byte[] bytes, int offset, int len)
cursor += roundedSize;
}
+ private void writeVectorToVarLenPart(int pos, BinaryVector vector) {
+ // Memory layout: [numElements][segments]
+ final int numElementsWidth = 4;
+ final int size = vector.getSizeInBytes();
+
+ final int roundedSize = roundNumberOfBytesToNearestWord(size + numElementsWidth);
+
+ // grow the global buffer before writing data.
+ ensureCapacity(roundedSize);
+
+ zeroOutPaddingBytes(size + numElementsWidth);
+
+ // write numElements value first
+ segment.putInt(cursor, vector.size());
+ cursor += numElementsWidth;
+
+ // then vector values
+ if (vector.getSegments().length == 1) {
+ vector.getSegments()[0].copyTo(vector.getOffset(), segment, cursor, size);
+ } else {
+ writeMultiSegmentsToVarLenPart(vector.getSegments(), vector.getOffset(), size);
+ }
+
+ setOffsetAndSize(pos, cursor - numElementsWidth, size + numElementsWidth);
+
+ // move the cursor forward.
+ cursor += (roundedSize - numElementsWidth);
+ }
+
/** Increases the capacity to ensure that it can hold at least the minimum capacity argument. */
private void grow(int minCapacity) {
int oldCapacity = segment.size();
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BinaryArray.java b/paimon-common/src/main/java/org/apache/paimon/data/BinaryArray.java
index d8b649d4a64c..afac3f8d3d35 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryArray.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryArray.java
@@ -256,6 +256,11 @@ public InternalArray getArray(int pos) {
return MemorySegmentUtils.readArrayData(segments, offset, getLong(pos));
}
+ @Override
+ public InternalVector getVector(int pos) {
+ throw new IllegalArgumentException("Unsupported type: VectorType");
+ }
+
@Override
public InternalMap getMap(int pos) {
assertIndexIsValid(pos);
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java b/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java
index 70d2ec7a01a8..ff5406f7b326 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java
@@ -355,6 +355,12 @@ public InternalArray getArray(int pos) {
return MemorySegmentUtils.readArrayData(segments, offset, getLong(pos));
}
+ @Override
+ public InternalVector getVector(int pos) {
+ assertIndexIsValid(pos);
+ return MemorySegmentUtils.readVectorData(segments, offset, getLong(pos));
+ }
+
@Override
public InternalMap getMap(int pos) {
assertIndexIsValid(pos);
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BinaryVector.java b/paimon-common/src/main/java/org/apache/paimon/data/BinaryVector.java
new file mode 100644
index 000000000000..7ef07a85a45c
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryVector.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import org.apache.paimon.annotation.Public;
+import org.apache.paimon.data.variant.Variant;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.memory.MemorySegmentUtils;
+import org.apache.paimon.types.DataType;
+
+import java.util.Objects;
+
+import static org.apache.paimon.memory.MemorySegment.UNSAFE;
+
+/**
+ * A binary implementation of {@link InternalVector} which is backed by {@link MemorySegment}s.
+ *
+ *
Compared to {@link BinaryArray}, {@link BinaryVector} stores only primitive types, and
+ * nullable is not supported yet. Thus, the memory layout is very simple.
+ *
+ *
The binary layout of {@link BinaryVector}:
+ *
+ *
+ * [values]
+ *
+ *
+ * @since 2.0.0
+ */
+@Public
+public final class BinaryVector extends BinarySection implements InternalVector, DataSetters {
+
+ private static final long serialVersionUID = 1L;
+
+ /** Offset for Arrays. */
+ private static final int BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
+
+ private static final int BOOLEAN_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(boolean[].class);
+ private static final int SHORT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(short[].class);
+ private static final int INT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(int[].class);
+ private static final int LONG_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(long[].class);
+ private static final int FLOAT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(float[].class);
+ private static final int DOUBLE_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(double[].class);
+
+ public static int getPrimitiveElementSize(DataType type) {
+ // ordered by type root definition
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ case TINYINT:
+ return 1;
+ case SMALLINT:
+ return 2;
+ case INTEGER:
+ case FLOAT:
+ return 4;
+ case BIGINT:
+ case DOUBLE:
+ return 8;
+ default:
+ throw new IllegalArgumentException("Unsupported type: " + type);
+ }
+ }
+
+ // The number of elements in this vector
+ private final int size;
+
+ public BinaryVector(int size) {
+ this.size = size;
+ }
+
+ private void assertIndexIsValid(int ordinal) {
+ assert ordinal >= 0 : "ordinal (" + ordinal + ") should >= 0";
+ assert ordinal < size : "ordinal (" + ordinal + ") should < " + size;
+ }
+
+ private int getElementOffset(int ordinal, int elementSize) {
+ return offset + ordinal * elementSize;
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ assertIndexIsValid(pos);
+ return false;
+ }
+
+ @Override
+ public void setNullAt(int pos) {
+ assertIndexIsValid(pos);
+ throw new UnsupportedOperationException("Nullable is not supported yet for VectorType.");
+ }
+
+ @Override
+ public long getLong(int pos) {
+ assertIndexIsValid(pos);
+ return MemorySegmentUtils.getLong(segments, getElementOffset(pos, 8));
+ }
+
+ @Override
+ public void setLong(int pos, long value) {
+ assertIndexIsValid(pos);
+ MemorySegmentUtils.setLong(segments, getElementOffset(pos, 8), value);
+ }
+
+ @Override
+ public int getInt(int pos) {
+ assertIndexIsValid(pos);
+ return MemorySegmentUtils.getInt(segments, getElementOffset(pos, 4));
+ }
+
+ @Override
+ public void setInt(int pos, int value) {
+ assertIndexIsValid(pos);
+ MemorySegmentUtils.setInt(segments, getElementOffset(pos, 4), value);
+ }
+
+ @Override
+ public BinaryString getString(int pos) {
+ throw new UnsupportedOperationException("BinaryVector does not support String.");
+ }
+
+ @Override
+ public Decimal getDecimal(int pos, int precision, int scale) {
+ throw new UnsupportedOperationException("BinaryVector does not support Decimal.");
+ }
+
+ @Override
+ public Timestamp getTimestamp(int pos, int precision) {
+ throw new UnsupportedOperationException("BinaryVector does not support Timestamp.");
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ throw new UnsupportedOperationException("BinaryVector does not support Binary.");
+ }
+
+ @Override
+ public Variant getVariant(int pos) {
+ throw new UnsupportedOperationException("BinaryVector does not support Variant.");
+ }
+
+ @Override
+ public Blob getBlob(int pos) {
+ throw new UnsupportedOperationException("BinaryVector does not support Blob.");
+ }
+
+ @Override
+ public InternalArray getArray(int pos) {
+ throw new UnsupportedOperationException("BinaryVector does not support nested Array.");
+ }
+
+ @Override
+ public InternalVector getVector(int pos) {
+ throw new UnsupportedOperationException("BinaryVector does not support nested Vector.");
+ }
+
+ @Override
+ public InternalMap getMap(int pos) {
+ throw new UnsupportedOperationException("BinaryVector does not support Map.");
+ }
+
+ @Override
+ public InternalRow getRow(int pos, int numFields) {
+ throw new UnsupportedOperationException("BinaryVector does not support nested Row.");
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ assertIndexIsValid(pos);
+ return MemorySegmentUtils.getBoolean(segments, getElementOffset(pos, 1));
+ }
+
+ @Override
+ public void setBoolean(int pos, boolean value) {
+ assertIndexIsValid(pos);
+ MemorySegmentUtils.setBoolean(segments, getElementOffset(pos, 1), value);
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ assertIndexIsValid(pos);
+ return MemorySegmentUtils.getByte(segments, getElementOffset(pos, 1));
+ }
+
+ @Override
+ public void setByte(int pos, byte value) {
+ assertIndexIsValid(pos);
+ MemorySegmentUtils.setByte(segments, getElementOffset(pos, 1), value);
+ }
+
+ @Override
+ public short getShort(int pos) {
+ assertIndexIsValid(pos);
+ return MemorySegmentUtils.getShort(segments, getElementOffset(pos, 2));
+ }
+
+ @Override
+ public void setShort(int pos, short value) {
+ assertIndexIsValid(pos);
+ MemorySegmentUtils.setShort(segments, getElementOffset(pos, 2), value);
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ assertIndexIsValid(pos);
+ return MemorySegmentUtils.getFloat(segments, getElementOffset(pos, 4));
+ }
+
+ @Override
+ public void setFloat(int pos, float value) {
+ assertIndexIsValid(pos);
+ MemorySegmentUtils.setFloat(segments, getElementOffset(pos, 4), value);
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ assertIndexIsValid(pos);
+ return MemorySegmentUtils.getDouble(segments, getElementOffset(pos, 8));
+ }
+
+ @Override
+ public void setDouble(int pos, double value) {
+ assertIndexIsValid(pos);
+ MemorySegmentUtils.setDouble(segments, getElementOffset(pos, 8), value);
+ }
+
+ @Override
+ public void setDecimal(int pos, Decimal value, int precision) {
+ throw new UnsupportedOperationException("BinaryVector does not support Decimal.");
+ }
+
+ @Override
+ public void setTimestamp(int pos, Timestamp value, int precision) {
+ throw new UnsupportedOperationException("BinaryVector does not support Timestamp.");
+ }
+
+ @Override
+ public boolean[] toBooleanArray() {
+ boolean[] values = new boolean[size];
+ MemorySegmentUtils.copyToUnsafe(segments, offset, values, BOOLEAN_ARRAY_OFFSET, size);
+ return values;
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ byte[] values = new byte[size];
+ MemorySegmentUtils.copyToUnsafe(segments, offset, values, BYTE_ARRAY_BASE_OFFSET, size);
+ return values;
+ }
+
+ @Override
+ public short[] toShortArray() {
+ short[] values = new short[size];
+ MemorySegmentUtils.copyToUnsafe(segments, offset, values, SHORT_ARRAY_OFFSET, size * 2);
+ return values;
+ }
+
+ @Override
+ public int[] toIntArray() {
+ int[] values = new int[size];
+ MemorySegmentUtils.copyToUnsafe(segments, offset, values, INT_ARRAY_OFFSET, size * 4);
+ return values;
+ }
+
+ @Override
+ public long[] toLongArray() {
+ long[] values = new long[size];
+ MemorySegmentUtils.copyToUnsafe(segments, offset, values, LONG_ARRAY_OFFSET, size * 8);
+ return values;
+ }
+
+ @Override
+ public float[] toFloatArray() {
+ float[] values = new float[size];
+ MemorySegmentUtils.copyToUnsafe(segments, offset, values, FLOAT_ARRAY_OFFSET, size * 4);
+ return values;
+ }
+
+ @Override
+ public double[] toDoubleArray() {
+ double[] values = new double[size];
+ MemorySegmentUtils.copyToUnsafe(segments, offset, values, DOUBLE_ARRAY_OFFSET, size * 8);
+ return values;
+ }
+
+ public BinaryVector copy() {
+ return copy(new BinaryVector(size));
+ }
+
+ public BinaryVector copy(BinaryVector reuse) {
+ byte[] bytes = MemorySegmentUtils.copyToBytes(segments, offset, sizeInBytes);
+ reuse.pointTo(MemorySegment.wrap(bytes), 0, sizeInBytes);
+ return reuse;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(MemorySegmentUtils.hashByWords(segments, offset, sizeInBytes), size);
+ }
+
+ // ------------------------------------------------------------------------------------------
+ // Construction Utilities
+ // ------------------------------------------------------------------------------------------
+
+ public static BinaryVector fromPrimitiveArray(boolean[] arr) {
+ return fromPrimitiveArray(arr, BOOLEAN_ARRAY_OFFSET, arr.length, 1);
+ }
+
+ public static BinaryVector fromPrimitiveArray(byte[] arr) {
+ return fromPrimitiveArray(arr, BYTE_ARRAY_BASE_OFFSET, arr.length, 1);
+ }
+
+ public static BinaryVector fromPrimitiveArray(short[] arr) {
+ return fromPrimitiveArray(arr, SHORT_ARRAY_OFFSET, arr.length, 2);
+ }
+
+ public static BinaryVector fromPrimitiveArray(int[] arr) {
+ return fromPrimitiveArray(arr, INT_ARRAY_OFFSET, arr.length, 4);
+ }
+
+ public static BinaryVector fromPrimitiveArray(long[] arr) {
+ return fromPrimitiveArray(arr, LONG_ARRAY_OFFSET, arr.length, 8);
+ }
+
+ public static BinaryVector fromPrimitiveArray(float[] arr) {
+ return fromPrimitiveArray(arr, FLOAT_ARRAY_OFFSET, arr.length, 4);
+ }
+
+ public static BinaryVector fromPrimitiveArray(double[] arr) {
+ return fromPrimitiveArray(arr, DOUBLE_ARRAY_OFFSET, arr.length, 8);
+ }
+
+ private static BinaryVector fromPrimitiveArray(
+ Object arr, int offset, int length, int elementSize) {
+ // must align by 8 bytes
+ final long valueRegionInBytes = (long) elementSize * length;
+ long totalSizeInLongs = (valueRegionInBytes + 7) / 8;
+ if (totalSizeInLongs > Integer.MAX_VALUE / 8) {
+ throw new UnsupportedOperationException(
+ "Cannot convert this vector to unsafe format as " + "it's too big.");
+ }
+ long totalSize = totalSizeInLongs * 8;
+
+ final byte[] data = new byte[(int) totalSize];
+
+ UNSAFE.copyMemory(arr, offset, data, BYTE_ARRAY_BASE_OFFSET, valueRegionInBytes);
+
+ BinaryVector result = new BinaryVector(length);
+ result.pointTo(MemorySegment.wrap(data), 0, (int) totalSize);
+ return result;
+ }
+
+ public static BinaryVector fromInternalArray(InternalArray array, DataType elementType) {
+ switch (elementType.getTypeRoot()) {
+ case BOOLEAN:
+ return BinaryVector.fromPrimitiveArray(array.toBooleanArray());
+ case TINYINT:
+ return BinaryVector.fromPrimitiveArray(array.toByteArray());
+ case SMALLINT:
+ return BinaryVector.fromPrimitiveArray(array.toShortArray());
+ case INTEGER:
+ return BinaryVector.fromPrimitiveArray(array.toIntArray());
+ case BIGINT:
+ return BinaryVector.fromPrimitiveArray(array.toLongArray());
+ case FLOAT:
+ return BinaryVector.fromPrimitiveArray(array.toFloatArray());
+ case DOUBLE:
+ return BinaryVector.fromPrimitiveArray(array.toDoubleArray());
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported element type for vector " + elementType);
+ }
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java
index 06b763a0eac1..2e0cd5701b71 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java
@@ -22,6 +22,7 @@
import org.apache.paimon.data.serializer.InternalMapSerializer;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.data.serializer.InternalVectorSerializer;
import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.DataType;
@@ -74,6 +75,8 @@ public interface BinaryWriter {
void writeArray(int pos, InternalArray value, InternalArraySerializer serializer);
+ void writeVector(int pos, InternalVector value, InternalVectorSerializer serializer);
+
void writeMap(int pos, InternalMap value, InternalMapSerializer serializer);
void writeRow(int pos, InternalRow value, InternalRowSerializer serializer);
@@ -133,6 +136,9 @@ static void write(
case ARRAY:
writer.writeArray(pos, (InternalArray) o, (InternalArraySerializer) serializer);
break;
+ case VECTOR:
+ writer.writeVector(pos, (InternalVector) o, (InternalVectorSerializer) serializer);
+ break;
case MAP:
case MULTISET:
writer.writeMap(pos, (InternalMap) o, (InternalMapSerializer) serializer);
@@ -210,6 +216,14 @@ static ValueSetter createValueSetter(DataType elementType, Serializer> seriali
pos,
(InternalArray) value,
(InternalArraySerializer) arraySerializer);
+ case VECTOR:
+ final Serializer> vectorSerializer =
+ serializer == null ? InternalSerializers.create(elementType) : serializer;
+ return (writer, pos, value) ->
+ writer.writeVector(
+ pos,
+ (InternalVector) value,
+ (InternalVectorSerializer) vectorSerializer);
case MULTISET:
case MAP:
final Serializer> mapSerializer =
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/DataGetters.java b/paimon-common/src/main/java/org/apache/paimon/data/DataGetters.java
index d60ca316f7f2..1043b7e3ba4f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/DataGetters.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/DataGetters.java
@@ -84,6 +84,8 @@ public interface DataGetters {
/** Returns the array value at the given position. */
InternalArray getArray(int pos);
+ InternalVector getVector(int pos);
+
/** Returns the map value at the given position. */
InternalMap getMap(int pos);
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/GenericArray.java b/paimon-common/src/main/java/org/apache/paimon/data/GenericArray.java
index 0d0898ea4edc..8c1ba4e28ac9 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/GenericArray.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/GenericArray.java
@@ -240,6 +240,11 @@ public InternalArray getArray(int pos) {
return (InternalArray) getObject(pos);
}
+ @Override
+ public InternalVector getVector(int pos) {
+ return (InternalVector) getObject(pos);
+ }
+
@Override
public InternalMap getMap(int pos) {
return (InternalMap) getObject(pos);
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/GenericRow.java b/paimon-common/src/main/java/org/apache/paimon/data/GenericRow.java
index 37be2386b44b..10aefbafdd07 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/GenericRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/GenericRow.java
@@ -202,6 +202,11 @@ public InternalArray getArray(int pos) {
return (InternalArray) this.fields[pos];
}
+ @Override
+ public InternalVector getVector(int pos) {
+ return (InternalVector) this.fields[pos];
+ }
+
@Override
public InternalMap getMap(int pos) {
return (InternalMap) this.fields[pos];
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java b/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java
index 9ec838cd273b..3bbb85f49963 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java
@@ -213,6 +213,9 @@ static FieldGetter createFieldGetter(DataType fieldType, int fieldPos) {
case ARRAY:
fieldGetter = row -> row.getArray(fieldPos);
break;
+ case VECTOR:
+ fieldGetter = row -> row.getVector(fieldPos);
+ break;
case MULTISET:
case MAP:
fieldGetter = row -> row.getMap(fieldPos);
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/InternalVector.java b/paimon-common/src/main/java/org/apache/paimon/data/InternalVector.java
new file mode 100644
index 000000000000..4b4cedeb0279
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/data/InternalVector.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import org.apache.paimon.annotation.Public;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.VectorType;
+
+/**
+ * Base interface of an internal data structure representing data of {@link VectorType}.
+ *
+ *
Note: All elements of this data structure must be internal data structures and must be of the
+ * same type. See {@link InternalRow} for more information about internal data structures.
+ *
+ * @see BinaryVector
+ * @since 2.0.0
+ */
+@Public
+public interface InternalVector extends InternalArray {
+
+ // ------------------------------------------------------------------------------------------
+ // Access Utilities
+ // ------------------------------------------------------------------------------------------
+
+ /**
+ * Creates an accessor for getting elements in an internal vector data structure at the given
+ * position.
+ *
+ * @param elementType the element type of the vector
+ */
+ static ElementGetter createElementGetter(DataType elementType) {
+ final ElementGetter elementGetter;
+ // ordered by type root definition
+ switch (elementType.getTypeRoot()) {
+ case BOOLEAN:
+ elementGetter = InternalArray::getBoolean;
+ break;
+ case TINYINT:
+ elementGetter = InternalArray::getByte;
+ break;
+ case SMALLINT:
+ elementGetter = InternalArray::getShort;
+ break;
+ case INTEGER:
+ elementGetter = InternalArray::getInt;
+ break;
+ case BIGINT:
+ elementGetter = InternalArray::getLong;
+ break;
+ case FLOAT:
+ elementGetter = InternalArray::getFloat;
+ break;
+ case DOUBLE:
+ elementGetter = InternalArray::getDouble;
+ break;
+ default:
+ String msg =
+ String.format(
+ "type %s not support in %s",
+ elementType.getTypeRoot().toString(),
+ InternalVector.class.getName());
+ throw new IllegalArgumentException(msg);
+ }
+ if (!elementType.isNullable()) {
+ return elementGetter;
+ }
+ return (vector, pos) -> {
+ if (vector.isNullAt(pos)) {
+ return null;
+ }
+ return elementGetter.getElementOrNull(vector, pos);
+ };
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java b/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java
index 62aa7358082b..fee5552f8c5c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java
@@ -252,6 +252,15 @@ public InternalArray getArray(int pos) {
}
}
+ @Override
+ public InternalVector getVector(int pos) {
+ if (pos < row1.getFieldCount()) {
+ return row1.getVector(pos);
+ } else {
+ return row2.getVector(pos - row1.getFieldCount());
+ }
+ }
+
@Override
public InternalMap getMap(int pos) {
if (pos < row1.getFieldCount()) {
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/LazyGenericRow.java b/paimon-common/src/main/java/org/apache/paimon/data/LazyGenericRow.java
index 680e3be0df4d..6d2e8b141f57 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/LazyGenericRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/LazyGenericRow.java
@@ -158,6 +158,11 @@ public InternalArray getArray(int pos) {
return (InternalArray) getField(pos);
}
+ @Override
+ public InternalVector getVector(int pos) {
+ return (InternalVector) getField(pos);
+ }
+
@Override
public InternalMap getMap(int pos) {
return (InternalMap) getField(pos);
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/NestedRow.java b/paimon-common/src/main/java/org/apache/paimon/data/NestedRow.java
index 708c2bc60ce6..afc4f0c47fb0 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/NestedRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/NestedRow.java
@@ -306,6 +306,11 @@ public InternalArray getArray(int pos) {
return MemorySegmentUtils.readArrayData(segments, offset, getLong(pos));
}
+ @Override
+ public InternalVector getVector(int pos) {
+ throw new IllegalArgumentException("Unsupported type: VectorType");
+ }
+
@Override
public InternalMap getMap(int pos) {
assertIndexIsValid(pos);
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
index e91fa26b4e56..28221cec0db1 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
@@ -26,6 +26,7 @@
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.GenericVariant;
import org.apache.paimon.data.variant.Variant;
@@ -143,6 +144,11 @@ public InternalArray getArray(int pos) {
return ((ArrayColumnVector) data).getArray(offset + pos);
}
+ @Override
+ public InternalVector getVector(int pos) {
+ return ((VecColumnVector) data).getVector(offset + pos);
+ }
+
@Override
public InternalMap getMap(int pos) {
return ((MapColumnVector) data).getMap(offset + pos);
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java
index 468c44188a69..d7423608af45 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java
@@ -26,6 +26,7 @@
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;
@@ -163,6 +164,11 @@ public InternalArray getArray(int pos) {
return vectorizedColumnBatch.getArray(rowId, pos);
}
+ @Override
+ public InternalVector getVector(int pos) {
+ return vectorizedColumnBatch.getVector(rowId, pos);
+ }
+
@Override
public InternalMap getMap(int pos) {
return vectorizedColumnBatch.getMap(rowId, pos);
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarVec.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarVec.java
new file mode 100644
index 000000000000..de07351dff76
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarVec.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.columnar;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.DataSetters;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.Variant;
+
+import java.io.Serializable;
+
+/** Columnar VectorType to support access to vector column data. */
+public final class ColumnarVec implements InternalVector, DataSetters, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ColumnVector data;
+ private final int offset;
+ private final int numElements;
+
+ public ColumnarVec(ColumnVector data, int offset, int numElements) {
+ this.data = data;
+ this.offset = offset;
+ this.numElements = numElements;
+ }
+
+ @Override
+ public int size() {
+ return numElements;
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ return data.isNullAt(offset + pos);
+ }
+
+ @Override
+ public void setNullAt(int pos) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ return ((BooleanColumnVector) data).getBoolean(offset + pos);
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ return ((ByteColumnVector) data).getByte(offset + pos);
+ }
+
+ @Override
+ public short getShort(int pos) {
+ return ((ShortColumnVector) data).getShort(offset + pos);
+ }
+
+ @Override
+ public int getInt(int pos) {
+ return ((IntColumnVector) data).getInt(offset + pos);
+ }
+
+ @Override
+ public long getLong(int pos) {
+ return ((LongColumnVector) data).getLong(offset + pos);
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ return ((FloatColumnVector) data).getFloat(offset + pos);
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ return ((DoubleColumnVector) data).getDouble(offset + pos);
+ }
+
+ @Override
+ public BinaryString getString(int pos) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public Decimal getDecimal(int pos, int precision, int scale) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public Timestamp getTimestamp(int pos, int precision) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public Variant getVariant(int pos) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public Blob getBlob(int pos) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public InternalArray getArray(int pos) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public InternalVector getVector(int pos) {
+ return ((VecColumnVector) data).getVector(offset + pos);
+ }
+
+ @Override
+ public InternalMap getMap(int pos) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public InternalRow getRow(int pos, int numFields) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setBoolean(int pos, boolean value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setByte(int pos, byte value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setShort(int pos, short value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setInt(int pos, int value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setLong(int pos, long value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setFloat(int pos, float value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setDouble(int pos, double value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setDecimal(int pos, Decimal value, int precision) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setTimestamp(int pos, Timestamp value, int precision) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public boolean[] toBooleanArray() {
+ boolean[] res = new boolean[numElements];
+ for (int i = 0; i < numElements; i++) {
+ res[i] = getBoolean(i);
+ }
+ return res;
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ byte[] res = new byte[numElements];
+ for (int i = 0; i < numElements; i++) {
+ res[i] = getByte(i);
+ }
+ return res;
+ }
+
+ @Override
+ public short[] toShortArray() {
+ short[] res = new short[numElements];
+ for (int i = 0; i < numElements; i++) {
+ res[i] = getShort(i);
+ }
+ return res;
+ }
+
+ @Override
+ public int[] toIntArray() {
+ int[] res = new int[numElements];
+ for (int i = 0; i < numElements; i++) {
+ res[i] = getInt(i);
+ }
+ return res;
+ }
+
+ @Override
+ public long[] toLongArray() {
+ long[] res = new long[numElements];
+ for (int i = 0; i < numElements; i++) {
+ res[i] = getLong(i);
+ }
+ return res;
+ }
+
+ @Override
+ public float[] toFloatArray() {
+ float[] res = new float[numElements];
+ for (int i = 0; i < numElements; i++) {
+ res[i] = getFloat(i);
+ }
+ return res;
+ }
+
+ @Override
+ public double[] toDoubleArray() {
+ double[] res = new double[numElements];
+ for (int i = 0; i < numElements; i++) {
+ res[i] = getDouble(i);
+ }
+ return res;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ throw new UnsupportedOperationException(
+ "ColumnarVector do not support equals, please compare fields one by one!");
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/RowToColumnConverter.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/RowToColumnConverter.java
index d2a378846c6c..de962ad86a39 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/RowToColumnConverter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/RowToColumnConverter.java
@@ -62,6 +62,7 @@
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.types.VariantType;
+import org.apache.paimon.types.VectorType;
import java.io.Serializable;
import java.util.List;
@@ -281,6 +282,11 @@ public TypeConverter visit(ArrayType arrayType) {
});
}
+ @Override
+ public TypeConverter visit(VectorType vectorType) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public TypeConverter visit(MultisetType multisetType) {
throw new UnsupportedOperationException();
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VecColumnVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VecColumnVector.java
new file mode 100644
index 000000000000..e6df065cbf68
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VecColumnVector.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.columnar;
+
+import org.apache.paimon.data.InternalVector;
+
+/** Column vector for VectorType. */
+public interface VecColumnVector extends ColumnVector {
+ InternalVector getVector(int i);
+
+ ColumnVector getColumnVector();
+
+ int getVectorSize();
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
index 18eecd29ed59..01c6037ca688 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
@@ -23,6 +23,7 @@
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.columnar.BytesColumnVector.Bytes;
import org.apache.paimon.data.variant.GenericVariant;
@@ -124,6 +125,10 @@ public InternalArray getArray(int rowId, int colId) {
return ((ArrayColumnVector) columns[colId]).getArray(rowId);
}
+ public InternalVector getVector(int rowId, int colId) {
+ return ((VecColumnVector) columns[colId]).getVector(rowId);
+ }
+
public InternalRow getRow(int rowId, int colId) {
return ((RowColumnVector) columns[colId]).getRow(rowId);
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java
index c2ac7291c211..78d717ee1b0b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java
@@ -26,6 +26,7 @@
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.memory.BytesUtils;
@@ -164,6 +165,11 @@ public InternalArray getArray(int pos) {
throw new UnsupportedOperationException();
}
+ @Override
+ public InternalVector getVector(int pos) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public InternalMap getMap(int pos) {
throw new UnsupportedOperationException();
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java
index bdc864dfcb4f..2c285c30a6b4 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java
@@ -26,6 +26,7 @@
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.memory.BytesUtils;
@@ -186,6 +187,11 @@ private static InternalRow readNestedRow(
return new SafeBinaryRow(numFields, bytes, offset + baseOffset);
}
+ @Override
+ public InternalVector getVector(int pos) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public InternalMap getMap(int pos) {
throw new UnsupportedOperationException();
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalSerializers.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalSerializers.java
index 38e9c4678e49..6669f347ff27 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalSerializers.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalSerializers.java
@@ -24,6 +24,7 @@
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.MultisetType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VectorType;
import static org.apache.paimon.types.DataTypeChecks.getFieldTypes;
import static org.apache.paimon.types.DataTypeChecks.getPrecision;
@@ -75,6 +76,10 @@ private static Serializer> createInternal(DataType type) {
return new TimestampSerializer(getPrecision(type));
case ARRAY:
return new InternalArraySerializer(((ArrayType) type).getElementType());
+ case VECTOR:
+ VectorType vectorType = (VectorType) type;
+ return new InternalVectorSerializer(
+ vectorType.getElementType(), vectorType.getLength());
case MULTISET:
return new InternalMapSerializer(
((MultisetType) type).getElementType(), new IntType(false));
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalVectorSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalVectorSerializer.java
new file mode 100644
index 000000000000..6ae5f23a9547
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalVectorSerializer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.serializer;
+
+import org.apache.paimon.data.BinaryVector;
+import org.apache.paimon.data.InternalVector;
+import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.io.DataOutputView;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.memory.MemorySegmentUtils;
+import org.apache.paimon.types.DataType;
+
+import java.io.IOException;
+
+/** Serializer for {@link InternalVector}. */
+public class InternalVectorSerializer implements Serializer {
+ private static final long serialVersionUID = 1L;
+
+ private final DataType eleType;
+ private final Serializer