Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions bin/start_fe.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ OPTS="$(getopt \
-l 'image:' \
-l 'version' \
-l 'metadata_failure_recovery' \
-l 'recovery_journal_id:' \
-l 'console' \
-l 'cluster_snapshot:' \
-- "$@")"
Expand All @@ -46,6 +47,7 @@ IMAGE_PATH=''
IMAGE_TOOL=''
OPT_VERSION=''
METADATA_FAILURE_RECOVERY=''
RECOVERY_JOURNAL_ID=''
CLUSTER_SNAPSHOT=''
while true; do
case "$1" in
Expand All @@ -65,6 +67,10 @@ while true; do
METADATA_FAILURE_RECOVERY="-r"
shift
;;
--recovery_journal_id)
RECOVERY_JOURNAL_ID="--recovery_journal_id $2"
shift 2
;;
--helper)
HELPER="$2"
shift 2
Expand Down Expand Up @@ -418,12 +424,12 @@ if [[ "${IMAGE_TOOL}" -eq 1 ]]; then
echo "Internal error, USE IMAGE_TOOL like: ./start_fe.sh --image image_path"
fi
elif [[ "${RUN_DAEMON}" -eq 1 ]]; then
nohup ${LIMIT:+${LIMIT}} "${JAVA}" ${final_java_opt:+${final_java_opt}} -XX:-OmitStackTraceInFastThrow -XX:OnOutOfMemoryError="kill -9 %p" ${coverage_opt:+${coverage_opt}} org.apache.doris.DorisFE ${HELPER:+${HELPER}} "${METADATA_FAILURE_RECOVERY}" "${CLUSTER_SNAPSHOT}" "$@" >>"${STDOUT_LOGGER}" 2>&1 </dev/null &
nohup ${LIMIT:+${LIMIT}} "${JAVA}" ${final_java_opt:+${final_java_opt}} -XX:-OmitStackTraceInFastThrow -XX:OnOutOfMemoryError="kill -9 %p" ${coverage_opt:+${coverage_opt}} org.apache.doris.DorisFE ${HELPER:+${HELPER}} "${METADATA_FAILURE_RECOVERY}" ${RECOVERY_JOURNAL_ID:+${RECOVERY_JOURNAL_ID}} "${CLUSTER_SNAPSHOT}" "$@" >>"${STDOUT_LOGGER}" 2>&1 </dev/null &
elif [[ "${RUN_CONSOLE}" -eq 1 ]]; then
export DORIS_LOG_TO_STDERR=1
${LIMIT:+${LIMIT}} "${JAVA}" ${final_java_opt:+${final_java_opt}} -XX:-OmitStackTraceInFastThrow -XX:OnOutOfMemoryError="kill -9 %p" ${coverage_opt:+${coverage_opt}} org.apache.doris.DorisFE ${HELPER:+${HELPER}} ${OPT_VERSION:+${OPT_VERSION}} "${METADATA_FAILURE_RECOVERY}" "${CLUSTER_SNAPSHOT}" "$@" </dev/null
${LIMIT:+${LIMIT}} "${JAVA}" ${final_java_opt:+${final_java_opt}} -XX:-OmitStackTraceInFastThrow -XX:OnOutOfMemoryError="kill -9 %p" ${coverage_opt:+${coverage_opt}} org.apache.doris.DorisFE ${HELPER:+${HELPER}} ${OPT_VERSION:+${OPT_VERSION}} "${METADATA_FAILURE_RECOVERY}" ${RECOVERY_JOURNAL_ID:+${RECOVERY_JOURNAL_ID}} "${CLUSTER_SNAPSHOT}" "$@" </dev/null
else
${LIMIT:+${LIMIT}} "${JAVA}" ${final_java_opt:+${final_java_opt}} -XX:-OmitStackTraceInFastThrow -XX:OnOutOfMemoryError="kill -9 %p" ${coverage_opt:+${coverage_opt}} org.apache.doris.DorisFE ${HELPER:+${HELPER}} ${OPT_VERSION:+${OPT_VERSION}} "${METADATA_FAILURE_RECOVERY}" "${CLUSTER_SNAPSHOT}" "$@" >>"${STDOUT_LOGGER}" 2>&1 </dev/null
${LIMIT:+${LIMIT}} "${JAVA}" ${final_java_opt:+${final_java_opt}} -XX:-OmitStackTraceInFastThrow -XX:OnOutOfMemoryError="kill -9 %p" ${coverage_opt:+${coverage_opt}} org.apache.doris.DorisFE ${HELPER:+${HELPER}} ${OPT_VERSION:+${OPT_VERSION}} "${METADATA_FAILURE_RECOVERY}" ${RECOVERY_JOURNAL_ID:+${RECOVERY_JOURNAL_ID}} "${CLUSTER_SNAPSHOT}" "$@" >>"${STDOUT_LOGGER}" 2>&1 </dev/null
fi

