From e73e32d086fe1681831789633b76819164ae764d Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Thu, 29 Jun 2023 13:53:43 -0600 Subject: [PATCH 01/10] refactor: make cursor internal `next` function async-await --- src/cursor/abstract_cursor.ts | 97 +++++++++++++++++------------------ 1 file changed, 48 insertions(+), 49 deletions(-) diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 2f928c25a7..7481d9b5f2 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -1,5 +1,5 @@ import { Readable, Transform } from 'stream'; -import { promisify } from 'util'; +import { callbackify, promisify } from 'util'; import { type BSONSerializeOptions, type Document, Long, pluckBSONSerializeOptions } from '../bson'; import { @@ -361,7 +361,7 @@ export abstract class AbstractCursor< return true; } - const doc = await nextAsync(this, true); + const doc = await next(this, true); if (doc) { this[kDocuments].unshift(doc); @@ -377,7 +377,7 @@ export abstract class AbstractCursor< throw new MongoCursorExhaustedError(); } - return nextAsync(this, true); + return next(this, true); } /** @@ -388,7 +388,7 @@ export abstract class AbstractCursor< throw new MongoCursorExhaustedError(); } - return nextAsync(this, false); + return next(this, false); } /** @@ -690,14 +690,6 @@ function nextDocument(cursor: AbstractCursor): T | null { return doc; } -const nextAsync = promisify( - next as ( - cursor: AbstractCursor, - blocking: boolean, - callback: (e: Error, r: T | null) => void - ) => void -); - /** * @param cursor - the cursor on which to call `next` * @param blocking - a boolean indicating whether or not the cursor should `block` until data @@ -705,63 +697,70 @@ const nextAsync = promisify( * the cursor has been exhausted. In certain scenarios (ChangeStreams, tailable await cursors and * `tryNext`, for example) blocking is necessary because a getMore returning no documents does * not indicate the end of the cursor. - * @param callback - callback to return the result to the caller - * @returns + * @returns the next document in the cursor, or `null`. When `blocking` is `true`, a `null` document means + * the cursor has been exhausted. Otherwise, it means that there is no document available in the cursor's buffer. */ -export function next( - cursor: AbstractCursor, - blocking: boolean, - callback: Callback -): void { +async function next(cursor: AbstractCursor, blocking: boolean): Promise { const cursorId = cursor[kId]; if (cursor.closed) { - return callback(undefined, null); + return null; } if (cursor[kDocuments].length !== 0) { - callback(undefined, nextDocument(cursor)); - return; + return nextDocument(cursor); } if (cursorId == null) { // All cursors must operate within a session, one must be made implicitly if not explicitly provided - cursor[kInit](err => { - if (err) return callback(err); - return next(cursor, blocking, callback); - }); - - return; + const init = promisify(cb => cursor[kInit](cb)); + await init(); + return next(cursor, blocking); } if (cursorIsDead(cursor)) { - return cleanupCursor(cursor, undefined, () => callback(undefined, null)); + try { + await cleanupCursorAsync(cursor, undefined); + // eslint-disable-next-line no-empty + } catch {} + return null; } // otherwise need to call getMore const batchSize = cursor[kOptions].batchSize || 1000; - cursor._getMore(batchSize, (error, response) => { - if (response) { - const cursorId = - typeof response.cursor.id === 'number' - ? Long.fromNumber(response.cursor.id) - : typeof response.cursor.id === 'bigint' - ? Long.fromBigInt(response.cursor.id) - : response.cursor.id; - - cursor[kDocuments].pushMany(response.cursor.nextBatch); - cursor[kId] = cursorId; - } - + const getMore = promisify((batchSize: number, cb: Callback) => + cursor._getMore(batchSize, cb) + ); + + let response: Document | undefined; + try { + response = await getMore(batchSize); + } catch (error) { if (error || cursorIsDead(cursor)) { - return cleanupCursor(cursor, { error }, () => callback(error, nextDocument(cursor))); + try { + await cleanupCursorAsync(cursor, { error }); + // eslint-disable-next-line no-empty + } catch {} + throw error; } + } - if (cursor[kDocuments].length === 0 && blocking === false) { - return callback(undefined, null); - } + if (response) { + const cursorId = + typeof response.cursor.id === 'number' + ? Long.fromNumber(response.cursor.id) + : typeof response.cursor.id === 'bigint' + ? Long.fromBigInt(response.cursor.id) + : response.cursor.id; + + cursor[kDocuments].pushMany(response.cursor.nextBatch); + cursor[kId] = cursorId; + } + + if (cursor[kDocuments].length === 0 && blocking === false) { + return null; + } - next(cursor, blocking, callback); - }); + return next(cursor, blocking); } function cursorIsDead(cursor: AbstractCursor): boolean { @@ -881,7 +880,7 @@ class ReadableCursorStream extends Readable { } private _readNext() { - next(this._cursor, true, (err, result) => { + callbackify(next)(this._cursor, true, (err, result) => { if (err) { // NOTE: This is questionable, but we have a test backing the behavior. It seems the // desired behavior is that a stream ends cleanly when a user explicitly closes From 3683d0b50650bf1cb6f7521d31acb5f0d71a90ee Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Thu, 29 Jun 2023 14:33:42 -0600 Subject: [PATCH 02/10] readablecursorstream uses async next variant --- src/cursor/abstract_cursor.ts | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 7481d9b5f2..af054104a6 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -880,8 +880,21 @@ class ReadableCursorStream extends Readable { } private _readNext() { - callbackify(next)(this._cursor, true, (err, result) => { - if (err) { + next(this._cursor, true).then( + result => { + if (result == null) { + this.push(null); + } else if (this.destroyed) { + this._cursor.close().catch(() => null); + } else { + if (this.push(result)) { + return this._readNext(); + } + + this._readInProgress = false; + } + }, + err => { // NOTE: This is questionable, but we have a test backing the behavior. It seems the // desired behavior is that a stream ends cleanly when a user explicitly closes // a client during iteration. Alternatively, we could do the "right" thing and @@ -910,18 +923,6 @@ class ReadableCursorStream extends Readable { // See NODE-4475. return this.destroy(err); } - - if (result == null) { - this.push(null); - } else if (this.destroyed) { - this._cursor.close().catch(() => null); - } else { - if (this.push(result)) { - return this._readNext(); - } - - this._readInProgress = false; - } - }); + ); } } From 497b80ebcbfad33b3899aa22a004342eefe52332 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Tue, 27 Jun 2023 15:08:11 -0600 Subject: [PATCH 03/10] test: add integration tests for bug and better integration tests for transform cursor logic --- .../node-specific/abstract_cursor.test.ts | 295 +++++++++++++----- 1 file changed, 222 insertions(+), 73 deletions(-) diff --git a/test/integration/node-specific/abstract_cursor.test.ts b/test/integration/node-specific/abstract_cursor.test.ts index d89f23f3e8..5a546b0a26 100644 --- a/test/integration/node-specific/abstract_cursor.test.ts +++ b/test/integration/node-specific/abstract_cursor.test.ts @@ -1,114 +1,263 @@ import { expect } from 'chai'; +import { once } from 'events'; +import * as sinon from 'sinon'; import { inspect } from 'util'; -import { type Collection, MongoAPIError, type MongoClient } from '../../mongodb'; - -const falseyValues = [0, 0n, NaN, '', false, undefined]; +import { type Collection, type FindCursor, MongoAPIError, type MongoClient } from '../../mongodb'; describe('class AbstractCursor', function () { - let client: MongoClient; + describe('regression tests NODE-5372', function () { + let client: MongoClient; + let collection: Collection; + const docs = [{ count: 0 }, { count: 10 }]; + beforeEach(async function () { + client = this.configuration.newClient(); - let collection: Collection; - beforeEach(async function () { - client = this.configuration.newClient(); + collection = client.db('abstract_cursor_integration').collection('test'); - collection = client.db('abstract_cursor_integration').collection('test'); + await collection.insertMany(docs); + }); - await collection.insertMany(Array.from({ length: 5 }, (_, index) => ({ index }))); - }); + afterEach(async function () { + await collection.deleteMany({}); + await client.close(); + }); - afterEach(async function () { - await collection.deleteMany({}); - await client.close(); - }); + it('cursors can be iterated with hasNext+next', async function () { + const cursor = collection + // sort ensures that the docs in the cursor are in the same order as the docs inserted + .find({}, { sort: { count: 1 } }) + .map(doc => ({ ...doc, count: doc.count + 1 })); - context('toArray() with custom transforms', function () { - for (const value of falseyValues) { - it(`supports mapping to falsey value '${inspect(value)}'`, async function () { - const cursor = collection.find(); - cursor.map(() => value); + for (let count = 0; await cursor.hasNext(); count++) { + const received = await cursor.next(); + const actual = docs[count]; - const result = await cursor.toArray(); + expect(received.count).to.equal(actual.count + 1); + } + }); + }); - const expected = Array.from({ length: 5 }, () => value); - expect(result).to.deep.equal(expected); - }); - } + describe('cursor iteration APIs', function () { + let client: MongoClient; + let collection: Collection; + const transformSpy = sinon.spy(doc => ({ ...doc, name: doc.name.toUpperCase() })); + beforeEach(async function () { + client = this.configuration.newClient(); + + collection = client.db('abstract_cursor_integration').collection('test'); - it('throws when mapping to `null` and cleans up cursor', async function () { - const cursor = collection.find(); - cursor.map(() => null); + await collection.insertMany([{ name: 'john doe' }]); + }); - const error = await cursor.toArray().catch(e => e); + afterEach(async function () { + transformSpy.resetHistory(); - expect(error).be.instanceOf(MongoAPIError); - expect(cursor.closed).to.be.true; + await collection.deleteMany({}); + await client.close(); }); - }); - context('Symbol.asyncIterator() with custom transforms', function () { - for (const value of falseyValues) { - it(`supports mapping to falsey value '${inspect(value)}'`, async function () { - const cursor = collection.find(); - cursor.map(() => value); + describe('tryNext()', function () { + context('when there is a transform on the cursor', function () { + it('does not transform any documents', async function () { + const cursor = collection.find().map(transformSpy); - let count = 0; + await cursor.hasNext(); + expect(transformSpy.called).to.be.false; + }); + }); + }); - for await (const document of cursor) { - expect(document).to.deep.equal(value); - count++; + const operations: ReadonlyArray Promise]> = [ + [ + 'tryNext', + (cursor: FindCursor) => { + return cursor.tryNext(); + } + ], + ['next', (cursor: FindCursor) => cursor.next()], + [ + 'Symbol.asyncIterator().next', + async (cursor: FindCursor) => { + const iterator = cursor[Symbol.asyncIterator](); + const doc = await iterator.next(); + return doc.value; } + ] + ] as const; - expect(count).to.equal(5); + context('when there is a transform on the cursor', function () { + for (const [method, func] of operations) { + it(`${method}() calls the cursor transform when iterated`, async () => { + const cursor = collection.find().map(transformSpy); + + const doc = await func(cursor); + expect(transformSpy).to.have.been.calledOnce; + expect(doc.name).to.equal('JOHN DOE'); + }); + + // skipped because these tests fail after throwing uncaught exceptions + it(`when the transform throws, ${method}() propagates the error to the user`, async () => { + const cursor = collection.find().map(() => { + throw new Error('error thrown in transform'); + }); + + const error = await func(cursor).catch(e => e); + expect(error) + .to.be.instanceOf(Error) + .to.match(/error thrown in transform/); + }); + } + + it('Cursor.stream() calls the cursor transform when iterated', async function () { + const cursor = collection.find().map(transformSpy).stream(); + + const [doc] = await once(cursor, 'data'); + expect(transformSpy).to.have.been.calledOnce; + expect(doc.name).to.equal('JOHN DOE'); }); - } - it('throws when mapping to `null` and cleans up cursor', async function () { - const cursor = collection.find(); - cursor.map(() => null); + // skipped because these tests fail after throwing uncaught exceptions + it(`when the transform throws, Cursor.stream() propagates the error to the user`, async () => { + const cursor = collection + .find() + .map(() => { + throw new Error('error thrown in transform'); + }) + .stream(); - try { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - for await (const document of cursor) { - expect.fail('Expected error to be thrown'); - } - } catch (error) { - expect(error).to.be.instanceOf(MongoAPIError); - expect(cursor.closed).to.be.true; + const error = await once(cursor, 'data').catch(e => e); + expect(error) + .to.be.instanceOf(Error) + .to.match(/error thrown in transform/); + }); + }); + + context('when there is not a transform on the cursor', function () { + for (const [method, func] of operations) { + it(`${method}() returns the documents, unmodified`, async () => { + const cursor = collection.find(); + + const doc = await func(cursor); + expect(doc.name).to.equal('john doe'); + }); } + + it('Cursor.stream() returns the documents, unmodified', async function () { + const cursor = collection.find().stream(); + + const [doc] = await once(cursor, 'data'); + expect(doc.name).to.equal('john doe'); + }); }); }); - context('forEach() with custom transforms', function () { - for (const value of falseyValues) { - it(`supports mapping to falsey value '${inspect(value)}'`, async function () { + describe('custom transforms with falsy values', function () { + let client: MongoClient; + const falseyValues = [0, 0n, NaN, '', false, undefined]; + + let collection: Collection; + beforeEach(async function () { + client = this.configuration.newClient(); + + collection = client.db('abstract_cursor_integration').collection('test'); + + await collection.insertMany(Array.from({ length: 5 }, (_, index) => ({ index }))); + }); + + afterEach(async function () { + await collection.deleteMany({}); + await client.close(); + }); + + context('toArray() with custom transforms', function () { + for (const value of falseyValues) { + it(`supports mapping to falsey value '${inspect(value)}'`, async function () { + const cursor = collection.find(); + cursor.map(() => value); + + const result = await cursor.toArray(); + + const expected = Array.from({ length: 5 }, () => value); + expect(result).to.deep.equal(expected); + }); + } + + it('throws when mapping to `null` and cleans up cursor', async function () { const cursor = collection.find(); - cursor.map(() => value); + cursor.map(() => null); - let count = 0; + const error = await cursor.toArray().catch(e => e); - function transform(value) { - expect(value).to.deep.equal(value); - count++; - } + expect(error).be.instanceOf(MongoAPIError); + expect(cursor.closed).to.be.true; + }); + }); + + context('Symbol.asyncIterator() with custom transforms', function () { + for (const value of falseyValues) { + it(`supports mapping to falsey value '${inspect(value)}'`, async function () { + const cursor = collection.find(); + cursor.map(() => value); + + let count = 0; - await cursor.forEach(transform); + for await (const document of cursor) { + expect(document).to.deep.equal(value); + count++; + } - expect(count).to.equal(5); + expect(count).to.equal(5); + }); + } + + it('throws when mapping to `null` and cleans up cursor', async function () { + const cursor = collection.find(); + cursor.map(() => null); + + try { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const document of cursor) { + expect.fail('Expected error to be thrown'); + } + } catch (error) { + expect(error).to.be.instanceOf(MongoAPIError); + expect(cursor.closed).to.be.true; + } }); - } + }); + + context('forEach() with custom transforms', function () { + for (const value of falseyValues) { + it(`supports mapping to falsey value '${inspect(value)}'`, async function () { + const cursor = collection.find(); + cursor.map(() => value); + + let count = 0; - it('throws when mapping to `null` and cleans up cursor', async function () { - const cursor = collection.find(); - cursor.map(() => null); + function transform(value) { + expect(value).to.deep.equal(value); + count++; + } - function iterator() { - expect.fail('Expected no documents from cursor, received at least one.'); + await cursor.forEach(transform); + + expect(count).to.equal(5); + }); } - const error = await cursor.forEach(iterator).catch(e => e); - expect(error).to.be.instanceOf(MongoAPIError); - expect(cursor.closed).to.be.true; + it('throws when mapping to `null` and cleans up cursor', async function () { + const cursor = collection.find(); + cursor.map(() => null); + + function iterator() { + expect.fail('Expected no documents from cursor, received at least one.'); + } + + const error = await cursor.forEach(iterator).catch(e => e); + expect(error).to.be.instanceOf(MongoAPIError); + expect(cursor.closed).to.be.true; + }); }); }); }); From 53e8cde54a40071f874eb7cd463ec41adf44e94c Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Thu, 29 Jun 2023 15:05:03 -0600 Subject: [PATCH 04/10] fix multiple transform bug --- src/cursor/abstract_cursor.ts | 67 ++++++++++++------- test/integration/crud/misc_cursors.test.js | 45 ++++++------- .../node-specific/cursor_stream.test.js | 4 +- 3 files changed, 65 insertions(+), 51 deletions(-) diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index af054104a6..2af1c98309 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -361,7 +361,7 @@ export abstract class AbstractCursor< return true; } - const doc = await next(this, true); + const doc = await next(this, true, false); if (doc) { this[kDocuments].unshift(doc); @@ -680,16 +680,6 @@ export abstract class AbstractCursor< } } -function nextDocument(cursor: AbstractCursor): T | null { - const doc = cursor[kDocuments].shift(); - - if (doc && cursor[kTransform]) { - return cursor[kTransform](doc) as T; - } - - return doc; -} - /** * @param cursor - the cursor on which to call `next` * @param blocking - a boolean indicating whether or not the cursor should `block` until data @@ -697,31 +687,40 @@ function nextDocument(cursor: AbstractCursor): T | null { * the cursor has been exhausted. In certain scenarios (ChangeStreams, tailable await cursors and * `tryNext`, for example) blocking is necessary because a getMore returning no documents does * not indicate the end of the cursor. + * @param transform - if true, the cursor's transform function is applied to the result document (if the transform exists) * @returns the next document in the cursor, or `null`. When `blocking` is `true`, a `null` document means * the cursor has been exhausted. Otherwise, it means that there is no document available in the cursor's buffer. */ -async function next(cursor: AbstractCursor, blocking: boolean): Promise { +async function next( + cursor: AbstractCursor, + blocking: boolean, + transform = true +): Promise { const cursorId = cursor[kId]; if (cursor.closed) { return null; } if (cursor[kDocuments].length !== 0) { - return nextDocument(cursor); + const doc = cursor[kDocuments].shift(); + + if (doc != null && transform && cursor[kTransform]) { + return cursor[kTransform](doc); + } + + return doc; } if (cursorId == null) { // All cursors must operate within a session, one must be made implicitly if not explicitly provided const init = promisify(cb => cursor[kInit](cb)); await init(); - return next(cursor, blocking); + return next(cursor, blocking, transform); } if (cursorIsDead(cursor)) { - try { - await cleanupCursorAsync(cursor, undefined); - // eslint-disable-next-line no-empty - } catch {} + // if the cursor is dead, we clean it up + await cleanupCursorAsync(cursor); return null; } @@ -735,11 +734,8 @@ async function next(cursor: AbstractCursor, blocking: boolean): Promise(cursor: AbstractCursor, blocking: boolean): Promise( + cursor: AbstractCursor, + options: { needsToEmitClosed?: boolean; error?: AnyError } = {} +): Promise { + try { + await cleanupCursorAsyncInternal(cursor, options); + } catch { + // `cleanupCursor` never throws but we can't really test that. + // so this is a hack to ensure that any upstream consumers + // can safely guarantee on this wrapper never throwing. + } +} function cleanupCursor( cursor: AbstractCursor, diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index 28544267d6..3b1e484c0c 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -1708,7 +1708,7 @@ describe('Cursor', function () { expect(cursor).property('closed', false); const willClose = once(cursor, 'close'); - const willEnd = once(stream, 'end'); + const willEnd = once(stream, 'close'); const dataEvents = on(stream, 'data'); @@ -1722,16 +1722,16 @@ describe('Cursor', function () { // After 5 successful data events, destroy stream stream.destroy(); - // We should get an end event on the stream and a close event on the cursor + // We should get a a close event on the stream and a close event on the cursor // We should **not** get an 'error' event, // the following will throw if either stream or cursor emitted an 'error' event await Promise.race([ willEnd, - sleep(100).then(() => Promise.reject(new Error('end event never emitted'))) + sleep(100, { ref: false }).then(() => Promise.reject(new Error('end event never emitted'))) ]); await Promise.race([ willClose, - sleep(100).then(() => Promise.reject(new Error('close event never emitted'))) + sleep(100, { ref: false }).then(() => Promise.reject(new Error('close event never emitted'))) ]); }); @@ -3589,7 +3589,7 @@ describe('Cursor', function () { await client.close(); }); - it('should return implicit session to pool when client-side cursor exhausts results after a getMore', function (done) { + it('should return implicit session to pool when client-side cursor exhausts results after a getMore', async function () { const configuration = this.configuration; const client = configuration.newClient({ w: 1 }, { maxPoolSize: 1 }); @@ -3604,25 +3604,22 @@ describe('Cursor', function () { { a: 9, b: 10 } ]; - collection.insertMany(docs, err => { - expect(err).to.not.exist; - const cursor = collection.find({}, { batchSize: 3 }); - cursor.next(function () { - expect(client.s.activeSessions.size).to.equal(1); - cursor.next(function () { - expect(client.s.activeSessions.size).to.equal(1); - cursor.next(function () { - expect(client.s.activeSessions.size).to.equal(1); - cursor.next(function () { - expect(client.s.activeSessions.size).to.equal(0); - cursor.close(() => { - client.close(done); - }); - }); - }); - }); - }); - }); + await collection.insertMany(docs); + + // TODO - talk to Neal about this test + const cursor = await collection.find({}, { batchSize: 3 }); + for (let i = 0; i < 3; ++i) { + await cursor.next(); + expect(client.s.activeSessions.size).to.equal(1); + } + + await cursor.next(); + expect(client.s.activeSessions.size, 'session not checked in after cursor exhausted').to.equal( + 0 + ); + + await cursor.close(); + await client.close(); }); describe('#clone', function () { diff --git a/test/integration/node-specific/cursor_stream.test.js b/test/integration/node-specific/cursor_stream.test.js index 79096838fd..7346a6a9b8 100644 --- a/test/integration/node-specific/cursor_stream.test.js +++ b/test/integration/node-specific/cursor_stream.test.js @@ -297,10 +297,10 @@ describe('Cursor Streams', function () { stream.on('error', err => (error = err)); cursor.on('close', function () { // NOTE: use `setImmediate` here because the stream implementation uses `nextTick` to emit the error - setImmediate(() => { + setTimeout(() => { expect(error).to.exist; client.close(done); - }); + }, 50); }); stream.pipe(process.stdout); From 34424ed67c1d8b295e6eae3c5911544c5e562a17 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Thu, 29 Jun 2023 17:08:30 -0600 Subject: [PATCH 05/10] close cursor on error from transform --- src/cursor/abstract_cursor.ts | 7 ++++++- test/integration/node-specific/abstract_cursor.test.ts | 3 ++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 2af1c98309..31a76ee384 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -705,7 +705,12 @@ async function next( const doc = cursor[kDocuments].shift(); if (doc != null && transform && cursor[kTransform]) { - return cursor[kTransform](doc); + try { + return cursor[kTransform](doc); + } catch (error) { + await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }); + throw error; + } } return doc; diff --git a/test/integration/node-specific/abstract_cursor.test.ts b/test/integration/node-specific/abstract_cursor.test.ts index 5a546b0a26..dc22cacc55 100644 --- a/test/integration/node-specific/abstract_cursor.test.ts +++ b/test/integration/node-specific/abstract_cursor.test.ts @@ -96,7 +96,6 @@ describe('class AbstractCursor', function () { expect(doc.name).to.equal('JOHN DOE'); }); - // skipped because these tests fail after throwing uncaught exceptions it(`when the transform throws, ${method}() propagates the error to the user`, async () => { const cursor = collection.find().map(() => { throw new Error('error thrown in transform'); @@ -106,6 +105,7 @@ describe('class AbstractCursor', function () { expect(error) .to.be.instanceOf(Error) .to.match(/error thrown in transform/); + expect(cursor.closed).to.be.true; }); } @@ -130,6 +130,7 @@ describe('class AbstractCursor', function () { expect(error) .to.be.instanceOf(Error) .to.match(/error thrown in transform/); + expect(cursor._cursor).to.have.property('closed', true); }); }); From 9a4eecc0f8d90bd7822d62d309cb907b29433d26 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Fri, 30 Jun 2023 08:18:19 -0600 Subject: [PATCH 06/10] fix lint and refactor --- src/cursor/abstract_cursor.ts | 40 ++++++++++--------- test/integration/crud/misc_cursors.test.js | 12 ++---- .../node-specific/abstract_cursor.test.ts | 1 - .../node-specific/cursor_stream.test.js | 3 +- 4 files changed, 25 insertions(+), 31 deletions(-) diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 31a76ee384..dee8c678de 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -1,5 +1,5 @@ import { Readable, Transform } from 'stream'; -import { callbackify, promisify } from 'util'; +import { promisify } from 'util'; import { type BSONSerializeOptions, type Document, Long, pluckBSONSerializeOptions } from '../bson'; import { @@ -708,7 +708,10 @@ async function next( try { return cursor[kTransform](doc); } catch (error) { - await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }); + await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => { + // `cleanupCursorAsync` should never throw, but if it does we want to throw the original + // error instead. + }); throw error; } } @@ -725,7 +728,9 @@ async function next( if (cursorIsDead(cursor)) { // if the cursor is dead, we clean it up - await cleanupCursorAsync(cursor); + // cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver + // and we should surface the error + await cleanupCursorAsync(cursor, {}); return null; } @@ -740,7 +745,10 @@ async function next( response = await getMore(batchSize); } catch (error) { if (error) { - await cleanupCursorAsync(cursor, { error }); + await cleanupCursorAsync(cursor, { error }).catch(() => { + // `cleanupCursorAsync` should never throw, but if it does we want to throw the original + // error instead. + }); throw error; } } @@ -762,7 +770,10 @@ async function next( // we intentionally clean up the cursor to release its session back into the pool before the cursor // is iterated. This prevents a cursor that is exhausted on the server from holding // onto a session indefinitely until the AbstractCursor is iterated. - await cleanupCursorAsync(cursor); + // + // cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver + // and we should surface the error + await cleanupCursorAsync(cursor, {}); } if (cursor[kDocuments].length === 0 && blocking === false) { @@ -777,20 +788,7 @@ function cursorIsDead(cursor: AbstractCursor): boolean { return !!cursorId && cursorId.isZero(); } -const cleanupCursorAsyncInternal = promisify(cleanupCursor); - -async function cleanupCursorAsync( - cursor: AbstractCursor, - options: { needsToEmitClosed?: boolean; error?: AnyError } = {} -): Promise { - try { - await cleanupCursorAsyncInternal(cursor, options); - } catch { - // `cleanupCursor` never throws but we can't really test that. - // so this is a hack to ensure that any upstream consumers - // can safely guarantee on this wrapper never throwing. - } -} +const cleanupCursorAsync = promisify(cleanupCursor); function cleanupCursor( cursor: AbstractCursor, @@ -802,6 +800,10 @@ function cleanupCursor( const server = cursor[kServer]; const session = cursor[kSession]; const error = options?.error; + + // Cursors only emit closed events once the client-side cursor has been exhausted fully or there + // was an error. Notably, when the server returns a cursor id of 0 and a non-empty batch, we + // cleanup the cursor but don't emit a `close` event. const needsToEmitClosed = options?.needsToEmitClosed ?? cursor[kDocuments].length === 0; if (error) { diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index 3b1e484c0c..4a26c861dd 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -1708,7 +1708,6 @@ describe('Cursor', function () { expect(cursor).property('closed', false); const willClose = once(cursor, 'close'); - const willEnd = once(stream, 'close'); const dataEvents = on(stream, 'data'); @@ -1722,16 +1721,12 @@ describe('Cursor', function () { // After 5 successful data events, destroy stream stream.destroy(); - // We should get a a close event on the stream and a close event on the cursor - // We should **not** get an 'error' event, + // We should get a close event on the stream and a close event on the cursor + // We should **not** get an 'error' or an 'end' event, // the following will throw if either stream or cursor emitted an 'error' event - await Promise.race([ - willEnd, - sleep(100, { ref: false }).then(() => Promise.reject(new Error('end event never emitted'))) - ]); await Promise.race([ willClose, - sleep(100, { ref: false }).then(() => Promise.reject(new Error('close event never emitted'))) + sleep(100).then(() => Promise.reject(new Error('close event never emitted'))) ]); }); @@ -3606,7 +3601,6 @@ describe('Cursor', function () { await collection.insertMany(docs); - // TODO - talk to Neal about this test const cursor = await collection.find({}, { batchSize: 3 }); for (let i = 0; i < 3; ++i) { await cursor.next(); diff --git a/test/integration/node-specific/abstract_cursor.test.ts b/test/integration/node-specific/abstract_cursor.test.ts index dc22cacc55..9814e9037c 100644 --- a/test/integration/node-specific/abstract_cursor.test.ts +++ b/test/integration/node-specific/abstract_cursor.test.ts @@ -117,7 +117,6 @@ describe('class AbstractCursor', function () { expect(doc.name).to.equal('JOHN DOE'); }); - // skipped because these tests fail after throwing uncaught exceptions it(`when the transform throws, Cursor.stream() propagates the error to the user`, async () => { const cursor = collection .find() diff --git a/test/integration/node-specific/cursor_stream.test.js b/test/integration/node-specific/cursor_stream.test.js index 7346a6a9b8..d04d08cac3 100644 --- a/test/integration/node-specific/cursor_stream.test.js +++ b/test/integration/node-specific/cursor_stream.test.js @@ -1,7 +1,7 @@ 'use strict'; const { expect } = require('chai'); const { Binary } = require('../../mongodb'); -const { setTimeout, setImmediate } = require('timers'); +const { setTimeout } = require('timers'); describe('Cursor Streams', function () { let client; @@ -296,7 +296,6 @@ describe('Cursor Streams', function () { const stream = cursor.stream(); stream.on('error', err => (error = err)); cursor.on('close', function () { - // NOTE: use `setImmediate` here because the stream implementation uses `nextTick` to emit the error setTimeout(() => { expect(error).to.exist; client.close(done); From bf9e916605036e409b4abe185bf38d54fb82d1cf Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Fri, 30 Jun 2023 11:36:47 -0600 Subject: [PATCH 07/10] go back to set immediate --- test/integration/node-specific/cursor_stream.test.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/integration/node-specific/cursor_stream.test.js b/test/integration/node-specific/cursor_stream.test.js index d04d08cac3..407d121780 100644 --- a/test/integration/node-specific/cursor_stream.test.js +++ b/test/integration/node-specific/cursor_stream.test.js @@ -1,7 +1,7 @@ 'use strict'; const { expect } = require('chai'); const { Binary } = require('../../mongodb'); -const { setTimeout } = require('timers'); +const { setTimeout, setImmediate } = require('timers'); describe('Cursor Streams', function () { let client; @@ -296,10 +296,10 @@ describe('Cursor Streams', function () { const stream = cursor.stream(); stream.on('error', err => (error = err)); cursor.on('close', function () { - setTimeout(() => { + setImmediate(() => { expect(error).to.exist; client.close(done); - }, 50); + }); }); stream.pipe(process.stdout); From 6e371ad50bb670ddbb6424a769f95f971dc9a94d Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Wed, 5 Jul 2023 10:53:49 -0600 Subject: [PATCH 08/10] address PR comments --- test/integration/crud/misc_cursors.test.js | 4 - .../node-specific/abstract_cursor.test.ts | 106 +++++++----------- 2 files changed, 40 insertions(+), 70 deletions(-) diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index 4a26c861dd..51e244c1f0 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -3585,9 +3585,6 @@ describe('Cursor', function () { }); it('should return implicit session to pool when client-side cursor exhausts results after a getMore', async function () { - const configuration = this.configuration; - const client = configuration.newClient({ w: 1 }, { maxPoolSize: 1 }); - const db = client.db(configuration.db); const collection = db.collection('cursor_session_tests2'); @@ -3613,7 +3610,6 @@ describe('Cursor', function () { ); await cursor.close(); - await client.close(); }); describe('#clone', function () { diff --git a/test/integration/node-specific/abstract_cursor.test.ts b/test/integration/node-specific/abstract_cursor.test.ts index 9814e9037c..a1ba146ce5 100644 --- a/test/integration/node-specific/abstract_cursor.test.ts +++ b/test/integration/node-specific/abstract_cursor.test.ts @@ -57,13 +57,14 @@ describe('class AbstractCursor', function () { await client.close(); }); - describe('tryNext()', function () { + context(`hasNext()`, function () { context('when there is a transform on the cursor', function () { - it('does not transform any documents', async function () { + it(`the transform is NOT called`, async () => { const cursor = collection.find().map(transformSpy); - await cursor.hasNext(); - expect(transformSpy.called).to.be.false; + const hasNext = await cursor.hasNext(); + expect(transformSpy).not.to.have.been.called; + expect(hasNext).to.be.true; }); }); }); @@ -71,85 +72,58 @@ describe('class AbstractCursor', function () { const operations: ReadonlyArray Promise]> = [ [ 'tryNext', - (cursor: FindCursor) => { - return cursor.tryNext(); - } + (cursor: FindCursor) => cursor.tryNext() ], ['next', (cursor: FindCursor) => cursor.next()], [ 'Symbol.asyncIterator().next', async (cursor: FindCursor) => { const iterator = cursor[Symbol.asyncIterator](); - const doc = await iterator.next(); - return doc.value; + return iterator.next().then(({ value }) => value); } - ] + ], + ['Cursor.stream', (cursor: FindCursor) => { + const stream = cursor.stream(); + return once(stream, 'data').then(([doc]) => doc) + }] ] as const; - context('when there is a transform on the cursor', function () { - for (const [method, func] of operations) { - it(`${method}() calls the cursor transform when iterated`, async () => { - const cursor = collection.find().map(transformSpy); - - const doc = await func(cursor); - expect(transformSpy).to.have.been.calledOnce; - expect(doc.name).to.equal('JOHN DOE'); - }); + for (const [method, func] of operations) { + context(`${method}()`, function () { + context('when there is a transform on the cursor', function () { + it(`the transform is called`, async () => { + const cursor = collection.find().map(transformSpy); - it(`when the transform throws, ${method}() propagates the error to the user`, async () => { - const cursor = collection.find().map(() => { - throw new Error('error thrown in transform'); + const doc = await func(cursor); + expect(transformSpy).to.have.been.calledOnce; + expect(doc.name).to.equal('JOHN DOE'); }); - - const error = await func(cursor).catch(e => e); - expect(error) - .to.be.instanceOf(Error) - .to.match(/error thrown in transform/); - expect(cursor.closed).to.be.true; - }); - } - - it('Cursor.stream() calls the cursor transform when iterated', async function () { - const cursor = collection.find().map(transformSpy).stream(); - - const [doc] = await once(cursor, 'data'); - expect(transformSpy).to.have.been.calledOnce; - expect(doc.name).to.equal('JOHN DOE'); - }); - - it(`when the transform throws, Cursor.stream() propagates the error to the user`, async () => { - const cursor = collection - .find() - .map(() => { - throw new Error('error thrown in transform'); + context('when the transform throws', function () { + it(`the error is propagated to the user`, async () => { + const cursor = collection.find().map(() => { + throw new Error('error thrown in transform'); + }); + + const error = await func(cursor).catch(e => e); + expect(error) + .to.be.instanceOf(Error) + .to.match(/error thrown in transform/); + expect(cursor.closed).to.be.true; + }); }) - .stream(); + }); - const error = await once(cursor, 'data').catch(e => e); - expect(error) - .to.be.instanceOf(Error) - .to.match(/error thrown in transform/); - expect(cursor._cursor).to.have.property('closed', true); - }); - }); + context('when there is not a transform on the cursor', function () { + it(`it returns the cursor's documents unmodified`, async () => { + const cursor = collection.find(); - context('when there is not a transform on the cursor', function () { - for (const [method, func] of operations) { - it(`${method}() returns the documents, unmodified`, async () => { - const cursor = collection.find(); - - const doc = await func(cursor); - expect(doc.name).to.equal('john doe'); + const doc = await func(cursor); + expect(doc.name).to.equal('john doe'); + }); }); - } - it('Cursor.stream() returns the documents, unmodified', async function () { - const cursor = collection.find().stream(); - - const [doc] = await once(cursor, 'data'); - expect(doc.name).to.equal('john doe'); }); - }); + } }); describe('custom transforms with falsy values', function () { From 365ca273c67477fd0ea5153aa2f637cb36a944f8 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Wed, 5 Jul 2023 11:49:21 -0600 Subject: [PATCH 09/10] be explicit about arguments to next --- src/cursor/abstract_cursor.ts | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index dee8c678de..97f168faee 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -361,7 +361,7 @@ export abstract class AbstractCursor< return true; } - const doc = await next(this, true, false); + const doc = await next(this, { blocking: true, transform: false }); if (doc) { this[kDocuments].unshift(doc); @@ -377,7 +377,7 @@ export abstract class AbstractCursor< throw new MongoCursorExhaustedError(); } - return next(this, true); + return next(this, { blocking: true, transform: true }); } /** @@ -388,7 +388,7 @@ export abstract class AbstractCursor< throw new MongoCursorExhaustedError(); } - return next(this, false); + return next(this, { blocking: false, transform: true }); } /** @@ -693,8 +693,13 @@ export abstract class AbstractCursor< */ async function next( cursor: AbstractCursor, - blocking: boolean, - transform = true + { + blocking, + transform + }: { + blocking: boolean; + transform: boolean; + } ): Promise { const cursorId = cursor[kId]; if (cursor.closed) { @@ -723,7 +728,7 @@ async function next( // All cursors must operate within a session, one must be made implicitly if not explicitly provided const init = promisify(cb => cursor[kInit](cb)); await init(); - return next(cursor, blocking, transform); + return next(cursor, { blocking, transform }); } if (cursorIsDead(cursor)) { @@ -780,7 +785,7 @@ async function next( return null; } - return next(cursor, blocking, transform); + return next(cursor, { blocking, transform }); } function cursorIsDead(cursor: AbstractCursor): boolean { @@ -904,7 +909,7 @@ class ReadableCursorStream extends Readable { } private _readNext() { - next(this._cursor, true).then( + next(this._cursor, { blocking: true, transform: true }).then( result => { if (result == null) { this.push(null); From a73f192a4da2416222e3fa12011f18d9f6d397ea Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Wed, 5 Jul 2023 12:33:39 -0600 Subject: [PATCH 10/10] fix lint and failing test --- test/integration/crud/misc_cursors.test.js | 2 +- .../node-specific/abstract_cursor.test.ts | 19 +++++++++---------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index 51e244c1f0..0214bc52f1 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -3585,7 +3585,7 @@ describe('Cursor', function () { }); it('should return implicit session to pool when client-side cursor exhausts results after a getMore', async function () { - const db = client.db(configuration.db); + const db = client.db(this.configuration.db); const collection = db.collection('cursor_session_tests2'); const docs = [ diff --git a/test/integration/node-specific/abstract_cursor.test.ts b/test/integration/node-specific/abstract_cursor.test.ts index a1ba146ce5..eab72617f0 100644 --- a/test/integration/node-specific/abstract_cursor.test.ts +++ b/test/integration/node-specific/abstract_cursor.test.ts @@ -70,10 +70,7 @@ describe('class AbstractCursor', function () { }); const operations: ReadonlyArray Promise]> = [ - [ - 'tryNext', - (cursor: FindCursor) => cursor.tryNext() - ], + ['tryNext', (cursor: FindCursor) => cursor.tryNext()], ['next', (cursor: FindCursor) => cursor.next()], [ 'Symbol.asyncIterator().next', @@ -82,10 +79,13 @@ describe('class AbstractCursor', function () { return iterator.next().then(({ value }) => value); } ], - ['Cursor.stream', (cursor: FindCursor) => { - const stream = cursor.stream(); - return once(stream, 'data').then(([doc]) => doc) - }] + [ + 'Cursor.stream', + (cursor: FindCursor) => { + const stream = cursor.stream(); + return once(stream, 'data').then(([doc]) => doc); + } + ] ] as const; for (const [method, func] of operations) { @@ -110,7 +110,7 @@ describe('class AbstractCursor', function () { .to.match(/error thrown in transform/); expect(cursor.closed).to.be.true; }); - }) + }); }); context('when there is not a transform on the cursor', function () { @@ -121,7 +121,6 @@ describe('class AbstractCursor', function () { expect(doc.name).to.equal('john doe'); }); }); - }); } });