Skip to content

Commit

Permalink
refactor(NODE-6230): executeOperation to use iterative retry mechanism (
Browse files Browse the repository at this point in the history
  • Loading branch information
W-A-James committed Jul 8, 2024
1 parent 5abf5fc commit 9a5e611
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 131 deletions.
2 changes: 1 addition & 1 deletion src/operations/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
if (this.hasAspect(Aspect.EXPLAINABLE)) {
return this.explain == null;
}
return true;
return super.canRetryWrite;
}

public async executeCommand<T extends MongoDBResponseConstructor>(
Expand Down
260 changes: 132 additions & 128 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import {
} from '../sdam/server_selection';
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { squashError, supportsRetryableWrites } from '../utils';
import { supportsRetryableWrites } from '../utils';
import { AbstractOperation, Aspect } from './operation';

const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
Expand All @@ -45,10 +45,9 @@ type ResultTypeFromOperation<TOperation> = TOperation extends AbstractOperation<
* not provided.
*
* The expectation is that this function:
* - Connects the MongoClient if it has not already been connected
* - Connects the MongoClient if it has not already been connected, see {@link autoConnect}
* - Creates a session if none is provided and cleans up the session it creates
* - Selects a server based on readPreference or various factors
* - Retries an operation if it fails for certain errors, see {@link retryOperation}
* - Tries an operation and retries under certain conditions, see {@link tryOperation}
*
* @typeParam T - The operation's type
* @typeParam TResult - The type of the operation's result, calculated from T
Expand All @@ -65,23 +64,7 @@ export async function executeOperation<
throw new MongoRuntimeError('This method requires a valid operation instance');
}

if (client.topology == null) {
// Auto connect on operation
if (client.s.hasBeenClosed) {
throw new MongoNotConnectedError('Client must be connected before running operations');
}
client.s.options[Symbol.for('@@mdb.skipPingOnConnect')] = true;
try {
await client.connect();
} finally {
delete client.s.options[Symbol.for('@@mdb.skipPingOnConnect')];
}
}

const { topology } = client;
if (topology == null) {
throw new MongoRuntimeError('client.connect did not create a topology but also did not throw');
}
const topology = await autoConnect(client);

// The driver sessions spec mandates that we implicitly create sessions for operations
// that are not explicitly provided with a session.
Expand All @@ -108,7 +91,6 @@ export async function executeOperation<
const inTransaction = !!session?.inTransaction();

const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);
const hasWriteAspect = operation.hasAspect(Aspect.WRITE_OPERATION);

if (
inTransaction &&
Expand All @@ -124,6 +106,73 @@ export async function executeOperation<
session.unpin();
}

try {
return await tryOperation(operation, {
topology,
session,
readPreference
});
} finally {
if (session?.owner != null && session.owner === owner) {
await session.endSession();
}
}
}

/**
* Connects a client if it has not yet been connected
* @internal
*/
async function autoConnect(client: MongoClient): Promise<Topology> {
if (client.topology == null) {
if (client.s.hasBeenClosed) {
throw new MongoNotConnectedError('Client must be connected before running operations');
}
client.s.options[Symbol.for('@@mdb.skipPingOnConnect')] = true;
try {
await client.connect();
if (client.topology == null) {
throw new MongoRuntimeError(
'client.connect did not create a topology but also did not throw'
);
}
return client.topology;
} finally {
delete client.s.options[Symbol.for('@@mdb.skipPingOnConnect')];
}
}
return client.topology;
}

/** @internal */
type RetryOptions = {
session: ClientSession | undefined;
readPreference: ReadPreference;
topology: Topology;
};

