Skip to content

Commit

Permalink
GODRIVER-2520 Fix merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
prestonvasquez committed Oct 16, 2023
2 parents 0483971 + 71f65e3 commit e465fdf
Show file tree
Hide file tree
Showing 94 changed files with 1,246 additions and 2,688 deletions.
27 changes: 0 additions & 27 deletions benchmark/canary.go

This file was deleted.

21 changes: 3 additions & 18 deletions benchmark/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,10 @@ func BenchmarkClientWrite(b *testing.B) {
}
for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
client, err := mongo.NewClient(bm.opt)
client, err := mongo.Connect(context.Background(), bm.opt)
if err != nil {
b.Fatalf("error creating client: %v", err)
}
ctx := context.Background()
err = client.Connect(ctx)
if err != nil {
b.Fatalf("error connecting: %v", err)
}
defer func() { _ = client.Disconnect(context.Background()) }()
coll := client.Database("test").Collection("test")
_, err = coll.DeleteMany(context.Background(), bson.D{})
Expand Down Expand Up @@ -76,15 +71,10 @@ func BenchmarkClientBulkWrite(b *testing.B) {
}
for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
client, err := mongo.NewClient(bm.opt)
client, err := mongo.Connect(context.Background(), bm.opt)
if err != nil {
b.Fatalf("error creating client: %v", err)
}
ctx := context.Background()
err = client.Connect(ctx)
if err != nil {
b.Fatalf("error connecting: %v", err)
}
defer func() { _ = client.Disconnect(context.Background()) }()
coll := client.Database("test").Collection("test")
_, err = coll.DeleteMany(context.Background(), bson.D{})
Expand Down Expand Up @@ -125,15 +115,10 @@ func BenchmarkClientRead(b *testing.B) {
}
for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
client, err := mongo.NewClient(bm.opt)
client, err := mongo.Connect(context.Background(), bm.opt)
if err != nil {
b.Fatalf("error creating client: %v", err)
}
ctx := context.Background()
err = client.Connect(ctx)
if err != nil {
b.Fatalf("error connecting: %v", err)
}
defer func() { _ = client.Disconnect(context.Background()) }()
coll := client.Database("test").Collection("test")
_, err = coll.DeleteMany(context.Background(), bson.D{})
Expand Down
5 changes: 1 addition & 4 deletions benchmark/single.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,10 @@ func getClientDB(ctx context.Context) (*mongo.Database, error) {
if err != nil {
return nil, err
}
client, err := mongo.NewClient(options.Client().ApplyURI(cs.String()))
client, err := mongo.Connect(ctx, options.Client().ApplyURI(cs.String()))
if err != nil {
return nil, err
}
if err = client.Connect(ctx); err != nil {
return nil, err
}

db := client.Database(integtest.GetDBName(cs))
return db, nil
Expand Down
44 changes: 13 additions & 31 deletions event/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ type CommandStartedEvent struct {
CommandName string
RequestID int64
ConnectionID string
// ServerConnectionID contains the connection ID from the server of the operation. If the server does not return
// this value (e.g. on MDB < 4.2), it is unset. If the server connection ID would cause an int32 overflow, then
// then this field will be nil.
//
// Deprecated: Use ServerConnectionID64.
ServerConnectionID *int32
// ServerConnectionID64 contains the connection ID from the server of the operation. If the server does not
// return this value (e.g. on MDB < 4.2), it is unset.
ServerConnectionID64 *int64
Expand All @@ -39,19 +33,11 @@ type CommandStartedEvent struct {

// CommandFinishedEvent represents a generic command finishing.
type CommandFinishedEvent struct {
// Deprecated: Use Duration instead.
DurationNanos int64
Duration time.Duration
CommandName string
DatabaseName string
RequestID int64
ConnectionID string
// ServerConnectionID contains the connection ID from the server of the operation. If the server does not return
// this value (e.g. on MDB < 4.2), it is unset.If the server connection ID would cause an int32 overflow, then
// this field will be nil.
//
// Deprecated: Use ServerConnectionID64.
ServerConnectionID *int32
Duration time.Duration
CommandName string
DatabaseName string
RequestID int64
ConnectionID string
// ServerConnectionID64 contains the connection ID from the server of the operation. If the server does not
// return this value (e.g. on MDB < 4.2), it is unset.
ServerConnectionID64 *int64
Expand Down Expand Up @@ -174,22 +160,18 @@ type ServerHeartbeatStartedEvent struct {

// ServerHeartbeatSucceededEvent is an event generated when the heartbeat succeeds.
type ServerHeartbeatSucceededEvent struct {
// Deprecated: Use Duration instead.
DurationNanos int64
Duration time.Duration
Reply description.Server
ConnectionID string // The address this heartbeat was sent to with a unique identifier
Awaited bool // If this heartbeat was awaitable
Duration time.Duration
Reply description.Server
ConnectionID string // The address this heartbeat was sent to with a unique identifier
Awaited bool // If this heartbeat was awaitable
}

// ServerHeartbeatFailedEvent is an event generated when the heartbeat fails.
type ServerHeartbeatFailedEvent struct {
// Deprecated: Use Duration instead.
DurationNanos int64
Duration time.Duration
Failure error
ConnectionID string // The address this heartbeat was sent to with a unique identifier
Awaited bool // If this heartbeat was awaitable
Duration time.Duration
Failure error
ConnectionID string // The address this heartbeat was sent to with a unique identifier
Awaited bool // If this heartbeat was awaitable
}

// ServerMonitor represents a monitor that is triggered for different server events. The client
Expand Down
18 changes: 10 additions & 8 deletions examples/documentation_examples/examples.go
Original file line number Diff line number Diff line change
Expand Up @@ -1763,7 +1763,7 @@ func UpdateEmployeeInfo(ctx context.Context, client *mongo.Client) error {
return client.UseSession(ctx, func(sctx mongo.SessionContext) error {
err := sctx.StartTransaction(options.Transaction().
SetReadConcern(readconcern.Snapshot()).
SetWriteConcern(writeconcern.New(writeconcern.WMajority())),
SetWriteConcern(writeconcern.Majority()),
)
if err != nil {
return err
Expand Down Expand Up @@ -1921,7 +1921,7 @@ func TransactionsExamples(ctx context.Context, client *mongo.Client) error {

err := sctx.StartTransaction(options.Transaction().
SetReadConcern(readconcern.Snapshot()).
SetWriteConcern(writeconcern.New(writeconcern.WMajority())),
SetWriteConcern(writeconcern.Majority()),
)
if err != nil {
return err
Expand Down Expand Up @@ -1971,7 +1971,8 @@ func WithTransactionExample(ctx context.Context) error {
defer func() { _ = client.Disconnect(ctx) }()

// Prereq: Create collections.
wcMajority := writeconcern.New(writeconcern.WMajority(), writeconcern.WTimeout(1*time.Second))
wcMajority := writeconcern.Majority()
wcMajority.WTimeout = 1 * time.Second
wcMajorityCollectionOpts := options.Collection().SetWriteConcern(wcMajority)
fooColl := client.Database("mydb1").Collection("foo", wcMajorityCollectionOpts)
barColl := client.Database("mydb1").Collection("bar", wcMajorityCollectionOpts)
Expand Down Expand Up @@ -2550,9 +2551,11 @@ func CausalConsistencyExamples(client *mongo.Client) error {

// Start Causal Consistency Example 1

rc := readconcern.Majority()
wc := writeconcern.Majority()
wc.WTimeout = 1000
// Use a causally-consistent session to run some operations
opts := options.Session().SetDefaultReadConcern(readconcern.Majority()).SetDefaultWriteConcern(
writeconcern.New(writeconcern.WMajority(), writeconcern.WTimeout(1000)))
opts := options.Session().SetDefaultReadConcern(rc).SetDefaultWriteConcern(wc)
session1, err := client.StartSession(opts)
if err != nil {
return err
Expand Down Expand Up @@ -2584,9 +2587,8 @@ func CausalConsistencyExamples(client *mongo.Client) error {
// Start Causal Consistency Example 2

// Make a new session that is causally consistent with session1 so session2 reads what session1 writes
opts = options.Session().SetDefaultReadPreference(readpref.Secondary()).SetDefaultReadConcern(
readconcern.Majority()).SetDefaultWriteConcern(writeconcern.New(writeconcern.WMajority(),
writeconcern.WTimeout(1000)))
opts = options.Session().SetDefaultReadPreference(readpref.Secondary()).
SetDefaultReadConcern(rc).SetDefaultWriteConcern(wc)
session2, err := client.StartSession(opts)
if err != nil {
return err
Expand Down
51 changes: 50 additions & 1 deletion mongo/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,55 @@ type changeStreamConfig struct {
crypt driver.Crypt
}

// mergeChangeStreamOptions combines the given ChangeStreamOptions instances into a single ChangeStreamOptions in a
// last-property-wins fashion.
func mergeChangeStreamOptions(opts ...*options.ChangeStreamOptions) *options.ChangeStreamOptions {
csOpts := options.ChangeStream()
for _, cso := range opts {
if cso == nil {
continue
}
if cso.BatchSize != nil {
csOpts.BatchSize = cso.BatchSize
}
if cso.Collation != nil {
csOpts.Collation = cso.Collation
}
if cso.Comment != nil {
csOpts.Comment = cso.Comment
}
if cso.FullDocument != nil {
csOpts.FullDocument = cso.FullDocument
}
if cso.FullDocumentBeforeChange != nil {
csOpts.FullDocumentBeforeChange = cso.FullDocumentBeforeChange
}
if cso.MaxAwaitTime != nil {
csOpts.MaxAwaitTime = cso.MaxAwaitTime
}
if cso.ResumeAfter != nil {
csOpts.ResumeAfter = cso.ResumeAfter
}
if cso.ShowExpandedEvents != nil {
csOpts.ShowExpandedEvents = cso.ShowExpandedEvents
}
if cso.StartAtOperationTime != nil {
csOpts.StartAtOperationTime = cso.StartAtOperationTime
}
if cso.StartAfter != nil {
csOpts.StartAfter = cso.StartAfter
}
if cso.Custom != nil {
csOpts.Custom = cso.Custom
}
if cso.CustomPipeline != nil {
csOpts.CustomPipeline = cso.CustomPipeline
}
}

return csOpts
}

func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline interface{},
opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
if ctx == nil {
Expand All @@ -116,7 +165,7 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in
bsonOpts: config.bsonOpts,
registry: config.registry,
streamType: config.streamType,
options: options.MergeChangeStreamOptions(opts...),
options: mergeChangeStreamOptions(opts...),
selector: description.CompositeSelector([]description.ServerSelector{
description.ReadPrefSelector(config.readPreference),
description.LatencySelector(config.client.localThreshold),
Expand Down
63 changes: 63 additions & 0 deletions mongo/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"

"go.mongodb.org/mongo-driver/internal/assert"
"go.mongodb.org/mongo-driver/mongo/options"
)

func TestChangeStream(t *testing.T) {
Expand All @@ -27,3 +28,65 @@ func TestChangeStream(t *testing.T) {
assert.Nil(t, err, "Close error: %v", err)
})
}

func TestMergeChangeStreamOptions(t *testing.T) {
t.Parallel()

fullDocumentP := func(x options.FullDocument) *options.FullDocument { return &x }
int32P := func(x int32) *int32 { return &x }

testCases := []struct {
description string
input []*options.ChangeStreamOptions
want *options.ChangeStreamOptions
}{
{
description: "nil",
input: nil,
want: &options.ChangeStreamOptions{},
},
{
description: "empty",
input: []*options.ChangeStreamOptions{},
want: &options.ChangeStreamOptions{},
},
{
description: "many ChangeStreamOptions with one configuration each",
input: []*options.ChangeStreamOptions{
options.ChangeStream().SetFullDocumentBeforeChange(options.Required),
options.ChangeStream().SetFullDocument(options.Required),
options.ChangeStream().SetBatchSize(10),
},
want: &options.ChangeStreamOptions{
FullDocument: fullDocumentP(options.Required),
FullDocumentBeforeChange: fullDocumentP(options.Required),
BatchSize: int32P(10),
},
},
{
description: "single ChangeStreamOptions with many configurations",
input: []*options.ChangeStreamOptions{
options.ChangeStream().
SetFullDocumentBeforeChange(options.Required).
SetFullDocument(options.Required).
SetBatchSize(10),
},
want: &options.ChangeStreamOptions{
FullDocument: fullDocumentP(options.Required),
FullDocumentBeforeChange: fullDocumentP(options.Required),
BatchSize: int32P(10),
},
},
}

for _, tc := range testCases {
tc := tc // Capture range variable.

t.Run(tc.description, func(t *testing.T) {
t.Parallel()

got := mergeChangeStreamOptions(tc.input...)
assert.Equal(t, tc.want, got, "expected and actual ChangeStreamOptions are different")
})
}
}
Loading

0 comments on commit e465fdf

Please sign in to comment.