diff --git a/src/gridfs/download.ts b/src/gridfs/download.ts index 0053ecc9a0..6736b968f1 100644 --- a/src/gridfs/download.ts +++ b/src/gridfs/download.ts @@ -78,35 +78,15 @@ export interface GridFSBucketReadStreamPrivate { * Do not instantiate this class directly. Use `openDownloadStream()` instead. * @public */ -export class GridFSBucketReadStream extends Readable implements NodeJS.ReadableStream { +export class GridFSBucketReadStream extends Readable { /** @internal */ s: GridFSBucketReadStreamPrivate; - /** - * An error occurred - * @event - */ - static readonly ERROR = 'error' as const; /** * Fires when the stream loaded the file document corresponding to the provided id. * @event */ static readonly FILE = 'file' as const; - /** - * Emitted when a chunk of data is available to be consumed. - * @event - */ - static readonly DATA = 'data' as const; - /** - * Fired when the stream is exhausted (no more data events). - * @event - */ - static readonly END = 'end' as const; - /** - * Fired when the stream is exhausted and the underlying cursor is killed - * @event - */ - static readonly CLOSE = 'close' as const; /** * @param chunks - Handle for chunks collection @@ -122,7 +102,7 @@ export class GridFSBucketReadStream extends Readable implements NodeJS.ReadableS filter: Document, options?: GridFSBucketReadStreamOptions ) { - super(); + super({ emitClose: true }); this.s = { bytesToTrim: 0, bytesToSkip: 0, @@ -185,20 +165,8 @@ export class GridFSBucketReadStream extends Readable implements NodeJS.ReadableS */ async abort(): Promise { this.push(null); - this.destroyed = true; - if (this.s.cursor) { - try { - await this.s.cursor.close(); - } finally { - this.emit(GridFSBucketReadStream.CLOSE); - } - } else { - if (!this.s.init) { - // If not initialized, fire close event because we will never - // get a cursor - this.emit(GridFSBucketReadStream.CLOSE); - } - } + this.destroy(); + await this.s.cursor?.close(); } } @@ -221,19 +189,15 @@ function doRead(stream: GridFSBucketReadStream): void { return; } if (error) { - stream.emit(GridFSBucketReadStream.ERROR, error); + stream.destroy(error); return; } if (!doc) { stream.push(null); stream.s.cursor?.close().then( - () => { - stream.emit(GridFSBucketReadStream.CLOSE); - }, - error => { - stream.emit(GridFSBucketReadStream.ERROR, error); - } + () => null, + error => stream.destroy(error) ); return; } @@ -244,8 +208,7 @@ function doRead(stream: GridFSBucketReadStream): void { const expectedN = stream.s.expected++; const expectedLength = Math.min(stream.s.file.chunkSize, bytesRemaining); if (doc.n > expectedN) { - return stream.emit( - GridFSBucketReadStream.ERROR, + return stream.destroy( new MongoGridFSChunkError( `ChunkIsMissing: Got unexpected n: ${doc.n}, expected: ${expectedN}` ) @@ -253,8 +216,7 @@ function doRead(stream: GridFSBucketReadStream): void { } if (doc.n < expectedN) { - return stream.emit( - GridFSBucketReadStream.ERROR, + return stream.destroy( new MongoGridFSChunkError(`ExtraChunk: Got unexpected n: ${doc.n}, expected: ${expectedN}`) ); } @@ -263,16 +225,14 @@ function doRead(stream: GridFSBucketReadStream): void { if (buf.byteLength !== expectedLength) { if (bytesRemaining <= 0) { - return stream.emit( - GridFSBucketReadStream.ERROR, + return stream.destroy( new MongoGridFSChunkError( `ExtraChunk: Got unexpected n: ${doc.n}, expected file length ${stream.s.file.length} bytes but already read ${stream.s.bytesRead} bytes` ) ); } - return stream.emit( - GridFSBucketReadStream.ERROR, + return stream.destroy( new MongoGridFSChunkError( `ChunkIsWrongSize: Got unexpected length: ${buf.byteLength}, expected: ${expectedLength}` ) @@ -332,7 +292,7 @@ function init(stream: GridFSBucketReadStream): void { doc }: { error: Error; doc: null } | { error: null; doc: any }) => { if (error) { - return stream.emit(GridFSBucketReadStream.ERROR, error); + return stream.destroy(error); } if (!doc) { @@ -343,7 +303,7 @@ function init(stream: GridFSBucketReadStream): void { // TODO(NODE-3483) const err = new MongoRuntimeError(errmsg); err.code = 'ENOENT'; // TODO: NODE-3338 set property as part of constructor - return stream.emit(GridFSBucketReadStream.ERROR, err); + return stream.destroy(err); } // If document is empty, kill the stream immediately and don't @@ -357,14 +317,14 @@ function init(stream: GridFSBucketReadStream): void { // If user destroys the stream before we have a cursor, wait // until the query is done to say we're 'closed' because we can't // cancel a query. - stream.emit(GridFSBucketReadStream.CLOSE); + stream.destroy(); return; } try { stream.s.bytesToSkip = handleStartOption(stream, doc, stream.s.options); } catch (error) { - return stream.emit(GridFSBucketReadStream.ERROR, error); + return stream.destroy(error); } const filter: Document = { files_id: doc._id }; @@ -390,7 +350,7 @@ function init(stream: GridFSBucketReadStream): void { try { stream.s.bytesToTrim = handleEndOption(stream, doc, stream.s.cursor, stream.s.options); } catch (error) { - return stream.emit(GridFSBucketReadStream.ERROR, error); + return stream.destroy(error); } stream.emit(GridFSBucketReadStream.FILE, doc); diff --git a/src/gridfs/upload.ts b/src/gridfs/upload.ts index 8ef76e1a57..9feb814158 100644 --- a/src/gridfs/upload.ts +++ b/src/gridfs/upload.ts @@ -3,7 +3,7 @@ import { Writable } from 'stream'; import type { Document } from '../bson'; import { ObjectId } from '../bson'; import type { Collection } from '../collection'; -import { type AnyError, MongoAPIError, MONGODB_ERROR_CODES, MongoError } from '../error'; +import { MongoAPIError, MONGODB_ERROR_CODES, MongoError } from '../error'; import type { Callback } from '../utils'; import type { WriteConcernOptions } from '../write_concern'; import { WriteConcern } from './../write_concern'; @@ -38,36 +38,58 @@ export interface GridFSBucketWriteStreamOptions extends WriteConcernOptions { * Do not instantiate this class directly. Use `openUploadStream()` instead. * @public */ -export class GridFSBucketWriteStream extends Writable implements NodeJS.WritableStream { +export class GridFSBucketWriteStream extends Writable { bucket: GridFSBucket; + /** A Collection instance where the file's chunks are stored */ chunks: Collection; - filename: string; + /** A Collection instance where the file's GridFSFile document is stored */ files: Collection; + /** The name of the file */ + filename: string; + /** Options controlling the metadata inserted along with the file */ options: GridFSBucketWriteStreamOptions; + /** Indicates the stream is finished uploading */ done: boolean; + /** The ObjectId used for the `_id` field on the GridFSFile document */ id: ObjectId; + /** The number of bytes that each chunk will be limited to */ chunkSizeBytes: number; + /** Space used to store a chunk currently being inserted */ bufToStore: Buffer; + /** Accumulates the number of bytes inserted as the stream uploads chunks */ length: number; + /** Accumulates the number of chunks inserted as the stream uploads file contents */ n: number; + /** Tracks the current offset into the buffered bytes being uploaded */ pos: number; + /** Contains a number of properties indicating the current state of the stream */ state: { + /** If set the stream has ended */ streamEnd: boolean; + /** Indicates the number of chunks that still need to be inserted to exhaust the current buffered data */ outstandingRequests: number; + /** If set an error occurred during insertion */ errored: boolean; + /** If set the stream was intentionally aborted */ aborted: boolean; }; + /** The write concern setting to be used with every insert operation */ writeConcern?: WriteConcern; - - /** @event */ - static readonly CLOSE = 'close'; - /** @event */ - static readonly ERROR = 'error'; /** - * `end()` was called and the write stream successfully wrote the file metadata and all the chunks to MongoDB. - * @event + * The document containing information about the inserted file. + * This property is defined _after_ the finish event has been emitted. + * It will remain `null` if an error occurs. + * + * @example + * ```ts + * fs.createReadStream('file.txt') + * .pipe(bucket.openUploadStream('file.txt')) + * .on('finish', function () { + * console.log(this.gridFSFile) + * }) + * ``` */ - static readonly FINISH = 'finish'; + gridFSFile: GridFSFile | null = null; /** * @param bucket - Handle for this stream's corresponding bucket @@ -116,29 +138,40 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable } /** + * @internal + * + * The stream is considered constructed when the indexes are done being created + */ + override _construct(callback: (error?: Error | null) => void): void { + if (this.bucket.s.checkedIndexes) { + return process.nextTick(callback); + } + this.bucket.once('index', callback); + } + + /** + * @internal * Write a buffer to the stream. * * @param chunk - Buffer to write - * @param encodingOrCallback - Optional encoding for the buffer + * @param encoding - Optional encoding for the buffer * @param callback - Function to call when the chunk was added to the buffer, or if the entire chunk was persisted to MongoDB if this chunk caused a flush. - * @returns False if this write required flushing a chunk to MongoDB. True otherwise. */ - override write(chunk: Buffer | string): boolean; - override write(chunk: Buffer | string, callback: Callback): boolean; - override write(chunk: Buffer | string, encoding: BufferEncoding | undefined): boolean; - override write( + override _write( chunk: Buffer | string, - encoding: BufferEncoding | undefined, + encoding: BufferEncoding, callback: Callback - ): boolean; - override write( - chunk: Buffer | string, - encodingOrCallback?: Callback | BufferEncoding, - callback?: Callback - ): boolean { - const encoding = typeof encodingOrCallback === 'function' ? undefined : encodingOrCallback; - callback = typeof encodingOrCallback === 'function' ? encodingOrCallback : callback; - return waitForIndexes(this, () => doWrite(this, chunk, encoding, callback)); + ): void { + doWrite(this, chunk, encoding, callback); + } + + /** @internal */ + override _final(callback: (error?: Error | null) => void): void { + if (this.state.streamEnd) { + return process.nextTick(callback); + } + this.state.streamEnd = true; + writeRemnant(this, callback); } /** @@ -159,76 +192,15 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable this.state.aborted = true; await this.chunks.deleteMany({ files_id: this.id }); } - - /** - * Tells the stream that no more data will be coming in. The stream will - * persist the remaining data to MongoDB, write the files document, and - * then emit a 'finish' event. - * - * @param chunk - Buffer to write - * @param encoding - Optional encoding for the buffer - * @param callback - Function to call when all files and chunks have been persisted to MongoDB - */ - override end(): this; - override end(chunk: Buffer): this; - override end(callback: Callback): this; - override end(chunk: Buffer, callback: Callback): this; - override end(chunk: Buffer, encoding: BufferEncoding): this; - override end( - chunk: Buffer, - encoding: BufferEncoding | undefined, - callback: Callback - ): this; - override end( - chunkOrCallback?: Buffer | Callback, - encodingOrCallback?: BufferEncoding | Callback, - callback?: Callback - ): this { - const chunk = typeof chunkOrCallback === 'function' ? undefined : chunkOrCallback; - const encoding = typeof encodingOrCallback === 'function' ? undefined : encodingOrCallback; - callback = - typeof chunkOrCallback === 'function' - ? chunkOrCallback - : typeof encodingOrCallback === 'function' - ? encodingOrCallback - : callback; - - if (this.state.streamEnd || checkAborted(this, callback)) return this; - - this.state.streamEnd = true; - - if (callback) { - this.once(GridFSBucketWriteStream.FINISH, (result: GridFSFile) => { - if (callback) callback(undefined, result); - }); - } - - if (!chunk) { - waitForIndexes(this, () => !!writeRemnant(this)); - return this; - } - - this.write(chunk, encoding, () => { - writeRemnant(this); - }); - - return this; - } } -function __handleError( - stream: GridFSBucketWriteStream, - error: AnyError, - callback?: Callback -): void { +function handleError(stream: GridFSBucketWriteStream, error: Error, callback: Callback): void { if (stream.state.errored) { + process.nextTick(callback); return; } stream.state.errored = true; - if (callback) { - return callback(error); - } - stream.emit(GridFSBucketWriteStream.ERROR, error); + process.nextTick(callback, error); } function createChunkDoc(filesId: ObjectId, n: number, data: Buffer): GridFSChunk { @@ -271,13 +243,16 @@ async function checkChunksIndex(stream: GridFSBucketWriteStream): Promise } } -function checkDone(stream: GridFSBucketWriteStream, callback?: Callback): boolean { - if (stream.done) return true; +function checkDone(stream: GridFSBucketWriteStream, callback: Callback): void { + if (stream.done) { + return process.nextTick(callback); + } + if (stream.state.streamEnd && stream.state.outstandingRequests === 0 && !stream.state.errored) { // Set done so we do not trigger duplicate createFilesDoc stream.done = true; // Create a new files doc - const filesDoc = createFilesDoc( + const gridFSFile = createFilesDoc( stream.id, stream.length, stream.chunkSizeBytes, @@ -287,24 +262,21 @@ function checkDone(stream: GridFSBucketWriteStream, callback?: Callback): boolea stream.options.metadata ); - if (checkAborted(stream, callback)) { - return false; + if (isAborted(stream, callback)) { + return; } - stream.files.insertOne(filesDoc, { writeConcern: stream.writeConcern }).then( + stream.files.insertOne(gridFSFile, { writeConcern: stream.writeConcern }).then( () => { - stream.emit(GridFSBucketWriteStream.FINISH, filesDoc); - stream.emit(GridFSBucketWriteStream.CLOSE); + stream.gridFSFile = gridFSFile; + callback(); }, - error => { - return __handleError(stream, error, callback); - } + error => handleError(stream, error, callback) ); - - return true; + return; } - return false; + process.nextTick(callback); } async function checkIndexes(stream: GridFSBucketWriteStream): Promise { @@ -377,11 +349,11 @@ function createFilesDoc( function doWrite( stream: GridFSBucketWriteStream, chunk: Buffer | string, - encoding?: BufferEncoding, - callback?: Callback -): boolean { - if (checkAborted(stream, callback)) { - return false; + encoding: BufferEncoding, + callback: Callback +): void { + if (isAborted(stream, callback)) { + return; } const inputBuf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding); @@ -392,13 +364,8 @@ function doWrite( if (stream.pos + inputBuf.length < stream.chunkSizeBytes) { inputBuf.copy(stream.bufToStore, stream.pos); stream.pos += inputBuf.length; - - callback && callback(); - - // Note that we reverse the typical semantics of write's return value - // to be compatible with node's `.pipe()` function. - // True means client can keep writing. - return true; + process.nextTick(callback); + return; } // Otherwise, buffer is too big for current chunk, so we need to flush @@ -418,8 +385,8 @@ function doWrite( ++stream.state.outstandingRequests; ++outstandingRequests; - if (checkAborted(stream, callback)) { - return false; + if (isAborted(stream, callback)) { + return; } stream.chunks.insertOne(doc, { writeConcern: stream.writeConcern }).then( @@ -428,14 +395,10 @@ function doWrite( --outstandingRequests; if (!outstandingRequests) { - stream.emit('drain', doc); - callback && callback(); - checkDone(stream); + checkDone(stream, callback); } }, - error => { - return __handleError(stream, error); - } + error => handleError(stream, error, callback) ); spaceRemaining = stream.chunkSizeBytes; @@ -445,29 +408,9 @@ function doWrite( inputBufRemaining -= numToCopy; numToCopy = Math.min(spaceRemaining, inputBufRemaining); } - - // Note that we reverse the typical semantics of write's return value - // to be compatible with node's `.pipe()` function. - // False means the client should wait for the 'drain' event. - return false; -} - -function waitForIndexes( - stream: GridFSBucketWriteStream, - callback: (res: boolean) => boolean -): boolean { - if (stream.bucket.s.checkedIndexes) { - return callback(false); - } - - stream.bucket.once('index', () => { - callback(true); - }); - - return true; } -function writeRemnant(stream: GridFSBucketWriteStream, callback?: Callback): boolean { +function writeRemnant(stream: GridFSBucketWriteStream, callback: Callback): void { // Buffer is empty, so don't bother to insert if (stream.pos === 0) { return checkDone(stream, callback); @@ -482,28 +425,22 @@ function writeRemnant(stream: GridFSBucketWriteStream, callback?: Callback): boo const doc = createChunkDoc(stream.id, stream.n, remnant); // If the stream was aborted, do not write remnant - if (checkAborted(stream, callback)) { - return false; + if (isAborted(stream, callback)) { + return; } stream.chunks.insertOne(doc, { writeConcern: stream.writeConcern }).then( () => { --stream.state.outstandingRequests; - checkDone(stream); + checkDone(stream, callback); }, - error => { - return __handleError(stream, error); - } + error => handleError(stream, error, callback) ); - return true; } -function checkAborted(stream: GridFSBucketWriteStream, callback?: Callback): boolean { +function isAborted(stream: GridFSBucketWriteStream, callback: Callback): boolean { if (stream.state.aborted) { - if (typeof callback === 'function') { - // TODO(NODE-3485): Replace with MongoGridFSStreamClosedError - callback(new MongoAPIError('Stream has been aborted')); - } + process.nextTick(callback, new MongoAPIError('Stream has been aborted')); return true; } return false; diff --git a/test/integration/gridfs/gridfs.spec.test.js b/test/integration/gridfs/gridfs.spec.test.js index 22028dab22..6bde9ff6e5 100644 --- a/test/integration/gridfs/gridfs.spec.test.js +++ b/test/integration/gridfs/gridfs.spec.test.js @@ -1,71 +1,54 @@ 'use strict'; const { EJSON } = require('bson'); -const { setupDatabase } = require('./../shared'); const { expect } = require('chai'); +const { once } = require('node:events'); const { GridFSBucket } = require('../../mongodb'); describe('GridFS spec', function () { - before(function () { - return setupDatabase(this.configuration); + let client; + let db; + + beforeEach(async function () { + client = this.configuration.newClient(); + db = client.db('gridfs_spec_tests'); + }); + + afterEach(async function () { + await db.dropDatabase().catch(() => null); + await client.close(); }); const UPLOAD_SPEC = require('../../spec/gridfs/gridfs-upload.json'); - UPLOAD_SPEC.tests.forEach(function (specTest) { - (function (testSpec) { - it(testSpec.description, { - metadata: { requires: { topology: ['single'] } }, - test(done) { - const configuration = this.configuration; - const client = configuration.newClient(configuration.writeConcernMax(), { - maxPoolSize: 1 - }); - client.connect(function (err, client) { - const db = client.db(configuration.db); - db.dropDatabase(function (error) { - expect(error).to.not.exist; - - const bucket = new GridFSBucket(db, { bucketName: 'expected' }); - const res = bucket.openUploadStream( - testSpec.act.arguments.filename, - testSpec.act.arguments.options - ); - const buf = Buffer.from(testSpec.act.arguments.source.$hex, 'hex'); - - res.on('error', function (err) { - expect(err).to.not.exist; - }); + for (const testSpec of UPLOAD_SPEC.tests) { + it(testSpec.description, async function () { + const bucket = new GridFSBucket(db, { bucketName: 'expected' }); - res.on('finish', function () { - const data = testSpec.assert.data; - let num = data.length; - data.forEach(function (data) { - const collection = data.insert; - db.collection(collection) - .find({}) - .toArray(function (error, docs) { - expect(data.documents.length).to.equal(docs.length); - - for (let i = 0; i < docs.length; ++i) { - testResultDoc(data.documents[i], docs[i], res.id); - } - - if (--num === 0) { - client.close(done); - } - }); - }); - }); + const uploadStream = bucket.openUploadStream( + testSpec.act.arguments.filename, + testSpec.act.arguments.options + ); - res.write(buf); - res.end(); - }); - }); + const buf = Buffer.from(testSpec.act.arguments.source.$hex, 'hex'); + + const finished = once(uploadStream, 'finish'); + + uploadStream.write(buf); + uploadStream.end(); + + await finished; + + for (const data of testSpec.assert.data) { + const docs = await db.collection(data.insert).find({}).toArray(); + + expect(data.documents.length).to.equal(docs.length); + for (let i = 0; i < docs.length; ++i) { + testResultDoc(data.documents[i], docs[i], uploadStream.id); } - }); - })(specTest); - }); + } + }); + } const DOWNLOAD_SPEC = require('../../spec/gridfs/gridfs-download.json'); DOWNLOAD_SPEC.tests.forEach(function (specTest) { diff --git a/test/integration/gridfs/gridfs_stream.test.js b/test/integration/gridfs/gridfs_stream.test.js index 414805d223..cd36983e20 100644 --- a/test/integration/gridfs/gridfs_stream.test.js +++ b/test/integration/gridfs/gridfs_stream.test.js @@ -4,17 +4,20 @@ const { Double } = require('bson'); const stream = require('stream'); const fs = require('fs'); const { expect } = require('chai'); -const { GridFSBucket, ObjectId } = require('../../mongodb'); -const sinon = require('sinon'); -const { sleep } = require('../../tools/utils'); +const { promisify } = require('node:util'); +const { once } = require('node:events'); +const { GridFSBucket, ObjectId, MongoAPIError } = require('../../mongodb'); describe('GridFS Stream', function () { let client; + let db; beforeEach(async function () { client = this.configuration.newClient(); + db = client.db('gridfs_stream_tests'); }); afterEach(async function () { + await db.dropDatabase().catch(() => null); await client.close(); }); @@ -357,33 +360,32 @@ describe('GridFS Stream', function () { } }); - it('should emit close after all chunks are received', { - metadata: { requires: { topology: ['single'] } }, + it('emits end and close after all chunks are received', async function () { + const bucket = new GridFSBucket(db, { bucketName: 'gridfsdownload', chunkSizeBytes: 6000 }); - test(done) { - const db = client.db(); - const bucket = new GridFSBucket(db, { - bucketName: 'gridfsdownload', - chunkSizeBytes: 6000 - }); + const readStream = fs.createReadStream('./LICENSE.md'); + const uploadStream = bucket.openUploadStream('LICENSE.md'); - const readStream = fs.createReadStream('./LICENSE.md'); - const uploadStream = bucket.openUploadStream('teststart.dat'); - uploadStream.once('finish', function () { - const downloadStream = bucket.openDownloadStreamByName('teststart.dat'); - - const events = []; - downloadStream.on('data', () => events.push('data')); - downloadStream.on('close', () => events.push('close')); - downloadStream.on('end', () => { - expect(events).to.deep.equal(['data', 'data', 'close']); - expect(downloadStream).to.exist; - client.close(done); - }); - }); + const finishedUpload = once(uploadStream, 'finish'); + readStream.pipe(uploadStream); + await finishedUpload; - readStream.pipe(uploadStream); + const downloadStream = bucket.openDownloadStreamByName('LICENSE.md'); + + const closeEvent = once(downloadStream, 'close'); + const endEvent = once(downloadStream, 'end'); + + // This always comes in two chunks because + // our LICENSE is 11323 characters and we set chunkSize to 6000 + const chunks = []; + for await (const data of downloadStream) { + chunks.push(data); } + + await endEvent; + await closeEvent; + + expect(chunks).to.have.lengthOf(2); }); /** @@ -432,165 +434,49 @@ describe('GridFS Stream', function () { } }); - /** - * Aborting an upload - * - * @example-class GridFSBucketWriteStream - * @example-method abort - */ - it('Aborting an upload', { + it('writing to an aborted stream throws API error', { metadata: { requires: { topology: ['single'] } }, - test(done) { - const configuration = this.configuration; - const client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); - client.connect(function (err, client) { - const db = client.db(configuration.db); - const bucket = new GridFSBucket(db, { bucketName: 'gridfsabort', chunkSizeBytes: 1 }); - const CHUNKS_COLL = 'gridfsabort.chunks'; - const uploadStream = bucket.openUploadStream('test.dat'); - - const id = uploadStream.id; - const query = { files_id: id }; - uploadStream.write('a', 'utf8', function (error) { - expect(error).to.not.exist; + async test() { + const bucket = new GridFSBucket(db, { bucketName: 'gridfsabort', chunkSizeBytes: 1 }); + const chunks = db.collection('gridfsabort.chunks'); + const uploadStream = bucket.openUploadStream('test.dat'); - db.collection(CHUNKS_COLL).count(query, function (error, c) { - expect(error).to.not.exist; - expect(c).to.equal(1); - uploadStream.abort(function (error) { - expect(error).to.not.exist; - db.collection(CHUNKS_COLL).count(query, function (error, c) { - expect(error).to.not.exist; - expect(c).to.equal(0); - uploadStream.write('b', 'utf8', function (error) { - expect(error.toString()).to.equal('MongoAPIError: Stream has been aborted'); - uploadStream.end('c', 'utf8', function (error) { - expect(error.toString()).to.equal('MongoAPIError: Stream has been aborted'); - // Fail if user tries to abort an aborted stream - uploadStream.abort().then(null, function (error) { - expect(error.toString()).to.equal( - // TODO(NODE-3485): Replace with MongoGridFSStreamClosedError - 'MongoAPIError: Cannot call abort() on a stream twice' - ); - client.close(done); - }); - }); - }); - }); - }); - }); - }); - }); - } - }); + const willError = once(uploadStream, 'error'); - /** - * Aborting an upload - */ - it('Destroy an upload', { - metadata: { requires: { topology: ['single'] } }, + const id = uploadStream.id; + const query = { files_id: id }; - test(done) { - const configuration = this.configuration; - const client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); - client.connect(function (err, client) { - const db = client.db(configuration.db); - const bucket = new GridFSBucket(db, { bucketName: 'gridfsabort', chunkSizeBytes: 1 }); - const CHUNKS_COLL = 'gridfsabort.chunks'; - const uploadStream = bucket.openUploadStream('test.dat'); + const writeAsync = promisify(uploadStream.write.bind(uploadStream)); - const id = uploadStream.id; - const query = { files_id: id }; - uploadStream.write('a', 'utf8', function (error) { - expect(error).to.not.exist; + await writeAsync('a', 'utf8'); - db.collection(CHUNKS_COLL).count(query, function (error, c) { - expect(error).to.not.exist; - expect(c).to.equal(1); - uploadStream.abort(function (error) { - expect(error).to.not.exist; - db.collection(CHUNKS_COLL).count(query, function (error, c) { - expect(error).to.not.exist; - expect(c).to.equal(0); - uploadStream.write('b', 'utf8', function (error) { - expect(error.toString()).to.equal('MongoAPIError: Stream has been aborted'); - uploadStream.end('c', 'utf8', function (error) { - expect(error.toString()).to.equal('MongoAPIError: Stream has been aborted'); - // Fail if user tries to abort an aborted stream - uploadStream.abort().then(null, function (error) { - expect(error.toString()).to.equal( - // TODO(NODE-3485): Replace with MongoGridFSStreamClosedError - 'MongoAPIError: Cannot call abort() on a stream twice' - ); - client.close(done); - }); - }); - }); - }); - }); - }); - }); - }); - } - }); + expect(await chunks.countDocuments(query)).to.equal(1); - /** - * Calling abort() on a GridFSBucketReadStream - * - * @example-class GridFSBucketReadStream - * @example-method abort - */ - it('Destroying a download stream', { - metadata: { requires: { topology: ['single'], apiVersion: false } }, + await uploadStream.abort(); - test(done) { - const configuration = this.configuration; - const client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); - client.connect(function (err, client) { - const db = client.db(configuration.db); - const bucket = new GridFSBucket(db, { bucketName: 'gridfsdestroy', chunkSizeBytes: 10 }); - const readStream = fs.createReadStream('./LICENSE.md'); - const uploadStream = bucket.openUploadStream('test.dat'); + expect(await chunks.countDocuments(query)).to.equal(0); - // Wait for stream to finish - uploadStream.once('finish', function () { - const id = uploadStream.id; - const downloadStream = bucket.openDownloadStream(id); - const finished = {}; - downloadStream.on('data', function () { - expect.fail('Should be unreachable'); - }); - - downloadStream.on('error', function () { - expect.fail('Should be unreachable'); - }); - - downloadStream.on('end', function () { - expect(downloadStream.s.cursor).to.not.exist; - if (finished.close) { - client.close(done); - return; - } - finished.end = true; - }); + expect(await writeAsync('b', 'utf8').catch(e => e)).to.be.instanceOf(MongoAPIError); + expect(await uploadStream.abort().catch(e => e)).to.be.instanceOf(MongoAPIError); + expect((await willError)[0]).to.be.instanceOf(MongoAPIError); + } + }); - downloadStream.on('close', function () { - if (finished.end) { - client.close(done); - return; - } - finished.close = true; - }); + it('aborting a download stream emits close and cleans up cursor', async () => { + const bucket = new GridFSBucket(db, { bucketName: 'gridfsdestroy', chunkSizeBytes: 10 }); + const readStream = fs.createReadStream('./LICENSE.md'); + const uploadStream = bucket.openUploadStream('LICENSE.md'); + const finishUpload = once(uploadStream, 'finish'); + readStream.pipe(uploadStream); + await finishUpload; + const downloadStream = bucket.openDownloadStream(uploadStream.gridFSFile._id); - downloadStream.abort(function (error) { - expect(error).to.not.exist; - }); - }); + const downloadClose = once(downloadStream, 'close'); + await downloadStream.abort(); - readStream.pipe(uploadStream); - }); - } + await downloadClose; + expect(downloadStream.s.cursor).to.not.exist; }); /** @@ -874,134 +760,6 @@ describe('GridFS Stream', function () { } }); - /** - * NODE-822 GridFSBucketWriteStream end method does not handle optional parameters - */ - it('should correctly handle calling end function with only a callback', { - metadata: { requires: { topology: ['single'] } }, - - test(done) { - const configuration = this.configuration; - const client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); - client.connect(function (err, client) { - const db = client.db(configuration.db); - const bucket = new GridFSBucket(db, { bucketName: 'gridfsabort', chunkSizeBytes: 1 }); - const CHUNKS_COLL = 'gridfsabort.chunks'; - const uploadStream = bucket.openUploadStream('test.dat'); - - const id = uploadStream.id; - const query = { files_id: id }; - uploadStream.write('a', 'utf8', function (error) { - expect(error).to.not.exist; - - db.collection(CHUNKS_COLL).count(query, function (error, c) { - expect(error).to.not.exist; - expect(c).to.equal(1); - - uploadStream.abort(function (error) { - expect(error).to.not.exist; - - db.collection(CHUNKS_COLL).count(query, function (error, c) { - expect(error).to.not.exist; - expect(c).to.equal(0); - - uploadStream.write('b', 'utf8', function (error) { - expect(error.toString()).to.equal('MongoAPIError: Stream has been aborted'); - - uploadStream.end(function (error) { - expect(error.toString()).to.equal('MongoAPIError: Stream has been aborted'); - - // Fail if user tries to abort an aborted stream - uploadStream.abort().then(null, function (error) { - expect(error.toString()).to.equal( - // TODO(NODE-3485): Replace with MongoGridFSStreamClosedError - 'MongoAPIError: Cannot call abort() on a stream twice' - ); - client.close(done); - }); - }); - }); - }); - }); - }); - }); - }); - } - }); - - describe('upload stream end()', () => { - let client, db; - - afterEach(async () => { - sinon.restore(); - await client.close(); - }); - - it('should not call the callback on repeat calls to end', { - metadata: { requires: { topology: ['single'] } }, - - async test() { - const configuration = this.configuration; - client = configuration.newClient(configuration.writeConcernMax(), { - maxPoolSize: 1 - }); - await client.connect(); - db = client.db(configuration.db); - const bucket = new GridFSBucket(db, { bucketName: 'gridfsabort', chunkSizeBytes: 1 }); - const uploadStream = bucket.openUploadStream('test.dat'); - - const endPromise = new Promise(resolve => { - uploadStream.end('1', resolve); - }); - - const endPromise2 = new Promise((resolve, reject) => { - uploadStream.end('2', () => { - reject(new Error('Expected callback to not be called on duplicate end')); - }); - }); - - await endPromise; - // in the fail case, the callback would be called when the actual write is finished, - // so we need to give it a moment - await Promise.race([endPromise2, sleep(100)]); - } - }); - - it('should not write a chunk on repeat calls to end', { - metadata: { requires: { topology: ['single'] } }, - - async test() { - const configuration = this.configuration; - client = configuration.newClient(configuration.writeConcernMax(), { - maxPoolSize: 1 - }); - await client.connect(); - db = client.db(this.configuration.db); - const bucket = new GridFSBucket(db, { bucketName: 'gridfsabort', chunkSizeBytes: 1 }); - const uploadStream = bucket.openUploadStream('test.dat'); - const spy = sinon.spy(uploadStream, 'write'); - - const endPromise = new Promise(resolve => { - uploadStream.end('1', resolve); - }); - - await endPromise; - expect(spy).to.have.been.calledWith('1'); - - uploadStream.end('2'); - - // wait for potential async calls to happen before we close the client - // so that we don't get a client not connected failure in the afterEach - // in the failure case since it would be confusing and unnecessary - // given the assertions we already have for this case - await sleep(100); - - expect(spy).not.to.have.been.calledWith('2'); - expect(spy.calledOnce).to.be.true; - } - }); - }); - it('should return only end - start bytes when the end is within a chunk', { metadata: { requires: { topology: ['single'] } }, test(done) { diff --git a/test/tools/unified-spec-runner/operations.ts b/test/tools/unified-spec-runner/operations.ts index 16a07a9bbb..b5e5752aea 100644 --- a/test/tools/unified-spec-runner/operations.ts +++ b/test/tools/unified-spec-runner/operations.ts @@ -1,5 +1,7 @@ /* eslint-disable @typescript-eslint/no-unused-vars */ /* eslint-disable @typescript-eslint/no-non-null-assertion */ +import { once, Writable } from 'node:stream'; + import { expect } from 'chai'; import { @@ -417,15 +419,15 @@ operations.set('upload', async ({ entities, operation }) => { const bucket = entities.getEntity('bucket', operation.object); const stream = bucket.openUploadStream(operation.arguments!.filename, { - chunkSizeBytes: operation.arguments!.chunkSizeBytes + chunkSizeBytes: operation.arguments?.chunkSizeBytes }); - return new Promise((resolve, reject) => { - stream.end(Buffer.from(operation.arguments!.source.$$hexBytes, 'hex'), (error, file) => { - if (error) reject(error); - resolve((file as GridFSFile)._id as ObjectId); - }); - }); + const data = Buffer.from(operation.arguments!.source.$$hexBytes, 'hex'); + const willFinish = once(stream, 'finish'); + stream.end(data); + await willFinish; + + return stream.gridFSFile?._id; }); operations.set('wait', async ({ operation }) => {