Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cloud/src/common/bvars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_commit_txn_eventually("ms", "commit_txn_eve
BvarLatencyRecorderWithTag g_bvar_ms_abort_txn("ms", "abort_txn");
BvarLatencyRecorderWithTag g_bvar_ms_get_txn("ms", "get_txn");
BvarLatencyRecorderWithTag g_bvar_ms_get_current_max_txn_id("ms", "get_current_max_txn_id");
BvarLatencyRecorderWithTag g_bvar_ms_create_meta_sync_point("ms", "create_meta_sync_point");
BvarLatencyRecorderWithTag g_bvar_ms_begin_sub_txn("ms", "begin_sub_txn");
BvarLatencyRecorderWithTag g_bvar_ms_abort_sub_txn("ms", "abort_sub_txn");
BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict("ms", "check_txn_conflict");
Expand Down Expand Up @@ -467,6 +468,8 @@ mBvarInt64Adder g_bvar_rpc_kv_abort_txn_del_counter("rpc_kv_abort_txn_del_counte
mBvarInt64Adder g_bvar_rpc_kv_get_txn_get_counter("rpc_kv_get_txn_get_counter",{"instance_id"});
// get_current_max_txn_id
mBvarInt64Adder g_bvar_rpc_kv_get_current_max_txn_id_get_counter("rpc_kv_get_current_max_txn_id_get_counter",{"instance_id"});
// create_meta_sync_point
mBvarInt64Adder g_bvar_rpc_kv_create_meta_sync_point_del_counter("rpc_kv_create_meta_sync_point_del_counter",{"instance_id"});
// begin_sub_txn
mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_get_counter("rpc_kv_begin_sub_txn_get_counter",{"instance_id"});
mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_put_counter("rpc_kv_begin_sub_txn_put_counter",{"instance_id"});
Expand Down Expand Up @@ -669,6 +672,8 @@ mBvarInt64Adder g_bvar_rpc_kv_abort_txn_del_bytes("rpc_kv_abort_txn_del_bytes",{
mBvarInt64Adder g_bvar_rpc_kv_get_txn_get_bytes("rpc_kv_get_txn_get_bytes",{"instance_id"});
// get_current_max_txn_id
mBvarInt64Adder g_bvar_rpc_kv_get_current_max_txn_id_get_bytes("rpc_kv_get_current_max_txn_id_get_bytes",{"instance_id"});
// create_meta_sync_point
mBvarInt64Adder g_bvar_rpc_kv_create_meta_sync_point_del_bytes("rpc_kv_create_meta_sync_point_del_bytes",{"instance_id"});
// begin_sub_txn
mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_get_bytes("rpc_kv_begin_sub_txn_get_bytes",{"instance_id"});
mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_put_bytes("rpc_kv_begin_sub_txn_put_bytes",{"instance_id"});
Expand Down
3 changes: 3 additions & 0 deletions cloud/src/common/bvars.h
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_commit_txn_eventually;
extern BvarLatencyRecorderWithTag g_bvar_ms_abort_txn;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_current_max_txn_id;
extern BvarLatencyRecorderWithTag g_bvar_ms_create_meta_sync_point;
extern BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict;
extern BvarLatencyRecorderWithTag g_bvar_ms_abort_txn_with_coordinator;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_prepare_txn_by_coordinator;
Expand Down Expand Up @@ -887,6 +888,7 @@ extern mBvarInt64Adder g_bvar_rpc_kv_abort_txn_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_abort_txn_del_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_get_txn_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_get_current_max_txn_id_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_create_meta_sync_point_del_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_del_counter;
Expand Down Expand Up @@ -1026,6 +1028,7 @@ extern mBvarInt64Adder g_bvar_rpc_kv_abort_txn_put_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_abort_txn_del_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_get_txn_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_get_current_max_txn_id_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_create_meta_sync_point_del_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_put_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_del_bytes;
Expand Down
11 changes: 11 additions & 0 deletions cloud/src/meta-service/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ class MetaServiceImpl : public cloud::MetaService {
const GetCurrentMaxTxnRequest* request,
GetCurrentMaxTxnResponse* response,
::google::protobuf::Closure* done) override;
void create_meta_sync_point(::google::protobuf::RpcController* controller,
const CreateMetaSyncPointRequest* request,
CreateMetaSyncPointResponse* response,
::google::protobuf::Closure* done) override;

void begin_sub_txn(::google::protobuf::RpcController* controller,
const BeginSubTxnRequest* request, BeginSubTxnResponse* response,
Expand Down Expand Up @@ -570,6 +574,13 @@ class MetaServiceProxy final : public MetaService {
call_impl(&cloud::MetaService::get_current_max_txn_id, controller, request, response, done);
}

void create_meta_sync_point(::google::protobuf::RpcController* controller,
const CreateMetaSyncPointRequest* request,
CreateMetaSyncPointResponse* response,
::google::protobuf::Closure* done) override {
call_impl(&cloud::MetaService::create_meta_sync_point, controller, request, response, done);
}

void begin_sub_txn(::google::protobuf::RpcController* controller,
const BeginSubTxnRequest* request, BeginSubTxnResponse* response,
::google::protobuf::Closure* done) override {
Expand Down
56 changes: 56 additions & 0 deletions cloud/src/meta-service/meta_service_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ using namespace std::chrono;

namespace doris::cloud {

static constexpr std::string_view kMetaSyncPointDummyKey = "__meta_service_sync_point_dummy_key__";

struct TableStats {
int64_t updated_row_count = 0;

Expand Down Expand Up @@ -3628,6 +3630,60 @@ void MetaServiceImpl::get_current_max_txn_id(::google::protobuf::RpcController*
response->set_current_max_txn_id(current_max_txn_id);
}

void MetaServiceImpl::create_meta_sync_point(::google::protobuf::RpcController* controller,
const CreateMetaSyncPointRequest* request,
CreateMetaSyncPointResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(create_meta_sync_point, del);
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
return;
}
RPC_RATE_LIMIT(create_meta_sync_point)

TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
msg = "failed to create txn";
code = cast_as<ErrCategory::CREATE>(err);
return;
}

txn->enable_get_versionstamp();
txn->remove(kMetaSyncPointDummyKey);

err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
ss << "txn->commit() failed, err=" << err;
msg = ss.str();
return;
}

int64_t committed_version = 0;
err = txn->get_committed_version(&committed_version);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
ss << "get committed version failed, err=" << err;
msg = ss.str();
return;
}

Versionstamp versionstamp;
err = txn->get_versionstamp(&versionstamp);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
ss << "get versionstamp failed, err=" << err;
msg = ss.str();
return;
}

response->set_committed_version(committed_version);
response->set_versionstamp(versionstamp.to_string());
}

/**
* 1. Generate a sub_txn_id
*
Expand Down
17 changes: 17 additions & 0 deletions cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2671,6 +2671,23 @@ TEST(MetaServiceTest, GetCurrentMaxTxnIdTest) {
ASSERT_GE(max_txn_id_res.current_max_txn_id(), begin_txn_res.txn_id());
}

TEST(MetaServiceTest, CreateMetaSyncPointTest) {
auto meta_service = get_meta_service();
const std::string cloud_unique_id = "test_cloud_unique_id";

brpc::Controller cntl;
CreateMetaSyncPointRequest req;
CreateMetaSyncPointResponse resp;
req.set_cloud_unique_id(cloud_unique_id);

meta_service->create_meta_sync_point(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &resp, nullptr);

ASSERT_EQ(resp.status().code(), MetaServiceCode::OK);
ASSERT_GT(resp.committed_version(), 0);
ASSERT_EQ(resp.versionstamp().size(), 20);
}

TEST(MetaServiceTest, AbortTxnWithCoordinatorTest) {
auto meta_service = get_meta_service();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.cloud.persist;

import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.gson.annotations.SerializedName;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class CloudMetaSyncPoint implements Writable {
@SerializedName(value = "committedVersion")
private long committedVersion;

@SerializedName(value = "versionStamp")
private String versionStamp;

@SerializedName(value = "createTimeMs")
private long createTimeMs;

public CloudMetaSyncPoint() {
}

public CloudMetaSyncPoint(long committedVersion, String versionStamp, long createTimeMs) {
this.committedVersion = committedVersion;
this.versionStamp = versionStamp;
this.createTimeMs = createTimeMs;
}

public long getCommittedVersion() {
return committedVersion;
}

public String getVersionStamp() {
return versionStamp;
}

public long getCreateTimeMs() {
return createTimeMs;
}

@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}

public static CloudMetaSyncPoint read(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), CloudMetaSyncPoint.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ public Cloud.GetCurrentMaxTxnResponse getCurrentMaxTxnId(Cloud.GetCurrentMaxTxnR
.getCurrentMaxTxnId(request);
}

public Cloud.CreateMetaSyncPointResponse createMetaSyncPoint(Cloud.CreateMetaSyncPointRequest request) {
return blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms, TimeUnit.MILLISECONDS)
.createMetaSyncPoint(request);
}

public Cloud.BeginSubTxnResponse beginSubTxn(Cloud.BeginSubTxnRequest request) {
if (!request.hasCloudUniqueId()) {
Cloud.BeginSubTxnRequest.Builder builder = Cloud.BeginSubTxnRequest.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,11 @@ public Cloud.GetCurrentMaxTxnResponse getCurrentMaxTxnId(Cloud.GetCurrentMaxTxnR
return executeWithMetrics("getCurrentMaxTxnId", (client) -> client.getCurrentMaxTxnId(request));
}

public Cloud.CreateMetaSyncPointResponse createMetaSyncPoint(Cloud.CreateMetaSyncPointRequest request)
throws RpcException {
return executeWithMetrics("createMetaSyncPoint", (client) -> client.createMetaSyncPoint(request));
}

public Cloud.BeginSubTxnResponse beginSubTxn(Cloud.BeginSubTxnRequest request)
throws RpcException {
return executeWithMetrics("beginSubTxn", (client) -> client.beginSubTxn(request));
Expand Down
Loading
Loading