Skip to content

Commit

Permalink
GODRIVER-2935 Update sent_message logic to include OP_QUERY for hello
Browse files Browse the repository at this point in the history
  • Loading branch information
prestonvasquez committed Sep 6, 2023
1 parent 8277d88 commit 728f277
Showing 1 changed file with 65 additions and 0 deletions.
65 changes: 65 additions & 0 deletions mongo/integration/mtest/sent_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type sentMsgParseFn func([]byte) (*SentMessage, error)

func getSentMessageParser(opcode wiremessage.OpCode) (sentMsgParseFn, bool) {
switch opcode {
case wiremessage.OpQuery:
return parseOpQuery, true
case wiremessage.OpMsg:
return parseSentOpMsg, true
case wiremessage.OpCompressed:
Expand All @@ -46,6 +48,69 @@ func getSentMessageParser(opcode wiremessage.OpCode) (sentMsgParseFn, bool) {
}
}

func parseOpQuery(wm []byte) (*SentMessage, error) {
var ok bool

if _, wm, ok = wiremessage.ReadQueryFlags(wm); !ok {
return nil, errors.New("failed to read query flags")
}
if _, wm, ok = wiremessage.ReadQueryFullCollectionName(wm); !ok {
return nil, errors.New("failed to read full collection name")
}
if _, wm, ok = wiremessage.ReadQueryNumberToSkip(wm); !ok {
return nil, errors.New("failed to read number to skip")
}
if _, wm, ok = wiremessage.ReadQueryNumberToReturn(wm); !ok {
return nil, errors.New("failed to read number to return")
}

query, wm, ok := wiremessage.ReadQueryQuery(wm)
if !ok {
return nil, errors.New("failed to read query")
}

// If there is no read preference document, the command document is query.
// Otherwise, query is in the format {$query: <command document>, $readPreference: <read preference document>}.
commandDoc := query
var rpDoc bsoncore.Document

dollarQueryVal, err := query.LookupErr("$query")
if err == nil {
commandDoc = dollarQueryVal.Document()

rpVal, err := query.LookupErr("$readPreference")
if err != nil {
return nil, fmt.Errorf("query %s contains $query but not $readPreference fields", query)
}
rpDoc = rpVal.Document()
}

// For OP_QUERY, inserts, updates, and deletes are sent as a BSON array of documents inside the main command
// document. Pull these sequences out into an ArrayStyle DocumentSequence.
var docSequence *bsoncore.DocumentSequence
cmdElems, _ := commandDoc.Elements()
for _, elem := range cmdElems {
switch elem.Key() {
case "documents", "updates", "deletes":
docSequence = &bsoncore.DocumentSequence{
Style: bsoncore.ArrayStyle,
Data: elem.Value().Array(),
}
}
if docSequence != nil {
// There can only be one of these arrays in a well-formed command, so we exit the loop once one is found.
break
}
}

sm := &SentMessage{
Command: commandDoc,
ReadPreference: rpDoc,
DocumentSequence: docSequence,
}
return sm, nil
}

func parseSentMessage(wm []byte) (*SentMessage, error) {
// Re-assign the wire message to "remaining" so "wm" continues to point to the entire message after parsing.
_, requestID, _, opcode, remaining, ok := wiremessage.ReadHeader(wm)
Expand Down

0 comments on commit 728f277

Please sign in to comment.