Skip to content

Commit

Permalink
feat(NODE-6329): client bulk write happy path
Browse files Browse the repository at this point in the history
  • Loading branch information
durran committed Sep 16, 2024
1 parent 20396e1 commit c1363a8
Show file tree
Hide file tree
Showing 22 changed files with 2,408 additions and 5 deletions.
26 changes: 26 additions & 0 deletions src/cmap/wire_protocol/responses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,3 +329,29 @@ export class ExplainedCursorResponse extends CursorResponse {
return this.toObject(options);
}
}

/**
* Client bulk writes have some extra metadata at the top level that needs to be
* included in the result returned to the user.
*/
export class ClientBulkWriteCursorResponse extends CursorResponse {
get insertedCount() {
return this.get('nInserted', BSONType.int, true);
}

get upsertedCount() {
return this.get('nUpserted', BSONType.int, true);
}

get matchedCount() {
return this.get('nMatched', BSONType.int, true);
}

get modifiedCount() {
return this.get('nModified', BSONType.int, true);
}

get deletedCount() {
return this.get('nDeleted', BSONType.int, true);
}
}
64 changes: 64 additions & 0 deletions src/cursor/client_bulk_write_cursor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import type { Document } from '../bson';
import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses';
import type { MongoClient } from '../mongo_client';
import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write';
import { type ClientBulkWriteOptions } from '../operations/client_bulk_write/common';
import { executeOperation } from '../operations/execute_operation';
import type { ClientSession } from '../sessions';
import { mergeOptions, MongoDBNamespace } from '../utils';
import {
AbstractCursor,
type AbstractCursorOptions,
type InitialCursorResponse
} from './abstract_cursor';

/** @public */
export interface ClientBulkWriteCursorOptions
extends AbstractCursorOptions,
ClientBulkWriteOptions {}

/**
* @public
*/
export class ClientBulkWriteCursor extends AbstractCursor {
public readonly command: Document;
/** @internal */
private cursorResponse?: ClientBulkWriteCursorResponse;
/** @internal */
private clientBulkWriteOptions: ClientBulkWriteOptions;

/** @internal */
constructor(client: MongoClient, command: Document, options: ClientBulkWriteOptions = {}) {
super(client, new MongoDBNamespace('admin'), options);

this.command = command;
this.clientBulkWriteOptions = options;
}

get response(): ClientBulkWriteCursorResponse {
if (this.cursorResponse) return this.response;
throw new Error('no cursor response');
}

clone(): ClientBulkWriteCursor {
const clonedOptions = mergeOptions({}, this.clientBulkWriteOptions);
delete clonedOptions.session;
return new ClientBulkWriteCursor(this.client, this.command, {
...clonedOptions
});
}

/** @internal */
async _initialize(session: ClientSession): Promise<InitialCursorResponse> {
const clientBulkWriteOperation = new ClientBulkWriteOperation(this.command, {
...this.clientBulkWriteOptions,
...this.cursorOptions,
session
});

const response = await executeOperation(this.client, clientBulkWriteOperation);
this.cursorResponse = response;

return { server: clientBulkWriteOperation.server, session, response };
}
}
12 changes: 12 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,18 @@ export type {
AggregateOptions,
DB_AGGREGATE_COLLECTION
} from './operations/aggregate';
export type {
AnyClientBulkWriteModel,
ClientBulkWriteOptions,
ClientBulkWriteResult,
ClientDeleteManyModel,
ClientDeleteOneModel,
ClientInsertOneModel,
ClientReplaceOneModel,
ClientUpdateManyModel,
ClientUpdateOneModel,
ClientWriteModel
} from './operations/client_bulk_write/common';
export type {
CollationOptions,
CommandOperation,
Expand Down
18 changes: 18 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ import {
SeverityLevel
} from './mongo_logger';
import { TypedEventEmitter } from './mongo_types';
import {
type AnyClientBulkWriteModel,
type ClientBulkWriteOptions,
type ClientBulkWriteResult
} from './operations/client_bulk_write/common';
import { ClientBulkWriteExecutor } from './operations/client_bulk_write/executor';
import { executeOperation } from './operations/execute_operation';
import { RunAdminCommandOperation } from './operations/run_command';
import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_concern';
Expand Down Expand Up @@ -477,6 +483,18 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
return this.s.bsonOptions;
}

