-
Notifications
You must be signed in to change notification settings - Fork 892
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GODRIVER-2935 Use OP_QUERY in connection handshakes #1377
Changes from 2 commits
8277d88
728f277
9fec743
c343f12
25b7685
c0085e1
9569e0c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ import ( | |
"time" | ||
|
||
"go.mongodb.org/mongo-driver/bson" | ||
"go.mongodb.org/mongo-driver/bson/bsontype" | ||
"go.mongodb.org/mongo-driver/bson/primitive" | ||
"go.mongodb.org/mongo-driver/event" | ||
"go.mongodb.org/mongo-driver/internal/csot" | ||
|
@@ -629,7 +630,7 @@ func (op Operation) Execute(ctx context.Context) error { | |
} | ||
|
||
var startedInfo startedInformation | ||
*wm, startedInfo, err = op.createMsgWireMessage(ctx, maxTimeMS, (*wm)[:0], desc, conn) | ||
*wm, startedInfo, err = op.createWireMessage(ctx, (*wm)[:0], desc, maxTimeMS, conn) | ||
|
||
if err != nil { | ||
return err | ||
|
@@ -1103,6 +1104,85 @@ func (op Operation) addBatchArray(dst []byte) []byte { | |
return dst | ||
} | ||
|
||
func (op Operation) createLegacyHandshakeWireMessage( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a direct copy and paste of the code removed in this commit: 4449617 |
||
maxTimeMS uint64, | ||
dst []byte, | ||
desc description.SelectedServer, | ||
) ([]byte, startedInformation, error) { | ||
var info startedInformation | ||
flags := op.secondaryOK(desc) | ||
var wmindex int32 | ||
info.requestID = wiremessage.NextRequestID() | ||
wmindex, dst = wiremessage.AppendHeaderStart(dst, info.requestID, 0, wiremessage.OpQuery) | ||
dst = wiremessage.AppendQueryFlags(dst, flags) | ||
|
||
dollarCmd := [...]byte{'.', '$', 'c', 'm', 'd'} | ||
|
||
// FullCollectionName | ||
dst = append(dst, op.Database...) | ||
dst = append(dst, dollarCmd[:]...) | ||
dst = append(dst, 0x00) | ||
dst = wiremessage.AppendQueryNumberToSkip(dst, 0) | ||
dst = wiremessage.AppendQueryNumberToReturn(dst, -1) | ||
|
||
wrapper := int32(-1) | ||
rp, err := op.createReadPref(desc, true) | ||
if err != nil { | ||
return dst, info, err | ||
} | ||
if len(rp) > 0 { | ||
wrapper, dst = bsoncore.AppendDocumentStart(dst) | ||
dst = bsoncore.AppendHeader(dst, bsontype.EmbeddedDocument, "$query") | ||
} | ||
idx, dst := bsoncore.AppendDocumentStart(dst) | ||
dst, err = op.CommandFn(dst, desc) | ||
if err != nil { | ||
return dst, info, err | ||
} | ||
|
||
if op.Batches != nil && len(op.Batches.Current) > 0 { | ||
dst = op.addBatchArray(dst) | ||
} | ||
|
||
dst, err = op.addReadConcern(dst, desc) | ||
if err != nil { | ||
return dst, info, err | ||
} | ||
|
||
dst, err = op.addWriteConcern(dst, desc) | ||
if err != nil { | ||
return dst, info, err | ||
} | ||
|
||
dst, err = op.addSession(dst, desc) | ||
if err != nil { | ||
return dst, info, err | ||
} | ||
|
||
dst = op.addClusterTime(dst, desc) | ||
dst = op.addServerAPI(dst) | ||
// If maxTimeMS is greater than 0 append it to wire message. A maxTimeMS value of 0 only explicitly | ||
// specifies the default behavior of no timeout server-side. | ||
if maxTimeMS > 0 { | ||
dst = bsoncore.AppendInt64Element(dst, "maxTimeMS", int64(maxTimeMS)) | ||
} | ||
|
||
dst, _ = bsoncore.AppendDocumentEnd(dst, idx) | ||
// Command monitoring only reports the document inside $query | ||
info.cmd = dst[idx:] | ||
|
||
if len(rp) > 0 { | ||
var err error | ||
dst = bsoncore.AppendDocumentElement(dst, "$readPreference", rp) | ||
dst, err = bsoncore.AppendDocumentEnd(dst, wrapper) | ||
if err != nil { | ||
return dst, info, err | ||
} | ||
} | ||
|
||
return bsoncore.UpdateLength(dst, wmindex, int32(len(dst[wmindex:]))), info, nil | ||
} | ||
|
||
func (op Operation) createMsgWireMessage(ctx context.Context, maxTimeMS uint64, dst []byte, desc description.SelectedServer, | ||
conn Connection, | ||
) ([]byte, startedInformation, error) { | ||
|
@@ -1186,6 +1266,33 @@ func (op Operation) createMsgWireMessage(ctx context.Context, maxTimeMS uint64, | |
return bsoncore.UpdateLength(dst, wmindex, int32(len(dst[wmindex:]))), info, nil | ||
} | ||
|
||
// isLegacyHandshake returns "true" if the operation is the first message of | ||
// the initial handshake and should use a legacy hello. The requirement for | ||
// using a legacy hello as defined by the specifications is as follows: | ||
// | ||
// > If server API version is not requested and loadBalanced: False, drivers | ||
// > MUST use legacy hello for the first message of the initial handshake with | ||
// > the OP_QUERY protocol | ||
func isLegacyHandshake(op Operation, desc description.SelectedServer) bool { | ||
isInitialHandshake := desc.WireVersion == nil || desc.WireVersion.Max == 0 | ||
|
||
return desc.Kind != description.LoadBalanced && op.ServerAPI == nil && isInitialHandshake | ||
} | ||
|
||
func (op Operation) createWireMessage( | ||
ctx context.Context, | ||
dst []byte, | ||
desc description.SelectedServer, | ||
maxTimeMS uint64, | ||
conn Connection, | ||
) ([]byte, startedInformation, error) { | ||
if isLegacyHandshake(op, desc) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See here for the exact logic of this:
|
||
return op.createLegacyHandshakeWireMessage(maxTimeMS, dst, desc) | ||
} | ||
|
||
return op.createMsgWireMessage(ctx, maxTimeMS, dst, desc, conn) | ||
} | ||
|
||
// addCommandFields adds the fields for a command to the wire message in dst. This assumes that the start of the document | ||
// has already been added and does not add the final 0 byte. | ||
func (op Operation) addCommandFields(ctx context.Context, dst []byte, desc description.SelectedServer) ([]byte, error) { | ||
|
@@ -1830,6 +1937,8 @@ func (op Operation) publishFinishedEvent(ctx context.Context, info finishedInfor | |
logger.KeyFailure, formattedReply)...) | ||
} | ||
|
||
//fmt.Println("->", redactFinishedInformationResponse(info)) | ||
|
||
// If the finished event cannot be published, return early. | ||
if !op.canPublishFinishedEvent(info) { | ||
return | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,9 +31,11 @@ import ( | |
"go.mongodb.org/mongo-driver/internal/require" | ||
"go.mongodb.org/mongo-driver/mongo/address" | ||
"go.mongodb.org/mongo-driver/mongo/description" | ||
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore" | ||
"go.mongodb.org/mongo-driver/x/mongo/driver" | ||
"go.mongodb.org/mongo-driver/x/mongo/driver/auth" | ||
"go.mongodb.org/mongo-driver/x/mongo/driver/drivertest" | ||
"go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage" | ||
) | ||
|
||
type channelNetConnDialer struct{} | ||
|
@@ -1207,12 +1209,41 @@ func TestServer_ProcessError(t *testing.T) { | |
func includesClientMetadata(t *testing.T, wm []byte) bool { | ||
t.Helper() | ||
|
||
doc, err := drivertest.GetCommandFromMsgWireMessage(wm) | ||
assert.NoError(t, err) | ||
var ok bool | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a direct copy and paste of the code removed in this commit: 4449617 |
||
_, _, _, _, wm, ok = wiremessage.ReadHeader(wm) | ||
if !ok { | ||
t.Fatal("could not read header") | ||
} | ||
_, wm, ok = wiremessage.ReadQueryFlags(wm) | ||
if !ok { | ||
t.Fatal("could not read flags") | ||
} | ||
_, wm, ok = wiremessage.ReadQueryFullCollectionName(wm) | ||
if !ok { | ||
t.Fatal("could not read fullCollectionName") | ||
} | ||
_, wm, ok = wiremessage.ReadQueryNumberToSkip(wm) | ||
if !ok { | ||
t.Fatal("could not read numberToSkip") | ||
} | ||
_, wm, ok = wiremessage.ReadQueryNumberToReturn(wm) | ||
if !ok { | ||
t.Fatal("could not read numberToReturn") | ||
} | ||
var query bsoncore.Document | ||
query, wm, ok = wiremessage.ReadQueryQuery(wm) | ||
if !ok { | ||
t.Fatal("could not read query") | ||
} | ||
|
||
_, err = doc.LookupErr("client") | ||
if _, err := query.LookupErr("client"); err == nil { | ||
return true | ||
} | ||
if _, err := query.LookupErr("$query", "client"); err == nil { | ||
return true | ||
} | ||
|
||
return err == nil | ||
return false | ||
} | ||
|
||
// processErrorTestConn is a driver.Connection implementation used by tests | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a direct copy and paste of the code removed in this commit: 4449617