diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index af054104a6b..2af1c983098 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -361,7 +361,7 @@ export abstract class AbstractCursor< return true; } - const doc = await next(this, true); + const doc = await next(this, true, false); if (doc) { this[kDocuments].unshift(doc); @@ -680,16 +680,6 @@ export abstract class AbstractCursor< } } -function nextDocument(cursor: AbstractCursor): T | null { - const doc = cursor[kDocuments].shift(); - - if (doc && cursor[kTransform]) { - return cursor[kTransform](doc) as T; - } - - return doc; -} - /** * @param cursor - the cursor on which to call `next` * @param blocking - a boolean indicating whether or not the cursor should `block` until data @@ -697,31 +687,40 @@ function nextDocument(cursor: AbstractCursor): T | null { * the cursor has been exhausted. In certain scenarios (ChangeStreams, tailable await cursors and * `tryNext`, for example) blocking is necessary because a getMore returning no documents does * not indicate the end of the cursor. + * @param transform - if true, the cursor's transform function is applied to the result document (if the transform exists) * @returns the next document in the cursor, or `null`. When `blocking` is `true`, a `null` document means * the cursor has been exhausted. Otherwise, it means that there is no document available in the cursor's buffer. */ -async function next(cursor: AbstractCursor, blocking: boolean): Promise { +async function next( + cursor: AbstractCursor, + blocking: boolean, + transform = true +): Promise { const cursorId = cursor[kId]; if (cursor.closed) { return null; } if (cursor[kDocuments].length !== 0) { - return nextDocument(cursor); + const doc = cursor[kDocuments].shift(); + + if (doc != null && transform && cursor[kTransform]) { + return cursor[kTransform](doc); + } + + return doc; } if (cursorId == null) { // All cursors must operate within a session, one must be made implicitly if not explicitly provided const init = promisify(cb => cursor[kInit](cb)); await init(); - return next(cursor, blocking); + return next(cursor, blocking, transform); } if (cursorIsDead(cursor)) { - try { - await cleanupCursorAsync(cursor, undefined); - // eslint-disable-next-line no-empty - } catch {} + // if the cursor is dead, we clean it up + await cleanupCursorAsync(cursor); return null; } @@ -735,11 +734,8 @@ async function next(cursor: AbstractCursor, blocking: boolean): Promise(cursor: AbstractCursor, blocking: boolean): Promise( + cursor: AbstractCursor, + options: { needsToEmitClosed?: boolean; error?: AnyError } = {} +): Promise { + try { + await cleanupCursorAsyncInternal(cursor, options); + } catch { + // `cleanupCursor` never throws but we can't really test that. + // so this is a hack to ensure that any upstream consumers + // can safely guarantee on this wrapper never throwing. + } +} function cleanupCursor( cursor: AbstractCursor, diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index 28544267d68..3b1e484c0c0 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -1708,7 +1708,7 @@ describe('Cursor', function () { expect(cursor).property('closed', false); const willClose = once(cursor, 'close'); - const willEnd = once(stream, 'end'); + const willEnd = once(stream, 'close'); const dataEvents = on(stream, 'data'); @@ -1722,16 +1722,16 @@ describe('Cursor', function () { // After 5 successful data events, destroy stream stream.destroy(); - // We should get an end event on the stream and a close event on the cursor + // We should get a a close event on the stream and a close event on the cursor // We should **not** get an 'error' event, // the following will throw if either stream or cursor emitted an 'error' event await Promise.race([ willEnd, - sleep(100).then(() => Promise.reject(new Error('end event never emitted'))) + sleep(100, { ref: false }).then(() => Promise.reject(new Error('end event never emitted'))) ]); await Promise.race([ willClose, - sleep(100).then(() => Promise.reject(new Error('close event never emitted'))) + sleep(100, { ref: false }).then(() => Promise.reject(new Error('close event never emitted'))) ]); }); @@ -3589,7 +3589,7 @@ describe('Cursor', function () { await client.close(); }); - it('should return implicit session to pool when client-side cursor exhausts results after a getMore', function (done) { + it('should return implicit session to pool when client-side cursor exhausts results after a getMore', async function () { const configuration = this.configuration; const client = configuration.newClient({ w: 1 }, { maxPoolSize: 1 }); @@ -3604,25 +3604,22 @@ describe('Cursor', function () { { a: 9, b: 10 } ]; - collection.insertMany(docs, err => { - expect(err).to.not.exist; - const cursor = collection.find({}, { batchSize: 3 }); - cursor.next(function () { - expect(client.s.activeSessions.size).to.equal(1); - cursor.next(function () { - expect(client.s.activeSessions.size).to.equal(1); - cursor.next(function () { - expect(client.s.activeSessions.size).to.equal(1); - cursor.next(function () { - expect(client.s.activeSessions.size).to.equal(0); - cursor.close(() => { - client.close(done); - }); - }); - }); - }); - }); - }); + await collection.insertMany(docs); + + // TODO - talk to Neal about this test + const cursor = await collection.find({}, { batchSize: 3 }); + for (let i = 0; i < 3; ++i) { + await cursor.next(); + expect(client.s.activeSessions.size).to.equal(1); + } + + await cursor.next(); + expect(client.s.activeSessions.size, 'session not checked in after cursor exhausted').to.equal( + 0 + ); + + await cursor.close(); + await client.close(); }); describe('#clone', function () { diff --git a/test/integration/node-specific/cursor_stream.test.js b/test/integration/node-specific/cursor_stream.test.js index 79096838fd1..7346a6a9b82 100644 --- a/test/integration/node-specific/cursor_stream.test.js +++ b/test/integration/node-specific/cursor_stream.test.js @@ -297,10 +297,10 @@ describe('Cursor Streams', function () { stream.on('error', err => (error = err)); cursor.on('close', function () { // NOTE: use `setImmediate` here because the stream implementation uses `nextTick` to emit the error - setImmediate(() => { + setTimeout(() => { expect(error).to.exist; client.close(done); - }); + }, 50); }); stream.pipe(process.stdout);