if [[ "${OPT_VERSION}" != "" ]]; then
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -351,6 +352,9 @@ private static CommandLineOptions parseArgs(String[] args) {
options.addOption("m", "metaversion", true, "Specify the meta version to decode log value");
options.addOption("r", FeConstants.METADATA_FAILURE_RECOVERY_KEY, false,
"Check if the specified metadata recover is valid");
options.addOption(Option.builder().longOpt(FeConstants.RECOVERY_JOURNAL_ID_KEY).hasArg()
.desc("Specify the recovery truncate journal id, and journals greater than this id will be removed")
.build());
options.addOption("c", "cluster_snapshot", true, "Specify the cluster snapshot json file");

CommandLine cmd = null;
Expand Down Expand Up @@ -388,6 +392,14 @@ private static CommandLineOptions parseArgs(String[] args) {
if (cmd.hasOption('r') || cmd.hasOption(FeConstants.METADATA_FAILURE_RECOVERY_KEY)) {
System.setProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY, "true");
}
if (cmd.hasOption(FeConstants.RECOVERY_JOURNAL_ID_KEY)) {
String recoveryJournalId = cmd.getOptionValue(FeConstants.RECOVERY_JOURNAL_ID_KEY);
if (Strings.isNullOrEmpty(recoveryJournalId)) {
System.err.println("recovery_journal_id is missing");
System.exit(-1);
}
System.setProperty(FeConstants.RECOVERY_JOURNAL_ID_KEY, recoveryJournalId.trim());
}
if (cmd.hasOption('b') || cmd.hasOption("bdb")) {
if (cmd.hasOption('l') || cmd.hasOption("listdb")) {
// list bdb je databases
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,5 @@ public class FeConstants {
public static final String INTERNAL_FILE_CACHE_HOTSPOT_TABLE_NAME = "cloud_cache_hotspot";

public static String METADATA_FAILURE_RECOVERY_KEY = "metadata_failure_recovery";
public static String RECOVERY_JOURNAL_ID_KEY = "recovery_journal_id";
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,20 @@
import org.apache.doris.system.Frontend;

import com.google.common.collect.ImmutableList;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.DatabaseNotFoundException;
import com.sleepycat.je.Durability;
import com.sleepycat.je.Durability.ReplicaAckPolicy;
import com.sleepycat.je.Durability.SyncPolicy;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.NetworkRestore;
import com.sleepycat.je.rep.NetworkRestoreConfig;
Expand Down Expand Up @@ -335,6 +340,73 @@ public void removeDatabase(String dbName) {
}
}

// Remove journals whose id is greater than truncateToJournalId.
public void truncateJournalsGreaterThan(long truncateToJournalId) {
lock.writeLock().lock();
try {
List<Long> dbNames = getDatabaseNames();
if (dbNames == null || dbNames.isEmpty()) {
return;
}

Long minJournalId = dbNames.get(0);
Long targetDbName = null;
for (Long dbName : dbNames) {
if (dbName <= truncateToJournalId) {
targetDbName = dbName;
} else {
break;
}
}

if (targetDbName == null) {
throw new IllegalArgumentException("truncate journal id " + truncateToJournalId
+ " is smaller than min journal id " + minJournalId);
}

for (Long dbName : dbNames) {
if (dbName > truncateToJournalId) {
removeDatabase(dbName.toString());
}
}

long deletedCount = truncateTailInDb(targetDbName.toString(), truncateToJournalId);
LOG.info("truncate journals greater than {} finished, targetDb {}, deleted {} keys in target db",
truncateToJournalId, targetDbName, deletedCount);
} finally {
lock.writeLock().unlock();
}
}

private long truncateTailInDb(String dbName, long truncateToJournalId) {
Database db = openDatabase(dbName);
if (db == null) {
throw new IllegalStateException("failed to open target database " + dbName + " for truncate");
}

long deletedCount = 0;
TupleBinding<Long> idBinding = TupleBinding.getPrimitiveBinding(Long.class);
Cursor cursor = null;
try {
cursor = db.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS) {
long journalId = idBinding.entryToObject(key);
if (journalId > truncateToJournalId) {
cursor.delete();
deletedCount++;
}
}
} finally {
if (cursor != null) {
cursor.close();
}
}

return deletedCount;
}

// get journal db names and sort the names
public List<Long> getDatabaseNames() {
// The operation before may set the current thread as interrupted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
public static final Logger LOG = LogManager.getLogger(BDBJEJournal.class);
private static final int OUTPUT_BUFFER_INIT_SIZE = 128;
private static final int RETRY_TIME = 3;
private static final long RECOVERY_JOURNAL_ID_UNSET = -1L;

private String environmentPath = null;
private String selfNodeName;
Expand Down Expand Up @@ -519,6 +520,8 @@ public synchronized void open() {
LOG.error("catch an exception when setup bdb environment. will exit.", e);
System.exit(-1);
}

truncateRecoveryJournalsIfNeeded(metadataFailureRecovery);
}

// Open a new journal database or get last existing one as current journal
Expand Down Expand Up @@ -593,6 +596,78 @@ private void reSetupBdbEnvironment(InsufficientLogException insufficientLogEx) {
NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(), helperNode.getPort()));
}