/**
* Executes an operation and retries as appropriate
* @internal
*
* @remarks
* Implements behaviour described in [Retryable Reads](https://github.com/mongodb/specifications/blob/master/source/retryable-reads/retryable-reads.md) and [Retryable
* Writes](https://github.com/mongodb/specifications/blob/master/source/retryable-writes/retryable-writes.md) specification
*
* This function:
* - performs initial server selection
* - attempts to execute an operation
* - retries the operation if it meets the criteria for a retryable read or a retryable write
*
* @typeParam T - The operation's type
* @typeParam TResult - The type of the operation's result, calculated from T
*
* @param operation - The operation to execute
* */
async function tryOperation<
T extends AbstractOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(operation: T, { topology, session, readPreference }: RetryOptions): Promise<TResult> {
let selector: ReadPreference | ServerSelector;

if (operation.hasAspect(Aspect.MUST_SELECT_SAME_SERVER)) {
Expand All @@ -139,30 +188,14 @@ export async function executeOperation<
selector = readPreference;
}

const server = await topology.selectServer(selector, {
let server = await topology.selectServer(selector, {
session,
operationName: operation.commandName
});

if (session == null) {
// No session also means it is not retryable, early exit
return await operation.execute(server, undefined);
}

if (!operation.hasAspect(Aspect.RETRYABLE)) {
// non-retryable operation, early exit
try {
return await operation.execute(server, session);
} finally {
if (session?.owner != null && session.owner === owner) {
try {
await session.endSession();
} catch (error) {
squashError(error);
}
}
}
}
const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);
const hasWriteAspect = operation.hasAspect(Aspect.WRITE_OPERATION);
const inTransaction = session?.inTransaction() ?? false;

const willRetryRead = topology.s.options.retryReads && !inTransaction && operation.canRetryRead;

Expand All @@ -172,105 +205,76 @@ export async function executeOperation<
supportsRetryableWrites(server) &&
operation.canRetryWrite;

const willRetry = (hasReadAspect && willRetryRead) || (hasWriteAspect && willRetryWrite);
const willRetry =
operation.hasAspect(Aspect.RETRYABLE) &&
session != null &&
((hasReadAspect && willRetryRead) || (hasWriteAspect && willRetryWrite));

if (hasWriteAspect && willRetryWrite) {
if (hasWriteAspect && willRetryWrite && session != null) {
operation.options.willRetryWrite = true;
session.incrementTransactionNumber();
}

try {
return await operation.execute(server, session);
} catch (operationError) {
if (willRetry && operationError instanceof MongoError) {
return await retryOperation(operation, operationError, {
session,
topology,
selector,
previousServer: server.description
});
}
throw operationError;
} finally {
if (session?.owner != null && session.owner === owner) {
try {
await session.endSession();
} catch (error) {
squashError(error);
}
}
}
}
// TODO(NODE-6231): implement infinite retry within CSOT timeout here
const maxTries = willRetry ? 2 : 1;
let previousOperationError: MongoError | undefined;
let previousServer: ServerDescription | undefined;

/** @internal */
type RetryOptions = {
session: ClientSession;
topology: Topology;
selector: ReadPreference | ServerSelector;
previousServer: ServerDescription;
};
// TODO(NODE-6231): implement infinite retry within CSOT timeout here
for (let tries = 0; tries < maxTries; tries++) {
if (previousOperationError) {
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
throw new MongoServerError({
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
originalError: previousOperationError
});
}

async function retryOperation<
T extends AbstractOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(
operation: T,
originalError: MongoError,
{ session, topology, selector, previousServer }: RetryOptions
): Promise<TResult> {
const isWriteOperation = operation.hasAspect(Aspect.WRITE_OPERATION);
const isReadOperation = operation.hasAspect(Aspect.READ_OPERATION);
if (hasWriteAspect && !isRetryableWriteError(previousOperationError))
throw previousOperationError;

if (isWriteOperation && originalError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
throw new MongoServerError({
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
originalError
});
}
if (hasReadAspect && !isRetryableReadError(previousOperationError))
throw previousOperationError;

if (isWriteOperation && !isRetryableWriteError(originalError)) {
throw originalError;
}

if (isReadOperation && !isRetryableReadError(originalError)) {
throw originalError;
}
if (
previousOperationError instanceof MongoNetworkError &&
operation.hasAspect(Aspect.CURSOR_CREATING) &&
session != null &&
session.isPinned &&
!session.inTransaction()
) {
session.unpin({ force: true, forceClear: true });
}

if (
originalError instanceof MongoNetworkError &&
session.isPinned &&
!session.inTransaction() &&
operation.hasAspect(Aspect.CURSOR_CREATING)
) {
// If we have a cursor and the initial command fails with a network error,
// we can retry it on another connection. So we need to check it back in, clear the
// pool for the service id, and retry again.
session.unpin({ force: true, forceClear: true });
}
server = await topology.selectServer(selector, {
session,
operationName: operation.commandName,
previousServer
});

// select a new server, and attempt to retry the operation
const server = await topology.selectServer(selector, {
session,
operationName: operation.commandName,
previousServer
});
if (hasWriteAspect && !supportsRetryableWrites(server)) {
throw new MongoUnexpectedServerResponseError(
'Selected server does not support retryable writes'
);
}
}

if (isWriteOperation && !supportsRetryableWrites(server)) {
throw new MongoUnexpectedServerResponseError(
'Selected server does not support retryable writes'
);
}
try {
return await operation.execute(server, session);
} catch (operationError) {
if (!(operationError instanceof MongoError)) throw operationError;

try {
return await operation.execute(server, session);
} catch (retryError) {
if (
retryError instanceof MongoError &&
retryError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)
) {
throw originalError;
if (
previousOperationError != null &&
operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)
) {
throw previousOperationError;
}
previousServer = server.description;
previousOperationError = operationError;
}
throw retryError;
}

throw previousOperationError;
}
4 changes: 2 additions & 2 deletions src/operations/operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ export abstract class AbstractOperation<TResult = any> {
}

get canRetryRead(): boolean {
return true;
return this.hasAspect(Aspect.RETRYABLE) && this.hasAspect(Aspect.READ_OPERATION);
}

get canRetryWrite(): boolean {
return true;
return this.hasAspect(Aspect.RETRYABLE) && this.hasAspect(Aspect.WRITE_OPERATION);
}
}

Expand Down

0 comments on commit 9a5e611

Please sign in to comment.