Skip to content

Commit

Permalink
fix multiple transform bug
Browse files Browse the repository at this point in the history
  • Loading branch information
baileympearson committed Jun 29, 2023
1 parent fb7904b commit dd008e0
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 51 deletions.
67 changes: 42 additions & 25 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ export abstract class AbstractCursor<
return true;
}

const doc = await next<TSchema>(this, true);
const doc = await next<TSchema>(this, true, false);

if (doc) {
this[kDocuments].unshift(doc);
Expand Down Expand Up @@ -680,48 +680,47 @@ export abstract class AbstractCursor<
}
}

function nextDocument<T>(cursor: AbstractCursor<T>): T | null {
const doc = cursor[kDocuments].shift();

if (doc && cursor[kTransform]) {
return cursor[kTransform](doc) as T;
}

return doc;
}

/**
* @param cursor - the cursor on which to call `next`
* @param blocking - a boolean indicating whether or not the cursor should `block` until data
* is available. Generally, this flag is set to `false` because if the getMore returns no documents,
* the cursor has been exhausted. In certain scenarios (ChangeStreams, tailable await cursors and
* `tryNext`, for example) blocking is necessary because a getMore returning no documents does
* not indicate the end of the cursor.
* @param transform - if true, the cursor's transform function is applied to the result document (if the transform exists)
* @returns the next document in the cursor, or `null`. When `blocking` is `true`, a `null` document means
* the cursor has been exhausted. Otherwise, it means that there is no document available in the cursor's buffer.
*/
async function next<T>(cursor: AbstractCursor<T>, blocking: boolean): Promise<T | null> {
async function next<T>(
cursor: AbstractCursor<T>,
blocking: boolean,
transform = true
): Promise<T | null> {
const cursorId = cursor[kId];
if (cursor.closed) {
return null;
}

if (cursor[kDocuments].length !== 0) {
return nextDocument<T>(cursor);
const doc = cursor[kDocuments].shift();

if (doc != null && transform && cursor[kTransform]) {
return cursor[kTransform](doc);
}

return doc;
}

if (cursorId == null) {
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
const init = promisify(cb => cursor[kInit](cb));
await init();
return next(cursor, blocking);
return next(cursor, blocking, transform);
}

if (cursorIsDead(cursor)) {
try {
await cleanupCursorAsync(cursor, undefined);
// eslint-disable-next-line no-empty
} catch {}
// if the cursor is dead, we clean it up
await cleanupCursorAsync(cursor);
return null;
}

Expand All @@ -735,11 +734,8 @@ async function next<T>(cursor: AbstractCursor<T>, blocking: boolean): Promise<T
try {
response = await getMore(batchSize);
} catch (error) {
if (error || cursorIsDead(cursor)) {
try {
await cleanupCursorAsync(cursor, { error });
// eslint-disable-next-line no-empty
} catch {}
if (error) {
await cleanupCursorAsync(cursor, { error });
throw error;
}
}
Expand All @@ -756,19 +752,40 @@ async function next<T>(cursor: AbstractCursor<T>, blocking: boolean): Promise<T
cursor[kId] = cursorId;
}

if (cursorIsDead(cursor)) {
// If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted,
// we intentionally clean up the cursor to release its session back into the pool before the cursor
// is iterated. This prevents a cursor that is exhausted on the server from holding
// onto a session indefinitely until the AbstractCursor is iterated.
await cleanupCursorAsync(cursor);
}

if (cursor[kDocuments].length === 0 && blocking === false) {
return null;
}

return next(cursor, blocking);
return next(cursor, blocking, transform);
}

function cursorIsDead(cursor: AbstractCursor): boolean {
const cursorId = cursor[kId];
return !!cursorId && cursorId.isZero();
}

const cleanupCursorAsync = promisify(cleanupCursor);
const cleanupCursorAsyncInternal = promisify(cleanupCursor);

async function cleanupCursorAsync<T>(
cursor: AbstractCursor<T>,
options: { needsToEmitClosed?: boolean; error?: AnyError } = {}
): Promise<void> {
try {
await cleanupCursorAsyncInternal(cursor, options);
} catch {
// `cleanupCursor` never throws but we can't really test that.
// so this is a hack to ensure that any upstream consumers
// can safely guarantee on this wrapper never throwing.
}
}

function cleanupCursor(
cursor: AbstractCursor,
Expand Down
45 changes: 21 additions & 24 deletions test/integration/crud/misc_cursors.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1708,7 +1708,7 @@ describe('Cursor', function () {
expect(cursor).property('closed', false);

const willClose = once(cursor, 'close');
const willEnd = once(stream, 'end');
const willEnd = once(stream, 'close');

const dataEvents = on(stream, 'data');

Expand All @@ -1722,16 +1722,16 @@ describe('Cursor', function () {
// After 5 successful data events, destroy stream
stream.destroy();

// We should get an end event on the stream and a close event on the cursor
// We should get a a close event on the stream and a close event on the cursor
// We should **not** get an 'error' event,
// the following will throw if either stream or cursor emitted an 'error' event
await Promise.race([
willEnd,
sleep(100).then(() => Promise.reject(new Error('end event never emitted')))
sleep(100, { ref: false }).then(() => Promise.reject(new Error('end event never emitted')))
]);
await Promise.race([
willClose,
sleep(100).then(() => Promise.reject(new Error('close event never emitted')))
sleep(100, { ref: false }).then(() => Promise.reject(new Error('close event never emitted')))
]);
});

Expand Down Expand Up @@ -3589,7 +3589,7 @@ describe('Cursor', function () {
await client.close();
});

it('should return implicit session to pool when client-side cursor exhausts results after a getMore', function (done) {
it('should return implicit session to pool when client-side cursor exhausts results after a getMore', async function () {
const configuration = this.configuration;
const client = configuration.newClient({ w: 1 }, { maxPoolSize: 1 });

Expand All @@ -3604,25 +3604,22 @@ describe('Cursor', function () {
{ a: 9, b: 10 }
];

collection.insertMany(docs, err => {
expect(err).to.not.exist;
const cursor = collection.find({}, { batchSize: 3 });
cursor.next(function () {
expect(client.s.activeSessions.size).to.equal(1);
cursor.next(function () {
expect(client.s.activeSessions.size).to.equal(1);
cursor.next(function () {
expect(client.s.activeSessions.size).to.equal(1);
cursor.next(function () {
expect(client.s.activeSessions.size).to.equal(0);
cursor.close(() => {
client.close(done);
});
});
});
});
});
});
await collection.insertMany(docs);

// TODO - talk to Neal about this test
const cursor = await collection.find({}, { batchSize: 3 });
for (let i = 0; i < 3; ++i) {
await cursor.next();
expect(client.s.activeSessions.size).to.equal(1);
}

await cursor.next();
expect(client.s.activeSessions.size, 'session not checked in after cursor exhausted').to.equal(
0
);

await cursor.close();
await client.close();
});

describe('#clone', function () {
Expand Down
4 changes: 2 additions & 2 deletions test/integration/node-specific/cursor_stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,10 @@ describe('Cursor Streams', function () {
stream.on('error', err => (error = err));
cursor.on('close', function () {
// NOTE: use `setImmediate` here because the stream implementation uses `nextTick` to emit the error
setImmediate(() => {
setTimeout(() => {
expect(error).to.exist;
client.close(done);
});
}, 50);
});

stream.pipe(process.stdout);
Expand Down

0 comments on commit dd008e0

Please sign in to comment.