Skip to content

Commit

Permalink
fix lint and refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
baileympearson committed Jun 30, 2023
1 parent 34424ed commit 9a4eecc
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 31 deletions.
40 changes: 21 additions & 19 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Readable, Transform } from 'stream';
import { callbackify, promisify } from 'util';
import { promisify } from 'util';

import { type BSONSerializeOptions, type Document, Long, pluckBSONSerializeOptions } from '../bson';
import {
Expand Down Expand Up @@ -708,7 +708,10 @@ async function next<T>(
try {
return cursor[kTransform](doc);
} catch (error) {
await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true });
await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => {
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
// error instead.
});
throw error;
}
}
Expand All @@ -725,7 +728,9 @@ async function next<T>(

if (cursorIsDead(cursor)) {
// if the cursor is dead, we clean it up
await cleanupCursorAsync(cursor);
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
// and we should surface the error
await cleanupCursorAsync(cursor, {});
return null;
}

Expand All @@ -740,7 +745,10 @@ async function next<T>(
response = await getMore(batchSize);
} catch (error) {
if (error) {
await cleanupCursorAsync(cursor, { error });
await cleanupCursorAsync(cursor, { error }).catch(() => {
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
// error instead.
});
throw error;
}
}
Expand All @@ -762,7 +770,10 @@ async function next<T>(
// we intentionally clean up the cursor to release its session back into the pool before the cursor
// is iterated. This prevents a cursor that is exhausted on the server from holding
// onto a session indefinitely until the AbstractCursor is iterated.
await cleanupCursorAsync(cursor);
//
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
// and we should surface the error
await cleanupCursorAsync(cursor, {});
}

if (cursor[kDocuments].length === 0 && blocking === false) {
Expand All @@ -777,20 +788,7 @@ function cursorIsDead(cursor: AbstractCursor): boolean {
return !!cursorId && cursorId.isZero();
}

const cleanupCursorAsyncInternal = promisify(cleanupCursor);

async function cleanupCursorAsync<T>(
cursor: AbstractCursor<T>,
options: { needsToEmitClosed?: boolean; error?: AnyError } = {}
): Promise<void> {
try {
await cleanupCursorAsyncInternal(cursor, options);
} catch {
// `cleanupCursor` never throws but we can't really test that.
// so this is a hack to ensure that any upstream consumers
// can safely guarantee on this wrapper never throwing.
}
}
const cleanupCursorAsync = promisify(cleanupCursor);

function cleanupCursor(
cursor: AbstractCursor,
Expand All @@ -802,6 +800,10 @@ function cleanupCursor(
const server = cursor[kServer];
const session = cursor[kSession];
const error = options?.error;

// Cursors only emit closed events once the client-side cursor has been exhausted fully or there
// was an error. Notably, when the server returns a cursor id of 0 and a non-empty batch, we
// cleanup the cursor but don't emit a `close` event.
const needsToEmitClosed = options?.needsToEmitClosed ?? cursor[kDocuments].length === 0;

if (error) {
Expand Down
12 changes: 3 additions & 9 deletions test/integration/crud/misc_cursors.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1708,7 +1708,6 @@ describe('Cursor', function () {
expect(cursor).property('closed', false);

const willClose = once(cursor, 'close');
const willEnd = once(stream, 'close');

const dataEvents = on(stream, 'data');

Expand All @@ -1722,16 +1721,12 @@ describe('Cursor', function () {
// After 5 successful data events, destroy stream
stream.destroy();

// We should get a a close event on the stream and a close event on the cursor
// We should **not** get an 'error' event,
// We should get a close event on the stream and a close event on the cursor
// We should **not** get an 'error' or an 'end' event,
// the following will throw if either stream or cursor emitted an 'error' event
await Promise.race([
willEnd,
sleep(100, { ref: false }).then(() => Promise.reject(new Error('end event never emitted')))
]);
await Promise.race([
willClose,
sleep(100, { ref: false }).then(() => Promise.reject(new Error('close event never emitted')))
sleep(100).then(() => Promise.reject(new Error('close event never emitted')))
]);
});

Expand Down Expand Up @@ -3606,7 +3601,6 @@ describe('Cursor', function () {

await collection.insertMany(docs);

// TODO - talk to Neal about this test
const cursor = await collection.find({}, { batchSize: 3 });
for (let i = 0; i < 3; ++i) {
await cursor.next();
Expand Down
1 change: 0 additions & 1 deletion test/integration/node-specific/abstract_cursor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ describe('class AbstractCursor', function () {
expect(doc.name).to.equal('JOHN DOE');
});

// skipped because these tests fail after throwing uncaught exceptions
it(`when the transform throws, Cursor.stream() propagates the error to the user`, async () => {
const cursor = collection
.find()
Expand Down
3 changes: 1 addition & 2 deletions test/integration/node-specific/cursor_stream.test.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict';
const { expect } = require('chai');
const { Binary } = require('../../mongodb');
const { setTimeout, setImmediate } = require('timers');
const { setTimeout } = require('timers');

describe('Cursor Streams', function () {
let client;
Expand Down Expand Up @@ -296,7 +296,6 @@ describe('Cursor Streams', function () {
const stream = cursor.stream();
stream.on('error', err => (error = err));
cursor.on('close', function () {
// NOTE: use `setImmediate` here because the stream implementation uses `nextTick` to emit the error
setTimeout(() => {
expect(error).to.exist;
client.close(done);
Expand Down

0 comments on commit 9a4eecc

Please sign in to comment.