Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions be/src/vec/runtime/vfile_format_transformer_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <vector>

#include "vec/runtime/vcsv_transformer.h"
#include "vec/runtime/vjni_format_transformer.h"
#include "vec/runtime/vorc_transformer.h"
#include "vec/runtime/vparquet_transformer.h"

Expand All @@ -32,6 +33,28 @@ Status create_tvf_format_transformer(const TTVFTableSink& tvf_sink, RuntimeState
io::FileWriter* file_writer,
const VExprContextSPtrs& output_vexpr_ctxs,
std::unique_ptr<VFileFormatTransformer>* result) {
// JNI writer path: delegate to Java-side writer
if (tvf_sink.__isset.writer_type && tvf_sink.writer_type == TTVFWriterType::JNI) {
if (!tvf_sink.__isset.writer_class) {
return Status::InternalError("writer_class is required when writer_type is JNI");
}
std::map<std::string, std::string> writer_params;
if (tvf_sink.__isset.properties) {
writer_params = tvf_sink.properties;
}
writer_params["file_path"] = tvf_sink.file_path;
if (tvf_sink.__isset.column_separator) {
writer_params["column_separator"] = tvf_sink.column_separator;
}
if (tvf_sink.__isset.line_delimiter) {
writer_params["line_delimiter"] = tvf_sink.line_delimiter;
}
result->reset(new VJniFormatTransformer(state, output_vexpr_ctxs, tvf_sink.writer_class,
std::move(writer_params)));
return Status::OK();
}

// Native writer path
TFileFormatType::type format = tvf_sink.file_format;
switch (format) {
case TFileFormatType::FORMAT_CSV_PLAIN: {
Expand Down
136 changes: 136 additions & 0 deletions be/src/vec/runtime/vjni_format_transformer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "vec/runtime/vjni_format_transformer.h"

#include "runtime/runtime_state.h"
#include "vec/exec/jni_connector.h"

namespace doris::vectorized {

VJniFormatTransformer::VJniFormatTransformer(RuntimeState* state,
const VExprContextSPtrs& output_vexpr_ctxs,
std::string writer_class,
std::map<std::string, std::string> writer_params)
: VFileFormatTransformer(state, output_vexpr_ctxs, false),
_writer_class(std::move(writer_class)),
_writer_params(std::move(writer_params)) {}

Status VJniFormatTransformer::_init_jni_writer(JNIEnv* env, int batch_size) {
// Load writer class via the same class loader as JniScanner
Jni::GlobalClass jni_writer_cls;
RETURN_IF_ERROR(Jni::Util::get_jni_scanner_class(env, _writer_class.c_str(), &jni_writer_cls));

// Get constructor: (int batchSize, Map<String,String> params)
Jni::MethodId writer_constructor;
RETURN_IF_ERROR(
jni_writer_cls.get_method(env, "<init>", "(ILjava/util/Map;)V", &writer_constructor));

// Convert C++ params map to Java HashMap
Jni::LocalObject hashmap_object;
RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, _writer_params, &hashmap_object));

// Create writer instance
RETURN_IF_ERROR(jni_writer_cls.new_object(env, writer_constructor)
.with_arg((jint)batch_size)
.with_arg(hashmap_object)
.call(&_jni_writer_obj));

// Resolve method IDs
RETURN_IF_ERROR(jni_writer_cls.get_method(env, "open", "()V", &_jni_writer_open));
RETURN_IF_ERROR(
jni_writer_cls.get_method(env, "write", "(Ljava/util/Map;)V", &_jni_writer_write));
RETURN_IF_ERROR(jni_writer_cls.get_method(env, "close", "()V", &_jni_writer_close));
RETURN_IF_ERROR(jni_writer_cls.get_method(env, "getStatistics", "()Ljava/util/Map;",
&_jni_writer_get_statistics));
return Status::OK();
}

Status VJniFormatTransformer::open() {
JNIEnv* env = nullptr;
RETURN_IF_ERROR(Jni::Env::Get(&env));

int batch_size = _state->batch_size();
RETURN_IF_ERROR(_init_jni_writer(env, batch_size));

RETURN_IF_ERROR(_jni_writer_obj.call_void_method(env, _jni_writer_open).call());
RETURN_ERROR_IF_EXC(env);

_opened = true;
return Status::OK();
}

Status VJniFormatTransformer::write(const Block& block) {
if (block.rows() == 0) {
return Status::OK();
}

JNIEnv* env = nullptr;
RETURN_IF_ERROR(Jni::Env::Get(&env));

// 1. Convert Block to Java table metadata (column addresses)
Block* mutable_block = const_cast<Block*>(&block);
std::unique_ptr<long[]> input_table;
RETURN_IF_ERROR(JniConnector::to_java_table(mutable_block, input_table));

// 2. Cache schema on first call
if (!_schema_cached) {
auto schema = JniConnector::parse_table_schema(mutable_block);
_cached_required_fields = schema.first;
_cached_columns_types = schema.second;
_schema_cached = true;
}

// 3. Build input params map for Java writer
std::map<std::string, std::string> input_params = {
{"meta_address", std::to_string((long)input_table.get())},
{"required_fields", _cached_required_fields},
{"columns_types", _cached_columns_types}};

// 4. Convert to Java Map and call writer.write(inputParams)
Jni::LocalObject input_map;
RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, input_params, &input_map));

RETURN_IF_ERROR(
_jni_writer_obj.call_void_method(env, _jni_writer_write).with_arg(input_map).call());
RETURN_ERROR_IF_EXC(env);

_cur_written_rows += block.rows();
return Status::OK();
}