/**
* Executes a client bulk write operation, available on server 8.0+.
* @param models - The client bulk write models.
* @param options - The client bulk write options.
*/
async bulkWrite(
models: AnyClientBulkWriteModel[],
options?: ClientBulkWriteOptions
): Promise<ClientBulkWriteResult> {
return await new ClientBulkWriteExecutor(this, models, options).execute();
}

/**
* Connect to MongoDB using a url
*
Expand Down
41 changes: 41 additions & 0 deletions src/operations/client_bulk_write/client_bulk_write.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { type Document } from 'bson';

import { ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses';
import type { Server } from '../../sdam/server';
import type { ClientSession } from '../../sessions';
import { AbstractOperation, Aspect, defineAspects } from '../operation';
import { type ClientBulkWriteOptions } from './common';

export class ClientBulkWriteOperation extends AbstractOperation<ClientBulkWriteCursorResponse> {
command: Document;
override options: ClientBulkWriteOptions;

override get commandName() {
return 'bulkWrite' as const;
}

constructor(command: Document, options: ClientBulkWriteOptions) {
super(options);
this.command = command;
this.options = options;
}

override async execute(
server: Server,
session: ClientSession | undefined
): Promise<ClientBulkWriteCursorResponse> {
return await server.command(
this.ns,
this.command,
{
...this.options,
...this.bsonOptions,
documentsReturnedIn: 'firstBatch',
session
},
ClientBulkWriteCursorResponse
);
}
}

defineAspects(ClientBulkWriteOperation, [Aspect.WRITE_OPERATION]);
74 changes: 74 additions & 0 deletions src/operations/client_bulk_write/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,77 @@ export type AnyClientBulkWriteModel =
| ClientUpdateManyModel
| ClientDeleteOneModel
| ClientDeleteManyModel;

/** @public */
export interface ClientBulkWriteResult {
/**
* The total number of documents inserted across all insert operations.
*/
insertedCount: number;
/**
* The total number of documents upserted across all update operations.
*/
upsertedCount: number;
/**
* The total number of documents matched across all update operations.
*/
matchedCount: number;
/**
* The total number of documents modified across all update operations.
*/
modifiedCount: number;
/**
* The total number of documents deleted across all delete operations.
*/
deletedCount: number;
}

export interface VerboseClientBulkWriteResult extends ClientBulkWriteResult {
/**
* The results of each individual insert operation that was successfully performed.
*/
insertResults: Map<number, ClientInsertOneResult>;
/**
* The results of each individual update operation that was successfully performed.
*/
updateResults: Map<number, ClientUpdateResult>;
/**
* The results of each individual delete operation that was successfully performed.
*/
deleteResults: Map<number, ClientDeleteResult>;
}

export interface ClientInsertOneResult {
/**
* The _id of the inserted document.
*/
insertedId: any;
}

export interface ClientUpdateResult {
/**
* The number of documents that matched the filter.
*/
matchedCount: number;

/**
* The number of documents that were modified.
*/
modifiedCount: number;

/**
* The _id field of the upserted document if an upsert occurred.
*
* It MUST be possible to discern between a BSON Null upserted ID value and this field being
* unset. If necessary, drivers MAY add a didUpsert boolean field to differentiate between
* these two cases.
*/
upsertedId?: any;
}

