Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
qingyang-hu committed Sep 20, 2023
1 parent b07ff6e commit 0a66171
Show file tree
Hide file tree
Showing 12 changed files with 189 additions and 282 deletions.
51 changes: 50 additions & 1 deletion mongo/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,55 @@ type changeStreamConfig struct {
crypt driver.Crypt
}

// mergeChangeStreamOptions combines the given ChangeStreamOptions instances into a single ChangeStreamOptions in a
// last-one-wins fashion.
func mergeChangeStreamOptions(opts ...*options.ChangeStreamOptions) *options.ChangeStreamOptions {
csOpts := options.ChangeStream()
for _, cso := range opts {
if cso == nil {
continue
}
if cso.BatchSize != nil {
csOpts.BatchSize = cso.BatchSize
}
if cso.Collation != nil {
csOpts.Collation = cso.Collation
}
if cso.Comment != nil {
csOpts.Comment = cso.Comment
}
if cso.FullDocument != nil {
csOpts.FullDocument = cso.FullDocument
}
if cso.FullDocumentBeforeChange != nil {
csOpts.FullDocumentBeforeChange = cso.FullDocumentBeforeChange
}
if cso.MaxAwaitTime != nil {
csOpts.MaxAwaitTime = cso.MaxAwaitTime
}
if cso.ResumeAfter != nil {
csOpts.ResumeAfter = cso.ResumeAfter
}
if cso.ShowExpandedEvents != nil {
csOpts.ShowExpandedEvents = cso.ShowExpandedEvents
}
if cso.StartAtOperationTime != nil {
csOpts.StartAtOperationTime = cso.StartAtOperationTime
}
if cso.StartAfter != nil {
csOpts.StartAfter = cso.StartAfter
}
if cso.Custom != nil {
csOpts.Custom = cso.Custom
}
if cso.CustomPipeline != nil {
csOpts.CustomPipeline = cso.CustomPipeline
}
}

return csOpts
}

func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline interface{},
opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
if ctx == nil {
Expand All @@ -116,7 +165,7 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in
bsonOpts: config.bsonOpts,
registry: config.registry,
streamType: config.streamType,
options: options.MergeChangeStreamOptions(opts...),
options: mergeChangeStreamOptions(opts...),
selector: description.CompositeSelector([]description.ServerSelector{
description.ReadPrefSelector(config.readPreference),
description.LatencySelector(config.client.localThreshold),
Expand Down
58 changes: 58 additions & 0 deletions mongo/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"

"go.mongodb.org/mongo-driver/internal/assert"
"go.mongodb.org/mongo-driver/mongo/options"
)

func TestChangeStream(t *testing.T) {
Expand All @@ -27,3 +28,60 @@ func TestChangeStream(t *testing.T) {
assert.Nil(t, err, "Close error: %v", err)
})
}

func TestMergeChangeStreamOptions(t *testing.T) {
t.Parallel()

fullDocumentP := func(x options.FullDocument) *options.FullDocument { return &x }
int32P := func(x int32) *int32 { return &x }

testCases := []struct {
description string
input []*options.ChangeStreamOptions
want *options.ChangeStreamOptions
}{
{
description: "empty",
input: []*options.ChangeStreamOptions{},
want: &options.ChangeStreamOptions{},
},
{
description: "many ChangeStreamOptions with one configuration each",
input: []*options.ChangeStreamOptions{
options.ChangeStream().SetFullDocumentBeforeChange(options.Required),
options.ChangeStream().SetFullDocument(options.Required),
options.ChangeStream().SetBatchSize(10),
},
want: &options.ChangeStreamOptions{
FullDocument: fullDocumentP(options.Required),
FullDocumentBeforeChange: fullDocumentP(options.Required),
BatchSize: int32P(10),
},
},
{
description: "single ChangeStreamOptions with many configurations",
input: []*options.ChangeStreamOptions{
options.ChangeStream().
SetFullDocumentBeforeChange(options.Required).
SetFullDocument(options.Required).
SetBatchSize(10),
},
want: &options.ChangeStreamOptions{
FullDocument: fullDocumentP(options.Required),
FullDocumentBeforeChange: fullDocumentP(options.Required),
BatchSize: int32P(10),
},
},
}

for _, tc := range testCases {
tc := tc // Capture range variable.

t.Run(tc.description, func(t *testing.T) {
t.Parallel()

got := mergeChangeStreamOptions(tc.input...)
assert.Equal(t, tc.want, got, "expected and actual ChangeStreamOptions are different")
})
}
}
64 changes: 62 additions & 2 deletions mongo/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,24 @@ func (coll *Collection) BulkWrite(ctx context.Context, models []WriteModel,
}
}

