Skip to content

Commit

Permalink
GODRIVER-3043 [master] Use default write/read concerns in the index s…
Browse files Browse the repository at this point in the history
…earch commands. (#1609)

Co-authored-by: Qingyang Hu <103950869+qingyang-hu@users.noreply.github.com>
Co-authored-by: Qingyang Hu <qingyang.hu@mongodb.com>
  • Loading branch information
3 people authored Apr 19, 2024
1 parent 5051b66 commit 70403cd
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 138 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ evg-test-load-balancers:

.PHONY: evg-test-search-index
evg-test-search-index:
go test ./internal/integration -run TestSearchIndexProse -v -timeout $(TEST_TIMEOUT)s >> test.suite
# Double the timeout to wait for the responses from the server.
go test ./internal/integration -run TestSearchIndexProse -v -timeout $(shell echo "$$(( $(TEST_TIMEOUT) * 2))")s >> test.suite

.PHONY: evg-test-ocsp
evg-test-ocsp:
Expand Down
115 changes: 88 additions & 27 deletions internal/integration/search_index_prose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package integration

import (
"bytes"
"context"
"os"
"sync"
Expand All @@ -20,6 +21,8 @@ import (
"go.mongodb.org/mongo-driver/internal/uuid"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readconcern"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
)

func TestSearchIndexProse(t *testing.T) {
Expand Down Expand Up @@ -61,15 +64,16 @@ func TestSearchIndexProse(t *testing.T) {
if !cursor.Next(ctx) {
break
}
if cursor.Current.Lookup("queryable").Boolean() {
name := cursor.Current.Lookup("name").StringValue()
queryable := cursor.Current.Lookup("queryable").Boolean()
if name == searchName && queryable {
doc = cursor.Current
} else {
t.Logf("cursor: %s, sleep 5 seconds...", cursor.Current.String())
time.Sleep(5 * time.Second)
}
}
require.NotNil(mt, doc, "got empty document")
assert.Equal(mt, searchName, doc.Lookup("name").StringValue(), "unmatched name")
expected, err := bson.Marshal(definition)
require.NoError(mt, err, "failed to marshal definition")
actual := doc.Lookup("latestDefinition").Value
Expand Down Expand Up @@ -110,7 +114,9 @@ func TestSearchIndexProse(t *testing.T) {
if !cursor.Next(ctx) {
return nil
}
if cursor.Current.Lookup("queryable").Boolean() {
name := cursor.Current.Lookup("name").StringValue()
queryable := cursor.Current.Lookup("queryable").Boolean()
if name == *opts.Name && queryable {
return cursor.Current
}
t.Logf("cursor: %s, sleep 5 seconds...", cursor.Current.String())
Expand All @@ -126,7 +132,6 @@ func TestSearchIndexProse(t *testing.T) {

doc := getDocument(opts)
require.NotNil(mt, doc, "got empty document")
assert.Equal(mt, *opts.Name, doc.Lookup("name").StringValue(), "unmatched name")
expected, err := bson.Marshal(definition)
require.NoError(mt, err, "failed to marshal definition")
actual := doc.Lookup("latestDefinition").Value
Expand Down Expand Up @@ -162,15 +167,16 @@ func TestSearchIndexProse(t *testing.T) {
if !cursor.Next(ctx) {
break
}
if cursor.Current.Lookup("queryable").Boolean() {
name := cursor.Current.Lookup("name").StringValue()
queryable := cursor.Current.Lookup("queryable").Boolean()
if name == searchName && queryable {
doc = cursor.Current
} else {
t.Logf("cursor: %s, sleep 5 seconds...", cursor.Current.String())
time.Sleep(5 * time.Second)
}
}
require.NotNil(mt, doc, "got empty document")
require.Equal(mt, searchName, doc.Lookup("name").StringValue(), "unmatched name")

err = view.DropOne(ctx, searchName)
require.NoError(mt, err, "failed to drop index")
Expand Down Expand Up @@ -204,37 +210,49 @@ func TestSearchIndexProse(t *testing.T) {
require.NoError(mt, err, "failed to create index")
require.Equal(mt, searchName, index, "unmatched name")

getDocument := func() bson.Raw {
for {
cursor, err := view.List(ctx, opts)
require.NoError(mt, err, "failed to list")
var doc bson.Raw
for doc == nil {
cursor, err := view.List(ctx, opts)
require.NoError(mt, err, "failed to list")

if !cursor.Next(ctx) {
return nil
}
if cursor.Current.Lookup("queryable").Boolean() {
return cursor.Current
}
if !cursor.Next(ctx) {
break
}
name := cursor.Current.Lookup("name").StringValue()
queryable := cursor.Current.Lookup("queryable").Boolean()
if name == searchName && queryable {
doc = cursor.Current
} else {
t.Logf("cursor: %s, sleep 5 seconds...", cursor.Current.String())
time.Sleep(5 * time.Second)
}
}

doc := getDocument()
require.NotNil(mt, doc, "got empty document")
require.Equal(mt, searchName, doc.Lookup("name").StringValue(), "unmatched name")

definition = bson.D{{"mappings", bson.D{{"dynamic", true}}}}
err = view.UpdateOne(ctx, searchName, definition)
require.NoError(mt, err, "failed to drop index")
doc = getDocument()
require.NotNil(mt, doc, "got empty document")
assert.Equal(mt, searchName, doc.Lookup("name").StringValue(), "unmatched name")
assert.Equal(mt, "READY", doc.Lookup("status").StringValue(), "unexpected status")
expected, err := bson.Marshal(definition)
require.NoError(mt, err, "failed to marshal definition")
actual := doc.Lookup("latestDefinition").Value
assert.Equal(mt, expected, actual, "unmatched definition")
err = view.UpdateOne(ctx, searchName, definition)
require.NoError(mt, err, "failed to update index")
for doc == nil {
cursor, err := view.List(ctx, opts)
require.NoError(mt, err, "failed to list")

if !cursor.Next(ctx) {
break
}
name := cursor.Current.Lookup("name").StringValue()
queryable := cursor.Current.Lookup("queryable").Boolean()
status := cursor.Current.Lookup("status").StringValue()
latestDefinition := doc.Lookup("latestDefinition").Value
if name == searchName && queryable && status == "READY" && bytes.Equal(latestDefinition, expected) {
doc = cursor.Current
} else {
t.Logf("cursor: %s, sleep 5 seconds...", cursor.Current.String())
time.Sleep(5 * time.Second)
}
}
require.NotNil(mt, doc, "got empty document")
})

mt.Run("case 5: dropSearchIndex suppresses namespace not found errors", func(mt *mtest.T) {
Expand All @@ -250,4 +268,47 @@ func TestSearchIndexProse(t *testing.T) {
err = collection.SearchIndexes().DropOne(ctx, "foo")
require.NoError(mt, err)
})

mt.RunOpts("case 6: Driver can successfully create and list search indexes with non-default readConcern and writeConcern",
mtest.NewOptions().CollectionOptions(options.Collection().SetWriteConcern(writeconcern.W1()).SetReadConcern(readconcern.Majority())),
func(mt *mtest.T) {
ctx := context.Background()

_, err := mt.Coll.InsertOne(ctx, bson.D{})
require.NoError(mt, err, "failed to insert")

view := mt.Coll.SearchIndexes()

definition := bson.D{{"mappings", bson.D{{"dynamic", false}}}}
const searchName = "test-search-index-case6"
opts := options.SearchIndexes().SetName(searchName)
index, err := view.CreateOne(ctx, mongo.SearchIndexModel{
Definition: definition,
Options: opts,
})
require.NoError(mt, err, "failed to create index")
require.Equal(mt, searchName, index, "unmatched name")
var doc bson.Raw
for doc == nil {
cursor, err := view.List(ctx, opts)
require.NoError(mt, err, "failed to list")

if !cursor.Next(ctx) {
break
}
name := cursor.Current.Lookup("name").StringValue()
queryable := cursor.Current.Lookup("queryable").Boolean()
if name == searchName && queryable {
doc = cursor.Current
} else {
t.Logf("cursor: %s, sleep 5 seconds...", cursor.Current.String())
time.Sleep(5 * time.Second)
}
}
require.NotNil(mt, doc, "got empty document")
expected, err := bson.Marshal(definition)
require.NoError(mt, err, "failed to marshal definition")
actual := doc.Lookup("latestDefinition").Value
assert.Equal(mt, expected, actual, "unmatched definition")
})
}
5 changes: 4 additions & 1 deletion mongo/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -2175,8 +2175,11 @@ func (coll *Collection) Indexes() IndexView {

// SearchIndexes returns a SearchIndexView instance that can be used to perform operations on the search indexes for the collection.
func (coll *Collection) SearchIndexes() SearchIndexView {
c, _ := coll.Clone() // Clone() always return a nil error.
c.readConcern = nil
c.writeConcern = nil
return SearchIndexView{
coll: coll,
coll: c,
}
}

Expand Down
39 changes: 8 additions & 31 deletions mongo/search_index_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,13 @@ func (siv SearchIndexView) CreateMany(
return nil, err
}

wc := siv.coll.writeConcern
if sess.TransactionRunning() {
wc = nil
}
if !wc.Acknowledged() {
sess = nil
}

selector := makePinnedSelector(sess, siv.coll.writeSelector)

op := operation.NewCreateSearchIndexes(indexes).
Session(sess).WriteConcern(wc).ClusterClock(siv.coll.client.clock).
Database(siv.coll.db.name).Collection(siv.coll.name).CommandMonitor(siv.coll.client.monitor).
Deployment(siv.coll.client.deployment).ServerSelector(selector).ServerAPI(siv.coll.client.serverAPI).
Session(sess).CommandMonitor(siv.coll.client.monitor).
ServerSelector(selector).ClusterClock(siv.coll.client.clock).
Collection(siv.coll.name).Database(siv.coll.db.name).
Deployment(siv.coll.client.deployment).ServerAPI(siv.coll.client.serverAPI).
Timeout(siv.coll.client.timeout)

err = op.Execute(ctx)
Expand Down Expand Up @@ -195,20 +188,12 @@ func (siv SearchIndexView) DropOne(
return err
}

wc := siv.coll.writeConcern
if sess.TransactionRunning() {
wc = nil
}
if !wc.Acknowledged() {
sess = nil
}

selector := makePinnedSelector(sess, siv.coll.writeSelector)

op := operation.NewDropSearchIndex(name).
Session(sess).WriteConcern(wc).CommandMonitor(siv.coll.client.monitor).
Session(sess).CommandMonitor(siv.coll.client.monitor).
ServerSelector(selector).ClusterClock(siv.coll.client.clock).
Database(siv.coll.db.name).Collection(siv.coll.name).
Collection(siv.coll.name).Database(siv.coll.db.name).
Deployment(siv.coll.client.deployment).ServerAPI(siv.coll.client.serverAPI).
Timeout(siv.coll.client.timeout)

Expand Down Expand Up @@ -257,20 +242,12 @@ func (siv SearchIndexView) UpdateOne(
return err
}

wc := siv.coll.writeConcern
if sess.TransactionRunning() {
wc = nil
}
if !wc.Acknowledged() {
sess = nil
}

selector := makePinnedSelector(sess, siv.coll.writeSelector)

op := operation.NewUpdateSearchIndex(name, indexDefinition).
Session(sess).WriteConcern(wc).CommandMonitor(siv.coll.client.monitor).
Session(sess).CommandMonitor(siv.coll.client.monitor).
ServerSelector(selector).ClusterClock(siv.coll.client.clock).
Database(siv.coll.db.name).Collection(siv.coll.name).
Collection(siv.coll.name).Database(siv.coll.db.name).
Deployment(siv.coll.client.deployment).ServerAPI(siv.coll.client.serverAPI).
Timeout(siv.coll.client.timeout)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
]
},
{
"description": "createSearchIndex ignores read and write concern",
"description": "createSearchIndexes ignores read and write concern",
"operations": [
{
"name": "createSearchIndexes",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ tests:
writeConcern: { $$exists: false }
readConcern: { $$exists: false }

- description: "createSearchIndex ignores read and write concern"
- description: "createSearchIndexes ignores read and write concern"
operations:
- name: createSearchIndexes
object: *collection0
arguments:
arguments:
models: []
expectError:
# This test always errors in a non-Atlas environment. The test functions as a unit test by asserting
Expand Down
42 changes: 18 additions & 24 deletions x/mongo/driver/operation/create_search_indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,25 @@ import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/event"
"go.mongodb.org/mongo-driver/mongo/description"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/x/mongo/driver"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
)

// CreateSearchIndexes performs a createSearchIndexes operation.
type CreateSearchIndexes struct {
indexes bsoncore.Document
session *session.Client
clock *session.ClusterClock
collection string
monitor *event.CommandMonitor
crypt driver.Crypt
database string
deployment driver.Deployment
selector description.ServerSelector
writeConcern *writeconcern.WriteConcern
result CreateSearchIndexesResult
serverAPI *driver.ServerAPIOptions
timeout *time.Duration
indexes bsoncore.Document
session *session.Client
clock *session.ClusterClock
collection string
monitor *event.CommandMonitor
crypt driver.Crypt
database string
deployment driver.Deployment
selector description.ServerSelector
result CreateSearchIndexesResult
serverAPI *driver.ServerAPIOptions
timeout *time.Duration
}

// CreateSearchIndexResult represents a single search index result in CreateSearchIndexesResult.
Expand Down Expand Up @@ -109,9 +107,15 @@ func (csi *CreateSearchIndexes) Execute(ctx context.Context) error {
return driver.Operation{
CommandFn: csi.command,
ProcessResponseFn: csi.processResponse,
Client: csi.session,
Clock: csi.clock,
CommandMonitor: csi.monitor,
Crypt: csi.crypt,
Database: csi.database,
Deployment: csi.deployment,
Selector: csi.selector,
ServerAPI: csi.serverAPI,
Timeout: csi.timeout,
}.Execute(ctx)

}
Expand Down Expand Up @@ -214,16 +218,6 @@ func (csi *CreateSearchIndexes) ServerSelector(selector description.ServerSelect
return csi
}

// WriteConcern sets the write concern for this operation.
func (csi *CreateSearchIndexes) WriteConcern(writeConcern *writeconcern.WriteConcern) *CreateSearchIndexes {
if csi == nil {
csi = new(CreateSearchIndexes)
}

csi.writeConcern = writeConcern
return csi
}

// ServerAPI sets the server API version for this operation.
func (csi *CreateSearchIndexes) ServerAPI(serverAPI *driver.ServerAPIOptions) *CreateSearchIndexes {
if csi == nil {
Expand Down
Loading

0 comments on commit 70403cd

Please sign in to comment.