Status VJniFormatTransformer::close() {
if (_closed || !_opened) {
return Status::OK();
}
_closed = true;

JNIEnv* env = nullptr;
RETURN_IF_ERROR(Jni::Env::Get(&env));

RETURN_IF_ERROR(_jni_writer_obj.call_void_method(env, _jni_writer_close).call());
RETURN_ERROR_IF_EXC(env);

return Status::OK();
}

int64_t VJniFormatTransformer::written_len() {
// JNI writer manages file size on Java side; return 0 to disable C++ auto-split.
return 0;
}

} // namespace doris::vectorized
74 changes: 74 additions & 0 deletions be/src/vec/runtime/vjni_format_transformer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <map>
#include <string>

#include "util/jni-util.h"
#include "vfile_format_transformer.h"

namespace doris::vectorized {
#include "common/compile_check_begin.h"

/**
* VJniFormatTransformer is a VFileFormatTransformer implementation that delegates
* write operations to a Java-side JniWriter via JNI. It sits alongside
* VCSVTransformer/VParquetTransformer/VOrcTransformer as a peer implementation.
*
* The Java writer class must extend org.apache.doris.common.jni.JniWriter and
* follow the same constructor signature as JniScanner: (int batchSize, Map<String,String> params).
*/
class VJniFormatTransformer final : public VFileFormatTransformer {
public:
VJniFormatTransformer(RuntimeState* state, const VExprContextSPtrs& output_vexpr_ctxs,
std::string writer_class,
std::map<std::string, std::string> writer_params);

~VJniFormatTransformer() override = default;

Status open() override;
Status write(const Block& block) override;
Status close() override;
int64_t written_len() override;

private:
Status _init_jni_writer(JNIEnv* env, int batch_size);

std::string _writer_class;
std::map<std::string, std::string> _writer_params;

// JNI handles (same pattern as JniConnector)
Jni::GlobalObject _jni_writer_obj;
Jni::MethodId _jni_writer_open;
Jni::MethodId _jni_writer_write;
Jni::MethodId _jni_writer_close;
Jni::MethodId _jni_writer_get_statistics;

// Schema cache (computed on first write, reused afterwards)
bool _schema_cached = false;
std::string _cached_required_fields;
std::string _cached_columns_types;

bool _opened = false;
bool _closed = false;
};

} // namespace doris::vectorized

#include "common/compile_check_end.h"
27 changes: 17 additions & 10 deletions be/src/vec/sink/writer/vtvf_table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,28 @@ Status VTVFTableWriter::close(Status status) {
}

Status VTVFTableWriter::_create_file_writer(const std::string& file_name) {
TFileType::type file_type = _tvf_sink.file_type;
std::map<std::string, std::string> properties;
if (_tvf_sink.__isset.properties) {
properties = _tvf_sink.properties;
bool use_jni = _tvf_sink.__isset.writer_type && _tvf_sink.writer_type == TTVFWriterType::JNI;

if (!use_jni) {
// Native path: create file writer via FileFactory
TFileType::type file_type = _tvf_sink.file_type;
std::map<std::string, std::string> properties;
if (_tvf_sink.__isset.properties) {
properties = _tvf_sink.properties;
}

_file_writer_impl = DORIS_TRY(FileFactory::create_file_writer(
file_type, _state->exec_env(), {}, properties, file_name,
{.write_file_cache = false, .sync_file_data = false}));
}

_file_writer_impl = DORIS_TRY(FileFactory::create_file_writer(
file_type, _state->exec_env(), {}, properties, file_name,
{.write_file_cache = false, .sync_file_data = false}));

RETURN_IF_ERROR(create_tvf_format_transformer(_tvf_sink, _state, _file_writer_impl.get(),
// Factory creates either JNI or native transformer
RETURN_IF_ERROR(create_tvf_format_transformer(_tvf_sink, _state,
use_jni ? nullptr : _file_writer_impl.get(),
_vec_output_expr_ctxs, &_vfile_writer));

VLOG_DEBUG << "TVF table writer created file: " << file_name
<< ", format=" << _tvf_sink.file_format
<< ", format=" << _tvf_sink.file_format << ", use_jni=" << use_jni
<< ", query_id=" << print_id(_state->query_id());

return _vfile_writer->open();
Expand Down
4 changes: 3 additions & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then
# modules+=("be-java-extensions/lakesoul-scanner")
modules+=("be-java-extensions/preload-extensions")
modules+=("be-java-extensions/${HADOOP_DEPS_NAME}")
modules+=("be-java-extensions/java-writer")

# If the BE_EXTENSION_IGNORE variable is not empty, remove the modules that need to be ignored from FE_MODULES
if [[ -n "${BE_EXTENSION_IGNORE}" ]]; then
Expand Down Expand Up @@ -920,6 +921,7 @@ EOF
extensions_modules+=("preload-extensions")
extensions_modules+=("iceberg-metadata-scanner")
extensions_modules+=("${HADOOP_DEPS_NAME}")
extensions_modules+=("java-writer")

if [[ -n "${BE_EXTENSION_IGNORE}" ]]; then
IFS=',' read -r -a ignore_modules <<<"${BE_EXTENSION_IGNORE}"
Expand Down Expand Up @@ -1048,4 +1050,4 @@ if [[ -n "${DORIS_POST_BUILD_HOOK}" ]]; then
eval "${DORIS_POST_BUILD_HOOK}"
fi

exit 0
exit 0
Loading
Loading