diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp index 91b1e0bb6497b2..f2dd86d8d1c200 100644 --- a/cloud/src/common/bvars.cpp +++ b/cloud/src/common/bvars.cpp @@ -39,6 +39,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_commit_txn_eventually("ms", "commit_txn_eve BvarLatencyRecorderWithTag g_bvar_ms_abort_txn("ms", "abort_txn"); BvarLatencyRecorderWithTag g_bvar_ms_get_txn("ms", "get_txn"); BvarLatencyRecorderWithTag g_bvar_ms_get_current_max_txn_id("ms", "get_current_max_txn_id"); +BvarLatencyRecorderWithTag g_bvar_ms_create_meta_sync_point("ms", "create_meta_sync_point"); BvarLatencyRecorderWithTag g_bvar_ms_begin_sub_txn("ms", "begin_sub_txn"); BvarLatencyRecorderWithTag g_bvar_ms_abort_sub_txn("ms", "abort_sub_txn"); BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict("ms", "check_txn_conflict"); @@ -467,6 +468,8 @@ mBvarInt64Adder g_bvar_rpc_kv_abort_txn_del_counter("rpc_kv_abort_txn_del_counte mBvarInt64Adder g_bvar_rpc_kv_get_txn_get_counter("rpc_kv_get_txn_get_counter",{"instance_id"}); // get_current_max_txn_id mBvarInt64Adder g_bvar_rpc_kv_get_current_max_txn_id_get_counter("rpc_kv_get_current_max_txn_id_get_counter",{"instance_id"}); +// create_meta_sync_point +mBvarInt64Adder g_bvar_rpc_kv_create_meta_sync_point_del_counter("rpc_kv_create_meta_sync_point_del_counter",{"instance_id"}); // begin_sub_txn mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_get_counter("rpc_kv_begin_sub_txn_get_counter",{"instance_id"}); mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_put_counter("rpc_kv_begin_sub_txn_put_counter",{"instance_id"}); @@ -669,6 +672,8 @@ mBvarInt64Adder g_bvar_rpc_kv_abort_txn_del_bytes("rpc_kv_abort_txn_del_bytes",{ mBvarInt64Adder g_bvar_rpc_kv_get_txn_get_bytes("rpc_kv_get_txn_get_bytes",{"instance_id"}); // get_current_max_txn_id mBvarInt64Adder g_bvar_rpc_kv_get_current_max_txn_id_get_bytes("rpc_kv_get_current_max_txn_id_get_bytes",{"instance_id"}); +// create_meta_sync_point +mBvarInt64Adder g_bvar_rpc_kv_create_meta_sync_point_del_bytes("rpc_kv_create_meta_sync_point_del_bytes",{"instance_id"}); // begin_sub_txn mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_get_bytes("rpc_kv_begin_sub_txn_get_bytes",{"instance_id"}); mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_put_bytes("rpc_kv_begin_sub_txn_put_bytes",{"instance_id"}); diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h index e4b9789c1bfde2..695a9c0206be37 100644 --- a/cloud/src/common/bvars.h +++ b/cloud/src/common/bvars.h @@ -551,6 +551,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_commit_txn_eventually; extern BvarLatencyRecorderWithTag g_bvar_ms_abort_txn; extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn; extern BvarLatencyRecorderWithTag g_bvar_ms_get_current_max_txn_id; +extern BvarLatencyRecorderWithTag g_bvar_ms_create_meta_sync_point; extern BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict; extern BvarLatencyRecorderWithTag g_bvar_ms_abort_txn_with_coordinator; extern BvarLatencyRecorderWithTag g_bvar_ms_get_prepare_txn_by_coordinator; @@ -887,6 +888,7 @@ extern mBvarInt64Adder g_bvar_rpc_kv_abort_txn_put_counter; extern mBvarInt64Adder g_bvar_rpc_kv_abort_txn_del_counter; extern mBvarInt64Adder g_bvar_rpc_kv_get_txn_get_counter; extern mBvarInt64Adder g_bvar_rpc_kv_get_current_max_txn_id_get_counter; +extern mBvarInt64Adder g_bvar_rpc_kv_create_meta_sync_point_del_counter; extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_get_counter; extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_put_counter; extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_del_counter; @@ -1026,6 +1028,7 @@ extern mBvarInt64Adder g_bvar_rpc_kv_abort_txn_put_bytes; extern mBvarInt64Adder g_bvar_rpc_kv_abort_txn_del_bytes; extern mBvarInt64Adder g_bvar_rpc_kv_get_txn_get_bytes; extern mBvarInt64Adder g_bvar_rpc_kv_get_current_max_txn_id_get_bytes; +extern mBvarInt64Adder g_bvar_rpc_kv_create_meta_sync_point_del_bytes; extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_get_bytes; extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_put_bytes; extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_del_bytes; diff --git a/cloud/src/meta-service/meta_service.h b/cloud/src/meta-service/meta_service.h index f26dc3ad3603c6..2e4e2790f4b9e4 100644 --- a/cloud/src/meta-service/meta_service.h +++ b/cloud/src/meta-service/meta_service.h @@ -125,6 +125,10 @@ class MetaServiceImpl : public cloud::MetaService { const GetCurrentMaxTxnRequest* request, GetCurrentMaxTxnResponse* response, ::google::protobuf::Closure* done) override; + void create_meta_sync_point(::google::protobuf::RpcController* controller, + const CreateMetaSyncPointRequest* request, + CreateMetaSyncPointResponse* response, + ::google::protobuf::Closure* done) override; void begin_sub_txn(::google::protobuf::RpcController* controller, const BeginSubTxnRequest* request, BeginSubTxnResponse* response, @@ -570,6 +574,13 @@ class MetaServiceProxy final : public MetaService { call_impl(&cloud::MetaService::get_current_max_txn_id, controller, request, response, done); } + void create_meta_sync_point(::google::protobuf::RpcController* controller, + const CreateMetaSyncPointRequest* request, + CreateMetaSyncPointResponse* response, + ::google::protobuf::Closure* done) override { + call_impl(&cloud::MetaService::create_meta_sync_point, controller, request, response, done); + } + void begin_sub_txn(::google::protobuf::RpcController* controller, const BeginSubTxnRequest* request, BeginSubTxnResponse* response, ::google::protobuf::Closure* done) override { diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index e707883105b412..cd7b62e7e6d589 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -46,6 +46,8 @@ using namespace std::chrono; namespace doris::cloud { +static constexpr std::string_view kMetaSyncPointDummyKey = "__meta_service_sync_point_dummy_key__"; + struct TableStats { int64_t updated_row_count = 0; @@ -3628,6 +3630,60 @@ void MetaServiceImpl::get_current_max_txn_id(::google::protobuf::RpcController* response->set_current_max_txn_id(current_max_txn_id); } +void MetaServiceImpl::create_meta_sync_point(::google::protobuf::RpcController* controller, + const CreateMetaSyncPointRequest* request, + CreateMetaSyncPointResponse* response, + ::google::protobuf::Closure* done) { + RPC_PREPROCESS(create_meta_sync_point, del); + instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); + if (instance_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "empty instance_id"; + LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); + return; + } + RPC_RATE_LIMIT(create_meta_sync_point) + + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + msg = "failed to create txn"; + code = cast_as(err); + return; + } + + txn->enable_get_versionstamp(); + txn->remove(kMetaSyncPointDummyKey); + + err = txn->commit(); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as(err); + ss << "txn->commit() failed, err=" << err; + msg = ss.str(); + return; + } + + int64_t committed_version = 0; + err = txn->get_committed_version(&committed_version); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as(err); + ss << "get committed version failed, err=" << err; + msg = ss.str(); + return; + } + + Versionstamp versionstamp; + err = txn->get_versionstamp(&versionstamp); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as(err); + ss << "get versionstamp failed, err=" << err; + msg = ss.str(); + return; + } + + response->set_committed_version(committed_version); + response->set_versionstamp(versionstamp.to_string()); +} + /** * 1. Generate a sub_txn_id * diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index f2f623cf1c1722..f815c0d3244316 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -2671,6 +2671,23 @@ TEST(MetaServiceTest, GetCurrentMaxTxnIdTest) { ASSERT_GE(max_txn_id_res.current_max_txn_id(), begin_txn_res.txn_id()); } +TEST(MetaServiceTest, CreateMetaSyncPointTest) { + auto meta_service = get_meta_service(); + const std::string cloud_unique_id = "test_cloud_unique_id"; + + brpc::Controller cntl; + CreateMetaSyncPointRequest req; + CreateMetaSyncPointResponse resp; + req.set_cloud_unique_id(cloud_unique_id); + + meta_service->create_meta_sync_point( + reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &resp, nullptr); + + ASSERT_EQ(resp.status().code(), MetaServiceCode::OK); + ASSERT_GT(resp.committed_version(), 0); + ASSERT_EQ(resp.versionstamp().size(), 20); +} + TEST(MetaServiceTest, AbortTxnWithCoordinatorTest) { auto meta_service = get_meta_service(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/CloudMetaSyncPoint.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/CloudMetaSyncPoint.java new file mode 100644 index 00000000000000..e226bad144d240 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/CloudMetaSyncPoint.java @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.persist; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class CloudMetaSyncPoint implements Writable { + @SerializedName(value = "committedVersion") + private long committedVersion; + + @SerializedName(value = "versionStamp") + private String versionStamp; + + @SerializedName(value = "createTimeMs") + private long createTimeMs; + + public CloudMetaSyncPoint() { + } + + public CloudMetaSyncPoint(long committedVersion, String versionStamp, long createTimeMs) { + this.committedVersion = committedVersion; + this.versionStamp = versionStamp; + this.createTimeMs = createTimeMs; + } + + public long getCommittedVersion() { + return committedVersion; + } + + public String getVersionStamp() { + return versionStamp; + } + + public long getCreateTimeMs() { + return createTimeMs; + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + public static CloudMetaSyncPoint read(DataInput in) throws IOException { + return GsonUtils.GSON.fromJson(Text.readString(in), CloudMetaSyncPoint.class); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java index a27a823c7c112d..2f85ef17a8e191 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java @@ -221,6 +221,11 @@ public Cloud.GetCurrentMaxTxnResponse getCurrentMaxTxnId(Cloud.GetCurrentMaxTxnR .getCurrentMaxTxnId(request); } + public Cloud.CreateMetaSyncPointResponse createMetaSyncPoint(Cloud.CreateMetaSyncPointRequest request) { + return blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms, TimeUnit.MILLISECONDS) + .createMetaSyncPoint(request); + } + public Cloud.BeginSubTxnResponse beginSubTxn(Cloud.BeginSubTxnRequest request) { if (!request.hasCloudUniqueId()) { Cloud.BeginSubTxnRequest.Builder builder = Cloud.BeginSubTxnRequest.newBuilder(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java index e1cb45401db6e8..90e3a9276cf328 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java @@ -387,6 +387,11 @@ public Cloud.GetCurrentMaxTxnResponse getCurrentMaxTxnId(Cloud.GetCurrentMaxTxnR return executeWithMetrics("getCurrentMaxTxnId", (client) -> client.getCurrentMaxTxnId(request)); } + public Cloud.CreateMetaSyncPointResponse createMetaSyncPoint(Cloud.CreateMetaSyncPointRequest request) + throws RpcException { + return executeWithMetrics("createMetaSyncPoint", (client) -> client.createMetaSyncPoint(request)); + } + public Cloud.BeginSubTxnResponse beginSubTxn(Cloud.BeginSubTxnRequest request) throws RpcException { return executeWithMetrics("beginSubTxn", (client) -> client.beginSubTxn(request)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/MetaBackupAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/MetaBackupAction.java new file mode 100644 index 00000000000000..71a053e25fc615 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/MetaBackupAction.java @@ -0,0 +1,329 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.httpv2.rest.manager; + +import org.apache.doris.catalog.Env; +import org.apache.doris.cloud.persist.CloudMetaSyncPoint; +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.rpc.MetaServiceProxy; +import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.httpv2.entity.ResponseEntityBuilder; +import org.apache.doris.httpv2.rest.RestBaseController; +import org.apache.doris.journal.Journal; +import org.apache.doris.journal.bdbje.BDBJEJournal; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.persist.Storage; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.service.FrontendOptions; + +import com.fasterxml.jackson.annotation.JsonAlias; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Strings; +import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.util.DbBackup; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; +import org.apache.commons.io.FileUtils; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.util.HashMap; +import java.util.Map; + +@RestController +@RequestMapping("/rest/v2/manager/backup") +public class MetaBackupAction extends RestBaseController { + private static final String ALLOW_REDIRECT = "allow_redirect"; + + @PostMapping("/sync_cloud_meta") + public Object syncCloudMeta(HttpServletRequest request, HttpServletResponse response) { + if (!Config.isCloudMode()) { + return ResponseEntityBuilder.okWithCommonError("/sync_cloud_meta only works on the cloud mode"); + } + try { + if (needRedirect(request.getScheme())) { + return redirectToHttps(request); + } + executeCheckPassword(request, response); + checkGlobalAuth(org.apache.doris.qe.ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); + Object redirectOrError = checkMasterAndRedirectIfNeeded(request, response); + if (redirectOrError != null) { + return redirectOrError; + } + + synchronized (Env.getCurrentEnv().getEditLog()) { + MetaSyncPointVersion syncVersion = createMetaSyncPoint(); + CloudMetaSyncPoint syncPoint = new CloudMetaSyncPoint(syncVersion.committedVersion, + syncVersion.versionStamp, + System.currentTimeMillis()); + long journalId = Env.getCurrentEnv().getEditLog().logMetaSyncPoint(syncPoint); + + Map data = new HashMap<>(); + data.put("journal_id", journalId); + data.put("committed_version", syncVersion.committedVersion); + data.put("versionstamp", syncVersion.versionStamp); + return ResponseEntityBuilder.ok(data); + } + } catch (Exception e) { + return ResponseEntityBuilder.okWithCommonError(e.getMessage()); + } + } + + @PostMapping("/export_meta") + public Object exportMeta(@RequestBody ExportMetaRequest req, + HttpServletRequest request, HttpServletResponse response) { + if (!Config.isCloudMode()) { + return ResponseEntityBuilder.okWithCommonError("/export_meta only works on the cloud mode"); + } + try { + if (needRedirect(request.getScheme())) { + return redirectToHttps(request); + } + executeCheckPassword(request, response); + checkGlobalAuth(org.apache.doris.qe.ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); + Object redirectOrError = checkMasterAndRedirectIfNeeded(request, response); + if (redirectOrError != null) { + return redirectOrError; + } + + if (req == null || Strings.isNullOrEmpty(req.getTargetDir())) { + return ResponseEntityBuilder.badRequest("target_dir is required"); + } + File targetDir = prepareTargetDir(req.getTargetDir()); + if (Env.getCurrentEnv().getCheckpointer() != null) { + Env.getCurrentEnv().getCheckpointer().getLock().readLock().lock(); + } + try { + CopiedImage copiedImage = copyLatestImageIfExists(targetDir); + copyImageMetaFiles(targetDir); + BdbExportResult bdbResult = exportBdbJe(targetDir, copiedImage.version, copiedImage.exists); + + Map data = new HashMap<>(); + data.put("target_dir", targetDir.getAbsolutePath()); + data.put("bdb_dir", new File(targetDir, "bdb").getAbsolutePath()); + data.put("bdb_file_count", bdbResult.fileCount); + data.put("image_file", copiedImage.exists ? copiedImage.file.getName() : null); + data.put("image_version", copiedImage.version); + data.put("image_exported", copiedImage.exists); + data.put("journal_upper_bound", bdbResult.journalUpperBound); + return ResponseEntityBuilder.ok(data); + } finally { + if (Env.getCurrentEnv().getCheckpointer() != null) { + Env.getCurrentEnv().getCheckpointer().getLock().readLock().unlock(); + } + } + } catch (Exception e) { + return ResponseEntityBuilder.okWithCommonError(e.getMessage()); + } + } + + private Object checkMasterAndRedirectIfNeeded(HttpServletRequest request, HttpServletResponse response) + throws Exception { + if (Env.getCurrentEnv().isMaster()) { + return null; + } + if (Boolean.parseBoolean(request.getParameter(ALLOW_REDIRECT))) { + return redirectToMasterOrException(request, response); + } + return ResponseEntityBuilder.okWithCommonError( + "current fe is not master, master is " + + Env.getCurrentEnv().getMasterHost() + ":" + Env.getCurrentEnv().getMasterHttpPort()); + } + + private MetaSyncPointVersion createMetaSyncPoint() throws DdlException { + Cloud.CreateMetaSyncPointRequest req = Cloud.CreateMetaSyncPointRequest.newBuilder() + .setCloudUniqueId(Config.cloud_unique_id) + .setRequestIp(FrontendOptions.getLocalHostAddressCached()) + .build(); + try { + Cloud.CreateMetaSyncPointResponse resp = MetaServiceProxy.getInstance().createMetaSyncPoint(req); + if (resp.getStatus().getCode() != Cloud.MetaServiceCode.OK) { + throw new DdlException("create_meta_sync_point failed: " + resp.getStatus().getMsg()); + } + if (!resp.hasCommittedVersion()) { + throw new DdlException("meta service response missing committed_version"); + } + if (!resp.hasVersionstamp() || Strings.isNullOrEmpty(resp.getVersionstamp())) { + throw new DdlException("meta service response missing versionstamp"); + } + return new MetaSyncPointVersion(resp.getCommittedVersion(), resp.getVersionstamp()); + } catch (RpcException e) { + throw new DdlException("create_meta_sync_point rpc failed: " + e.getMessage()); + } + } + + private static class MetaSyncPointVersion { + private final long committedVersion; + private final String versionStamp; + + MetaSyncPointVersion(long committedVersion, String versionStamp) { + this.committedVersion = committedVersion; + this.versionStamp = versionStamp; + } + } + + private static File prepareTargetDir(String targetDir) throws IOException { + File dir = new File(targetDir).getCanonicalFile(); + if (dir.exists()) { + if (!dir.isDirectory()) { + throw new IOException("target_dir exists but is not a directory: " + dir.getAbsolutePath()); + } + FileUtils.cleanDirectory(dir); + } else { + FileUtils.forceMkdir(dir); + } + return dir; + } + + private BdbExportResult exportBdbJe(File targetDir, long imageVersion, boolean hasImage) throws Exception { + if (!"bdb".equalsIgnoreCase(Config.edit_log_type)) { + throw new DdlException("only bdb edit_log_type supports bdbje export"); + } + Journal journal = Env.getCurrentEnv().getEditLog().getJournal(); + if (!(journal instanceof BDBJEJournal)) { + throw new DdlException("current edit log is not BDBJEJournal"); + } + + BDBJEJournal bdbjeJournal = (BDBJEJournal) journal; + if (bdbjeJournal.getBDBEnvironment() == null) { + throw new DdlException("bdb environment is not initialized"); + } + ReplicatedEnvironment replicatedEnvironment = bdbjeJournal.getBDBEnvironment().getReplicatedEnvironment(); + if (replicatedEnvironment == null) { + throw new DdlException("bdb replicated environment is not ready"); + } + + File bdbTargetDir = new File(targetDir, "bdb"); + FileUtils.forceMkdir(bdbTargetDir); + File bdbSourceDir = new File(Env.getCurrentEnv().getBdbDir()); + + DbBackup backup = new DbBackup(replicatedEnvironment); + backup.startBackup(); + try { + long journalUpperBound = bdbjeJournal.getMaxJournalId(); + if (hasImage) { + long journalMinId = bdbjeJournal.getMinJournalId(); + if (journalMinId > 0 && journalMinId > imageVersion + 1) { + throw new DdlException("export failed: bdb min journal id " + journalMinId + + " is greater than image_version + 1 (" + (imageVersion + 1) + ")"); + } + if (journalUpperBound < imageVersion) { + throw new DdlException("export failed: bdb journal upper bound " + journalUpperBound + + " is smaller than image_version " + imageVersion); + } + } + String[] files = backup.getLogFilesInBackupSet(); + for (String fileName : files) { + FileUtils.copyFile(new File(bdbSourceDir, fileName), new File(bdbTargetDir, fileName)); + } + return new BdbExportResult(files.length, journalUpperBound); + } finally { + backup.endBackup(); + } + } + + private CopiedImage copyLatestImageIfExists(File targetDir) throws IOException { + File imageTargetDir = new File(targetDir, "image"); + Storage storage = new Storage(Env.getServingEnv().getImageDir()); + long imageVersion = storage.getLatestImageSeq(); + File image = storage.getImageFile(imageVersion); + if (!image.exists()) { + return CopiedImage.notFound(imageVersion); + } + File targetImage = new File(imageTargetDir, image.getName()); + linkOrCopyFile(image, targetImage); + return CopiedImage.found(targetImage, imageVersion); + } + + private void copyImageMetaFiles(File targetDir) throws IOException { + File imageTargetDir = new File(targetDir, "image"); + Storage storage = new Storage(Env.getServingEnv().getImageDir()); + File[] metaFiles = new File[] { + storage.getModeFile(), + storage.getRoleFile(), + storage.getVersionFile() + }; + for (File source : metaFiles) { + if (!source.exists()) { + continue; + } + linkOrCopyFile(source, new File(imageTargetDir, source.getName())); + } + } + + private void linkOrCopyFile(File source, File target) throws IOException { + try { + Files.createLink(target.toPath(), source.toPath()); + } catch (UnsupportedOperationException | SecurityException | FileAlreadyExistsException e) { + FileUtils.copyFile(source, target); + } catch (IOException e) { + FileUtils.copyFile(source, target); + } + } + + private static class CopiedImage { + private final File file; + private final long version; + private final boolean exists; + + CopiedImage(File file, long version, boolean exists) { + this.file = file; + this.version = version; + this.exists = exists; + } + + static CopiedImage found(File file, long version) { + return new CopiedImage(file, version, true); + } + + static CopiedImage notFound(long version) { + return new CopiedImage(null, version, false); + } + } + + private static class BdbExportResult { + private final int fileCount; + private final long journalUpperBound; + + BdbExportResult(int fileCount, long journalUpperBound) { + this.fileCount = fileCount; + this.journalUpperBound = journalUpperBound; + } + } + + public static class ExportMetaRequest { + @JsonAlias({"targetDir"}) + @JsonProperty("target_dir") + private String targetDir; + + public String getTargetDir() { + return targetDir; + } + + public void setTargetDir(String targetDir) { + this.targetDir = targetDir; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 76b4578f892c2b..8f5e3f178659b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -33,6 +33,7 @@ import org.apache.doris.catalog.FunctionSearchDesc; import org.apache.doris.catalog.Resource; import org.apache.doris.cloud.CloudWarmUpJob; +import org.apache.doris.cloud.persist.CloudMetaSyncPoint; import org.apache.doris.cloud.persist.UpdateCloudReplicaInfo; import org.apache.doris.cloud.snapshot.SnapshotState; import org.apache.doris.cluster.Cluster; @@ -990,6 +991,11 @@ public void readFields(DataInput in) throws IOException { isRead = true; break; } + case OperationType.OP_META_SYNC_POINT: { + data = CloudMetaSyncPoint.read(in); + isRead = true; + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index d506b474ed70b3..c7384c7e7155f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -41,6 +41,7 @@ import org.apache.doris.catalog.Resource; import org.apache.doris.cloud.CloudWarmUpJob; import org.apache.doris.cloud.catalog.CloudEnv; +import org.apache.doris.cloud.persist.CloudMetaSyncPoint; import org.apache.doris.cloud.persist.UpdateCloudReplicaInfo; import org.apache.doris.cloud.snapshot.SnapshotState; import org.apache.doris.common.Config; @@ -1424,6 +1425,11 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) { // TODO: implement break; } + case OperationType.OP_META_SYNC_POINT: { + // CloudMetaSyncPoint info = (CloudMetaSyncPoint) journal.getData(); + // This log is only used to keep FE/MS cut point in journal timeline. + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}, log id: {}", opCode, logId, e); @@ -2537,4 +2543,8 @@ public void logOperateKey(KeyOperationInfo info) { public long logBeginSnapshot(SnapshotState snapshotState) { return logEdit(OperationType.OP_BEGIN_SNAPSHOT, snapshotState); } + + public long logMetaSyncPoint(CloudMetaSyncPoint syncPoint) { + return logEdit(OperationType.OP_META_SYNC_POINT, syncPoint); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 1174d9c3874817..016903129c2e67 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -426,6 +426,7 @@ public class OperationType { public static final short OP_MODIFY_CLOUD_WARM_UP_JOB = 1002; public static final short OP_BEGIN_SNAPSHOT = 1100; + public static final short OP_META_SYNC_POINT = 1101; /** * Get opcode name by op code. diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 6c14b3b1ee4559..1f61fcab17d16b 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -1113,6 +1113,17 @@ message GetCurrentMaxTxnResponse { optional int64 current_max_txn_id = 2; } +message CreateMetaSyncPointRequest { + optional string cloud_unique_id = 1; // For auth + optional string request_ip = 2; +} + +message CreateMetaSyncPointResponse { + optional MetaServiceResponseStatus status = 1; + optional int64 committed_version = 2; + optional string versionstamp = 3; +} + message AbortTxnWithCoordinatorRequest { optional string cloud_unique_id = 1; // For auth optional string ip = 2; @@ -2234,6 +2245,7 @@ service MetaService { rpc abort_txn(AbortTxnRequest) returns (AbortTxnResponse); rpc get_txn(GetTxnRequest) returns (GetTxnResponse); rpc get_current_max_txn_id(GetCurrentMaxTxnRequest) returns (GetCurrentMaxTxnResponse); + rpc create_meta_sync_point(CreateMetaSyncPointRequest) returns (CreateMetaSyncPointResponse); rpc check_txn_conflict(CheckTxnConflictRequest) returns (CheckTxnConflictResponse); rpc clean_txn_label(CleanTxnLabelRequest) returns (CleanTxnLabelResponse); rpc get_txn_id(GetTxnIdRequest) returns (GetTxnIdResponse);