private void truncateRecoveryJournalsIfNeeded(boolean metadataFailureRecovery) {
if (!metadataFailureRecovery) {
return;
}

long recoveryJournalId = getRecoveryJournalIdOrUnset();
if (recoveryJournalId == RECOVERY_JOURNAL_ID_UNSET) {
return;
}

long maxJournalId = getMaxJournalIdWithoutCheck();
if (maxJournalId < 0) {
String msg = String.format("invalid metadata recovery truncate target %d, no journals in bdb",
recoveryJournalId);
LOG.error(msg);
LogUtils.stderr(msg);
System.exit(-1);
}

if (recoveryJournalId >= maxJournalId) {
String msg = String.format("metadata recovery truncate target %d >= max journal id %d, no-op",
recoveryJournalId, maxJournalId);
LOG.info(msg);
LogUtils.stdout(msg);
return;
}

long minJournalId = getMinJournalId();
if (minJournalId < 0 || recoveryJournalId < minJournalId) {
String msg = String.format("invalid metadata recovery truncate target %d, min journal id is %d",
recoveryJournalId, minJournalId);
LOG.error(msg);
LogUtils.stderr(msg);
System.exit(-1);
}

try {
bdbEnvironment.truncateJournalsGreaterThan(recoveryJournalId);
} catch (Exception e) {
String msg = String.format("failed to truncate journals greater than %d in metadata recovery mode",
recoveryJournalId);
LOG.error(msg, e);
LogUtils.stderr(msg + ", reason: " + e.getMessage());
System.exit(-1);
}
String msg = String.format("metadata recovery truncate finished, kept journals <= %d", recoveryJournalId);
LOG.info(msg);
LogUtils.stdout(msg);
}

private long getRecoveryJournalIdOrUnset() {
String journalIdStr = System.getProperty(FeConstants.RECOVERY_JOURNAL_ID_KEY);
if (journalIdStr == null || journalIdStr.trim().isEmpty()) {
return RECOVERY_JOURNAL_ID_UNSET;
}

String trimmedJournalId = journalIdStr.trim();
try {
long journalId = Long.parseLong(trimmedJournalId);
if (journalId < 0) {
throw new NumberFormatException("recovery_journal_id must not be negative");
}
return journalId;
} catch (NumberFormatException e) {
String msg = String.format("invalid recovery_journal_id: %s", trimmedJournalId);
LOG.error(msg, e);
LogUtils.stderr(msg);
System.exit(-1);
}
return RECOVERY_JOURNAL_ID_UNSET;
}

@Override
public long getJournalNum() {
return currentJournalDB.count();
Expand Down
Loading