Skip to content

Commit e70fdc9

Browse files
feat(NODE-7122): exponential backoff between retries in convenient transaction API (#4765)
Co-authored-by: Durran Jordan <durran@gmail.com>
1 parent a4211e7 commit e70fdc9

File tree

2 files changed

+187
-44
lines changed

2 files changed

+187
-44
lines changed

src/sessions.ts

Lines changed: 100 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { setTimeout } from 'timers/promises';
2+
13
import { Binary, type Document, Long, type Timestamp } from './bson';
24
import type { CommandOptions, Connection } from './cmap/connection';
35
import { ConnectionPoolMetrics } from './cmap/metrics';
@@ -732,17 +734,61 @@ export class ClientSession
732734
: processTimeMS();
733735

734736
let committed = false;
735-
let result: any;
737+
let result: T;
738+
739+
let lastError: Error | null = null;
736740

737741
try {
738-
while (!committed) {
739-
// 2. Invoke startTransaction on the session
740-
// 3. If `startTransaction` reported an error, propagate that error to the caller of `withTransaction` and return immediately.
742+
retryTransaction: for (
743+
// 2. Set `transactionAttempt` to `0`.
744+
let transactionAttempt = 0, isRetry = false;
745+
!committed;
746+
++transactionAttempt, isRetry = transactionAttempt > 0
747+
) {
748+
// 2. If `transactionAttempt` > 0:
749+
if (isRetry) {
750+
// 2.i If elapsed time + `backoffMS` > `TIMEOUT_MS`, then raise the previously encountered error. If the elapsed time of
751+
// `withTransaction` is less than TIMEOUT_MS, calculate the backoffMS to be
752+
// `jitter * min(BACKOFF_INITIAL * 1.5 ** (transactionAttempt - 1), BACKOFF_MAX)`. sleep for `backoffMS`.
753+
// 2.i.i jitter is a random float between \[0, 1)
754+
// 2.i.ii `transactionAttempt` is the variable defined in step 1.
755+
// 2.i.iii `BACKOFF_INITIAL` is 5ms
756+
// 2.i.iv `BACKOFF_MAX` is 500ms
757+
const BACKOFF_INITIAL_MS = 5;
758+
const BACKOFF_MAX_MS = 500;
759+
const BACKOFF_GROWTH = 1.5;
760+
const jitter = Math.random();
761+
const backoffMS =
762+
jitter *
763+
Math.min(
764+
BACKOFF_INITIAL_MS * BACKOFF_GROWTH ** (transactionAttempt - 1),
765+
BACKOFF_MAX_MS
766+
);
767+
768+
const willExceedTransactionDeadline =
769+
(this.timeoutContext?.csotEnabled() &&
770+
backoffMS > this.timeoutContext.remainingTimeMS) ||
771+
processTimeMS() + backoffMS > startTime + MAX_TIMEOUT;
772+
773+
if (willExceedTransactionDeadline) {
774+
throw (
775+
lastError ??
776+
new MongoRuntimeError(
777+
`Transaction retry did not record an error: should never occur. Please file a bug.`
778+
)
779+
);
780+
}
781+
782+
await setTimeout(backoffMS);
783+
}
784+
785+
// 3. Invoke startTransaction on the session
786+
// 4. If `startTransaction` reported an error, propagate that error to the caller of `withTransaction` and return immediately.
741787
this.startTransaction(options); // may throw on error
742788

743789
try {
744-
// 4. Invoke the callback.
745-
// 5. Control returns to withTransaction. (continued below)
790+
// 5. Invoke the callback.
791+
// 6. Control returns to withTransaction. (continued below)
746792
const promise = fn(this);
747793
if (!isPromiseLike(promise)) {
748794
throw new MongoInvalidArgumentError(
@@ -752,18 +798,18 @@ export class ClientSession
752798

753799
result = await promise;
754800

755-
// 5. (cont.) Determine the current state of the ClientSession (continued below)
801+
// 6. (cont.) Determine the current state of the ClientSession (continued below)
756802
if (
757803
this.transaction.state === TxnState.NO_TRANSACTION ||
758804
this.transaction.state === TxnState.TRANSACTION_COMMITTED ||
759805
this.transaction.state === TxnState.TRANSACTION_ABORTED
760806
) {
761-
// 7. If the ClientSession is in the "no transaction", "transaction aborted", or "transaction committed" state,
807+
// 8. If the ClientSession is in the "no transaction", "transaction aborted", or "transaction committed" state,
762808
// assume the callback intentionally aborted or committed the transaction and return immediately.
763809
return result;
764810
}
765811
// 5. (cont.) and whether the callback reported an error
766-
// 6. If the callback reported an error:
812+
// 7. If the callback reported an error:
767813
} catch (fnError) {
768814
if (!(fnError instanceof MongoError) || fnError instanceof MongoInvalidArgumentError) {
769815
// This first preemptive abort regardless of TxnState isn't spec,
@@ -776,70 +822,80 @@ export class ClientSession
776822
this.transaction.state === TxnState.STARTING_TRANSACTION ||
777823
this.transaction.state === TxnState.TRANSACTION_IN_PROGRESS
778824
) {
779-
// 6.i If the ClientSession is in the "starting transaction" or "transaction in progress" state,
825+
// 7.i If the ClientSession is in the "starting transaction" or "transaction in progress" state,
780826
// invoke abortTransaction on the session
781827
await this.abortTransaction();
782828
}
783829

784830
if (
785831
fnError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) &&
786-
(this.timeoutContext != null || processTimeMS() - startTime < MAX_TIMEOUT)
832+
(this.timeoutContext?.csotEnabled() || processTimeMS() - startTime < MAX_TIMEOUT)
787833
) {
788-
// 6.ii If the callback's error includes a "TransientTransactionError" label and the elapsed time of `withTransaction`
834+
// 7.ii If the callback's error includes a "TransientTransactionError" label and the elapsed time of `withTransaction`
789835
// is less than 120 seconds, jump back to step two.
790-
continue;
836+
lastError = fnError;
837+
continue retryTransaction;
791838
}
792839

793-
// 6.iii If the callback's error includes a "UnknownTransactionCommitResult" label, the callback must have manually committed a transaction,
840+
// 7.iii If the callback's error includes a "UnknownTransactionCommitResult" label, the callback must have manually committed a transaction,
794841
// propagate the callback's error to the caller of withTransaction and return immediately.
795-
// The 6.iii check is redundant with 6.iv, so we don't write code for it
796-
// 6.iv Otherwise, propagate the callback's error to the caller of withTransaction and return immediately.
842+
// The 7.iii check is redundant with 6.iv, so we don't write code for it
843+
// 7.iv Otherwise, propagate the callback's error to the caller of withTransaction and return immediately.
797844
throw fnError;
798845
}
799846

800-
while (!committed) {
847+
retryCommit: while (!committed) {
801848
try {
802849
/*
803850
* We will rely on ClientSession.commitTransaction() to
804851
* apply a majority write concern if commitTransaction is
805852
* being retried (see: DRIVERS-601)
806853
*/
807-
// 8. Invoke commitTransaction on the session.
854+
// 9. Invoke commitTransaction on the session.
808855
await this.commitTransaction();
809856
committed = true;
810-
// 9. If commitTransaction reported an error:
857+
// 10. If commitTransaction reported an error:
811858
} catch (commitError) {
812-
/*
813-
* Note: a maxTimeMS error will have the MaxTimeMSExpired
814-
* code (50) and can be reported as a top-level error or
815-
* inside writeConcernError, ex.
816-
* { ok:0, code: 50, codeName: 'MaxTimeMSExpired' }
817-
* { ok:1, writeConcernError: { code: 50, codeName: 'MaxTimeMSExpired' } }
818-
*/
819-
if (
820-
!isMaxTimeMSExpiredError(commitError) &&
821-
commitError.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult) &&
822-
(this.timeoutContext != null || processTimeMS() - startTime < MAX_TIMEOUT)
823-
) {
824-
// 9.i If the `commitTransaction` error includes a "UnknownTransactionCommitResult" label and the error is not
825-
// MaxTimeMSExpired and the elapsed time of `withTransaction` is less than 120 seconds, jump back to step eight.
826-
continue;
859+
// If CSOT is enabled, we repeatedly retry until timeoutMS expires. This is enforced by providing a
860+
// timeoutContext to each async API, which know how to cancel themselves (i.e., the next retry will
861+
// abort the withTransaction call).
862+
// If CSOT is not enabled, do we still have time remaining or have we timed out?
863+
const hasTimedOut =
864+
!this.timeoutContext?.csotEnabled() && processTimeMS() - startTime >= MAX_TIMEOUT;
865+
866+
if (!hasTimedOut) {
867+
/*
868+
* Note: a maxTimeMS error will have the MaxTimeMSExpired
869+
* code (50) and can be reported as a top-level error or
870+
* inside writeConcernError, ex.
871+
* { ok:0, code: 50, codeName: 'MaxTimeMSExpired' }
872+
* { ok:1, writeConcernError: { code: 50, codeName: 'MaxTimeMSExpired' } }
873+
*/
874+
if (
875+
!isMaxTimeMSExpiredError(commitError) &&
876+
commitError.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult)
877+
) {
878+
// 10.i If the `commitTransaction` error includes a "UnknownTransactionCommitResult" label and the error is not
879+
// MaxTimeMSExpired and the elapsed time of `withTransaction` is less than 120 seconds, jump back to step eight.
880+
continue retryCommit;
881+
}
882+
883+
if (commitError.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) {
884+
// 10.ii If the commitTransaction error includes a "TransientTransactionError" label
885+
// and the elapsed time of withTransaction is less than 120 seconds, jump back to step two.
886+
lastError = commitError;
887+
888+
continue retryTransaction;
889+
}
827890
}
828891

829-
if (
830-
commitError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) &&
831-
(this.timeoutContext != null || processTimeMS() - startTime < MAX_TIMEOUT)
832-
) {
833-
// 9.ii If the commitTransaction error includes a "TransientTransactionError" label
834-
// and the elapsed time of withTransaction is less than 120 seconds, jump back to step two.
835-
break;
836-
}
837-
838-
// 9.iii Otherwise, propagate the commitTransaction error to the caller of withTransaction and return immediately.
892+
// 10.iii Otherwise, propagate the commitTransaction error to the caller of withTransaction and return immediately.
839893
throw commitError;
840894
}
841895
}
842896
}
897+
898+
// @ts-expect-error Result is always defined if we reach here, the for-loop above convinces TS it is not.
843899
return result;
844900
} finally {
845901
this.timeoutContext = null;
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import { expect } from 'chai';
2+
import { test } from 'mocha';
3+
import * as sinon from 'sinon';
4+
5+
import { type ClientSession, type Collection, type MongoClient } from '../../../src';
6+
import { configureFailPoint, type FailCommandFailPoint, measureDuration } from '../../tools/utils';
7+
8+
const failCommand: FailCommandFailPoint = {
9+
configureFailPoint: 'failCommand',
10+
mode: {
11+
times: 13
12+
},
13+
data: {
14+
failCommands: ['commitTransaction'],
15+
errorCode: 251 // no such transaction
16+
}
17+
};
18+
19+
describe('Retry Backoff is Enforced', function () {
20+
// 1. let client be a MongoClient
21+
let client: MongoClient;
22+
23+
// 2. let coll be a collection
24+
let collection: Collection;
25+
26+
beforeEach(async function () {
27+
client = this.configuration.newClient();
28+
collection = client.db('foo').collection('bar');
29+
});
30+
31+
afterEach(async function () {
32+
sinon.restore();
33+
await client?.close();
34+
});
35+
36+
test(
37+
'works',
38+
{
39+
requires: {
40+
mongodb: '>=4.4', // failCommand
41+
topology: '!single' // transactions can't run on standalone servers
42+
}
43+
},
44+
async function () {
45+
const randomStub = sinon.stub(Math, 'random');
46+
47+
// 3.i Configure the random number generator used for jitter to always return 0
48+
randomStub.returns(0);
49+
50+
// 3.ii Configure a fail point that forces 13 retries
51+
await configureFailPoint(this.configuration, failCommand);
52+
53+
// 3.iii
54+
const callback = async (s: ClientSession) => {
55+
await collection.insertOne({}, { session: s });
56+
};
57+
58+
// 3.iv Let no_backoff_time be the duration of the withTransaction API call
59+
const { duration: noBackoffTime } = await measureDuration(() => {
60+
return client.withSession(async s => {
61+
await s.withTransaction(callback);
62+
});
63+
});
64+
65+
// 4.i Configure the random number generator used for jitter to always return 1.
66+
randomStub.returns(1);
67+
68+
// 4.ii Configure a fail point that forces 13 retries like in step 3.2.
69+
await configureFailPoint(this.configuration, failCommand);
70+
71+
// 4.iii Use the same callback defined in 3.3.
72+
// 4.iv Let with_backoff_time be the duration of the withTransaction API call
73+
const { duration: fullBackoffDuration } = await measureDuration(() => {
74+
return client.withSession(async s => {
75+
await s.withTransaction(callback);
76+
});
77+
});
78+
79+
// 5. Compare the two time between the two runs.
80+
// The sum of 13 backoffs is roughly 2.2 seconds. There is a 1-second window to account for potential variance between the two runs.
81+
expect(fullBackoffDuration).to.be.within(
82+
noBackoffTime + 2200 - 1000,
83+
noBackoffTime + 2200 + 1000
84+
);
85+
}
86+
);
87+
});

0 commit comments

Comments
 (0)