Skip to content

Commit

Permalink
fix(NODE-4788)!: use implementer Writable methods for GridFSBucketWri…
Browse files Browse the repository at this point in the history
…teStream (#3808)

Co-authored-by: Durran Jordan <durran@gmail.com>
  • Loading branch information
nbbeeken and durran authored Aug 15, 2023
1 parent 2fbb715 commit 7955610
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 575 deletions.
72 changes: 16 additions & 56 deletions src/gridfs/download.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,35 +78,15 @@ export interface GridFSBucketReadStreamPrivate {
* Do not instantiate this class directly. Use `openDownloadStream()` instead.
* @public
*/
export class GridFSBucketReadStream extends Readable implements NodeJS.ReadableStream {
export class GridFSBucketReadStream extends Readable {
/** @internal */
s: GridFSBucketReadStreamPrivate;

/**
* An error occurred
* @event
*/
static readonly ERROR = 'error' as const;
/**
* Fires when the stream loaded the file document corresponding to the provided id.
* @event
*/
static readonly FILE = 'file' as const;
/**
* Emitted when a chunk of data is available to be consumed.
* @event
*/
static readonly DATA = 'data' as const;
/**
* Fired when the stream is exhausted (no more data events).
* @event
*/
static readonly END = 'end' as const;
/**
* Fired when the stream is exhausted and the underlying cursor is killed
* @event
*/
static readonly CLOSE = 'close' as const;

/**
* @param chunks - Handle for chunks collection
Expand All @@ -122,7 +102,7 @@ export class GridFSBucketReadStream extends Readable implements NodeJS.ReadableS
filter: Document,
options?: GridFSBucketReadStreamOptions
) {
super();
super({ emitClose: true });
this.s = {
bytesToTrim: 0,
bytesToSkip: 0,
Expand Down Expand Up @@ -185,20 +165,8 @@ export class GridFSBucketReadStream extends Readable implements NodeJS.ReadableS
*/
async abort(): Promise<void> {
this.push(null);
this.destroyed = true;
if (this.s.cursor) {
try {
await this.s.cursor.close();
} finally {
this.emit(GridFSBucketReadStream.CLOSE);
}
} else {
if (!this.s.init) {
// If not initialized, fire close event because we will never
// get a cursor
this.emit(GridFSBucketReadStream.CLOSE);
}
}
this.destroy();
await this.s.cursor?.close();
}
}

Expand All @@ -221,19 +189,15 @@ function doRead(stream: GridFSBucketReadStream): void {
return;
}
if (error) {
stream.emit(GridFSBucketReadStream.ERROR, error);
stream.destroy(error);
return;
}
if (!doc) {
stream.push(null);

stream.s.cursor?.close().then(
() => {
stream.emit(GridFSBucketReadStream.CLOSE);
},
error => {
stream.emit(GridFSBucketReadStream.ERROR, error);
}
() => null,
error => stream.destroy(error)
);
return;
}
Expand All @@ -244,17 +208,15 @@ function doRead(stream: GridFSBucketReadStream): void {
const expectedN = stream.s.expected++;
const expectedLength = Math.min(stream.s.file.chunkSize, bytesRemaining);
if (doc.n > expectedN) {
return stream.emit(
GridFSBucketReadStream.ERROR,
return stream.destroy(
new MongoGridFSChunkError(
`ChunkIsMissing: Got unexpected n: ${doc.n}, expected: ${expectedN}`
)
);
}

if (doc.n < expectedN) {
return stream.emit(
GridFSBucketReadStream.ERROR,
return stream.destroy(
new MongoGridFSChunkError(`ExtraChunk: Got unexpected n: ${doc.n}, expected: ${expectedN}`)
);
}
Expand All @@ -263,16 +225,14 @@ function doRead(stream: GridFSBucketReadStream): void {

if (buf.byteLength !== expectedLength) {
if (bytesRemaining <= 0) {
return stream.emit(
GridFSBucketReadStream.ERROR,
return stream.destroy(
new MongoGridFSChunkError(
`ExtraChunk: Got unexpected n: ${doc.n}, expected file length ${stream.s.file.length} bytes but already read ${stream.s.bytesRead} bytes`
)
);
}

return stream.emit(
GridFSBucketReadStream.ERROR,
return stream.destroy(
new MongoGridFSChunkError(
`ChunkIsWrongSize: Got unexpected length: ${buf.byteLength}, expected: ${expectedLength}`
)
Expand Down Expand Up @@ -332,7 +292,7 @@ function init(stream: GridFSBucketReadStream): void {
doc
}: { error: Error; doc: null } | { error: null; doc: any }) => {
if (error) {
return stream.emit(GridFSBucketReadStream.ERROR, error);
return stream.destroy(error);
}

if (!doc) {
Expand All @@ -343,7 +303,7 @@ function init(stream: GridFSBucketReadStream): void {
// TODO(NODE-3483)
const err = new MongoRuntimeError(errmsg);
err.code = 'ENOENT'; // TODO: NODE-3338 set property as part of constructor
return stream.emit(GridFSBucketReadStream.ERROR, err);
return stream.destroy(err);
}

// If document is empty, kill the stream immediately and don't
Expand All @@ -357,14 +317,14 @@ function init(stream: GridFSBucketReadStream): void {
// If user destroys the stream before we have a cursor, wait
// until the query is done to say we're 'closed' because we can't
// cancel a query.
stream.emit(GridFSBucketReadStream.CLOSE);
stream.destroy();
return;
}

try {
stream.s.bytesToSkip = handleStartOption(stream, doc, stream.s.options);
} catch (error) {
return stream.emit(GridFSBucketReadStream.ERROR, error);
return stream.destroy(error);
}

const filter: Document = { files_id: doc._id };
Expand All @@ -390,7 +350,7 @@ function init(stream: GridFSBucketReadStream): void {
try {
stream.s.bytesToTrim = handleEndOption(stream, doc, stream.s.cursor, stream.s.options);
} catch (error) {
return stream.emit(GridFSBucketReadStream.ERROR, error);
return stream.destroy(error);
}

stream.emit(GridFSBucketReadStream.FILE, doc);
Expand Down
Loading

0 comments on commit 7955610

Please sign in to comment.