bwo := options.MergeBulkWriteOptions(opts...)
bwo := options.BulkWrite()
for _, opt := range opts {
if opt == nil {
continue
}
if opt.Comment != nil {
bwo.Comment = opt.Comment
}
if opt.Ordered != nil {
bwo.Ordered = opt.Ordered
}
if opt.BypassDocumentValidation != nil {
bwo.BypassDocumentValidation = opt.BypassDocumentValidation
}
if opt.Let != nil {
bwo.Let = opt.Let
}
}

op := bulkWrite{
comment: bwo.Comment,
Expand Down Expand Up @@ -796,6 +813,49 @@ func (coll *Collection) Aggregate(ctx context.Context, pipeline interface{},
return aggregate(a)
}

// mergeAggregateOptions combines the given AggregateOptions instances into a single AggregateOptions in a last-one-wins
// fashion.
func mergeAggregateOptions(opts ...*options.AggregateOptions) *options.AggregateOptions {
aggOpts := options.Aggregate()
for _, ao := range opts {
if ao == nil {
continue
}
if ao.AllowDiskUse != nil {
aggOpts.AllowDiskUse = ao.AllowDiskUse
}
if ao.BatchSize != nil {
aggOpts.BatchSize = ao.BatchSize
}
if ao.BypassDocumentValidation != nil {
aggOpts.BypassDocumentValidation = ao.BypassDocumentValidation
}
if ao.Collation != nil {
aggOpts.Collation = ao.Collation
}
if ao.MaxTime != nil {
aggOpts.MaxTime = ao.MaxTime
}
if ao.MaxAwaitTime != nil {
aggOpts.MaxAwaitTime = ao.MaxAwaitTime
}
if ao.Comment != nil {
aggOpts.Comment = ao.Comment
}
if ao.Hint != nil {
aggOpts.Hint = ao.Hint
}
if ao.Let != nil {
aggOpts.Let = ao.Let
}
if ao.Custom != nil {
aggOpts.Custom = ao.Custom
}
}

return aggOpts
}

// aggregate is the helper method for Aggregate
func aggregate(a aggregateParams) (cur *Cursor, err error) {
if a.ctx == nil {
Expand Down Expand Up @@ -840,7 +900,7 @@ func aggregate(a aggregateParams) (cur *Cursor, err error) {
selector = makeOutputAggregateSelector(sess, a.readPreference, a.client.localThreshold)
}

ao := options.MergeAggregateOptions(a.opts...)
ao := mergeAggregateOptions(a.opts...)

cursorOpts := a.client.createBaseCursorOptions()

Expand Down
10 changes: 9 additions & 1 deletion mongo/integration/mtest/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,15 @@ func setupClient(opts *options.ClientOptions) (*mongo.Client, error) {
// Setup initializes the current testing context.
// This function must only be called one time and must be called before any tests run.
func Setup(setupOpts ...*SetupOptions) error {
opts := MergeSetupOptions(setupOpts...)
opts := NewSetupOptions()
for _, opt := range setupOpts {
if opt == nil {
continue
}
if opt.URI != nil {
opts.URI = opt.URI
}
}

var uri string
var err error
Expand Down
17 changes: 0 additions & 17 deletions mongo/integration/mtest/setup_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,3 @@ func (so *SetupOptions) SetURI(uri string) *SetupOptions {
so.URI = &uri
return so
}

// MergeSetupOptions combines the given *SetupOptions into a single *Options in a last one wins fashion.
//
// Deprecated: Merging options structs will not be supported in Go Driver 2.0. Users should create a
// single options struct instead.
func MergeSetupOptions(opts ...*SetupOptions) *SetupOptions {
op := NewSetupOptions()
for _, opt := range opts {
if opt == nil {
continue
}
if opt.URI != nil {
op.URI = opt.URI
}
}
return op
}
18 changes: 0 additions & 18 deletions mongo/integration/unified/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,3 @@ func (op *Options) SetRunKillAllSessions(killAllSessions bool) *Options {
op.RunKillAllSessions = &killAllSessions
return op
}

// MergeOptions combines the given *Options into a single *Options in a last one wins fashion.
//
// Deprecated: Merging options structs will not be supported in Go Driver 2.0. Users should create a
// single options struct instead.
func MergeOptions(opts ...*Options) *Options {
op := NewOptions()
for _, opt := range opts {
if opt == nil {
continue
}
if opt.RunKillAllSessions != nil {
op.RunKillAllSessions = opt.RunKillAllSessions
}
}

return op
}
11 changes: 10 additions & 1 deletion mongo/integration/unified/unified_spec_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,16 @@ func parseTestFile(testJSON []byte, opts ...*Options) ([]mtest.RunOnBlock, []*Te
return nil, nil, err
}

op := MergeOptions(opts...)
op := NewOptions()
for _, opt := range opts {
if opt == nil {
continue
}
if opt.RunKillAllSessions != nil {
op.RunKillAllSessions = opt.RunKillAllSessions
}
}

for _, testCase := range testFile.TestCases {
testCase.initialData = testFile.InitialData
testCase.createEntities = testFile.CreateEntities
Expand Down
46 changes: 0 additions & 46 deletions mongo/options/aggregateoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,49 +136,3 @@ func (ao *AggregateOptions) SetCustom(c bson.M) *AggregateOptions {
ao.Custom = c
return ao
}

// MergeAggregateOptions combines the given AggregateOptions instances into a single AggregateOptions in a last-one-wins
// fashion.
//
// Deprecated: Merging options structs will not be supported in Go Driver 2.0. Users should create a
// single options struct instead.
func MergeAggregateOptions(opts ...*AggregateOptions) *AggregateOptions {
aggOpts := Aggregate()
for _, ao := range opts {
if ao == nil {
continue
}
if ao.AllowDiskUse != nil {
aggOpts.AllowDiskUse = ao.AllowDiskUse
}
if ao.BatchSize != nil {
aggOpts.BatchSize = ao.BatchSize
}
if ao.BypassDocumentValidation != nil {
aggOpts.BypassDocumentValidation = ao.BypassDocumentValidation
}
if ao.Collation != nil {
aggOpts.Collation = ao.Collation
}
if ao.MaxTime != nil {
aggOpts.MaxTime = ao.MaxTime
}
if ao.MaxAwaitTime != nil {
aggOpts.MaxAwaitTime = ao.MaxAwaitTime
}
if ao.Comment != nil {
aggOpts.Comment = ao.Comment
}
if ao.Hint != nil {
aggOpts.Hint = ao.Hint
}
if ao.Let != nil {
aggOpts.Let = ao.Let
}
if ao.Custom != nil {
aggOpts.Custom = ao.Custom
}
}

return aggOpts
}
46 changes: 0 additions & 46 deletions mongo/options/autoencryptionoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,49 +162,3 @@ func (a *AutoEncryptionOptions) SetBypassQueryAnalysis(bypass bool) *AutoEncrypt
a.BypassQueryAnalysis = &bypass
return a
}

// MergeAutoEncryptionOptions combines the argued AutoEncryptionOptions in a last-one wins fashion.
//
// Deprecated: Merging options structs will not be supported in Go Driver 2.0. Users should create a
// single options struct instead.
func MergeAutoEncryptionOptions(opts ...*AutoEncryptionOptions) *AutoEncryptionOptions {
aeo := AutoEncryption()
for _, opt := range opts {
if opt == nil {
continue
}

if opt.KeyVaultClientOptions != nil {
aeo.KeyVaultClientOptions = opt.KeyVaultClientOptions
}
if opt.KeyVaultNamespace != "" {
aeo.KeyVaultNamespace = opt.KeyVaultNamespace
}
if opt.KmsProviders != nil {
aeo.KmsProviders = opt.KmsProviders
}
if opt.SchemaMap != nil {
aeo.SchemaMap = opt.SchemaMap
}
if opt.BypassAutoEncryption != nil {
aeo.BypassAutoEncryption = opt.BypassAutoEncryption
}
if opt.ExtraOptions != nil {
aeo.ExtraOptions = opt.ExtraOptions
}
if opt.TLSConfig != nil {
aeo.TLSConfig = opt.TLSConfig
}
if opt.EncryptedFieldsMap != nil {
aeo.EncryptedFieldsMap = opt.EncryptedFieldsMap
}
if opt.BypassQueryAnalysis != nil {
aeo.BypassQueryAnalysis = opt.BypassQueryAnalysis
}
if opt.HTTPClient != nil {
aeo.HTTPClient = opt.HTTPClient
}
}

return aeo
}
Loading

0 comments on commit 0a66171

Please sign in to comment.