From 1229994bb0af03d59ec523cb4ea040f6a9ace34b Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 13 Feb 2026 15:44:46 +0800 Subject: [PATCH 1/2] support jni writer --- .../vfile_format_transformer_factory.cpp | 23 +++ .../vec/runtime/vjni_format_transformer.cpp | 137 ++++++++++++++++++ be/src/vec/runtime/vjni_format_transformer.h | 74 ++++++++++ be/src/vec/sink/writer/vtvf_table_writer.cpp | 29 ++-- build.sh | 4 +- .../apache/doris/common/jni/JniWriter.java | 99 +++++++++++++ fe/be-java-extensions/java-writer/pom.xml | 75 ++++++++++ .../doris/writer/LocalFileJniWriter.java | 117 +++++++++++++++ .../src/main/resources/package.xml | 41 ++++++ fe/be-java-extensions/pom.xml | 1 + .../apache/doris/planner/TVFTableSink.java | 10 ++ gensrc/thrift/DataSinks.thrift | 7 + 12 files changed, 606 insertions(+), 11 deletions(-) create mode 100644 be/src/vec/runtime/vjni_format_transformer.cpp create mode 100644 be/src/vec/runtime/vjni_format_transformer.h create mode 100644 fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniWriter.java create mode 100644 fe/be-java-extensions/java-writer/pom.xml create mode 100644 fe/be-java-extensions/java-writer/src/main/java/org/apache/doris/writer/LocalFileJniWriter.java create mode 100644 fe/be-java-extensions/java-writer/src/main/resources/package.xml diff --git a/be/src/vec/runtime/vfile_format_transformer_factory.cpp b/be/src/vec/runtime/vfile_format_transformer_factory.cpp index bbd415e2452e28..69751cc637f55c 100644 --- a/be/src/vec/runtime/vfile_format_transformer_factory.cpp +++ b/be/src/vec/runtime/vfile_format_transformer_factory.cpp @@ -23,6 +23,7 @@ #include #include "vec/runtime/vcsv_transformer.h" +#include "vec/runtime/vjni_format_transformer.h" #include "vec/runtime/vorc_transformer.h" #include "vec/runtime/vparquet_transformer.h" @@ -32,6 +33,28 @@ Status create_tvf_format_transformer(const TTVFTableSink& tvf_sink, RuntimeState io::FileWriter* file_writer, const VExprContextSPtrs& output_vexpr_ctxs, std::unique_ptr* result) { + // JNI writer path: delegate to Java-side writer + if (tvf_sink.__isset.writer_type && tvf_sink.writer_type == TTVFWriterType::JNI) { + if (!tvf_sink.__isset.writer_class) { + return Status::InternalError("writer_class is required when writer_type is JNI"); + } + std::map writer_params; + if (tvf_sink.__isset.properties) { + writer_params = tvf_sink.properties; + } + writer_params["file_path"] = tvf_sink.file_path; + if (tvf_sink.__isset.column_separator) { + writer_params["column_separator"] = tvf_sink.column_separator; + } + if (tvf_sink.__isset.line_delimiter) { + writer_params["line_delimiter"] = tvf_sink.line_delimiter; + } + result->reset(new VJniFormatTransformer(state, output_vexpr_ctxs, tvf_sink.writer_class, + std::move(writer_params))); + return Status::OK(); + } + + // Native writer path TFileFormatType::type format = tvf_sink.file_format; switch (format) { case TFileFormatType::FORMAT_CSV_PLAIN: { diff --git a/be/src/vec/runtime/vjni_format_transformer.cpp b/be/src/vec/runtime/vjni_format_transformer.cpp new file mode 100644 index 00000000000000..f309a2861ccc23 --- /dev/null +++ b/be/src/vec/runtime/vjni_format_transformer.cpp @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/runtime/vjni_format_transformer.h" + +#include "runtime/runtime_state.h" +#include "vec/exec/jni_connector.h" + +namespace doris::vectorized { + +VJniFormatTransformer::VJniFormatTransformer(RuntimeState* state, + const VExprContextSPtrs& output_vexpr_ctxs, + std::string writer_class, + std::map writer_params) + : VFileFormatTransformer(state, output_vexpr_ctxs, false), + _writer_class(std::move(writer_class)), + _writer_params(std::move(writer_params)) {} + +Status VJniFormatTransformer::_init_jni_writer(JNIEnv* env, int batch_size) { + // Load writer class via the same class loader as JniScanner + Jni::GlobalClass jni_writer_cls; + RETURN_IF_ERROR( + Jni::Util::get_jni_scanner_class(env, _writer_class.c_str(), &jni_writer_cls)); + + // Get constructor: (int batchSize, Map params) + Jni::MethodId writer_constructor; + RETURN_IF_ERROR( + jni_writer_cls.get_method(env, "", "(ILjava/util/Map;)V", &writer_constructor)); + + // Convert C++ params map to Java HashMap + Jni::LocalObject hashmap_object; + RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, _writer_params, &hashmap_object)); + + // Create writer instance + RETURN_IF_ERROR(jni_writer_cls.new_object(env, writer_constructor) + .with_arg((jint)batch_size) + .with_arg(hashmap_object) + .call(&_jni_writer_obj)); + + // Resolve method IDs + RETURN_IF_ERROR(jni_writer_cls.get_method(env, "open", "()V", &_jni_writer_open)); + RETURN_IF_ERROR( + jni_writer_cls.get_method(env, "write", "(Ljava/util/Map;)V", &_jni_writer_write)); + RETURN_IF_ERROR(jni_writer_cls.get_method(env, "close", "()V", &_jni_writer_close)); + RETURN_IF_ERROR(jni_writer_cls.get_method(env, "getStatistics", "()Ljava/util/Map;", + &_jni_writer_get_statistics)); + return Status::OK(); +} + +Status VJniFormatTransformer::open() { + JNIEnv* env = nullptr; + RETURN_IF_ERROR(Jni::Env::Get(&env)); + + int batch_size = _state->batch_size(); + RETURN_IF_ERROR(_init_jni_writer(env, batch_size)); + + RETURN_IF_ERROR(_jni_writer_obj.call_void_method(env, _jni_writer_open).call()); + RETURN_ERROR_IF_EXC(env); + + _opened = true; + return Status::OK(); +} + +Status VJniFormatTransformer::write(const Block& block) { + if (block.rows() == 0) { + return Status::OK(); + } + + JNIEnv* env = nullptr; + RETURN_IF_ERROR(Jni::Env::Get(&env)); + + // 1. Convert Block to Java table metadata (column addresses) + Block* mutable_block = const_cast(&block); + std::unique_ptr input_table; + RETURN_IF_ERROR(JniConnector::to_java_table(mutable_block, input_table)); + + // 2. Cache schema on first call + if (!_schema_cached) { + auto schema = JniConnector::parse_table_schema(mutable_block); + _cached_required_fields = schema.first; + _cached_columns_types = schema.second; + _schema_cached = true; + } + + // 3. Build input params map for Java writer + std::map input_params = { + {"meta_address", std::to_string((long)input_table.get())}, + {"required_fields", _cached_required_fields}, + {"columns_types", _cached_columns_types}}; + + // 4. Convert to Java Map and call writer.write(inputParams) + Jni::LocalObject input_map; + RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, input_params, &input_map)); + + RETURN_IF_ERROR( + _jni_writer_obj.call_void_method(env, _jni_writer_write).with_arg(input_map).call()); + RETURN_ERROR_IF_EXC(env); + + _cur_written_rows += block.rows(); + return Status::OK(); +} + +Status VJniFormatTransformer::close() { + if (_closed || !_opened) { + return Status::OK(); + } + _closed = true; + + JNIEnv* env = nullptr; + RETURN_IF_ERROR(Jni::Env::Get(&env)); + + RETURN_IF_ERROR(_jni_writer_obj.call_void_method(env, _jni_writer_close).call()); + RETURN_ERROR_IF_EXC(env); + + return Status::OK(); +} + +int64_t VJniFormatTransformer::written_len() { + // JNI writer manages file size on Java side; return 0 to disable C++ auto-split. + return 0; +} + +} // namespace doris::vectorized diff --git a/be/src/vec/runtime/vjni_format_transformer.h b/be/src/vec/runtime/vjni_format_transformer.h new file mode 100644 index 00000000000000..aa63d65e516614 --- /dev/null +++ b/be/src/vec/runtime/vjni_format_transformer.h @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "util/jni-util.h" +#include "vfile_format_transformer.h" + +namespace doris::vectorized { +#include "common/compile_check_begin.h" + +/** + * VJniFormatTransformer is a VFileFormatTransformer implementation that delegates + * write operations to a Java-side JniWriter via JNI. It sits alongside + * VCSVTransformer/VParquetTransformer/VOrcTransformer as a peer implementation. + * + * The Java writer class must extend org.apache.doris.common.jni.JniWriter and + * follow the same constructor signature as JniScanner: (int batchSize, Map params). + */ +class VJniFormatTransformer final : public VFileFormatTransformer { +public: + VJniFormatTransformer(RuntimeState* state, const VExprContextSPtrs& output_vexpr_ctxs, + std::string writer_class, + std::map writer_params); + + ~VJniFormatTransformer() override = default; + + Status open() override; + Status write(const Block& block) override; + Status close() override; + int64_t written_len() override; + +private: + Status _init_jni_writer(JNIEnv* env, int batch_size); + + std::string _writer_class; + std::map _writer_params; + + // JNI handles (same pattern as JniConnector) + Jni::GlobalObject _jni_writer_obj; + Jni::MethodId _jni_writer_open; + Jni::MethodId _jni_writer_write; + Jni::MethodId _jni_writer_close; + Jni::MethodId _jni_writer_get_statistics; + + // Schema cache (computed on first write, reused afterwards) + bool _schema_cached = false; + std::string _cached_required_fields; + std::string _cached_columns_types; + + bool _opened = false; + bool _closed = false; +}; + +} // namespace doris::vectorized + +#include "common/compile_check_end.h" diff --git a/be/src/vec/sink/writer/vtvf_table_writer.cpp b/be/src/vec/sink/writer/vtvf_table_writer.cpp index 7d82e8ef471d2e..df5ad05f5c82b9 100644 --- a/be/src/vec/sink/writer/vtvf_table_writer.cpp +++ b/be/src/vec/sink/writer/vtvf_table_writer.cpp @@ -87,21 +87,30 @@ Status VTVFTableWriter::close(Status status) { } Status VTVFTableWriter::_create_file_writer(const std::string& file_name) { - TFileType::type file_type = _tvf_sink.file_type; - std::map properties; - if (_tvf_sink.__isset.properties) { - properties = _tvf_sink.properties; + bool use_jni = _tvf_sink.__isset.writer_type && + _tvf_sink.writer_type == TTVFWriterType::JNI; + + if (!use_jni) { + // Native path: create file writer via FileFactory + TFileType::type file_type = _tvf_sink.file_type; + std::map properties; + if (_tvf_sink.__isset.properties) { + properties = _tvf_sink.properties; + } + + _file_writer_impl = DORIS_TRY(FileFactory::create_file_writer( + file_type, _state->exec_env(), {}, properties, file_name, + {.write_file_cache = false, .sync_file_data = false})); } - _file_writer_impl = DORIS_TRY(FileFactory::create_file_writer( - file_type, _state->exec_env(), {}, properties, file_name, - {.write_file_cache = false, .sync_file_data = false})); - - RETURN_IF_ERROR(create_tvf_format_transformer(_tvf_sink, _state, _file_writer_impl.get(), - _vec_output_expr_ctxs, &_vfile_writer)); + // Factory creates either JNI or native transformer + RETURN_IF_ERROR(create_tvf_format_transformer( + _tvf_sink, _state, use_jni ? nullptr : _file_writer_impl.get(), + _vec_output_expr_ctxs, &_vfile_writer)); VLOG_DEBUG << "TVF table writer created file: " << file_name << ", format=" << _tvf_sink.file_format + << ", use_jni=" << use_jni << ", query_id=" << print_id(_state->query_id()); return _vfile_writer->open(); diff --git a/build.sh b/build.sh index 580f5d3a047db4..2bb49e2673f300 100755 --- a/build.sh +++ b/build.sh @@ -584,6 +584,7 @@ if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then # modules+=("be-java-extensions/lakesoul-scanner") modules+=("be-java-extensions/preload-extensions") modules+=("be-java-extensions/${HADOOP_DEPS_NAME}") + modules+=("be-java-extensions/java-writer") # If the BE_EXTENSION_IGNORE variable is not empty, remove the modules that need to be ignored from FE_MODULES if [[ -n "${BE_EXTENSION_IGNORE}" ]]; then @@ -920,6 +921,7 @@ EOF extensions_modules+=("preload-extensions") extensions_modules+=("iceberg-metadata-scanner") extensions_modules+=("${HADOOP_DEPS_NAME}") + extensions_modules+=("java-writer") if [[ -n "${BE_EXTENSION_IGNORE}" ]]; then IFS=',' read -r -a ignore_modules <<<"${BE_EXTENSION_IGNORE}" @@ -1048,4 +1050,4 @@ if [[ -n "${DORIS_POST_BUILD_HOOK}" ]]; then eval "${DORIS_POST_BUILD_HOOK}" fi -exit 0 \ No newline at end of file +exit 0 diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniWriter.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniWriter.java new file mode 100644 index 00000000000000..2dfefd2ac79cf7 --- /dev/null +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniWriter.java @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.jni; + +import org.apache.doris.common.jni.vec.ColumnType; +import org.apache.doris.common.jni.vec.VectorTable; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +/** + * JniWriter is the base class for JNI-based writers, symmetric to JniScanner. + * Constructor signature: (int batchSize, Map params) matches JniScanner + * to reuse the same class loading mechanism (Jni::Util::get_jni_scanner_class). + * + * Lifecycle: open() -> write() [repeated] -> close() + */ +public abstract class JniWriter { + protected int batchSize; + protected Map params; + protected ColumnType[] columnTypes; + protected String[] fields; + protected long writeTime = 0; + protected long readTableTime = 0; + + public JniWriter(int batchSize, Map params) { + this.batchSize = batchSize; + this.params = params; + } + + public abstract void open() throws IOException; + + /** + * JNI entry point: receives C++ Block metadata, creates a ReadableTable, + * then delegates to writeInternal. + */ + public void write(Map inputParams) throws IOException { + // Parse and cache schema on first call + if (columnTypes == null) { + String requiredFields = inputParams.get("required_fields"); + String columnsTypes = inputParams.get("columns_types"); + if (requiredFields != null && !requiredFields.isEmpty()) { + fields = requiredFields.split(","); + String[] typeStrs = columnsTypes.split("#"); + columnTypes = new ColumnType[typeStrs.length]; + for (int i = 0; i < typeStrs.length; i++) { + columnTypes[i] = ColumnType.parseType(fields[i], typeStrs[i]); + } + } else { + fields = new String[0]; + columnTypes = new ColumnType[0]; + } + } + + long startRead = System.nanoTime(); + VectorTable inputTable = VectorTable.createReadableTable(inputParams); + readTableTime += System.nanoTime() - startRead; + + long startWrite = System.nanoTime(); + writeInternal(inputTable); + writeTime += System.nanoTime() - startWrite; + } + + protected abstract void writeInternal(VectorTable inputTable) throws IOException; + + public abstract void close() throws IOException; + + /** + * Performance metrics. Key format: "metricType:metricName" + * Supported types: timer, counter, bytes (same as JniScanner). + */ + public Map getStatistics() { + return Collections.emptyMap(); + } + + public long getWriteTime() { + return writeTime; + } + + public long getReadTableTime() { + return readTableTime; + } +} diff --git a/fe/be-java-extensions/java-writer/pom.xml b/fe/be-java-extensions/java-writer/pom.xml new file mode 100644 index 00000000000000..e0bbaf20389cfc --- /dev/null +++ b/fe/be-java-extensions/java-writer/pom.xml @@ -0,0 +1,75 @@ + + + + + be-java-extensions + org.apache.doris + ${revision} + + 4.0.0 + + java-writer + + + 8 + 8 + + + + + org.apache.doris + java-common + ${project.version} + provided + + + + + java-writer + ${project.basedir}/target/ + + + org.apache.maven.plugins + maven-assembly-plugin + + + src/main/resources/package.xml + + + + + + + + + + make-assembly + package + + single + + + + + + + diff --git a/fe/be-java-extensions/java-writer/src/main/java/org/apache/doris/writer/LocalFileJniWriter.java b/fe/be-java-extensions/java-writer/src/main/java/org/apache/doris/writer/LocalFileJniWriter.java new file mode 100644 index 00000000000000..0b0bab8aa5f697 --- /dev/null +++ b/fe/be-java-extensions/java-writer/src/main/java/org/apache/doris/writer/LocalFileJniWriter.java @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.writer; + +import org.apache.doris.common.jni.JniWriter; +import org.apache.doris.common.jni.vec.VectorTable; + +import org.apache.log4j.Logger; + +import java.io.BufferedWriter; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Map; + +/** + * LocalFileJniWriter writes C++ Block data to local CSV files via JNI. + * Loaded by C++ as: org/apache/doris/writer/LocalFileJniWriter + */ +public class LocalFileJniWriter extends JniWriter { + private static final Logger LOG = Logger.getLogger(LocalFileJniWriter.class); + + private String filePath; + private String columnSeparator; + private String lineDelimiter; + private BufferedWriter fileWriter; + private long writtenRows = 0; + private long writtenBytes = 0; + + public LocalFileJniWriter(int batchSize, Map params) { + super(batchSize, params); + this.filePath = params.get("file_path"); + this.columnSeparator = params.getOrDefault("column_separator", ","); + this.lineDelimiter = params.getOrDefault("line_delimiter", "\n"); + LOG.info("LocalFileJniWriter created: filePath=" + filePath + + ", columnSeparator=" + columnSeparator + + ", batchSize=" + batchSize); + } + + @Override + public void open() throws IOException { + LOG.info("LocalFileJniWriter opening file: " + filePath); + fileWriter = new BufferedWriter(new FileWriter(filePath)); + LOG.info("LocalFileJniWriter opened file successfully: " + filePath); + } + + @Override + protected void writeInternal(VectorTable inputTable) throws IOException { + int numRows = inputTable.getNumRows(); + int numCols = inputTable.getNumColumns(); + LOG.info("LocalFileJniWriter writeInternal: numRows=" + numRows + ", numCols=" + numCols); + if (numRows == 0) { + return; + } + + Object[][] data = inputTable.getMaterializedData(); + StringBuilder sb = new StringBuilder(); + + for (int row = 0; row < numRows; row++) { + for (int col = 0; col < numCols; col++) { + if (col > 0) { + sb.append(columnSeparator); + } + Object val = data[col][row]; + if (val != null) { + sb.append(val.toString()); + } else { + sb.append("\\N"); + } + } + sb.append(lineDelimiter); + } + + String output = sb.toString(); + fileWriter.write(output); + writtenRows += numRows; + writtenBytes += output.getBytes().length; + LOG.info("LocalFileJniWriter wrote " + numRows + " rows, totalWrittenRows=" + writtenRows + + ", totalWrittenBytes=" + writtenBytes); + } + + @Override + public void close() throws IOException { + LOG.info("LocalFileJniWriter closing: filePath=" + filePath + + ", totalWrittenRows=" + writtenRows + ", totalWrittenBytes=" + writtenBytes); + if (fileWriter != null) { + fileWriter.flush(); + fileWriter.close(); + fileWriter = null; + } + LOG.info("LocalFileJniWriter closed successfully: " + filePath); + } + + @Override + public Map getStatistics() { + Map stats = new java.util.HashMap<>(); + stats.put("counter:WrittenRows", String.valueOf(writtenRows)); + stats.put("bytes:WrittenBytes", String.valueOf(writtenBytes)); + stats.put("timer:WriteTime", String.valueOf(writeTime)); + stats.put("timer:ReadTableTime", String.valueOf(readTableTime)); + return stats; + } +} diff --git a/fe/be-java-extensions/java-writer/src/main/resources/package.xml b/fe/be-java-extensions/java-writer/src/main/resources/package.xml new file mode 100644 index 00000000000000..4bbb2610603363 --- /dev/null +++ b/fe/be-java-extensions/java-writer/src/main/resources/package.xml @@ -0,0 +1,41 @@ + + + + jar-with-dependencies + + jar + + false + + + / + true + true + runtime + + + **/Log4j2Plugins.dat + + + + + diff --git a/fe/be-java-extensions/pom.xml b/fe/be-java-extensions/pom.xml index 8151cd179d167a..eab071c9c5ac6d 100644 --- a/fe/be-java-extensions/pom.xml +++ b/fe/be-java-extensions/pom.xml @@ -34,6 +34,7 @@ under the License. preload-extensions trino-connector-scanner hadoop-deps + java-writer diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java index 1d6d44d52aef61..c511336767c3df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java @@ -31,6 +31,7 @@ import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TTVFTableSink; +import org.apache.doris.thrift.TTVFWriterType; import com.google.common.collect.Maps; @@ -152,6 +153,15 @@ public void bindDataSink() throws AnalysisException { tSink.setHadoopConfig(backendConnectProps); } + // Set writer_type: JNI if writer_class is specified, otherwise NATIVE + String writerClass = properties.get("writer_class"); + if (writerClass != null) { + tSink.setWriterType(TTVFWriterType.JNI); + tSink.setWriterClass(writerClass); + } else { + tSink.setWriterType(TTVFWriterType.NATIVE); + } + tDataSink = new TDataSink(TDataSinkType.TVF_TABLE_SINK); tDataSink.setTvfTableSink(tSink); } diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index c5e176ee155d33..cb817b7dbe8803 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -453,6 +453,11 @@ struct TDictionarySink { struct TBlackholeSink { } +enum TTVFWriterType { + NATIVE = 0, + JNI = 1 +} + struct TTVFTableSink { 1: optional string tvf_name // "local", "s3", "hdfs" 2: optional string file_path @@ -467,6 +472,8 @@ struct TTVFTableSink { 11: optional map hadoop_config 12: optional PlanNodes.TFileCompressType compression_type 13: optional i64 backend_id // local TVF: specify BE + 14: optional TTVFWriterType writer_type // NATIVE or JNI + 15: optional string writer_class // Java class name (required when writer_type=JNI) } struct TDataSink { From b1bdfe4761251aa7e66ff5f414d183bcc03987f1 Mon Sep 17 00:00:00 2001 From: morningman Date: Sat, 14 Feb 2026 10:20:44 +0800 Subject: [PATCH 2/2] 2 --- be/src/vec/runtime/vjni_format_transformer.cpp | 3 +-- be/src/vec/sink/writer/vtvf_table_writer.cpp | 12 +++++------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/be/src/vec/runtime/vjni_format_transformer.cpp b/be/src/vec/runtime/vjni_format_transformer.cpp index f309a2861ccc23..ef7ed6048e3f61 100644 --- a/be/src/vec/runtime/vjni_format_transformer.cpp +++ b/be/src/vec/runtime/vjni_format_transformer.cpp @@ -33,8 +33,7 @@ VJniFormatTransformer::VJniFormatTransformer(RuntimeState* state, Status VJniFormatTransformer::_init_jni_writer(JNIEnv* env, int batch_size) { // Load writer class via the same class loader as JniScanner Jni::GlobalClass jni_writer_cls; - RETURN_IF_ERROR( - Jni::Util::get_jni_scanner_class(env, _writer_class.c_str(), &jni_writer_cls)); + RETURN_IF_ERROR(Jni::Util::get_jni_scanner_class(env, _writer_class.c_str(), &jni_writer_cls)); // Get constructor: (int batchSize, Map params) Jni::MethodId writer_constructor; diff --git a/be/src/vec/sink/writer/vtvf_table_writer.cpp b/be/src/vec/sink/writer/vtvf_table_writer.cpp index df5ad05f5c82b9..63dc3850c97623 100644 --- a/be/src/vec/sink/writer/vtvf_table_writer.cpp +++ b/be/src/vec/sink/writer/vtvf_table_writer.cpp @@ -87,8 +87,7 @@ Status VTVFTableWriter::close(Status status) { } Status VTVFTableWriter::_create_file_writer(const std::string& file_name) { - bool use_jni = _tvf_sink.__isset.writer_type && - _tvf_sink.writer_type == TTVFWriterType::JNI; + bool use_jni = _tvf_sink.__isset.writer_type && _tvf_sink.writer_type == TTVFWriterType::JNI; if (!use_jni) { // Native path: create file writer via FileFactory @@ -104,13 +103,12 @@ Status VTVFTableWriter::_create_file_writer(const std::string& file_name) { } // Factory creates either JNI or native transformer - RETURN_IF_ERROR(create_tvf_format_transformer( - _tvf_sink, _state, use_jni ? nullptr : _file_writer_impl.get(), - _vec_output_expr_ctxs, &_vfile_writer)); + RETURN_IF_ERROR(create_tvf_format_transformer(_tvf_sink, _state, + use_jni ? nullptr : _file_writer_impl.get(), + _vec_output_expr_ctxs, &_vfile_writer)); VLOG_DEBUG << "TVF table writer created file: " << file_name - << ", format=" << _tvf_sink.file_format - << ", use_jni=" << use_jni + << ", format=" << _tvf_sink.file_format << ", use_jni=" << use_jni << ", query_id=" << print_id(_state->query_id()); return _vfile_writer->open();