diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 31a76ee384..dee8c678de 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -1,5 +1,5 @@ import { Readable, Transform } from 'stream'; -import { callbackify, promisify } from 'util'; +import { promisify } from 'util'; import { type BSONSerializeOptions, type Document, Long, pluckBSONSerializeOptions } from '../bson'; import { @@ -708,7 +708,10 @@ async function next( try { return cursor[kTransform](doc); } catch (error) { - await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }); + await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => { + // `cleanupCursorAsync` should never throw, but if it does we want to throw the original + // error instead. + }); throw error; } } @@ -725,7 +728,9 @@ async function next( if (cursorIsDead(cursor)) { // if the cursor is dead, we clean it up - await cleanupCursorAsync(cursor); + // cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver + // and we should surface the error + await cleanupCursorAsync(cursor, {}); return null; } @@ -740,7 +745,10 @@ async function next( response = await getMore(batchSize); } catch (error) { if (error) { - await cleanupCursorAsync(cursor, { error }); + await cleanupCursorAsync(cursor, { error }).catch(() => { + // `cleanupCursorAsync` should never throw, but if it does we want to throw the original + // error instead. + }); throw error; } } @@ -762,7 +770,10 @@ async function next( // we intentionally clean up the cursor to release its session back into the pool before the cursor // is iterated. This prevents a cursor that is exhausted on the server from holding // onto a session indefinitely until the AbstractCursor is iterated. - await cleanupCursorAsync(cursor); + // + // cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver + // and we should surface the error + await cleanupCursorAsync(cursor, {}); } if (cursor[kDocuments].length === 0 && blocking === false) { @@ -777,20 +788,7 @@ function cursorIsDead(cursor: AbstractCursor): boolean { return !!cursorId && cursorId.isZero(); } -const cleanupCursorAsyncInternal = promisify(cleanupCursor); - -async function cleanupCursorAsync( - cursor: AbstractCursor, - options: { needsToEmitClosed?: boolean; error?: AnyError } = {} -): Promise { - try { - await cleanupCursorAsyncInternal(cursor, options); - } catch { - // `cleanupCursor` never throws but we can't really test that. - // so this is a hack to ensure that any upstream consumers - // can safely guarantee on this wrapper never throwing. - } -} +const cleanupCursorAsync = promisify(cleanupCursor); function cleanupCursor( cursor: AbstractCursor, @@ -802,6 +800,10 @@ function cleanupCursor( const server = cursor[kServer]; const session = cursor[kSession]; const error = options?.error; + + // Cursors only emit closed events once the client-side cursor has been exhausted fully or there + // was an error. Notably, when the server returns a cursor id of 0 and a non-empty batch, we + // cleanup the cursor but don't emit a `close` event. const needsToEmitClosed = options?.needsToEmitClosed ?? cursor[kDocuments].length === 0; if (error) { diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index 3b1e484c0c..4a26c861dd 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -1708,7 +1708,6 @@ describe('Cursor', function () { expect(cursor).property('closed', false); const willClose = once(cursor, 'close'); - const willEnd = once(stream, 'close'); const dataEvents = on(stream, 'data'); @@ -1722,16 +1721,12 @@ describe('Cursor', function () { // After 5 successful data events, destroy stream stream.destroy(); - // We should get a a close event on the stream and a close event on the cursor - // We should **not** get an 'error' event, + // We should get a close event on the stream and a close event on the cursor + // We should **not** get an 'error' or an 'end' event, // the following will throw if either stream or cursor emitted an 'error' event - await Promise.race([ - willEnd, - sleep(100, { ref: false }).then(() => Promise.reject(new Error('end event never emitted'))) - ]); await Promise.race([ willClose, - sleep(100, { ref: false }).then(() => Promise.reject(new Error('close event never emitted'))) + sleep(100).then(() => Promise.reject(new Error('close event never emitted'))) ]); }); @@ -3606,7 +3601,6 @@ describe('Cursor', function () { await collection.insertMany(docs); - // TODO - talk to Neal about this test const cursor = await collection.find({}, { batchSize: 3 }); for (let i = 0; i < 3; ++i) { await cursor.next(); diff --git a/test/integration/node-specific/abstract_cursor.test.ts b/test/integration/node-specific/abstract_cursor.test.ts index dc22cacc55..9814e9037c 100644 --- a/test/integration/node-specific/abstract_cursor.test.ts +++ b/test/integration/node-specific/abstract_cursor.test.ts @@ -117,7 +117,6 @@ describe('class AbstractCursor', function () { expect(doc.name).to.equal('JOHN DOE'); }); - // skipped because these tests fail after throwing uncaught exceptions it(`when the transform throws, Cursor.stream() propagates the error to the user`, async () => { const cursor = collection .find() diff --git a/test/integration/node-specific/cursor_stream.test.js b/test/integration/node-specific/cursor_stream.test.js index 7346a6a9b8..d04d08cac3 100644 --- a/test/integration/node-specific/cursor_stream.test.js +++ b/test/integration/node-specific/cursor_stream.test.js @@ -1,7 +1,7 @@ 'use strict'; const { expect } = require('chai'); const { Binary } = require('../../mongodb'); -const { setTimeout, setImmediate } = require('timers'); +const { setTimeout } = require('timers'); describe('Cursor Streams', function () { let client; @@ -296,7 +296,6 @@ describe('Cursor Streams', function () { const stream = cursor.stream(); stream.on('error', err => (error = err)); cursor.on('close', function () { - // NOTE: use `setImmediate` here because the stream implementation uses `nextTick` to emit the error setTimeout(() => { expect(error).to.exist; client.close(done);