export interface ClientDeleteResult {
/**
* The number of documents that were deleted.
*/
deletedCount: number;
}
42 changes: 42 additions & 0 deletions src/operations/client_bulk_write/executor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor';
import { type MongoClient } from '../../mongo_client';
import { ClientBulkWriteCommandBuilder } from './command_builder';
import {
type AnyClientBulkWriteModel,
type ClientBulkWriteOptions,
type ClientBulkWriteResult
} from './common';
import { ClientBulkWriteResultsMerger } from './results_merger';

/**
* Responsible for executing a client bulk write.
* @internal
*/
export class ClientBulkWriteExecutor {
client: MongoClient;
options: ClientBulkWriteOptions;
operations: AnyClientBulkWriteModel[];

constructor(
client: MongoClient,
operations: AnyClientBulkWriteModel[],
options?: ClientBulkWriteOptions
) {
this.client = client;
this.options = options || {};
this.operations = operations;
}

async execute(): Promise<ClientBulkWriteResult> {
const commmandBuilder = new ClientBulkWriteCommandBuilder(this.operations, this.options);
const commands = commmandBuilder.buildCommands();
const resultsMerger = new ClientBulkWriteResultsMerger(this.options);
for (const command of commands) {
const cursor = new ClientBulkWriteCursor(this.client, command, this.options);
for (const docs of await cursor.toArray()) {
resultsMerger.merge(cursor.response, docs);
}
}
return resultsMerger.result;
}
}
52 changes: 52 additions & 0 deletions src/operations/client_bulk_write/results_merger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { type Document } from '../../bson';
import { type ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses';
import {
type ClientBulkWriteOptions,
type ClientBulkWriteResult,
type VerboseClientBulkWriteResult
} from './common';

/**
* Merges client bulk write cursor responses together into a single result.
* @internal
*/
export class ClientBulkWriteResultsMerger {
result: ClientBulkWriteResult | VerboseClientBulkWriteResult;
options: ClientBulkWriteOptions;

/**
* Instantiate the merger.
* @param options - The options.
*/
constructor(options: ClientBulkWriteOptions) {
this.options = options;
this.result = {
insertedCount: 0,
upsertedCount: 0,
matchedCount: 0,
modifiedCount: 0,
deletedCount: 0
};
}

/**
* Merge the results in the cursor to the existing result.
* @param response - The cursor response.
* @param documents - The documents in the cursor.
* @returns The current result.
*/
merge(response: ClientBulkWriteCursorResponse, documents: Document[]): ClientBulkWriteResult {
// Update the counts from the cursor response.
this.result.insertedCount += response.insertedCount;
this.result.upsertedCount += response.upsertedCount;
this.result.matchedCount += response.matchedCount;
this.result.modifiedCount += response.modifiedCount;
this.result.deletedCount += response.deletedCount;

// Iterate all the documents in the cursor and update the result.
for (const document of documents) {
document;
}
return this.result;
}
}
5 changes: 1 addition & 4 deletions test/integration/crud/crud.spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,14 @@ import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';

const clientBulkWriteTests = new RegExp(
[
'client bulk write delete with collation',
'client bulk write delete with hint',
'client bulkWrite operations support errorResponse assertions',
'an individual operation fails during an ordered bulkWrite',
'an individual operation fails during an unordered bulkWrite',
'detailed results are omitted from error when verboseResults is false',
'a top-level failure occurs during a bulkWrite',
'a bulk write with only errors does not report a partial result',
'an empty list of write models is a client-side error',
'a write concern error occurs during a bulkWrite',
'client bulkWrite'
'a write concern error occurs during a bulkWrite'
].join('|')
);

Expand Down
1 change: 1 addition & 0 deletions test/mongodb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ export * from '../src/operations/aggregate';
export * from '../src/operations/bulk_write';
export * from '../src/operations/client_bulk_write/command_builder';
export * from '../src/operations/client_bulk_write/common';
export * from '../src/operations/client_bulk_write/results_merger';
export * from '../src/operations/collections';
export * from '../src/operations/command';
export * from '../src/operations/count';
Expand Down
Loading

0 comments on commit c1363a8

Please sign in to comment.