From bb1a2bb6183f7d7fdce6ad53c6e962833e0a5837 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Thu, 19 Sep 2024 16:00:55 +0200 Subject: [PATCH] feat(NODE-6337): implement client bulk write batching --- src/operations/client_bulk_write/executor.ts | 6 +- .../client_bulk_write/results_merger.ts | 12 +- .../client_bulk_write/results_merger.test.ts | 396 +++++++++++------- 3 files changed, 262 insertions(+), 152 deletions(-) diff --git a/src/operations/client_bulk_write/executor.ts b/src/operations/client_bulk_write/executor.ts index bbd356b45a..d82709e950 100644 --- a/src/operations/client_bulk_write/executor.ts +++ b/src/operations/client_bulk_write/executor.ts @@ -70,10 +70,14 @@ async function executeAcknowledged( ): Promise { const resultsMerger = new ClientBulkWriteResultsMerger(options); // For each command will will create and exhaust a cursor for the results. + let currentBatchOffset = 0; for (const command of commands) { const cursor = new ClientBulkWriteCursor(client, command, options); const docs = await cursor.toArray(); - resultsMerger.merge(command.ops.documents, cursor.response, docs); + const operations = command.ops.documents; + resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs); + // Set the new batch index so we can back back to the index in the original models. + currentBatchOffset += operations.length; } return resultsMerger.result; } diff --git a/src/operations/client_bulk_write/results_merger.ts b/src/operations/client_bulk_write/results_merger.ts index b66e709a71..9816d50707 100644 --- a/src/operations/client_bulk_write/results_merger.ts +++ b/src/operations/client_bulk_write/results_merger.ts @@ -44,11 +44,13 @@ export class ClientBulkWriteResultsMerger { /** * Merge the results in the cursor to the existing result. + * @param currentBatchOffset - The offset index to the original models. * @param response - The cursor response. * @param documents - The documents in the cursor. * @returns The current result. */ merge( + currentBatchOffset: number, operations: Document[], response: ClientBulkWriteCursorResponse, documents: Document[] @@ -69,7 +71,9 @@ export class ClientBulkWriteResultsMerger { const operation = operations[document.idx]; // Handle insert results. if ('insert' in operation) { - this.result.insertResults?.set(document.idx, { insertedId: operation.document._id }); + this.result.insertResults?.set(document.idx + currentBatchOffset, { + insertedId: operation.document._id + }); } // Handle update results. if ('update' in operation) { @@ -80,11 +84,13 @@ export class ClientBulkWriteResultsMerger { if (document.upserted) { result.upsertedId = document.upserted._id; } - this.result.updateResults?.set(document.idx, result); + this.result.updateResults?.set(document.idx + currentBatchOffset, result); } // Handle delete results. if ('delete' in operation) { - this.result.deleteResults?.set(document.idx, { deletedCount: document.n }); + this.result.deleteResults?.set(document.idx + currentBatchOffset, { + deletedCount: document.n + }); } } } diff --git a/test/unit/operations/client_bulk_write/results_merger.test.ts b/test/unit/operations/client_bulk_write/results_merger.test.ts index 661a15d4a1..3d046c0456 100644 --- a/test/unit/operations/client_bulk_write/results_merger.test.ts +++ b/test/unit/operations/client_bulk_write/results_merger.test.ts @@ -25,178 +25,278 @@ describe('ClientBulkWriteResultsMerger', function () { describe('#merge', function () { context('when the bulk write is acknowledged', function () { - context('when requesting verbose results', function () { - // An example verbose response from the server without errors: - // { - // cursor: { - // id: Long('0'), - // firstBatch: [ { ok: 1, idx: 0, n: 1 }, { ok: 1, idx: 1, n: 1 } ], - // ns: 'admin.$cmd.bulkWrite' - // }, - // nErrors: 0, - // nInserted: 2, - // nMatched: 0, - // nModified: 0, - // nUpserted: 0, - // nDeleted: 0, - // ok: 1 - // } - context('when there are no errors', function () { - const operations = [ - { insert: 0, document: { _id: 1 } }, - { update: 0 }, - { update: 0 }, - { delete: 0 } - ]; - const documents = [ - { ok: 1, idx: 0, n: 1 }, // Insert - { ok: 1, idx: 1, n: 1, nModified: 1 }, // Update match - { ok: 1, idx: 2, n: 0, upserted: { _id: 1 } }, // Update no match with upsert - { ok: 1, idx: 3, n: 1 } // Delete - ]; - const serverResponse = { - cursor: { - id: new Long('0'), - firstBatch: documents, - ns: 'admin.$cmd.bulkWrite' - }, - nErrors: 0, - nInserted: 1, - nMatched: 1, - nModified: 1, - nUpserted: 1, - nDeleted: 1, - ok: 1 - }; - const response = new ClientBulkWriteCursorResponse(BSON.serialize(serverResponse), 0); - const merger = new ClientBulkWriteResultsMerger({ verboseResults: true }); - let result: ClientBulkWriteResult; - - before(function () { - result = merger.merge(operations, response, documents); - }); + context('when merging on the first batch', function () { + context('when requesting verbose results', function () { + // An example verbose response from the server without errors: + // { + // cursor: { + // id: Long('0'), + // firstBatch: [ { ok: 1, idx: 0, n: 1 }, { ok: 1, idx: 1, n: 1 } ], + // ns: 'admin.$cmd.bulkWrite' + // }, + // nErrors: 0, + // nInserted: 2, + // nMatched: 0, + // nModified: 0, + // nUpserted: 0, + // nDeleted: 0, + // ok: 1 + // } + context('when there are no errors', function () { + const operations = [ + { insert: 0, document: { _id: 1 } }, + { update: 0 }, + { update: 0 }, + { delete: 0 } + ]; + const documents = [ + { ok: 1, idx: 0, n: 1 }, // Insert + { ok: 1, idx: 1, n: 1, nModified: 1 }, // Update match + { ok: 1, idx: 2, n: 0, upserted: { _id: 1 } }, // Update no match with upsert + { ok: 1, idx: 3, n: 1 } // Delete + ]; + const serverResponse = { + cursor: { + id: new Long('0'), + firstBatch: documents, + ns: 'admin.$cmd.bulkWrite' + }, + nErrors: 0, + nInserted: 1, + nMatched: 1, + nModified: 1, + nUpserted: 1, + nDeleted: 1, + ok: 1 + }; + const response = new ClientBulkWriteCursorResponse(BSON.serialize(serverResponse), 0); + const merger = new ClientBulkWriteResultsMerger({ verboseResults: true }); + let result: ClientBulkWriteResult; - it('merges the inserted count', function () { - expect(result.insertedCount).to.equal(1); - }); + before(function () { + result = merger.merge(0, operations, response, documents); + }); - it('sets insert results', function () { - expect(result.insertResults.get(0).insertedId).to.equal(1); - }); + it('merges the inserted count', function () { + expect(result.insertedCount).to.equal(1); + }); - it('merges the upserted count', function () { - expect(result.upsertedCount).to.equal(1); - }); + it('sets insert results', function () { + expect(result.insertResults.get(0).insertedId).to.equal(1); + }); - it('merges the matched count', function () { - expect(result.matchedCount).to.equal(1); - }); + it('merges the upserted count', function () { + expect(result.upsertedCount).to.equal(1); + }); - it('merges the modified count', function () { - expect(result.modifiedCount).to.equal(1); - }); + it('merges the matched count', function () { + expect(result.matchedCount).to.equal(1); + }); - it('sets the update results', function () { - expect(result.updateResults.get(1)).to.deep.equal({ - matchedCount: 1, - modifiedCount: 1 + it('merges the modified count', function () { + expect(result.modifiedCount).to.equal(1); }); - }); - it('sets the upsert results', function () { - expect(result.updateResults.get(2)).to.deep.equal({ - matchedCount: 0, - modifiedCount: 0, - upsertedId: 1 + it('sets the update results', function () { + expect(result.updateResults.get(1)).to.deep.equal({ + matchedCount: 1, + modifiedCount: 1 + }); + }); + + it('sets the upsert results', function () { + expect(result.updateResults.get(2)).to.deep.equal({ + matchedCount: 0, + modifiedCount: 0, + upsertedId: 1 + }); }); - }); - it('merges the deleted count', function () { - expect(result.deletedCount).to.equal(1); + it('merges the deleted count', function () { + expect(result.deletedCount).to.equal(1); + }); + + it('sets the delete results', function () { + expect(result.deleteResults.get(3).deletedCount).to.equal(1); + }); }); + }); + + context('when not requesting verbose results', function () { + // An example verbose response from the server without errors: + // { + // cursor: { + // id: Long('0'), + // firstBatch: [], + // ns: 'admin.$cmd.bulkWrite' + // }, + // nErrors: 0, + // nInserted: 2, + // nMatched: 0, + // nModified: 0, + // nUpserted: 0, + // nDeleted: 0, + // ok: 1 + // } + context('when there are no errors', function () { + const operations = [ + { insert: 0, document: { _id: 1 } }, + { update: 0 }, + { update: 0 }, + { delete: 0 } + ]; + const documents = []; + const serverResponse = { + cursor: { + id: new Long('0'), + firstBatch: documents, + ns: 'admin.$cmd.bulkWrite' + }, + nErrors: 0, + nInserted: 1, + nMatched: 1, + nModified: 1, + nUpserted: 1, + nDeleted: 1, + ok: 1 + }; + const response = new ClientBulkWriteCursorResponse(BSON.serialize(serverResponse), 0); + const merger = new ClientBulkWriteResultsMerger({ verboseResults: false }); + let result: ClientBulkWriteResult; + + before(function () { + result = merger.merge(0, operations, response, documents); + }); - it('sets the delete results', function () { - expect(result.deleteResults.get(3).deletedCount).to.equal(1); + it('merges the inserted count', function () { + expect(result.insertedCount).to.equal(1); + }); + + it('sets no insert results', function () { + expect(result).to.not.have.property('insertResults'); + }); + + it('merges the upserted count', function () { + expect(result.upsertedCount).to.equal(1); + }); + + it('merges the matched count', function () { + expect(result.matchedCount).to.equal(1); + }); + + it('merges the modified count', function () { + expect(result.modifiedCount).to.equal(1); + }); + + it('sets no update results', function () { + expect(result).to.not.have.property('updateResults'); + }); + + it('merges the deleted count', function () { + expect(result.deletedCount).to.equal(1); + }); + + it('sets no delete results', function () { + expect(result).to.not.have.property('deleteResults'); + }); }); }); }); - context('when not requesting verbose results', function () { - // An example verbose response from the server without errors: - // { - // cursor: { - // id: Long('0'), - // firstBatch: [], - // ns: 'admin.$cmd.bulkWrite' - // }, - // nErrors: 0, - // nInserted: 2, - // nMatched: 0, - // nModified: 0, - // nUpserted: 0, - // nDeleted: 0, - // ok: 1 - // } - context('when there are no errors', function () { - const operations = [ - { insert: 0, document: { _id: 1 } }, - { update: 0 }, - { update: 0 }, - { delete: 0 } - ]; - const documents = []; - const serverResponse = { - cursor: { - id: new Long('0'), - firstBatch: documents, - ns: 'admin.$cmd.bulkWrite' - }, - nErrors: 0, - nInserted: 1, - nMatched: 1, - nModified: 1, - nUpserted: 1, - nDeleted: 1, - ok: 1 - }; - const response = new ClientBulkWriteCursorResponse(BSON.serialize(serverResponse), 0); - const merger = new ClientBulkWriteResultsMerger({ verboseResults: false }); - let result: ClientBulkWriteResult; - - before(function () { - result = merger.merge(operations, response, documents); - }); + context('when merging on a later batch', function () { + context('when requesting verbose results', function () { + // An example verbose response from the server without errors: + // { + // cursor: { + // id: Long('0'), + // firstBatch: [ { ok: 1, idx: 0, n: 1 }, { ok: 1, idx: 1, n: 1 } ], + // ns: 'admin.$cmd.bulkWrite' + // }, + // nErrors: 0, + // nInserted: 2, + // nMatched: 0, + // nModified: 0, + // nUpserted: 0, + // nDeleted: 0, + // ok: 1 + // } + context('when there are no errors', function () { + const operations = [ + { insert: 0, document: { _id: 1 } }, + { update: 0 }, + { update: 0 }, + { delete: 0 } + ]; + const documents = [ + { ok: 1, idx: 0, n: 1 }, // Insert + { ok: 1, idx: 1, n: 1, nModified: 1 }, // Update match + { ok: 1, idx: 2, n: 0, upserted: { _id: 1 } }, // Update no match with upsert + { ok: 1, idx: 3, n: 1 } // Delete + ]; + const serverResponse = { + cursor: { + id: new Long('0'), + firstBatch: documents, + ns: 'admin.$cmd.bulkWrite' + }, + nErrors: 0, + nInserted: 1, + nMatched: 1, + nModified: 1, + nUpserted: 1, + nDeleted: 1, + ok: 1 + }; + const response = new ClientBulkWriteCursorResponse(BSON.serialize(serverResponse), 0); + const merger = new ClientBulkWriteResultsMerger({ verboseResults: true }); + let result: ClientBulkWriteResult; - it('merges the inserted count', function () { - expect(result.insertedCount).to.equal(1); - }); + before(function () { + result = merger.merge(20, operations, response, documents); + }); - it('sets no insert results', function () { - expect(result).to.not.have.property('insertResults'); - }); + it('merges the inserted count', function () { + expect(result.insertedCount).to.equal(1); + }); - it('merges the upserted count', function () { - expect(result.upsertedCount).to.equal(1); - }); + it('sets insert results', function () { + expect(result.insertResults.get(20).insertedId).to.equal(1); + }); - it('merges the matched count', function () { - expect(result.matchedCount).to.equal(1); - }); + it('merges the upserted count', function () { + expect(result.upsertedCount).to.equal(1); + }); - it('merges the modified count', function () { - expect(result.modifiedCount).to.equal(1); - }); + it('merges the matched count', function () { + expect(result.matchedCount).to.equal(1); + }); - it('sets no update results', function () { - expect(result).to.not.have.property('updateResults'); - }); + it('merges the modified count', function () { + expect(result.modifiedCount).to.equal(1); + }); - it('merges the deleted count', function () { - expect(result.deletedCount).to.equal(1); - }); + it('sets the update results', function () { + expect(result.updateResults.get(21)).to.deep.equal({ + matchedCount: 1, + modifiedCount: 1 + }); + }); + + it('sets the upsert results', function () { + expect(result.updateResults.get(22)).to.deep.equal({ + matchedCount: 0, + modifiedCount: 0, + upsertedId: 1 + }); + }); - it('sets no delete results', function () { - expect(result).to.not.have.property('deleteResults'); + it('merges the deleted count', function () { + expect(result.deletedCount).to.equal(1); + }); + + it('sets the delete results', function () { + expect(result.deleteResults.get(23).deletedCount).to.equal(1); + }); }); }); });