From 8acba08824115850acd64a02f4c69c32b5d78265 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Wed, 17 Jan 2024 22:06:00 -0700 Subject: [PATCH] GODRIVER-2726 Merge the gridfs into the mongo package (#1497) --- internal/integration/crud_helpers_test.go | 5 +- internal/integration/gridfs_test.go | 32 ++-- internal/integration/unified/entity.go | 9 +- internal/integration/unified_spec_test.go | 9 +- mongo/client.go | 5 - mongo/database.go | 71 ++++++-- mongo/gridfs/doc.go | 37 ---- mongo/{gridfs/bucket.go => gridfs_bucket.go} | 164 ++++++------------ .../bucket_test.go => gridfs_bucket_test.go} | 11 +- ...ad_stream.go => gridfs_download_stream.go} | 45 ++--- mongo/{gridfs => }/gridfs_examples_test.go | 36 ++-- mongo/{gridfs => }/gridfs_test.go | 19 +- ...load_stream.go => gridfs_upload_stream.go} | 45 +++-- 13 files changed, 214 insertions(+), 274 deletions(-) delete mode 100644 mongo/gridfs/doc.go rename mongo/{gridfs/bucket.go => gridfs_bucket.go} (79%) rename mongo/{gridfs/bucket_test.go => gridfs_bucket_test.go} (87%) rename mongo/{gridfs/download_stream.go => gridfs_download_stream.go} (83%) rename mongo/{gridfs => }/gridfs_examples_test.go (83%) rename mongo/{gridfs => }/gridfs_test.go (91%) rename mongo/{gridfs/upload_stream.go => gridfs_upload_stream.go} (79%) diff --git a/internal/integration/crud_helpers_test.go b/internal/integration/crud_helpers_test.go index af2c1ffe4d..a5b1525e7b 100644 --- a/internal/integration/crud_helpers_test.go +++ b/internal/integration/crud_helpers_test.go @@ -24,7 +24,6 @@ import ( "go.mongodb.org/mongo-driver/internal/integration/unified" "go.mongodb.org/mongo-driver/internal/integtest" "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/gridfs" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/readconcern" "go.mongodb.org/mongo-driver/mongo/readpref" @@ -1216,7 +1215,7 @@ func executeEstimatedDocumentCount(mt *mtest.T, sess mongo.Session, args bson.Ra return mt.Coll.EstimatedDocumentCount(context.Background()) } -func executeGridFSDownload(mt *mtest.T, bucket *gridfs.Bucket, args bson.Raw) (int64, error) { +func executeGridFSDownload(mt *mtest.T, bucket *mongo.GridFSBucket, args bson.Raw) (int64, error) { mt.Helper() var fileID primitive.ObjectID @@ -1236,7 +1235,7 @@ func executeGridFSDownload(mt *mtest.T, bucket *gridfs.Bucket, args bson.Raw) (i return bucket.DownloadToStream(context.Background(), fileID, new(bytes.Buffer)) } -func executeGridFSDownloadByName(mt *mtest.T, bucket *gridfs.Bucket, args bson.Raw) (int64, error) { +func executeGridFSDownloadByName(mt *mtest.T, bucket *mongo.GridFSBucket, args bson.Raw) (int64, error) { mt.Helper() var file string diff --git a/internal/integration/gridfs_test.go b/internal/integration/gridfs_test.go index ff83eb7e32..599a02bae5 100644 --- a/internal/integration/gridfs_test.go +++ b/internal/integration/gridfs_test.go @@ -22,7 +22,6 @@ import ( "go.mongodb.org/mongo-driver/internal/integration/mtest" "go.mongodb.org/mongo-driver/internal/israce" "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/gridfs" "go.mongodb.org/mongo-driver/mongo/options" ) @@ -73,7 +72,7 @@ func TestGridFS(x *testing.T) { for _, tc := range testcases { mt.Run(tc.name, func(mt *mtest.T) { - bucket, err := gridfs.NewBucket(mt.DB, options.GridFSBucket().SetChunkSizeBytes(chunkSize)) + bucket, err := mt.DB.GridFSBucket(options.GridFSBucket().SetChunkSizeBytes(chunkSize)) assert.Nil(mt, err, "NewBucket error: %v", err) ustream, err := bucket.OpenUploadStream(context.Background(), "foo") @@ -108,7 +107,7 @@ func TestGridFS(x *testing.T) { mt.Run("index creation", func(mt *mtest.T) { // Unit tests showing that UploadFromStream creates indexes on the chunks and files collections. - bucket, err := gridfs.NewBucket(mt.DB) + bucket, err := mt.DB.GridFSBucket() assert.Nil(mt, err, "NewBucket error: %v", err) byteData := []byte("Hello, world!") @@ -187,7 +186,7 @@ func TestGridFS(x *testing.T) { mt.ClearEvents() - bucket, err := gridfs.NewBucket(mt.DB) + bucket, err := mt.DB.GridFSBucket() assert.Nil(mt, err, "NewBucket error: %v", err) defer func() { _ = bucket.Drop(context.Background()) @@ -234,7 +233,7 @@ func TestGridFS(x *testing.T) { mt.ClearEvents() var fileContent []byte - bucket, err := gridfs.NewBucket(mt.DB) + bucket, err := mt.DB.GridFSBucket() assert.Nil(mt, err, "NewBucket error: %v", err) defer func() { _ = bucket.Drop(context.Background()) @@ -282,7 +281,7 @@ func TestGridFS(x *testing.T) { for _, tc := range testCases { mt.Run(tc.name, func(mt *mtest.T) { // Create a new GridFS bucket. - bucket, err := gridfs.NewBucket(mt.DB) + bucket, err := mt.DB.GridFSBucket() assert.Nil(mt, err, "NewBucket error: %v", err) defer func() { _ = bucket.Drop(context.Background()) }() @@ -303,10 +302,10 @@ func TestGridFS(x *testing.T) { assert.Nil(mt, err, "FindOne error: %v", err) uploadTime := uploadedFileDoc.Lookup("uploadDate").Time().UTC() - expectedFile := &gridfs.File{ + expectedFile := &mongo.GridFSFile{ ID: uploadedFileID, Length: int64(len(fileData)), - ChunkSize: gridfs.DefaultChunkSize, + ChunkSize: mongo.DefaultGridFSChunkSize, UploadDate: uploadTime, Name: fileName, Metadata: rawMetadata, @@ -332,7 +331,7 @@ func TestGridFS(x *testing.T) { // Test that the chunk size for a file download is determined by the chunkSize field in the files // collection document, not the bucket's chunk size. - bucket, err := gridfs.NewBucket(mt.DB) + bucket, err := mt.DB.GridFSBucket() assert.Nil(mt, err, "NewBucket error: %v", err) defer func() { _ = bucket.Drop(context.Background()) }() @@ -363,12 +362,13 @@ func TestGridFS(x *testing.T) { _, err := mt.DB.Collection("fs.files").InsertOne(context.Background(), filesDoc) assert.Nil(mt, err, "InsertOne error for files collection: %v", err) - bucket, err := gridfs.NewBucket(mt.DB) + bucket, err := mt.DB.GridFSBucket() assert.Nil(mt, err, "NewBucket error: %v", err) defer func() { _ = bucket.Drop(context.Background()) }() _, err = bucket.OpenDownloadStream(context.Background(), oid) - assert.Equal(mt, gridfs.ErrMissingChunkSize, err, "expected error %v, got %v", gridfs.ErrMissingChunkSize, err) + assert.Equal(mt, mongo.ErrMissingGridFSChunkSize, err, + "expected error %v, got %v", mongo.ErrMissingGridFSChunkSize, err) }) mt.Run("cursor error during read after downloading", func(mt *mtest.T) { // To simulate a cursor error we upload a file larger than the 16MB default batch size, @@ -378,7 +378,7 @@ func TestGridFS(x *testing.T) { fileName := "read-error-test" fileData := make([]byte, 17000000) - bucket, err := gridfs.NewBucket(mt.DB) + bucket, err := mt.DB.GridFSBucket() assert.Nil(mt, err, "NewBucket error: %v", err) defer func() { _ = bucket.Drop(context.Background()) }() @@ -406,7 +406,7 @@ func TestGridFS(x *testing.T) { fileName := "skip-error-test" fileData := make([]byte, 17000000) - bucket, err := gridfs.NewBucket(mt.DB) + bucket, err := mt.DB.GridFSBucket() assert.Nil(mt, err, "NewBucket error: %v", err) defer func() { _ = bucket.Drop(context.Background()) }() @@ -446,7 +446,7 @@ func TestGridFS(x *testing.T) { if tc.bucketName != "" { bucketOpts.SetName(tc.bucketName) } - bucket, err := gridfs.NewBucket(mt.DB, bucketOpts) + bucket, err := mt.DB.GridFSBucket(bucketOpts) assert.Nil(mt, err, "NewBucket error: %v", err) defer func() { _ = bucket.Drop(context.Background()) }() @@ -491,7 +491,7 @@ func TestGridFS(x *testing.T) { chunkSize = &temp } - bucket, err := gridfs.NewBucket(mt.DB, &options.BucketOptions{ + bucket, err := mt.DB.GridFSBucket(&options.BucketOptions{ ChunkSizeBytes: chunkSize, }) assert.Nil(mt, err, "NewBucket error: %v", err) @@ -531,7 +531,7 @@ func TestGridFS(x *testing.T) { // Regression test for a bug introduced in GODRIVER-2346. mt.Run("Find", func(mt *mtest.T) { - bucket, err := gridfs.NewBucket(mt.DB) + bucket, err := mt.DB.GridFSBucket() assert.Nil(mt, err, "NewBucket error: %v", err) // Find the file back. cursor, err := bucket.Find(context.Background(), bson.D{{"foo", "bar"}}) diff --git a/internal/integration/unified/entity.go b/internal/integration/unified/entity.go index 61bb57fc38..32796e6308 100644 --- a/internal/integration/unified/entity.go +++ b/internal/integration/unified/entity.go @@ -19,7 +19,6 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/gridfs" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" ) @@ -193,7 +192,7 @@ type EntityMap struct { dbEntites map[string]*mongo.Database collEntities map[string]*mongo.Collection sessions map[string]mongo.Session - gridfsBuckets map[string]*gridfs.Bucket + gridfsBuckets map[string]*mongo.GridFSBucket bsonValues map[string]bson.RawValue eventListEntities map[string][]bson.Raw bsonArrayEntities map[string][]bson.Raw // for storing errors and failures from a loop operation @@ -220,7 +219,7 @@ func (em *EntityMap) setClosed(val bool) { func newEntityMap() *EntityMap { em := &EntityMap{ allEntities: make(map[string]struct{}), - gridfsBuckets: make(map[string]*gridfs.Bucket), + gridfsBuckets: make(map[string]*mongo.GridFSBucket), bsonValues: make(map[string]bson.RawValue), cursorEntities: make(map[string]cursor), clientEntities: make(map[string]*clientEntity), @@ -367,7 +366,7 @@ func (em *EntityMap) addEntity(ctx context.Context, entityType string, entityOpt return nil } -func (em *EntityMap) gridFSBucket(id string) (*gridfs.Bucket, error) { +func (em *EntityMap) gridFSBucket(id string) (*mongo.GridFSBucket, error) { bucket, ok := em.gridfsBuckets[id] if !ok { return nil, newEntityNotFoundError("gridfs bucket", id) @@ -797,7 +796,7 @@ func (em *EntityMap) addGridFSBucketEntity(entityOptions *entityOptions) error { bucketOpts = entityOptions.GridFSBucketOptions.BucketOptions } - bucket, err := gridfs.NewBucket(db, bucketOpts) + bucket, err := db.GridFSBucket(bucketOpts) if err != nil { return fmt.Errorf("error creating GridFS bucket: %v", err) } diff --git a/internal/integration/unified_spec_test.go b/internal/integration/unified_spec_test.go index 16feaad37e..78e911555a 100644 --- a/internal/integration/unified_spec_test.go +++ b/internal/integration/unified_spec_test.go @@ -30,7 +30,6 @@ import ( "go.mongodb.org/mongo-driver/internal/integtest" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/address" - "go.mongodb.org/mongo-driver/mongo/gridfs" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/readconcern" "go.mongodb.org/mongo-driver/mongo/readpref" @@ -116,7 +115,7 @@ type testCase struct { // set in code if the test is a GridFS test chunkSize int32 - bucket *gridfs.Bucket + bucket *mongo.GridFSBucket // set in code to track test context testTopology *topology.Topology @@ -360,12 +359,12 @@ func createBucket(mt *mtest.T, testFile testFile, testCase *testCase) { } chunkSize := testCase.chunkSize if chunkSize == 0 { - chunkSize = gridfs.DefaultChunkSize + chunkSize = mongo.DefaultGridFSChunkSize } bucketOpts.SetChunkSizeBytes(chunkSize) var err error - testCase.bucket, err = gridfs.NewBucket(mt.DB, bucketOpts) + testCase.bucket, err = mt.DB.GridFSBucket(bucketOpts) assert.Nil(mt, err, "NewBucket error: %v", err) } @@ -428,7 +427,7 @@ func runOperation(mt *mtest.T, testCase *testCase, op *operation, sess0, sess1 m return verifyError(op.opError, err) } -func executeGridFSOperation(mt *mtest.T, bucket *gridfs.Bucket, op *operation) error { +func executeGridFSOperation(mt *mtest.T, bucket *mongo.GridFSBucket, op *operation) error { // no results for GridFS operations assert.Nil(mt, op.Result, "unexpected result for GridFS operation") diff --git a/mongo/client.go b/mongo/client.go index fe6071cd75..40c0b3c411 100644 --- a/mongo/client.go +++ b/mongo/client.go @@ -860,11 +860,6 @@ func (c *Client) NumberSessionsInProgress() int { return int(c.sessionPool.CheckedOut()) } -// Timeout returns the timeout set for this client. -func (c *Client) Timeout() *time.Duration { - return c.timeout -} - func (c *Client) createBaseCursorOptions() driver.CursorOptions { return driver.CursorOptions{ CommandMonitor: c.monitor, diff --git a/mongo/database.go b/mongo/database.go index 4cb0011f5a..7b788354cd 100644 --- a/mongo/database.go +++ b/mongo/database.go @@ -544,21 +544,6 @@ func (db *Database) ListCollectionNames(ctx context.Context, filter interface{}, return names, nil } -// ReadConcern returns the read concern used to configure the Database object. -func (db *Database) ReadConcern() *readconcern.ReadConcern { - return db.readConcern -} - -// ReadPreference returns the read preference used to configure the Database object. -func (db *Database) ReadPreference() *readpref.ReadPref { - return db.readPreference -} - -// WriteConcern returns the write concern used to configure the Database object. -func (db *Database) WriteConcern() *writeconcern.WriteConcern { - return db.writeConcern -} - // Watch returns a change stream for all changes to the corresponding database. See // https://www.mongodb.com/docs/manual/changeStreams/ for more information about change streams. // @@ -982,3 +967,59 @@ func (db *Database) executeCreateOperation(ctx context.Context, op *operation.Cr return replaceErrors(op.Execute(ctx)) } + +// GridFSBucket is used to construct a GridFS bucket which can be used as a +// container for files. +func (db *Database) GridFSBucket(opts ...*options.BucketOptions) (*GridFSBucket, error) { + b := &GridFSBucket{ + name: "fs", + chunkSize: DefaultGridFSChunkSize, + db: db, + } + + bo := options.GridFSBucket() + for _, opt := range opts { + if opt == nil { + continue + } + if opt.Name != nil { + bo.Name = opt.Name + } + if opt.ChunkSizeBytes != nil { + bo.ChunkSizeBytes = opt.ChunkSizeBytes + } + if opt.WriteConcern != nil { + bo.WriteConcern = opt.WriteConcern + } + if opt.ReadConcern != nil { + bo.ReadConcern = opt.ReadConcern + } + if opt.ReadPreference != nil { + bo.ReadPreference = opt.ReadPreference + } + } + if bo.Name != nil { + b.name = *bo.Name + } + if bo.ChunkSizeBytes != nil { + b.chunkSize = *bo.ChunkSizeBytes + } + if bo.WriteConcern != nil { + b.wc = bo.WriteConcern + } + if bo.ReadConcern != nil { + b.rc = bo.ReadConcern + } + if bo.ReadPreference != nil { + b.rp = bo.ReadPreference + } + + var collOpts = options.Collection().SetWriteConcern(b.wc).SetReadConcern(b.rc).SetReadPreference(b.rp) + + b.chunksColl = db.Collection(b.name+".chunks", collOpts) + b.filesColl = db.Collection(b.name+".files", collOpts) + b.readBuf = make([]byte, b.chunkSize) + b.writeBuf = make([]byte, b.chunkSize) + + return b, nil +} diff --git a/mongo/gridfs/doc.go b/mongo/gridfs/doc.go deleted file mode 100644 index 69f9d2f1d7..0000000000 --- a/mongo/gridfs/doc.go +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright (C) MongoDB, Inc. 2017-present. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. You may obtain -// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - -// Package gridfs provides a MongoDB GridFS API. See https://www.mongodb.com/docs/manual/core/gridfs/ for more -// information about GridFS and its use cases. -// -// # Buckets -// -// The main type defined in this package is Bucket. A Bucket wraps a mongo.Database instance and operates on two -// collections in the database. The first is the files collection, which contains one metadata document per file stored -// in the bucket. This collection is named ".files". The second is the chunks collection, which contains -// chunks of files. This collection is named ".chunks". -// -// # Uploading a File -// -// Files can be uploaded in two ways: -// -// 1. OpenUploadStream/OpenUploadStreamWithID - These methods return an UploadStream instance. UploadStream -// implements the io.Writer interface and the Write() method can be used to upload a file to the database. -// -// 2. UploadFromStream/UploadFromStreamWithID - These methods take an io.Reader, which represents the file to -// upload. They internally create a new UploadStream and close it once the operation is complete. -// -// # Downloading a File -// -// Similar to uploads, files can be downloaded in two ways: -// -// 1. OpenDownloadStream/OpenDownloadStreamByName - These methods return a DownloadStream instance. DownloadStream -// implements the io.Reader interface. A file can be read either using the Read() method or any standard library -// methods that reads from an io.Reader such as io.Copy. -// -// 2. DownloadToStream/DownloadToStreamByName - These methods take an io.Writer, which represents the download -// destination. They internally create a new DownloadStream and close it once the operation is complete. -package gridfs diff --git a/mongo/gridfs/bucket.go b/mongo/gridfs_bucket.go similarity index 79% rename from mongo/gridfs/bucket.go rename to mongo/gridfs_bucket.go index ff5b9e4571..bda19e2483 100644 --- a/mongo/gridfs/bucket.go +++ b/mongo/gridfs_bucket.go @@ -4,7 +4,7 @@ // not use this file except in compliance with the License. You may obtain // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 -package gridfs +package mongo import ( "bytes" @@ -17,7 +17,6 @@ import ( "go.mongodb.org/mongo-driver/bson/bsonrw" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/internal/csot" - "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/readconcern" "go.mongodb.org/mongo-driver/mongo/readpref" @@ -27,20 +26,21 @@ import ( // TODO: add sessions options -// DefaultChunkSize is the default size of each file chunk. -const DefaultChunkSize int32 = 255 * 1024 // 255 KiB +// DefaultGridFSChunkSize is the default size of each file chunk. +const DefaultGridFSChunkSize int32 = 255 * 1024 // 255 KiB // ErrFileNotFound occurs if a user asks to download a file with a file ID that isn't found in the files collection. var ErrFileNotFound = errors.New("file with given parameters not found") -// ErrMissingChunkSize occurs when downloading a file if the files collection document is missing the "chunkSize" field. -var ErrMissingChunkSize = errors.New("files collection document does not contain a 'chunkSize' field") +// ErrMissingGridFSChunkSize occurs when downloading a file if the files +// collection document is missing the "chunkSize" field. +var ErrMissingGridFSChunkSize = errors.New("files collection document does not contain a 'chunkSize' field") -// Bucket represents a GridFS bucket. -type Bucket struct { - db *mongo.Database - chunksColl *mongo.Collection // collection to store file chunks - filesColl *mongo.Collection // collection to store file metadata +// GridFSBucket represents a GridFS bucket. +type GridFSBucket struct { + db *Database + chunksColl *Collection // collection to store file chunks + filesColl *Collection // collection to store file metadata name string chunkSize int32 @@ -53,80 +53,22 @@ type Bucket struct { writeBuf []byte } -// Upload contains options to upload a file to a bucket. -type Upload struct { +// upload contains options to upload a file to a bucket. +type upload struct { chunkSize int32 metadata bson.D } -// NewBucket creates a GridFS bucket. -func NewBucket(db *mongo.Database, opts ...*options.BucketOptions) (*Bucket, error) { - b := &Bucket{ - name: "fs", - chunkSize: DefaultChunkSize, - db: db, - wc: db.WriteConcern(), - rc: db.ReadConcern(), - rp: db.ReadPreference(), - } - - bo := options.GridFSBucket() - for _, opt := range opts { - if opt == nil { - continue - } - if opt.Name != nil { - bo.Name = opt.Name - } - if opt.ChunkSizeBytes != nil { - bo.ChunkSizeBytes = opt.ChunkSizeBytes - } - if opt.WriteConcern != nil { - bo.WriteConcern = opt.WriteConcern - } - if opt.ReadConcern != nil { - bo.ReadConcern = opt.ReadConcern - } - if opt.ReadPreference != nil { - bo.ReadPreference = opt.ReadPreference - } - } - if bo.Name != nil { - b.name = *bo.Name - } - if bo.ChunkSizeBytes != nil { - b.chunkSize = *bo.ChunkSizeBytes - } - if bo.WriteConcern != nil { - b.wc = bo.WriteConcern - } - if bo.ReadConcern != nil { - b.rc = bo.ReadConcern - } - if bo.ReadPreference != nil { - b.rp = bo.ReadPreference - } - - var collOpts = options.Collection().SetWriteConcern(b.wc).SetReadConcern(b.rc).SetReadPreference(b.rp) - - b.chunksColl = db.Collection(b.name+".chunks", collOpts) - b.filesColl = db.Collection(b.name+".files", collOpts) - b.readBuf = make([]byte, b.chunkSize) - b.writeBuf = make([]byte, b.chunkSize) - - return b, nil -} - // OpenUploadStream creates a file ID new upload stream for a file given the // filename. // // The context provided to this method controls the entire lifetime of an // upload stream io.Writer. -func (b *Bucket) OpenUploadStream( +func (b *GridFSBucket) OpenUploadStream( ctx context.Context, filename string, opts ...*options.UploadOptions, -) (*UploadStream, error) { +) (*GridFSUploadStream, error) { return b.OpenUploadStreamWithID(ctx, primitive.NewObjectID(), filename, opts...) } @@ -135,12 +77,12 @@ func (b *Bucket) OpenUploadStream( // // The context provided to this method controls the entire lifetime of an // upload stream io.Writer. -func (b *Bucket) OpenUploadStreamWithID( +func (b *GridFSBucket) OpenUploadStreamWithID( ctx context.Context, fileID interface{}, filename string, opts ...*options.UploadOptions, -) (*UploadStream, error) { +) (*GridFSUploadStream, error) { if err := b.checkFirstWrite(ctx); err != nil { return nil, err } @@ -161,7 +103,7 @@ func (b *Bucket) OpenUploadStreamWithID( // // The context provided to this method controls the entire lifetime of an // upload stream io.Writer. -func (b *Bucket) UploadFromStream( +func (b *GridFSBucket) UploadFromStream( ctx context.Context, filename string, source io.Reader, @@ -180,7 +122,7 @@ func (b *Bucket) UploadFromStream( // // The context provided to this method controls the entire lifetime of an // upload stream io.Writer. -func (b *Bucket) UploadFromStreamWithID( +func (b *GridFSBucket) UploadFromStreamWithID( ctx context.Context, fileID interface{}, filename string, @@ -219,7 +161,7 @@ func (b *Bucket) UploadFromStreamWithID( // // The context provided to this method controls the entire lifetime of a // download stream io.Reader. -func (b *Bucket) OpenDownloadStream(ctx context.Context, fileID interface{}) (*DownloadStream, error) { +func (b *GridFSBucket) OpenDownloadStream(ctx context.Context, fileID interface{}) (*GridFSDownloadStream, error) { return b.openDownloadStream(ctx, bson.D{{"_id", fileID}}) } @@ -233,7 +175,7 @@ func (b *Bucket) OpenDownloadStream(ctx context.Context, fileID interface{}) (*D // // The context provided to this method controls the entire lifetime of a // download stream io.Reader. -func (b *Bucket) DownloadToStream(ctx context.Context, fileID interface{}, stream io.Writer) (int64, error) { +func (b *GridFSBucket) DownloadToStream(ctx context.Context, fileID interface{}, stream io.Writer) (int64, error) { ds, err := b.OpenDownloadStream(ctx, fileID) if err != nil { return 0, err @@ -247,11 +189,11 @@ func (b *Bucket) DownloadToStream(ctx context.Context, fileID interface{}, strea // // The context provided to this method controls the entire lifetime of a // download stream io.Reader. -func (b *Bucket) OpenDownloadStreamByName( +func (b *GridFSBucket) OpenDownloadStreamByName( ctx context.Context, filename string, opts ...*options.NameOptions, -) (*DownloadStream, error) { +) (*GridFSDownloadStream, error) { var numSkip int32 = -1 var sortOrder int32 = 1 @@ -289,7 +231,7 @@ func (b *Bucket) OpenDownloadStreamByName( // // The context provided to this method controls the entire lifetime of a // download stream io.Reader. -func (b *Bucket) DownloadToStreamByName( +func (b *GridFSBucket) DownloadToStreamByName( ctx context.Context, filename string, stream io.Writer, @@ -307,12 +249,12 @@ func (b *Bucket) DownloadToStreamByName( // delete operations with the provided context. // // Use the context parameter to time-out or cancel the delete operation. The deadline set by SetWriteDeadline is ignored. -func (b *Bucket) Delete(ctx context.Context, fileID interface{}) error { +func (b *GridFSBucket) Delete(ctx context.Context, fileID interface{}) error { // If no deadline is set on the passed-in context, Timeout is set on the Client, and context is // not already a Timeout context, honor Timeout in new Timeout context for operation execution to // be shared by both delete operations. - if _, deadlineSet := ctx.Deadline(); !deadlineSet && b.db.Client().Timeout() != nil && !csot.IsTimeoutContext(ctx) { - newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *b.db.Client().Timeout()) + if _, deadlineSet := ctx.Deadline(); !deadlineSet && b.db.Client().timeout != nil && !csot.IsTimeoutContext(ctx) { + newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *b.db.Client().timeout) // Redefine ctx to be the new timeout-derived context. ctx = newCtx // Cancel the timeout-derived context at the end of Execute to avoid a context leak. @@ -337,11 +279,11 @@ func (b *Bucket) Delete(ctx context.Context, fileID interface{}) error { // // Use the context parameter to time-out or cancel the find operation. The deadline set by SetReadDeadline // is ignored. -func (b *Bucket) Find( +func (b *GridFSBucket) Find( ctx context.Context, filter interface{}, opts ...*options.GridFSFindOptions, -) (*mongo.Cursor, error) { +) (*Cursor, error) { gfsOpts := options.GridFSFind() for _, opt := range opts { if opt == nil { @@ -401,7 +343,7 @@ func (b *Bucket) Find( // write operations operations on this bucket that also require a custom deadline // // Use SetWriteDeadline to set a deadline for the rename operation. -func (b *Bucket) Rename(ctx context.Context, fileID interface{}, newFilename string) error { +func (b *GridFSBucket) Rename(ctx context.Context, fileID interface{}, newFilename string) error { res, err := b.filesColl.UpdateOne(ctx, bson.D{{"_id", fileID}}, bson.D{{"$set", bson.D{{"filename", newFilename}}}}, @@ -421,12 +363,12 @@ func (b *Bucket) Rename(ctx context.Context, fileID interface{}, newFilename str // the provided context. // // Use the context parameter to time-out or cancel the drop operation. The deadline set by SetWriteDeadline is ignored. -func (b *Bucket) Drop(ctx context.Context) error { +func (b *GridFSBucket) Drop(ctx context.Context) error { // If no deadline is set on the passed-in context, Timeout is set on the Client, and context is // not already a Timeout context, honor Timeout in new Timeout context for operation execution to // be shared by both drop operations. - if _, deadlineSet := ctx.Deadline(); !deadlineSet && b.db.Client().Timeout() != nil && !csot.IsTimeoutContext(ctx) { - newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *b.db.Client().Timeout()) + if _, deadlineSet := ctx.Deadline(); !deadlineSet && b.db.Client().timeout != nil && !csot.IsTimeoutContext(ctx) { + newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *b.db.Client().timeout) // Redefine ctx to be the new timeout-derived context. ctx = newCtx // Cancel the timeout-derived context at the end of Execute to avoid a context leak. @@ -442,28 +384,28 @@ func (b *Bucket) Drop(ctx context.Context) error { } // GetFilesCollection returns a handle to the collection that stores the file documents for this bucket. -func (b *Bucket) GetFilesCollection() *mongo.Collection { +func (b *GridFSBucket) GetFilesCollection() *Collection { return b.filesColl } // GetChunksCollection returns a handle to the collection that stores the file chunks for this bucket. -func (b *Bucket) GetChunksCollection() *mongo.Collection { +func (b *GridFSBucket) GetChunksCollection() *Collection { return b.chunksColl } -func (b *Bucket) openDownloadStream( +func (b *GridFSBucket) openDownloadStream( ctx context.Context, filter interface{}, opts ...*options.FindOneOptions, -) (*DownloadStream, error) { +) (*GridFSDownloadStream, error) { result := b.filesColl.FindOne(ctx, filter, opts...) - // Unmarshal the data into a File instance, which can be passed to newDownloadStream. The _id value has to be + // Unmarshal the data into a File instance, which can be passed to newGridFSDownloadStream. The _id value has to be // parsed out separately because "_id" will not match the File.ID field and we want to avoid exposing BSON tags // in the File type. After parsing it, use RawValue.Unmarshal to ensure File.ID is set to the appropriate value. var resp findFileResponse if err := result.Decode(&resp); err != nil { - if errors.Is(err, mongo.ErrNoDocuments) { + if errors.Is(err, ErrNoDocuments) { return nil, ErrFileNotFound } @@ -473,12 +415,12 @@ func (b *Bucket) openDownloadStream( foundFile := newFileFromResponse(resp) if foundFile.Length == 0 { - return newDownloadStream(ctx, nil, foundFile.ChunkSize, foundFile), nil + return newGridFSDownloadStream(ctx, nil, foundFile.ChunkSize, foundFile), nil } // For a file with non-zero length, chunkSize must exist so we know what size to expect when downloading chunks. if foundFile.ChunkSize == 0 { - return nil, ErrMissingChunkSize + return nil, ErrMissingGridFSChunkSize } chunksCursor, err := b.findChunks(ctx, foundFile.ID) @@ -487,10 +429,10 @@ func (b *Bucket) openDownloadStream( } // The chunk size can be overridden for individual files, so the expected chunk size should be the "chunkSize" // field from the files collection document, not the bucket's chunk size. - return newDownloadStream(ctx, chunksCursor, foundFile.ChunkSize, foundFile), nil + return newGridFSDownloadStream(ctx, chunksCursor, foundFile.ChunkSize, foundFile), nil } -func (b *Bucket) downloadToStream(ds *DownloadStream, stream io.Writer) (int64, error) { +func (b *GridFSBucket) downloadToStream(ds *GridFSDownloadStream, stream io.Writer) (int64, error) { copied, err := io.Copy(stream, ds) if err != nil { _ = ds.Close() @@ -500,12 +442,12 @@ func (b *Bucket) downloadToStream(ds *DownloadStream, stream io.Writer) (int64, return copied, ds.Close() } -func (b *Bucket) deleteChunks(ctx context.Context, fileID interface{}) error { +func (b *GridFSBucket) deleteChunks(ctx context.Context, fileID interface{}) error { _, err := b.chunksColl.DeleteMany(ctx, bson.D{{"files_id", fileID}}) return err } -func (b *Bucket) findChunks(ctx context.Context, fileID interface{}) (*mongo.Cursor, error) { +func (b *GridFSBucket) findChunks(ctx context.Context, fileID interface{}) (*Cursor, error) { chunksCursor, err := b.chunksColl.Find(ctx, bson.D{{"files_id", fileID}}, options.Find().SetSort(bson.D{{"n", 1}})) // sort by chunk index @@ -559,7 +501,7 @@ func numericalIndexDocsEqual(expected, actual bsoncore.Document) (bool, error) { } // Create an index if it doesn't already exist -func createNumericalIndexIfNotExists(ctx context.Context, iv mongo.IndexView, model mongo.IndexModel) error { +func createNumericalIndexIfNotExists(ctx context.Context, iv IndexView, model IndexModel) error { c, err := iv.List(ctx) if err != nil { return err @@ -596,7 +538,7 @@ func createNumericalIndexIfNotExists(ctx context.Context, iv mongo.IndexView, mo } // create indexes on the files and chunks collection if needed -func (b *Bucket) createIndexes(ctx context.Context) error { +func (b *GridFSBucket) createIndexes(ctx context.Context) error { // must use primary read pref mode to check if files coll empty cloned, err := b.filesColl.Clone(options.Collection().SetReadPreference(readpref.Primary())) if err != nil { @@ -606,7 +548,7 @@ func (b *Bucket) createIndexes(ctx context.Context) error { docRes := cloned.FindOne(ctx, bson.D{}, options.FindOne().SetProjection(bson.D{{"_id", 1}})) _, err = docRes.Raw() - if err != mongo.ErrNoDocuments { + if err != ErrNoDocuments { // nil, or error that occurred during the FindOne operation return err } @@ -614,14 +556,14 @@ func (b *Bucket) createIndexes(ctx context.Context) error { filesIv := b.filesColl.Indexes() chunksIv := b.chunksColl.Indexes() - filesModel := mongo.IndexModel{ + filesModel := IndexModel{ Keys: bson.D{ {"filename", int32(1)}, {"uploadDate", int32(1)}, }, } - chunksModel := mongo.IndexModel{ + chunksModel := IndexModel{ Keys: bson.D{ {"files_id", int32(1)}, {"n", int32(1)}, @@ -635,7 +577,7 @@ func (b *Bucket) createIndexes(ctx context.Context) error { return createNumericalIndexIfNotExists(ctx, chunksIv, chunksModel) } -func (b *Bucket) checkFirstWrite(ctx context.Context) error { +func (b *GridFSBucket) checkFirstWrite(ctx context.Context) error { if !b.firstWriteDone { // before the first write operation, must determine if files collection is empty // if so, create indexes if they do not already exist @@ -649,8 +591,8 @@ func (b *Bucket) checkFirstWrite(ctx context.Context) error { return nil } -func (b *Bucket) parseUploadOptions(opts ...*options.UploadOptions) (*Upload, error) { - upload := &Upload{ +func (b *GridFSBucket) parseUploadOptions(opts ...*options.UploadOptions) (*upload, error) { + upload := &upload{ chunkSize: b.chunkSize, // upload chunk size defaults to bucket's value } diff --git a/mongo/gridfs/bucket_test.go b/mongo/gridfs_bucket_test.go similarity index 87% rename from mongo/gridfs/bucket_test.go rename to mongo/gridfs_bucket_test.go index b0a50520fa..ea7b2f7353 100644 --- a/mongo/gridfs/bucket_test.go +++ b/mongo/gridfs_bucket_test.go @@ -4,7 +4,7 @@ // not use this file except in compliance with the License. You may obtain // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 -package gridfs +package mongo import ( "context" @@ -13,7 +13,6 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/internal/assert" "go.mongodb.org/mongo-driver/internal/integtest" - "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) @@ -26,7 +25,7 @@ func TestBucket_openDownloadStream(t *testing.T) { { name: "nil filter", filter: nil, - err: mongo.ErrNilDocument, + err: ErrNilDocument, }, { name: "nonmatching filter", @@ -38,14 +37,16 @@ func TestBucket_openDownloadStream(t *testing.T) { cs := integtest.ConnString(t) clientOpts := options.Client().ApplyURI(cs.Original) - client, err := mongo.Connect(clientOpts) + integtest.AddTestServerAPIVersion(clientOpts) + + client, err := Connect(clientOpts) assert.Nil(t, err, "Connect error: %v", err) db := client.Database("bucket") for _, test := range tests { t.Run(test.name, func(t *testing.T) { - bucket, err := NewBucket(db) + bucket, err := db.GridFSBucket() assert.NoError(t, err) _, err = bucket.openDownloadStream(context.Background(), test.filter) diff --git a/mongo/gridfs/download_stream.go b/mongo/gridfs_download_stream.go similarity index 83% rename from mongo/gridfs/download_stream.go rename to mongo/gridfs_download_stream.go index 41f5fb686c..4f4b9202bd 100644 --- a/mongo/gridfs/download_stream.go +++ b/mongo/gridfs_download_stream.go @@ -4,7 +4,7 @@ // not use this file except in compliance with the License. You may obtain // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 -package gridfs +package mongo import ( "context" @@ -14,23 +14,23 @@ import ( "time" "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" ) // ErrMissingChunk indicates that the number of chunks read from the server is -// less than expected. +// less than expected. This error is specific to GridFS operations. var ErrMissingChunk = errors.New("EOF missing one or more chunks") -// ErrWrongSize is used when the chunk retrieved from the server does not have the expected size. +// ErrWrongSize is used when the chunk retrieved from the server does not have +// the expected size. This error is specific to GridFS operations. var ErrWrongSize = errors.New("chunk size does not match expected size") var errNoMoreChunks = errors.New("no more chunks remaining") -// DownloadStream is a io.Reader that can be used to download a file from a GridFS bucket. -type DownloadStream struct { +// GridFSDownloadStream is a io.Reader that can be used to download a file from a GridFS bucket. +type GridFSDownloadStream struct { numChunks int32 chunkSize int32 - cursor *mongo.Cursor + cursor *Cursor done bool closed bool buffer []byte // store up to 1 chunk if the user provided buffer isn't big enough @@ -40,15 +40,16 @@ type DownloadStream struct { fileLen int64 ctx context.Context - // The pointer returned by GetFile. This should not be used in the actual DownloadStream code outside of the - // newDownloadStream constructor because the values can be mutated by the user after calling GetFile. Instead, + // The pointer returned by GetFile. This should not be used in the actual GridFSDownloadStream code outside of the + // newGridFSDownloadStream constructor because the values can be mutated by the user after calling GetFile. Instead, // any values needed in the code should be stored separately and copied over in the constructor. - file *File + file *GridFSFile } -// File represents a file stored in GridFS. This type can be used to access file information when downloading using the -// DownloadStream.GetFile method. -type File struct { +// GridFSFile represents a file stored in GridFS. This type can be used to +// access file information when downloading using the +// GridFSDownloadStream.GetFile method. +type GridFSFile struct { // ID is the file's ID. This will match the file ID specified when uploading the file. If an upload helper that // does not require a file ID was used, this field will be a primitive.ObjectID. ID interface{} @@ -83,8 +84,8 @@ type findFileResponse struct { Metadata bson.Raw `bson:"metadata"` } -func newFileFromResponse(resp findFileResponse) *File { - return &File{ +func newFileFromResponse(resp findFileResponse) *GridFSFile { + return &GridFSFile{ ID: resp.ID, Length: resp.Length, ChunkSize: resp.ChunkSize, @@ -94,10 +95,10 @@ func newFileFromResponse(resp findFileResponse) *File { } } -func newDownloadStream(ctx context.Context, cursor *mongo.Cursor, chunkSize int32, file *File) *DownloadStream { +func newGridFSDownloadStream(ctx context.Context, cursor *Cursor, chunkSize int32, file *GridFSFile) *GridFSDownloadStream { numChunks := int32(math.Ceil(float64(file.Length) / float64(chunkSize))) - return &DownloadStream{ + return &GridFSDownloadStream{ numChunks: numChunks, chunkSize: chunkSize, cursor: cursor, @@ -110,7 +111,7 @@ func newDownloadStream(ctx context.Context, cursor *mongo.Cursor, chunkSize int3 } // Close closes this download stream. -func (ds *DownloadStream) Close() error { +func (ds *GridFSDownloadStream) Close() error { if ds.closed { return ErrStreamClosed } @@ -123,7 +124,7 @@ func (ds *DownloadStream) Close() error { } // Read reads the file from the server and writes it to a destination byte slice. -func (ds *DownloadStream) Read(p []byte) (int, error) { +func (ds *GridFSDownloadStream) Read(p []byte) (int, error) { if ds.closed { return 0, ErrStreamClosed } @@ -160,7 +161,7 @@ func (ds *DownloadStream) Read(p []byte) (int, error) { } // Skip skips a given number of bytes in the file. -func (ds *DownloadStream) Skip(skip int64) (int64, error) { +func (ds *GridFSDownloadStream) Skip(skip int64) (int64, error) { if ds.closed { return 0, ErrStreamClosed } @@ -199,11 +200,11 @@ func (ds *DownloadStream) Skip(skip int64) (int64, error) { } // GetFile returns a File object representing the file being downloaded. -func (ds *DownloadStream) GetFile() *File { +func (ds *GridFSDownloadStream) GetFile() *GridFSFile { return ds.file } -func (ds *DownloadStream) fillBuffer(ctx context.Context) error { +func (ds *GridFSDownloadStream) fillBuffer(ctx context.Context) error { if !ds.cursor.Next(ctx) { ds.done = true // Check for cursor error, otherwise there are no more chunks. diff --git a/mongo/gridfs/gridfs_examples_test.go b/mongo/gridfs_examples_test.go similarity index 83% rename from mongo/gridfs/gridfs_examples_test.go rename to mongo/gridfs_examples_test.go index 7345f2f43e..c3ff7892af 100644 --- a/mongo/gridfs/gridfs_examples_test.go +++ b/mongo/gridfs_examples_test.go @@ -4,7 +4,7 @@ // not use this file except in compliance with the License. You may obtain // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 -package gridfs_test +package mongo_test import ( "bytes" @@ -16,13 +16,13 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" - "go.mongodb.org/mongo-driver/mongo/gridfs" + "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) -func ExampleBucket_OpenUploadStream() { +func ExampleGridFSBucket_OpenUploadStream() { var fileContent []byte - var bucket *gridfs.Bucket + var bucket *mongo.GridFSBucket // Specify the Metadata option to include a "metadata" field in the files // collection document. @@ -49,9 +49,9 @@ func ExampleBucket_OpenUploadStream() { } } -func ExampleBucket_UploadFromStream() { +func ExampleGridFSBucket_UploadFromStream() { var fileContent []byte - var bucket *gridfs.Bucket + var bucket *mongo.GridFSBucket // Specify the Metadata option to include a "metadata" field in the files // collection document. @@ -69,8 +69,8 @@ func ExampleBucket_UploadFromStream() { fmt.Printf("new file created with ID %s", fileID) } -func ExampleBucket_OpenDownloadStream() { - var bucket *gridfs.Bucket +func ExampleGridFSBucket_OpenDownloadStream() { + var bucket *mongo.GridFSBucket var fileID primitive.ObjectID // Use WithContext to force a timeout if the download does not succeed in @@ -94,8 +94,8 @@ func ExampleBucket_OpenDownloadStream() { } } -func ExampleBucket_DownloadToStream() { - var bucket *gridfs.Bucket +func ExampleGridFSBucket_DownloadToStream() { + var bucket *mongo.GridFSBucket var fileID primitive.ObjectID ctx := context.Background() @@ -106,8 +106,8 @@ func ExampleBucket_DownloadToStream() { } } -func ExampleBucket_Delete() { - var bucket *gridfs.Bucket +func ExampleGridFSBucket_Delete() { + var bucket *mongo.GridFSBucket var fileID primitive.ObjectID if err := bucket.Delete(context.Background(), fileID); err != nil { @@ -115,8 +115,8 @@ func ExampleBucket_Delete() { } } -func ExampleBucket_Find() { - var bucket *gridfs.Bucket +func ExampleGridFSBucket_Find() { + var bucket *mongo.GridFSBucket // Specify a filter to find all files with a length greater than 1000 bytes. filter := bson.D{ @@ -146,8 +146,8 @@ func ExampleBucket_Find() { } } -func ExampleBucket_Rename() { - var bucket *gridfs.Bucket +func ExampleGridFSBucket_Rename() { + var bucket *mongo.GridFSBucket var fileID primitive.ObjectID ctx := context.Background() @@ -157,8 +157,8 @@ func ExampleBucket_Rename() { } } -func ExampleBucket_Drop() { - var bucket *gridfs.Bucket +func ExampleGridFSBucket_Drop() { + var bucket *mongo.GridFSBucket if err := bucket.Drop(context.Background()); err != nil { log.Fatal(err) diff --git a/mongo/gridfs/gridfs_test.go b/mongo/gridfs_test.go similarity index 91% rename from mongo/gridfs/gridfs_test.go rename to mongo/gridfs_test.go index b3af58518c..413942b5f9 100644 --- a/mongo/gridfs/gridfs_test.go +++ b/mongo/gridfs_test.go @@ -4,7 +4,7 @@ // not use this file except in compliance with the License. You may obtain // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 -package gridfs +package mongo import ( "context" @@ -13,14 +13,13 @@ import ( "go.mongodb.org/mongo-driver/event" "go.mongodb.org/mongo-driver/internal/assert" "go.mongodb.org/mongo-driver/internal/integtest" - "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/readpref" "go.mongodb.org/mongo-driver/mongo/writeconcern" ) var ( - connsCheckedOut int + gridfsConnsCheckedOut int ) func TestGridFS(t *testing.T) { @@ -33,9 +32,9 @@ func TestGridFS(t *testing.T) { Event: func(evt *event.PoolEvent) { switch evt.Type { case event.ConnectionCheckedOut: - connsCheckedOut++ + gridfsConnsCheckedOut++ case event.ConnectionCheckedIn: - connsCheckedOut-- + gridfsConnsCheckedOut-- } }, } @@ -49,12 +48,14 @@ func TestGridFS(t *testing.T) { // will discover the other hosts during SDAM checks. SetHosts(cs.Hosts[:1]) - client, err := mongo.Connect(clientOpts) + integtest.AddTestServerAPIVersion(clientOpts) + + client, err := Connect(clientOpts) assert.Nil(t, err, "Connect error: %v", err) db := client.Database("gridfs") defer func() { sessions := client.NumberSessionsInProgress() - conns := connsCheckedOut + conns := gridfsConnsCheckedOut _ = db.Drop(context.Background()) _ = client.Disconnect(context.Background()) @@ -78,13 +79,13 @@ func TestGridFS(t *testing.T) { for _, tt := range chunkSizeTests { t.Run(tt.testName, func(t *testing.T) { - bucket, err := NewBucket(db, tt.bucketOpts) + bucket, err := db.GridFSBucket(tt.bucketOpts) assert.Nil(t, err, "NewBucket error: %v", err) us, err := bucket.OpenUploadStream(context.Background(), "filename", tt.uploadOpts) assert.Nil(t, err, "OpenUploadStream error: %v", err) - expectedBucketChunkSize := DefaultChunkSize + expectedBucketChunkSize := DefaultGridFSChunkSize if tt.bucketOpts != nil && tt.bucketOpts.ChunkSizeBytes != nil { expectedBucketChunkSize = *tt.bucketOpts.ChunkSizeBytes } diff --git a/mongo/gridfs/upload_stream.go b/mongo/gridfs_upload_stream.go similarity index 79% rename from mongo/gridfs/upload_stream.go rename to mongo/gridfs_upload_stream.go index fbcd0be7ef..a83f250d01 100644 --- a/mongo/gridfs/upload_stream.go +++ b/mongo/gridfs_upload_stream.go @@ -4,7 +4,7 @@ // not use this file except in compliance with the License. You may obtain // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 -package gridfs +package mongo import ( "errors" @@ -16,27 +16,26 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" - "go.mongodb.org/mongo-driver/mongo" ) -// UploadBufferSize is the size in bytes of one stream batch. Chunks will be written to the db after the sum of chunk +// uploadBufferSize is the size in bytes of one stream batch. Chunks will be written to the db after the sum of chunk // lengths is equal to the batch size. -const UploadBufferSize = 16 * 1024 * 1024 // 16 MiB +const uploadBufferSize = 16 * 1024 * 1024 // 16 MiB // ErrStreamClosed is an error returned if an operation is attempted on a closed/aborted stream. var ErrStreamClosed = errors.New("stream is closed or aborted") -// UploadStream is used to upload a file in chunks. This type implements the io.Writer interface and a file can be +// GridFSUploadStream is used to upload a file in chunks. This type implements the io.Writer interface and a file can be // uploaded using the Write method. After an upload is complete, the Close method must be called to write file // metadata. -type UploadStream struct { - *Upload // chunk size and metadata +type GridFSUploadStream struct { + *upload // chunk size and metadata FileID interface{} chunkIndex int - chunksColl *mongo.Collection // collection to store file chunks + chunksColl *Collection // collection to store file chunks filename string - filesColl *mongo.Collection // collection to store file metadata + filesColl *Collection // collection to store file metadata closed bool buffer []byte bufferIndex int @@ -47,25 +46,25 @@ type UploadStream struct { // NewUploadStream creates a new upload stream. func newUploadStream( ctx context.Context, - upload *Upload, + up *upload, fileID interface{}, filename string, - chunks, files *mongo.Collection, -) *UploadStream { - return &UploadStream{ - Upload: upload, + chunks, files *Collection, +) *GridFSUploadStream { + return &GridFSUploadStream{ + upload: up, FileID: fileID, chunksColl: chunks, filename: filename, filesColl: files, - buffer: make([]byte, UploadBufferSize), + buffer: make([]byte, uploadBufferSize), ctx: ctx, } } // Close writes file metadata to the files collection and cleans up any resources associated with the UploadStream. -func (us *UploadStream) Close() error { +func (us *GridFSUploadStream) Close() error { if us.closed { return ErrStreamClosed } @@ -86,7 +85,7 @@ func (us *UploadStream) Close() error { // Write transfers the contents of a byte slice into this upload stream. If the stream's underlying buffer fills up, // the buffer will be uploaded as chunks to the server. Implements the io.Writer interface. -func (us *UploadStream) Write(p []byte) (int, error) { +func (us *GridFSUploadStream) Write(p []byte) (int, error) { if us.closed { return 0, ErrStreamClosed } @@ -101,7 +100,7 @@ func (us *UploadStream) Write(p []byte) (int, error) { p = p[n:] us.bufferIndex += n - if us.bufferIndex == UploadBufferSize { + if us.bufferIndex == uploadBufferSize { err := us.uploadChunks(us.ctx, false) if err != nil { return 0, err @@ -112,7 +111,7 @@ func (us *UploadStream) Write(p []byte) (int, error) { } // Abort closes the stream and deletes all file chunks that have already been written. -func (us *UploadStream) Abort() error { +func (us *GridFSUploadStream) Abort() error { if us.closed { return ErrStreamClosed } @@ -130,7 +129,7 @@ func (us *UploadStream) Abort() error { // if uploadPartial is true, any data at the end of the buffer that is smaller than a chunk will be uploaded as a partial // chunk. if it is false, the data will be moved to the front of the buffer. // uploadChunks sets us.bufferIndex to the next available index in the buffer after uploading -func (us *UploadStream) uploadChunks(ctx context.Context, uploadPartial bool) error { +func (us *GridFSUploadStream) uploadChunks(ctx context.Context, uploadPartial bool) error { chunks := float64(us.bufferIndex) / float64(us.chunkSize) numChunks := int(math.Ceil(chunks)) if !uploadPartial { @@ -167,14 +166,14 @@ func (us *UploadStream) uploadChunks(ctx context.Context, uploadPartial bool) er // copy any remaining bytes to beginning of buffer and set buffer index bytesUploaded := numChunks * int(us.chunkSize) - if bytesUploaded != UploadBufferSize && !uploadPartial { + if bytesUploaded != uploadBufferSize && !uploadPartial { copy(us.buffer[0:], us.buffer[bytesUploaded:us.bufferIndex]) } - us.bufferIndex = UploadBufferSize - bytesUploaded + us.bufferIndex = uploadBufferSize - bytesUploaded return nil } -func (us *UploadStream) createFilesCollDoc(ctx context.Context) error { +func (us *GridFSUploadStream) createFilesCollDoc(ctx context.Context) error { doc := bson.D{ {"_id", us.FileID}, {"length", us.fileLen},