Skip to content

Commit

Permalink
GODRIVER-2962 Remove setters from Aggregate.
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewdale committed Sep 20, 2023
1 parent 4b0c121 commit f5ee31a
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 392 deletions.
42 changes: 26 additions & 16 deletions mongo/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,26 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in
return nil, cs.Err()
}

cs.aggregate = operation.NewAggregate(nil).
ReadPreference(config.readPreference).ReadConcern(config.readConcern).
Deployment(cs.client.deployment).ClusterClock(cs.client.clock).
CommandMonitor(cs.client.monitor).Session(cs.sess).ServerSelector(cs.selector).Retry(driver.RetryNone).
ServerAPI(cs.client.serverAPI).Crypt(config.crypt).Timeout(cs.client.timeout)
retry := driver.RetryNone
cs.aggregate = &operation.Aggregate{
ReadPreference: config.readPreference,
ReadConcern: config.readConcern,
Deployment: cs.client.deployment,
Clock: cs.client.clock,
Monitor: cs.client.monitor,
Session: cs.sess,
Selector: cs.selector,
Retry: &retry,
ServerAPI: cs.client.serverAPI,
Crypt: config.crypt,
Timeout: cs.client.timeout,
}

if cs.options.Collation != nil {
cs.aggregate.Collation(bsoncore.Document(cs.options.Collation.ToDocument()))
cs.aggregate.Collation = bsoncore.Document(cs.options.Collation.ToDocument())
}
if comment := cs.options.Comment; comment != nil {
cs.aggregate.Comment(*comment)
cs.aggregate.Comment = comment

commentVal, err := marshalValue(comment, cs.bsonOpts, cs.registry)
if err != nil {
Expand All @@ -152,7 +161,7 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in
cs.cursorOptions.Comment = commentVal
}
if cs.options.BatchSize != nil {
cs.aggregate.BatchSize(*cs.options.BatchSize)
cs.aggregate.BatchSize = cs.options.BatchSize
cs.cursorOptions.BatchSize = *cs.options.BatchSize
}
if cs.options.MaxAwaitTime != nil {
Expand All @@ -172,7 +181,7 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in
optionValueBSON := bsoncore.Value{Type: bsonType, Data: bsonData}
customOptions[optionName] = optionValueBSON
}
cs.aggregate.CustomOptions(customOptions)
cs.aggregate.CustomOptions = customOptions
}
if cs.options.CustomPipeline != nil {
// Marshal all custom pipeline options before building pipeline slice. Return
Expand All @@ -192,11 +201,12 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in

switch cs.streamType {
case ClientStream:
cs.aggregate.Database("admin")
cs.aggregate.Database = "admin"
case DatabaseStream:
cs.aggregate.Database(config.databaseName)
cs.aggregate.Database = config.databaseName
case CollectionStream:
cs.aggregate.Collection(config.collectionName).Database(config.databaseName)
cs.aggregate.Collection = config.collectionName
cs.aggregate.Database = config.databaseName
default:
closeImplicitSession(cs.sess)
return nil, fmt.Errorf("must supply a valid StreamType in config, instead of %v", cs.streamType)
Expand All @@ -223,7 +233,7 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in
}
var pipelineArr bsoncore.Document
pipelineArr, cs.err = cs.pipelineToBSON()
cs.aggregate.Pipeline(pipelineArr)
cs.aggregate.Pipeline = pipelineArr

if cs.err = cs.executeOperation(ctx, false); cs.err != nil {
closeImplicitSession(cs.sess)
Expand Down Expand Up @@ -254,7 +264,7 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err
defer conn.Close()
cs.wireVersion = conn.Description().WireVersion

cs.aggregate.Deployment(cs.createOperationDeployment(server, conn))
cs.aggregate.Deployment = cs.createOperationDeployment(server, conn)

if resuming {
cs.replaceOptions(cs.wireVersion)
Expand All @@ -274,7 +284,7 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err
if plArr, cs.err = cs.pipelineToBSON(); cs.err != nil {
return cs.Err()
}
cs.aggregate.Pipeline(plArr)
cs.aggregate.Pipeline = plArr
}

// If no deadline is set on the passed-in context, cs.client.timeout is set, and context is not already
Expand Down Expand Up @@ -333,7 +343,7 @@ AggregateExecuteLoop:
cs.wireVersion = conn.Description().WireVersion

// Reset deployment.
cs.aggregate.Deployment(cs.createOperationDeployment(server, conn))
cs.aggregate.Deployment = cs.createOperationDeployment(server, conn)
default:
// Do not retry if error is not a driver error.
break AggregateExecuteLoop
Expand Down
87 changes: 50 additions & 37 deletions mongo/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,42 +846,43 @@ func aggregate(a aggregateParams) (cur *Cursor, err error) {

cursorOpts.MarshalValueEncoderFn = newEncoderFn(a.bsonOpts, a.registry)

op := operation.NewAggregate(pipelineArr).
Session(sess).
WriteConcern(wc).
ReadConcern(rc).
ReadPreference(a.readPreference).
CommandMonitor(a.client.monitor).
ServerSelector(selector).
ClusterClock(a.client.clock).
Database(a.db).
Collection(a.col).
Deployment(a.client.deployment).
Crypt(a.client.cryptFLE).
ServerAPI(a.client.serverAPI).
HasOutputStage(hasOutputStage).
Timeout(a.client.timeout).
MaxTime(ao.MaxTime)

if ao.AllowDiskUse != nil {
op.AllowDiskUse(*ao.AllowDiskUse)
op := &operation.Aggregate{
Pipeline: pipelineArr,
Session: sess,
WriteConcern: wc,
ReadConcern: rc,
ReadPreference: a.readPreference,
Monitor: a.client.monitor,
Selector: selector,
Clock: a.client.clock,
Database: a.db,
Collection: a.col,
Deployment: a.client.deployment,
Crypt: a.client.cryptFLE,
ServerAPI: a.client.serverAPI,
HasOutputStage: hasOutputStage,
Timeout: a.client.timeout,
MaxTime: ao.MaxTime,
AllowDiskUse: ao.AllowDiskUse,
Comment: ao.Comment,
}

// ignore batchSize of 0 with $out
if ao.BatchSize != nil && !(*ao.BatchSize == 0 && hasOutputStage) {
op.BatchSize(*ao.BatchSize)
op.BatchSize = ao.BatchSize
cursorOpts.BatchSize = *ao.BatchSize
}
if ao.BypassDocumentValidation != nil && *ao.BypassDocumentValidation {
op.BypassDocumentValidation(*ao.BypassDocumentValidation)
op.BypassDocumentValidation = ao.BypassDocumentValidation
}
if ao.Collation != nil {
op.Collation(bsoncore.Document(ao.Collation.ToDocument()))
op.Collation = bsoncore.Document(ao.Collation.ToDocument())
}
if ao.MaxAwaitTime != nil {
cursorOpts.MaxTimeMS = int64(*ao.MaxAwaitTime / time.Millisecond)
}
if ao.Comment != nil {
op.Comment(*ao.Comment)
op.Comment = ao.Comment

commentVal, err := marshalValue(ao.Comment, a.bsonOpts, a.registry)
if err != nil {
Expand All @@ -897,14 +898,14 @@ func aggregate(a aggregateParams) (cur *Cursor, err error) {
if err != nil {
return nil, err
}
op.Hint(hintVal)
op.Hint = hintVal
}
if ao.Let != nil {
let, err := marshal(ao.Let, a.bsonOpts, a.registry)
if err != nil {
return nil, err
}
op.Let(let)
op.Let = let
}
if ao.Custom != nil {
// Marshal all custom options before passing to the aggregate operation. Return
Expand All @@ -918,14 +919,14 @@ func aggregate(a aggregateParams) (cur *Cursor, err error) {
optionValueBSON := bsoncore.Value{Type: bsonType, Data: bsonData}
customOptions[optionName] = optionValueBSON
}
op.CustomOptions(customOptions)
op.CustomOptions = customOptions
}

retry := driver.RetryNone
if a.retryRead && !hasOutputStage {
retry = driver.RetryOncePerCommand
}
op = op.Retry(retry)
op.Retry = &retry

err = op.Execute(a.ctx)
if err != nil {
Expand Down Expand Up @@ -980,15 +981,26 @@ func (coll *Collection) CountDocuments(ctx context.Context, filter interface{},
}

selector := makeReadPrefSelector(sess, coll.readSelector, coll.client.localThreshold)
op := operation.NewAggregate(pipelineArr).Session(sess).ReadConcern(rc).ReadPreference(coll.readPreference).
CommandMonitor(coll.client.monitor).ServerSelector(selector).ClusterClock(coll.client.clock).Database(coll.db.name).
Collection(coll.name).Deployment(coll.client.deployment).Crypt(coll.client.cryptFLE).ServerAPI(coll.client.serverAPI).
Timeout(coll.client.timeout).MaxTime(countOpts.MaxTime)
if countOpts.Collation != nil {
op.Collation(bsoncore.Document(countOpts.Collation.ToDocument()))
op := &operation.Aggregate{
Pipeline: pipelineArr,
Session: sess,
ReadConcern: rc,
ReadPreference: coll.readPreference,
Monitor: coll.client.monitor,
Selector: selector,
Clock: coll.client.clock,
Database: coll.db.name,
Collection: coll.name,
Deployment: coll.client.deployment,
Crypt: coll.client.cryptFLE,
ServerAPI: coll.client.serverAPI,
Timeout: coll.client.timeout,
MaxTime: countOpts.MaxTime,
Comment: countOpts.Comment,
}
if countOpts.Comment != nil {
op.Comment(*countOpts.Comment)

if countOpts.Collation != nil {
op.Collation = bsoncore.Document(countOpts.Collation.ToDocument())
}
if countOpts.Hint != nil {
if isUnorderedMap(countOpts.Hint) {
Expand All @@ -998,13 +1010,14 @@ func (coll *Collection) CountDocuments(ctx context.Context, filter interface{},
if err != nil {
return 0, err
}
op.Hint(hintVal)
op.Hint = hintVal
}

retry := driver.RetryNone
if coll.client.retryReads {
retry = driver.RetryOncePerCommand
}
op = op.Retry(retry)
op.Retry = &retry

err = op.Execute(ctx)
if err != nil {
Expand Down
54 changes: 38 additions & 16 deletions x/mongo/driver/integration/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,17 @@ func TestAggregate(t *testing.T) {
noerr(t, err)

clearChannels(started, succeeded, failed)
op := operation.NewAggregate(bsoncore.BuildDocumentFromElements(nil)).
Collection(collName).Database(dbName).Deployment(top).ServerSelector(description.WriteSelector()).
CommandMonitor(monitor).BatchSize(2)

var batchSize int32 = 2
op := &operation.Aggregate{
Pipeline: bsoncore.BuildDocumentFromElements(nil),
Collection: collName,
Database: dbName,
Deployment: top,
Selector: description.WriteSelector(),
Monitor: monitor,
BatchSize: &batchSize,
}
err = op.Execute(context.Background())
noerr(t, err)
batchCursor, err := op.Result(driver.CursorOptions{MaxTimeMS: 10, BatchSize: 2, CommandMonitor: monitor})
Expand Down Expand Up @@ -124,21 +132,28 @@ func TestAggregate(t *testing.T) {
wc := writeconcern.New(writeconcern.WMajority())
autoInsertDocs(t, wc, ds...)

op := operation.NewAggregate(bsoncore.BuildArray(nil,
bsoncore.BuildDocumentValue(
bsoncore.BuildDocumentElement(nil,
"$match", bsoncore.BuildDocumentElement(nil,
"_id", bsoncore.AppendInt32Element(nil, "$gt", 2),
var batchSize int32 = 2
op := &operation.Aggregate{
Pipeline: bsoncore.BuildArray(nil,
bsoncore.BuildDocumentValue(
bsoncore.BuildDocumentElement(nil,
"$match", bsoncore.BuildDocumentElement(nil,
"_id", bsoncore.AppendInt32Element(nil, "$gt", 2),
),
),
),
),
bsoncore.BuildDocumentValue(
bsoncore.BuildDocumentElement(nil,
"$sort", bsoncore.AppendInt32Element(nil, "_id", 1),
bsoncore.BuildDocumentValue(
bsoncore.BuildDocumentElement(nil,
"$sort", bsoncore.AppendInt32Element(nil, "_id", 1),
),
),
),
)).Collection(integtest.ColName(t)).Database(dbName).Deployment(integtest.Topology(t)).
ServerSelector(description.WriteSelector()).BatchSize(2)
Collection: integtest.ColName(t),
Database: dbName,
Deployment: integtest.Topology(t),
Selector: description.WriteSelector(),
BatchSize: &batchSize,
}
err := op.Execute(context.Background())
noerr(t, err)
cursor, err := op.Result(driver.CursorOptions{BatchSize: 2})
Expand Down Expand Up @@ -172,8 +187,15 @@ func TestAggregate(t *testing.T) {
wc := writeconcern.New(writeconcern.WMajority())
autoInsertDocs(t, wc, ds...)

op := operation.NewAggregate(bsoncore.BuildArray(nil)).Collection(integtest.ColName(t)).Database(dbName).
Deployment(integtest.Topology(t)).ServerSelector(description.WriteSelector()).AllowDiskUse(true)
allowDiskUse := true
op := &operation.Aggregate{
Pipeline: bsoncore.BuildArray(nil),
Collection: integtest.ColName(t),
Database: dbName,
Deployment: integtest.Topology(t),
Selector: description.WriteSelector(),
AllowDiskUse: &allowDiskUse,
}
err := op.Execute(context.Background())
if err != nil {
t.Errorf("Expected no error from allowing disk use, but got %v", err)
Expand Down
Loading

0 comments on commit f5ee31a

Please sign in to comment.