Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(NODE-5352): refactor AbstractOperation to use async #3729

Merged
merged 23 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import type { CollationOptions, CommandOperationOptions } from '../operations/co
import { DeleteOperation, type DeleteStatement, makeDeleteStatement } from '../operations/delete';
import { executeOperation } from '../operations/execute_operation';
import { InsertOperation } from '../operations/insert';
import { AbstractOperation, type Hint } from '../operations/operation';
import { AbstractCallbackOperation, type Hint } from '../operations/operation';
import { makeUpdateStatement, UpdateOperation, type UpdateStatement } from '../operations/update';
import type { Server } from '../sdam/server';
import type { Topology } from '../sdam/topology';
Expand Down Expand Up @@ -881,14 +881,18 @@ export interface BulkWriteOptions extends CommandOperationOptions {
* We would like this logic to simply live inside the BulkWriteOperation class
* @internal
*/
class BulkWriteShimOperation extends AbstractOperation {
class BulkWriteShimOperation extends AbstractCallbackOperation {
bulkOperation: BulkOperationBase;
constructor(bulkOperation: BulkOperationBase, options: BulkWriteOptions) {
super(options);
this.bulkOperation = bulkOperation;
}

execute(server: Server, session: ClientSession | undefined, callback: Callback<any>): void {
executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<any>
): void {
if (this.options.session == null) {
// An implicit session could have been created by 'executeOperation'
// So if we stick it on finalOptions here, each bulk operation
Expand Down
7 changes: 6 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,12 @@ export type {
export type { InsertManyResult, InsertOneOptions, InsertOneResult } from './operations/insert';
export type { CollectionInfo, ListCollectionsOptions } from './operations/list_collections';
export type { ListDatabasesOptions, ListDatabasesResult } from './operations/list_databases';
export type { AbstractOperation, Hint, OperationOptions } from './operations/operation';
export type {
AbstractCallbackOperation,
AbstractOperation,
Hint,
OperationOptions
} from './operations/operation';
export type { ProfilingLevelOptions } from './operations/profiling_level';
export type { RemoveUserOptions } from './operations/remove_user';
export type { RenameOptions } from './operations/rename';
Expand Down
2 changes: 1 addition & 1 deletion src/operations/add_user.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export class AddUserOperation extends CommandOperation<Document> {
this.options = options ?? {};
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<Document>
Expand Down
2 changes: 1 addition & 1 deletion src/operations/aggregate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export class AggregateOperation<T = Document> extends CommandOperation<T> {
this.pipeline.push(stage);
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<T>
Expand Down
6 changes: 3 additions & 3 deletions src/operations/bulk_write.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import type { Collection } from '../collection';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import type { Callback } from '../utils';
import { AbstractOperation, Aspect, defineAspects } from './operation';
import { AbstractCallbackOperation, Aspect, defineAspects } from './operation';

/** @internal */
export class BulkWriteOperation extends AbstractOperation<BulkWriteResult> {
export class BulkWriteOperation extends AbstractCallbackOperation<BulkWriteResult> {
override options: BulkWriteOptions;
collection: Collection;
operations: AnyBulkWriteOperation[];
Expand All @@ -27,7 +27,7 @@ export class BulkWriteOperation extends AbstractOperation<BulkWriteResult> {
this.operations = operations;
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<BulkWriteResult>
Expand Down
6 changes: 3 additions & 3 deletions src/operations/collections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ import type { Db } from '../db';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import type { Callback } from '../utils';
import { AbstractOperation, type OperationOptions } from './operation';
import { AbstractCallbackOperation, type OperationOptions } from './operation';

export interface CollectionsOptions extends OperationOptions {
nameOnly?: boolean;
}

/** @internal */
export class CollectionsOperation extends AbstractOperation<Collection[]> {
export class CollectionsOperation extends AbstractCallbackOperation<Collection[]> {
override options: CollectionsOptions;
db: Db;

Expand All @@ -20,7 +20,7 @@ export class CollectionsOperation extends AbstractOperation<Collection[]> {
this.db = db;
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<Collection[]>
Expand Down
4 changes: 2 additions & 2 deletions src/operations/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
} from '../utils';
import { WriteConcern, type WriteConcernOptions } from '../write_concern';
import type { ReadConcernLike } from './../read_concern';
import { AbstractOperation, Aspect, type OperationOptions } from './operation';
import { AbstractCallbackOperation, Aspect, type OperationOptions } from './operation';

/** @public */
export interface CollationOptions {
Expand Down Expand Up @@ -68,7 +68,7 @@ export interface OperationParent {
}

/** @internal */
export abstract class CommandOperation<T> extends AbstractOperation<T> {
export abstract class CommandOperation<T> extends AbstractCallbackOperation<T> {
override options: CommandOperationOptions;
readConcern?: ReadConcern;
writeConcern?: WriteConcern;
Expand Down
2 changes: 1 addition & 1 deletion src/operations/count.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export class CountOperation extends CommandOperation<number> {
this.query = filter;
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<number>
Expand Down
4 changes: 2 additions & 2 deletions src/operations/count_documents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ export class CountDocumentsOperation extends AggregateOperation<number> {
super(collection.s.namespace, pipeline, options);
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<number>
): void {
super.execute(server, session, (err, result) => {
super.executeCallback(server, session, (err, result) => {
if (err || !result) {
callback(err);
return;
Expand Down
6 changes: 2 additions & 4 deletions src/operations/create_collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
this.name = name;
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<Collection>
Expand Down Expand Up @@ -170,9 +170,7 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
if (encryptedFields) {
// Create the required index for queryable encryption support.
const createIndexOp = new CreateIndexOperation(db, name, { __safeContent__: 1 }, {});
await new Promise<void>((resolve, reject) => {
createIndexOp.execute(server, session, err => (err ? reject(err) : resolve()));
});
await createIndexOp.execute(server, session);
}

return coll;
Expand Down
14 changes: 9 additions & 5 deletions src/operations/delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ export class DeleteOperation extends CommandOperation<DeleteResult> {
return this.statements.every(op => (op.limit != null ? op.limit > 0 : true));
}

override execute(server: Server, session: ClientSession | undefined, callback: Callback): void {
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback
): void {
const options = this.options ?? {};
const ordered = typeof options.ordered === 'boolean' ? options.ordered : true;
const command: Document = {
Expand Down Expand Up @@ -97,12 +101,12 @@ export class DeleteOneOperation extends DeleteOperation {
super(collection.s.namespace, [makeDeleteStatement(filter, { ...options, limit: 1 })], options);
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<DeleteResult>
): void {
super.execute(server, session, (err, res) => {
super.executeCallback(server, session, (err, res) => {
if (err || res == null) return callback(err);
if (res.code) return callback(new MongoServerError(res));
if (res.writeErrors) return callback(new MongoServerError(res.writeErrors[0]));
Expand All @@ -121,12 +125,12 @@ export class DeleteManyOperation extends DeleteOperation {
super(collection.s.namespace, [makeDeleteStatement(filter, options)], options);
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<DeleteResult>
): void {
super.execute(server, session, (err, res) => {
super.executeCallback(server, session, (err, res) => {
if (err || res == null) return callback(err);
if (res.code) return callback(new MongoServerError(res));
if (res.writeErrors) return callback(new MongoServerError(res.writeErrors[0]));
Expand Down
2 changes: 1 addition & 1 deletion src/operations/distinct.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export class DistinctOperation extends CommandOperation<any[]> {
this.query = query;
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<any[]>
Expand Down
4 changes: 2 additions & 2 deletions src/operations/drop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export class DropCollectionOperation extends CommandOperation<boolean> {
this.name = name;
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<boolean>
Expand Down Expand Up @@ -102,7 +102,7 @@ export class DropDatabaseOperation extends CommandOperation<boolean> {
super(db, options);
this.options = options;
}
override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<boolean>
Expand Down
2 changes: 1 addition & 1 deletion src/operations/estimated_document_count.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class EstimatedDocumentCountOperation extends CommandOperation<number> {
this.collectionName = collection.collectionName;
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<number>
Expand Down
2 changes: 1 addition & 1 deletion src/operations/eval.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export class EvalOperation extends CommandOperation<Document> {
});
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<Document>
Expand Down
26 changes: 13 additions & 13 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import {
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { type Callback, maybeCallback, supportsRetryableWrites } from '../utils';
import { AbstractOperation, Aspect } from './operation';
import { AbstractCallbackOperation, Aspect } from './operation';

const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
'This MongoDB deployment does not support retryable writes. Please add retryWrites=false to your connection string.';

type ResultTypeFromOperation<TOperation> = TOperation extends AbstractOperation<infer K>
type ResultTypeFromOperation<TOperation> = TOperation extends AbstractCallbackOperation<infer K>
W-A-James marked this conversation as resolved.
Show resolved Hide resolved
? K
: never;

Expand Down Expand Up @@ -61,29 +61,29 @@ export interface ExecutionResult {
* @param callback - The command result callback
*/
export function executeOperation<
T extends AbstractOperation<TResult>,
T extends AbstractCallbackOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(client: MongoClient, operation: T): Promise<TResult>;
export function executeOperation<
T extends AbstractOperation<TResult>,
T extends AbstractCallbackOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(client: MongoClient, operation: T, callback: Callback<TResult>): void;
export function executeOperation<
T extends AbstractOperation<TResult>,
T extends AbstractCallbackOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(client: MongoClient, operation: T, callback?: Callback<TResult>): Promise<TResult> | void;
export function executeOperation<
T extends AbstractOperation<TResult>,
T extends AbstractCallbackOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(client: MongoClient, operation: T, callback?: Callback<TResult>): Promise<TResult> | void {
return maybeCallback(() => executeOperationAsync(client, operation), callback);
}

async function executeOperationAsync<
T extends AbstractOperation<TResult>,
T extends AbstractCallbackOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(client: MongoClient, operation: T): Promise<TResult> {
if (!(operation instanceof AbstractOperation)) {
if (!(operation instanceof AbstractCallbackOperation)) {
// TODO(NODE-3483): Extend MongoRuntimeError
throw new MongoRuntimeError('This method requires a valid operation instance');
}
Expand Down Expand Up @@ -152,13 +152,13 @@ async function executeOperationAsync<

if (session == null) {
// No session also means it is not retryable, early exit
return operation.executeAsync(server, undefined);
return operation.execute(server, undefined);
}

if (!operation.hasAspect(Aspect.RETRYABLE)) {
// non-retryable operation, early exit
try {
return await operation.executeAsync(server, session);
return await operation.execute(server, session);
} finally {
if (session?.owner != null && session.owner === owner) {
await session.endSession().catch(() => null);
Expand All @@ -184,7 +184,7 @@ async function executeOperationAsync<
}

try {
return await operation.executeAsync(server, session);
return await operation.execute(server, session);
} catch (operationError) {
if (willRetry && operationError instanceof MongoError) {
return await retryOperation(operation, operationError, {
Expand All @@ -209,7 +209,7 @@ type RetryOptions = {
};

async function retryOperation<
T extends AbstractOperation<TResult>,
T extends AbstractCallbackOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(
operation: T,
Expand Down Expand Up @@ -257,7 +257,7 @@ async function retryOperation<
}

try {
return await operation.executeAsync(server, session);
return await operation.execute(server, session);
} catch (retryError) {
if (
retryError instanceof MongoError &&
Expand Down
2 changes: 1 addition & 1 deletion src/operations/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ export class FindOperation extends CommandOperation<Document> {
this.filter = filter != null && filter._bsontype === 'ObjectId' ? { _id: filter } : filter;
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<Document>
Expand Down
2 changes: 1 addition & 1 deletion src/operations/find_and_modify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class FindAndModifyOperation extends CommandOperation<Document> {
this.query = query;
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<Document>
Expand Down
Loading