Skip to content

Commit

Permalink
refactor(NODE-5392): refactor cursor operations to use async/await (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
malikj2000 committed Jul 14, 2023
1 parent 7ade907 commit f697ee1
Show file tree
Hide file tree
Showing 12 changed files with 160 additions and 317 deletions.
38 changes: 14 additions & 24 deletions src/operations/collections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ import { Collection } from '../collection';
import type { Db } from '../db';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import type { Callback } from '../utils';
import { AbstractCallbackOperation, type OperationOptions } from './operation';
import { AbstractOperation, type OperationOptions } from './operation';

export interface CollectionsOptions extends OperationOptions {
nameOnly?: boolean;
}

/** @internal */
export class CollectionsOperation extends AbstractCallbackOperation<Collection[]> {
export class CollectionsOperation extends AbstractOperation<Collection[]> {
override options: CollectionsOptions;
db: Db;

Expand All @@ -20,31 +19,22 @@ export class CollectionsOperation extends AbstractCallbackOperation<Collection[]
this.db = db;
}

override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<Collection[]>
): void {
async execute(server: Server, session: ClientSession | undefined): Promise<Collection[]> {
// Let's get the collection names
this.db
const documents = await this.db
.listCollections(
{},
{ ...this.options, nameOnly: true, readPreference: this.readPreference, session }
)
.toArray()
.then(
documents => {
const collections = [];
for (const { name } of documents) {
if (!name.includes('$')) {
// Filter collections removing any illegal ones
collections.push(new Collection(this.db, name, this.db.s.options));
}
}
// Return the collection objects
callback(undefined, collections);
},
error => callback(error)
);
.toArray();
const collections: Collection[] = [];
for (const { name } of documents) {
if (!name.includes('$')) {
// Filter collections removing any illegal ones
collections.push(new Collection(this.db, name, this.db.s.options));
}
}
// Return the collection objects
return collections;
}
}
18 changes: 9 additions & 9 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import {
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { type Callback, maybeCallback, supportsRetryableWrites } from '../utils';
import { AbstractCallbackOperation, Aspect } from './operation';
import { AbstractOperation, Aspect } from './operation';

const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
'This MongoDB deployment does not support retryable writes. Please add retryWrites=false to your connection string.';

type ResultTypeFromOperation<TOperation> = TOperation extends AbstractCallbackOperation<infer K>
type ResultTypeFromOperation<TOperation> = TOperation extends AbstractOperation<infer K>
? K
: never;

Expand Down Expand Up @@ -61,29 +61,29 @@ export interface ExecutionResult {
* @param callback - The command result callback
*/
export function executeOperation<
T extends AbstractCallbackOperation<TResult>,
T extends AbstractOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(client: MongoClient, operation: T): Promise<TResult>;
export function executeOperation<
T extends AbstractCallbackOperation<TResult>,
T extends AbstractOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(client: MongoClient, operation: T, callback: Callback<TResult>): void;
export function executeOperation<
T extends AbstractCallbackOperation<TResult>,
T extends AbstractOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(client: MongoClient, operation: T, callback?: Callback<TResult>): Promise<TResult> | void;
export function executeOperation<
T extends AbstractCallbackOperation<TResult>,
T extends AbstractOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(client: MongoClient, operation: T, callback?: Callback<TResult>): Promise<TResult> | void {
return maybeCallback(() => executeOperationAsync(client, operation), callback);
}

async function executeOperationAsync<
T extends AbstractCallbackOperation<TResult>,
T extends AbstractOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(client: MongoClient, operation: T): Promise<TResult> {
if (!(operation instanceof AbstractCallbackOperation)) {
if (!(operation instanceof AbstractOperation)) {
// TODO(NODE-3483): Extend MongoRuntimeError
throw new MongoRuntimeError('This method requires a valid operation instance');
}
Expand Down Expand Up @@ -209,7 +209,7 @@ type RetryOptions = {
};

async function retryOperation<
T extends AbstractCallbackOperation<TResult>,
T extends AbstractOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(
operation: T,
Expand Down
39 changes: 17 additions & 22 deletions src/operations/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ import {
type MongoDBNamespace,
normalizeHintField
} from '../utils';
import {
type CollationOptions,
CommandCallbackOperation,
type CommandOperationOptions
} from './command';
import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command';
import { Aspect, defineAspects, type Hint } from './operation';

/**
Expand Down Expand Up @@ -75,7 +71,7 @@ export interface FindOptions<TSchema extends Document = Document>
}

/** @internal */
export class FindOperation extends CommandCallbackOperation<Document> {
export class FindOperation extends CommandOperation<Document> {
/**
* @remarks WriteConcern can still be present on the options because
* we inherit options from the client/db/collection. The
Expand Down Expand Up @@ -106,11 +102,7 @@ export class FindOperation extends CommandCallbackOperation<Document> {
this.filter = filter != null && filter._bsontype === 'ObjectId' ? { _id: filter } : filter;
}

override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<Document>
): void {
override async execute(server: Server, session: ClientSession | undefined): Promise<Document> {
this.server = server;

const options = this.options;
Expand All @@ -120,17 +112,20 @@ export class FindOperation extends CommandCallbackOperation<Document> {
findCommand = decorateWithExplain(findCommand, this.explain);
}

server.command(
this.ns,
findCommand,
{
...this.options,
...this.bsonOptions,
documentsReturnedIn: 'firstBatch',
session
},
callback
);
return server.commandAsync(this.ns, findCommand, {
...this.options,
...this.bsonOptions,
documentsReturnedIn: 'firstBatch',
session
});
}

protected executeCallback(
_server: Server,
_session: ClientSession | undefined,
_callback: Callback<Document>
): void {
throw new Error('Method not implemented.');
}
}

Expand Down
27 changes: 8 additions & 19 deletions src/operations/get_more.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,8 @@ import type { Document, Long } from '../bson';
import { MongoRuntimeError } from '../error';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type Callback, maxWireVersion, type MongoDBNamespace } from '../utils';
import {
AbstractCallbackOperation,
Aspect,
defineAspects,
type OperationOptions
} from './operation';
import { maxWireVersion, type MongoDBNamespace } from '../utils';
import { AbstractOperation, Aspect, defineAspects, type OperationOptions } from './operation';

/** @internal */
export interface GetMoreOptions extends OperationOptions {
Expand Down Expand Up @@ -40,7 +35,7 @@ export interface GetMoreCommand {
}

/** @internal */
export class GetMoreOperation extends AbstractCallbackOperation {
export class GetMoreOperation extends AbstractOperation {
cursorId: Long;
override options: GetMoreOptions;

Expand All @@ -57,26 +52,20 @@ export class GetMoreOperation extends AbstractCallbackOperation {
* Although there is a server already associated with the get more operation, the signature
* for execute passes a server so we will just use that one.
*/
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<Document>
): void {
async execute(server: Server, _session: ClientSession | undefined): Promise<Document> {
if (server !== this.server) {
return callback(
new MongoRuntimeError('Getmore must run on the same server operation began on')
);
throw new MongoRuntimeError('Getmore must run on the same server operation began on');
}

if (this.cursorId == null || this.cursorId.isZero()) {
return callback(new MongoRuntimeError('Unable to iterate cursor with no id'));
throw new MongoRuntimeError('Unable to iterate cursor with no id');
}

const collection = this.ns.collection;
if (collection == null) {
// Cursors should have adopted the namespace returned by MongoDB
// which should always defined a collection name (even a pseudo one, ex. db.aggregate())
return callback(new MongoRuntimeError('A collection name must be determined before getMore'));
throw new MongoRuntimeError('A collection name must be determined before getMore');
}

const getMoreCmd: GetMoreCommand = {
Expand Down Expand Up @@ -104,7 +93,7 @@ export class GetMoreOperation extends AbstractCallbackOperation {
...this.options
};

server.command(this.ns, getMoreCmd, commandOptions, callback);
return server.commandAsync(this.ns, getMoreCmd, commandOptions);
}
}

Expand Down
19 changes: 12 additions & 7 deletions src/operations/indexes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { type Callback, isObject, maxWireVersion, type MongoDBNamespace } from '
import {
type CollationOptions,
CommandCallbackOperation,
CommandOperation,
type CommandOperationOptions,
type OperationParent
} from './command';
Expand Down Expand Up @@ -396,7 +397,7 @@ export interface ListIndexesOptions extends Omit<CommandOperationOptions, 'write
}

/** @internal */
export class ListIndexesOperation extends CommandCallbackOperation<Document> {
export class ListIndexesOperation extends CommandOperation<Document> {
/**
* @remarks WriteConcern can still be present on the options because
* we inherit options from the client/db/collection. The
Expand All @@ -415,11 +416,7 @@ export class ListIndexesOperation extends CommandCallbackOperation<Document> {
this.collectionNamespace = collection.s.namespace;
}

override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<Document>
): void {
override async execute(server: Server, session: ClientSession | undefined): Promise<Document> {
const serverWireVersion = maxWireVersion(server);

const cursor = this.options.batchSize ? { batchSize: this.options.batchSize } : {};
Expand All @@ -432,7 +429,15 @@ export class ListIndexesOperation extends CommandCallbackOperation<Document> {
command.comment = this.options.comment;
}

super.executeCommandCallback(server, session, command, callback);
return super.executeCommand(server, session, command);
}

protected executeCallback(
_server: Server,
_session: ClientSession | undefined,
_callback: Callback<Document>
): void {
throw new Error('Method not implemented.');
}
}

Expand Down
32 changes: 11 additions & 21 deletions src/operations/kill_cursors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,8 @@ import type { Long } from '../bson';
import { MongoRuntimeError } from '../error';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import type { Callback, MongoDBNamespace } from '../utils';
import {
AbstractCallbackOperation,
Aspect,
defineAspects,
type OperationOptions
} from './operation';
import type { MongoDBNamespace } from '../utils';
import { AbstractOperation, Aspect, defineAspects, type OperationOptions } from './operation';

/**
* https://www.mongodb.com/docs/manual/reference/command/killCursors/
Expand All @@ -20,7 +15,7 @@ interface KillCursorsCommand {
comment?: unknown;
}

export class KillCursorsOperation extends AbstractCallbackOperation {
export class KillCursorsOperation extends AbstractOperation {
cursorId: Long;

constructor(cursorId: Long, ns: MongoDBNamespace, server: Server, options: OperationOptions) {
Expand All @@ -30,32 +25,27 @@ export class KillCursorsOperation extends AbstractCallbackOperation {
this.server = server;
}

executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<void>
): void {
async execute(server: Server, session: ClientSession | undefined): Promise<void> {
if (server !== this.server) {
return callback(
new MongoRuntimeError('Killcursor must run on the same server operation began on')
);
throw new MongoRuntimeError('Killcursor must run on the same server operation began on');
}

const killCursors = this.ns.collection;
if (killCursors == null) {
// Cursors should have adopted the namespace returned by MongoDB
// which should always defined a collection name (even a pseudo one, ex. db.aggregate())
return callback(
new MongoRuntimeError('A collection name must be determined before killCursors')
);
throw new MongoRuntimeError('A collection name must be determined before killCursors');
}

const killCursorsCommand: KillCursorsCommand = {
killCursors,
cursors: [this.cursorId]
};

server.command(this.ns, killCursorsCommand, { session }, () => callback());
try {
await server.commandAsync(this.ns, killCursorsCommand, { session });
} catch {
// The driver should never emit errors from killCursors, this is spec-ed behavior
}
}
}

Expand Down
Loading

0 comments on commit f697ee1

Please sign in to comment.