Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
qingyang-hu committed Sep 26, 2023
1 parent f35a93a commit 78b17ff
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 159 deletions.
5 changes: 0 additions & 5 deletions mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,6 @@ func NewClient(opts ...*options.ClientOptions) (*Client, error) {
client.cryptFLE = clientOpt.Crypt
}

// Deployment
if clientOpt.Deployment != nil {
client.deployment = clientOpt.Deployment
}

// Set default options
if clientOpt.MaxPoolSize == nil {
clientOpt.SetMaxPoolSize(defaultMaxPoolSize)
Expand Down
78 changes: 74 additions & 4 deletions mongo/gridfs/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,27 @@ func NewBucket(db *mongo.Database, opts ...*options.BucketOptions) (*Bucket, err
rp: db.ReadPreference(),
}

bo := options.MergeBucketOptions(opts...)
bo := options.GridFSBucket()
for _, opt := range opts {
if opt == nil {
continue
}
if opt.Name != nil {
bo.Name = opt.Name
}
if opt.ChunkSizeBytes != nil {
bo.ChunkSizeBytes = opt.ChunkSizeBytes
}
if opt.WriteConcern != nil {
bo.WriteConcern = opt.WriteConcern
}
if opt.ReadConcern != nil {
bo.ReadConcern = opt.ReadConcern
}
if opt.ReadPreference != nil {
bo.ReadPreference = opt.ReadPreference
}
}
if bo.Name != nil {
b.name = *bo.Name
}
Expand Down Expand Up @@ -210,7 +230,17 @@ func (b *Bucket) OpenDownloadStreamByName(filename string, opts ...*options.Name
var numSkip int32 = -1
var sortOrder int32 = 1

nameOpts := options.MergeNameOptions(opts...)
nameOpts := options.GridFSName()
nameOpts.Revision = &options.DefaultRevision

for _, opt := range opts {
if opt == nil {
continue
}
if opt.Revision != nil {
nameOpts.Revision = opt.Revision
}
}
if nameOpts.Revision != nil {
numSkip = *nameOpts.Revision
}
Expand Down Expand Up @@ -302,7 +332,33 @@ func (b *Bucket) Find(filter interface{}, opts ...*options.GridFSFindOptions) (*
// Use the context parameter to time-out or cancel the find operation. The deadline set by SetReadDeadline
// is ignored.
func (b *Bucket) FindContext(ctx context.Context, filter interface{}, opts ...*options.GridFSFindOptions) (*mongo.Cursor, error) {
gfsOpts := options.MergeGridFSFindOptions(opts...)
gfsOpts := options.GridFSFind()
for _, opt := range opts {
if opt == nil {
continue
}
if opt.AllowDiskUse != nil {
gfsOpts.AllowDiskUse = opt.AllowDiskUse
}
if opt.BatchSize != nil {
gfsOpts.BatchSize = opt.BatchSize
}
if opt.Limit != nil {
gfsOpts.Limit = opt.Limit
}
if opt.MaxTime != nil {
gfsOpts.MaxTime = opt.MaxTime
}
if opt.NoCursorTimeout != nil {
gfsOpts.NoCursorTimeout = opt.NoCursorTimeout
}
if opt.Skip != nil {
gfsOpts.Skip = opt.Skip
}
if opt.Sort != nil {
gfsOpts.Sort = opt.Sort
}
}
find := options.Find()
if gfsOpts.AllowDiskUse != nil {
find.SetAllowDiskUse(*gfsOpts.AllowDiskUse)
Expand Down Expand Up @@ -642,7 +698,21 @@ func (b *Bucket) parseUploadOptions(opts ...*options.UploadOptions) (*Upload, er
chunkSize: b.chunkSize, // upload chunk size defaults to bucket's value
}

uo := options.MergeUploadOptions(opts...)
uo := options.GridFSUpload()
for _, opt := range opts {
if opt == nil {
continue
}
if opt.ChunkSizeBytes != nil {
uo.ChunkSizeBytes = opt.ChunkSizeBytes
}
if opt.Metadata != nil {
uo.Metadata = opt.Metadata
}
if opt.Registry != nil {
uo.Registry = opt.Registry
}
}
if uo.ChunkSizeBytes != nil {
upload.chunkSize = *uo.ChunkSizeBytes
}
Expand Down
47 changes: 19 additions & 28 deletions mongo/integration/mtest/mongotest.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ func (t *T) createTestClient() {
clientOpts = options.Client().SetWriteConcern(MajorityWc).SetReadPreference(PrimaryRp)
}
// set ServerAPIOptions to latest version if required
if clientOpts.Deployment == nil && t.clientType != Mock && clientOpts.ServerAPIOptions == nil && testContext.requireAPIVersion {
if t.clientType != Mock && clientOpts.ServerAPIOptions == nil && testContext.requireAPIVersion {
clientOpts.SetServerAPIOptions(options.ServerAPI(driver.TestServerAPIVersion))
}

Expand Down Expand Up @@ -663,25 +663,23 @@ func (t *T) createTestClient() {
t.failed = append(t.failed, cfe)
},
})
// only specify connection pool monitor if no deployment is given
if clientOpts.Deployment == nil {
previousPoolMonitor := clientOpts.PoolMonitor

clientOpts.SetPoolMonitor(&event.PoolMonitor{
Event: func(evt *event.PoolEvent) {
if previousPoolMonitor != nil {
previousPoolMonitor.Event(evt)
}

switch evt.Type {
case event.GetSucceeded:
atomic.AddInt64(&t.connsCheckedOut, 1)
case event.ConnectionReturned:
atomic.AddInt64(&t.connsCheckedOut, -1)
}
},
})
}

// specify connection pool monitor
previousPoolMonitor := clientOpts.PoolMonitor
clientOpts.SetPoolMonitor(&event.PoolMonitor{
Event: func(evt *event.PoolEvent) {
if previousPoolMonitor != nil {
previousPoolMonitor.Event(evt)
}

switch evt.Type {
case event.GetSucceeded:
atomic.AddInt64(&t.connsCheckedOut, 1)
case event.ConnectionReturned:
atomic.AddInt64(&t.connsCheckedOut, -1)
}
},
})

var err error
switch t.clientType {
Expand All @@ -694,7 +692,6 @@ func (t *T) createTestClient() {
// clear pool monitor to avoid configuration error
clientOpts.PoolMonitor = nil
t.mockDeployment = newMockDeployment()
clientOpts.Deployment = t.mockDeployment
t.Client, err = mongo.NewClient(clientOpts)
case Proxy:
t.proxyDialer = newProxyDialer()
Expand All @@ -705,13 +702,7 @@ func (t *T) createTestClient() {
case Default:
// Use a different set of options to specify the URI because clientOpts may already have a URI or host seedlist
// specified.
var uriOpts *options.ClientOptions
if clientOpts.Deployment == nil {
// Only specify URI if the deployment is not set to avoid setting topology/server options along with the
// deployment.
uriOpts = options.Client().ApplyURI(testContext.connString.Original)
}

uriOpts := options.Client().ApplyURI(testContext.connString.Original)
// Pass in uriOpts first so clientOpts wins if there are any conflicting settings.
t.Client, err = mongo.NewClient(uriOpts, clientOpts)
}
Expand Down
9 changes: 0 additions & 9 deletions mongo/options/clientoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,6 @@ type ClientOptions struct {
// changed or removed in any release.
Crypt driver.Crypt

// Deployment specifies a custom deployment to use for the new Client.
//
// Deprecated: This option is for internal use only and should not be set. It may be changed or removed in any
// release.
Deployment driver.Deployment

// SocketTimeout specifies the timeout to be used for the Client's socket reads and writes.
//
// NOTE(benjirewis): SocketTimeout will be deprecated in a future release. The more general Timeout option
Expand Down Expand Up @@ -1080,9 +1074,6 @@ func MergeClientOptions(opts ...*ClientOptions) *ClientOptions {
if opt.AutoEncryptionOptions != nil {
c.AutoEncryptionOptions = opt.AutoEncryptionOptions
}
if opt.Deployment != nil {
c.Deployment = opt.Deployment
}
if opt.DisableOCSPEndpointCheck != nil {
c.DisableOCSPEndpointCheck = opt.DisableOCSPEndpointCheck
}
Expand Down
113 changes: 0 additions & 113 deletions mongo/options/gridfsoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,37 +84,6 @@ func (b *BucketOptions) SetReadPreference(rp *readpref.ReadPref) *BucketOptions
return b
}

// MergeBucketOptions combines the given BucketOptions instances into a single BucketOptions in a last-one-wins fashion.
//
// Deprecated: Merging options structs will not be supported in Go Driver 2.0. Users should create a
// single options struct instead.
func MergeBucketOptions(opts ...*BucketOptions) *BucketOptions {
b := GridFSBucket()

for _, opt := range opts {
if opt == nil {
continue
}
if opt.Name != nil {
b.Name = opt.Name
}
if opt.ChunkSizeBytes != nil {
b.ChunkSizeBytes = opt.ChunkSizeBytes
}
if opt.WriteConcern != nil {
b.WriteConcern = opt.WriteConcern
}
if opt.ReadConcern != nil {
b.ReadConcern = opt.ReadConcern
}
if opt.ReadPreference != nil {
b.ReadPreference = opt.ReadPreference
}
}

return b
}

// UploadOptions represents options that can be used to configure a GridFS upload operation.
type UploadOptions struct {
// The number of bytes in each chunk in the bucket. The default value is DefaultChunkSize (255 KiB).
Expand Down Expand Up @@ -146,31 +115,6 @@ func (u *UploadOptions) SetMetadata(doc interface{}) *UploadOptions {
return u
}

// MergeUploadOptions combines the given UploadOptions instances into a single UploadOptions in a last-one-wins fashion.
//
// Deprecated: Merging options structs will not be supported in Go Driver 2.0. Users should create a
// single options struct instead.
func MergeUploadOptions(opts ...*UploadOptions) *UploadOptions {
u := GridFSUpload()

for _, opt := range opts {
if opt == nil {
continue
}
if opt.ChunkSizeBytes != nil {
u.ChunkSizeBytes = opt.ChunkSizeBytes
}
if opt.Metadata != nil {
u.Metadata = opt.Metadata
}
if opt.Registry != nil {
u.Registry = opt.Registry
}
}

return u
}

// NameOptions represents options that can be used to configure a GridFS DownloadByName operation.
type NameOptions struct {
// Specifies the revision of the file to retrieve. Revision numbers are defined as follows:
Expand All @@ -197,26 +141,6 @@ func (n *NameOptions) SetRevision(r int32) *NameOptions {
return n
}

// MergeNameOptions combines the given NameOptions instances into a single *NameOptions in a last-one-wins fashion.
//
// Deprecated: Merging options structs will not be supported in Go Driver 2.0. Users should create a
// single options struct instead.
func MergeNameOptions(opts ...*NameOptions) *NameOptions {
n := GridFSName()
n.Revision = &DefaultRevision

for _, opt := range opts {
if opt == nil {
continue
}
if opt.Revision != nil {
n.Revision = opt.Revision
}
}

return n
}

// GridFSFindOptions represents options that can be used to configure a GridFS Find operation.
type GridFSFindOptions struct {
// If true, the server can write temporary data to disk while executing the find operation. The default value
Expand Down Expand Up @@ -302,40 +226,3 @@ func (f *GridFSFindOptions) SetSort(sort interface{}) *GridFSFindOptions {
f.Sort = sort
return f
}

// MergeGridFSFindOptions combines the given GridFSFindOptions instances into a single GridFSFindOptions in a
// last-one-wins fashion.
//
// Deprecated: Merging options structs will not be supported in Go Driver 2.0. Users should create a
// single options struct instead.
func MergeGridFSFindOptions(opts ...*GridFSFindOptions) *GridFSFindOptions {
fo := GridFSFind()
for _, opt := range opts {
if opt == nil {
continue
}
if opt.AllowDiskUse != nil {
fo.AllowDiskUse = opt.AllowDiskUse
}
if opt.BatchSize != nil {
fo.BatchSize = opt.BatchSize
}
if opt.Limit != nil {
fo.Limit = opt.Limit
}
if opt.MaxTime != nil {
fo.MaxTime = opt.MaxTime
}
if opt.NoCursorTimeout != nil {
fo.NoCursorTimeout = opt.NoCursorTimeout
}
if opt.Skip != nil {
fo.Skip = opt.Skip
}
if opt.Sort != nil {
fo.Sort = opt.Sort
}
}

return fo
}

0 comments on commit 78b17ff

Please sign in to comment.