Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-5374): do not apply cursor transform in Cursor.hasNext #3746

Merged
merged 11 commits into from
Jul 5, 2023
162 changes: 93 additions & 69 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ export abstract class AbstractCursor<
return true;
}

const doc = await nextAsync<TSchema>(this, true);
const doc = await next<TSchema>(this, true, false);

if (doc) {
this[kDocuments].unshift(doc);
Expand All @@ -377,7 +377,7 @@ export abstract class AbstractCursor<
throw new MongoCursorExhaustedError();
}

return nextAsync(this, true);
return next(this, true);
}

/**
Expand All @@ -388,7 +388,7 @@ export abstract class AbstractCursor<
throw new MongoCursorExhaustedError();
}

return nextAsync(this, false);
return next(this, false);
}

/**
Expand Down Expand Up @@ -680,88 +680,107 @@ export abstract class AbstractCursor<
}
}

function nextDocument<T>(cursor: AbstractCursor<T>): T | null {
const doc = cursor[kDocuments].shift();

if (doc && cursor[kTransform]) {
return cursor[kTransform](doc) as T;
}

return doc;
}

const nextAsync = promisify(
next as <T>(
cursor: AbstractCursor<T>,
blocking: boolean,
callback: (e: Error, r: T | null) => void
) => void
);

/**
* @param cursor - the cursor on which to call `next`
* @param blocking - a boolean indicating whether or not the cursor should `block` until data
* is available. Generally, this flag is set to `false` because if the getMore returns no documents,
* the cursor has been exhausted. In certain scenarios (ChangeStreams, tailable await cursors and
* `tryNext`, for example) blocking is necessary because a getMore returning no documents does
* not indicate the end of the cursor.
* @param callback - callback to return the result to the caller
* @returns
* @param transform - if true, the cursor's transform function is applied to the result document (if the transform exists)
* @returns the next document in the cursor, or `null`. When `blocking` is `true`, a `null` document means
* the cursor has been exhausted. Otherwise, it means that there is no document available in the cursor's buffer.
*/
export function next<T>(
async function next<T>(
cursor: AbstractCursor<T>,
blocking: boolean,
callback: Callback<T | null>
): void {
transform = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we are defaulting to true here as more methods would transform vs. methods that don't (hasNext)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that was my logic. I also considered two other options:

  • providing no default
  • combining blocking and transform into an options object and requiring all callsites to explicitly set them for clarity (i.e., next(this, { blocking: true, transform: false })

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I lean towards providing no default, for internal I prefer to make each call site define its inputs and because this isn't saving us a lot of refactors. I don't feel strongly though, feel free to resolve if we agree to leave it as is.

): Promise<T | null> {
const cursorId = cursor[kId];
if (cursor.closed) {
return callback(undefined, null);
return null;
}

if (cursor[kDocuments].length !== 0) {
callback(undefined, nextDocument<T>(cursor));
return;
const doc = cursor[kDocuments].shift();

if (doc != null && transform && cursor[kTransform]) {
try {
return cursor[kTransform](doc);
} catch (error) {
await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => {
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
// error instead.
});
throw error;
}
}

return doc;
}

if (cursorId == null) {
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
cursor[kInit](err => {
if (err) return callback(err);
return next(cursor, blocking, callback);
});

return;
const init = promisify(cb => cursor[kInit](cb));
await init();
return next(cursor, blocking, transform);
durran marked this conversation as resolved.
Show resolved Hide resolved
}

if (cursorIsDead(cursor)) {
return cleanupCursor(cursor, undefined, () => callback(undefined, null));
// if the cursor is dead, we clean it up
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
// and we should surface the error
await cleanupCursorAsync(cursor, {});
return null;
}

// otherwise need to call getMore
const batchSize = cursor[kOptions].batchSize || 1000;
cursor._getMore(batchSize, (error, response) => {
if (response) {
const cursorId =
typeof response.cursor.id === 'number'
? Long.fromNumber(response.cursor.id)
: typeof response.cursor.id === 'bigint'
? Long.fromBigInt(response.cursor.id)
: response.cursor.id;
const getMore = promisify((batchSize: number, cb: Callback<Document | undefined>) =>
cursor._getMore(batchSize, cb)
);

cursor[kDocuments].pushMany(response.cursor.nextBatch);
cursor[kId] = cursorId;
let response: Document | undefined;
try {
response = await getMore(batchSize);
} catch (error) {
if (error) {
await cleanupCursorAsync(cursor, { error }).catch(() => {
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
// error instead.
});
throw error;
}
}

if (error || cursorIsDead(cursor)) {
return cleanupCursor(cursor, { error }, () => callback(error, nextDocument<T>(cursor)));
}
if (response) {
const cursorId =
typeof response.cursor.id === 'number'
? Long.fromNumber(response.cursor.id)
: typeof response.cursor.id === 'bigint'
? Long.fromBigInt(response.cursor.id)
: response.cursor.id;

if (cursor[kDocuments].length === 0 && blocking === false) {
return callback(undefined, null);
}
cursor[kDocuments].pushMany(response.cursor.nextBatch);
cursor[kId] = cursorId;
}

next(cursor, blocking, callback);
});
if (cursorIsDead(cursor)) {
// If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted,
// we intentionally clean up the cursor to release its session back into the pool before the cursor
// is iterated. This prevents a cursor that is exhausted on the server from holding
// onto a session indefinitely until the AbstractCursor is iterated.
//
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
// and we should surface the error
await cleanupCursorAsync(cursor, {});
}

if (cursor[kDocuments].length === 0 && blocking === false) {
return null;
}

return next(cursor, blocking, transform);
}

function cursorIsDead(cursor: AbstractCursor): boolean {
Expand All @@ -781,6 +800,10 @@ function cleanupCursor(
const server = cursor[kServer];
const session = cursor[kSession];
const error = options?.error;

// Cursors only emit closed events once the client-side cursor has been exhausted fully or there
// was an error. Notably, when the server returns a cursor id of 0 and a non-empty batch, we
// cleanup the cursor but don't emit a `close` event.
const needsToEmitClosed = options?.needsToEmitClosed ?? cursor[kDocuments].length === 0;

if (error) {
Expand Down Expand Up @@ -881,8 +904,21 @@ class ReadableCursorStream extends Readable {
}

private _readNext() {
next(this._cursor, true, (err, result) => {
if (err) {
next(this._cursor, true).then(
result => {
if (result == null) {
this.push(null);
} else if (this.destroyed) {
this._cursor.close().catch(() => null);
} else {
if (this.push(result)) {
return this._readNext();
}

this._readInProgress = false;
}
},
err => {
// NOTE: This is questionable, but we have a test backing the behavior. It seems the
// desired behavior is that a stream ends cleanly when a user explicitly closes
// a client during iteration. Alternatively, we could do the "right" thing and
Expand Down Expand Up @@ -911,18 +947,6 @@ class ReadableCursorStream extends Readable {
// See NODE-4475.
return this.destroy(err);
}

if (result == null) {
this.push(null);
} else if (this.destroyed) {
this._cursor.close().catch(() => null);
} else {
if (this.push(result)) {
return this._readNext();
}

this._readInProgress = false;
}
});
);
}
}
47 changes: 17 additions & 30 deletions test/integration/crud/misc_cursors.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1708,7 +1708,6 @@ describe('Cursor', function () {
expect(cursor).property('closed', false);

const willClose = once(cursor, 'close');
const willEnd = once(stream, 'end');

const dataEvents = on(stream, 'data');

Expand All @@ -1722,13 +1721,9 @@ describe('Cursor', function () {
// After 5 successful data events, destroy stream
stream.destroy();

// We should get an end event on the stream and a close event on the cursor
// We should **not** get an 'error' event,
// We should get a close event on the stream and a close event on the cursor
// We should **not** get an 'error' or an 'end' event,
// the following will throw if either stream or cursor emitted an 'error' event
await Promise.race([
willEnd,
sleep(100).then(() => Promise.reject(new Error('end event never emitted')))
]);
await Promise.race([
willClose,
sleep(100).then(() => Promise.reject(new Error('close event never emitted')))
Expand Down Expand Up @@ -3589,10 +3584,7 @@ describe('Cursor', function () {
await client.close();
});

it('should return implicit session to pool when client-side cursor exhausts results after a getMore', function (done) {
const configuration = this.configuration;
const client = configuration.newClient({ w: 1 }, { maxPoolSize: 1 });

it('should return implicit session to pool when client-side cursor exhausts results after a getMore', async function () {
durran marked this conversation as resolved.
Show resolved Hide resolved
const db = client.db(configuration.db);
const collection = db.collection('cursor_session_tests2');

Expand All @@ -3604,25 +3596,20 @@ describe('Cursor', function () {
{ a: 9, b: 10 }
];

collection.insertMany(docs, err => {
expect(err).to.not.exist;
const cursor = collection.find({}, { batchSize: 3 });
cursor.next(function () {
expect(client.s.activeSessions.size).to.equal(1);
cursor.next(function () {
expect(client.s.activeSessions.size).to.equal(1);
cursor.next(function () {
expect(client.s.activeSessions.size).to.equal(1);
cursor.next(function () {
expect(client.s.activeSessions.size).to.equal(0);
cursor.close(() => {
client.close(done);
});
});
});
});
});
});
addaleax marked this conversation as resolved.
Show resolved Hide resolved
await collection.insertMany(docs);

const cursor = await collection.find({}, { batchSize: 3 });
for (let i = 0; i < 3; ++i) {
await cursor.next();
expect(client.s.activeSessions.size).to.equal(1);
}

await cursor.next();
expect(client.s.activeSessions.size, 'session not checked in after cursor exhausted').to.equal(
0
);

await cursor.close();
});

describe('#clone', function () {
Expand Down
Loading