Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GODRIVER-2572 Add log messages to Server selection spec #1325

Merged
merged 56 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
7b46fe0
update windows distro to vsCurrent-latest
prestonvasquez Jun 2, 2023
7bde5ae
Merge branch 'master' of github.com:mongodb/mongo-go-driver
prestonvasquez Jun 5, 2023
f7abd3c
GODRIVER-2742 set implementation
prestonvasquez Jun 8, 2023
92cb1bf
GODRIVER-2743 Add CompareUint32Ptr function
prestonvasquez Jun 13, 2023
4c95947
GODRIVER-2585 Initialize SDAM logging
prestonvasquez Jun 15, 2023
faca1cc
GODRIVER-2585 initialize heartbeat logging
prestonvasquez Jun 18, 2023
d7005b5
GODRIVER-2585 merge master
prestonvasquez Jun 21, 2023
5f4f342
GODRIVER-2585 Add remaining SDAM logging USTs
prestonvasquez Jun 22, 2023
9a9122d
GODRIVER-2585 Seperate server and topology component structs
prestonvasquez Jun 22, 2023
a0dfef3
GODRIER-2585 Revert changes to the internal logger
prestonvasquez Jun 22, 2023
c0c767f
GODRIVER-2585 Extend reduced HB frequency logic to operational clients
prestonvasquez Jun 23, 2023
85dbb56
GODRIVER-2585 Clean up comments
prestonvasquez Jun 23, 2023
2f73b7c
Merge branch 'master' into GODRIVER-2585
prestonvasquez Jun 23, 2023
1c05011
GODRIVER-2585 Remove empty space from comment
prestonvasquez Jun 23, 2023
4d75016
GODRIVER-2585 Fix static analysis failures
prestonvasquez Jun 23, 2023
d8eaf6c
GODRIVER-2585 add missing licenses
prestonvasquez Jun 28, 2023
27d61c0
Merge branch 'master' into GODRIVER-2742
prestonvasquez Jun 28, 2023
a7d6685
GODRIVER-2742 Add mongodcryptd process runner
prestonvasquez Jun 29, 2023
66bec31
GODRIVER-2742 Use internal compare function in server desc equal
prestonvasquez Jun 29, 2023
4299250
GODRIVER-2742 fix failures with session timout selection
prestonvasquez Jun 30, 2023
af250a1
GODRIVER-2742 Fix topology test failures
prestonvasquez Jun 30, 2023
5848ad5
GODRIVER-2742 More test failure fixes
prestonvasquez Jun 30, 2023
a333d3e
GODRIVER-2742 Convert mongocryptd prose test to mtest
prestonvasquez Jun 30, 2023
0696575
GODRIVER-2742 Use mtest and fix typos
prestonvasquez Jul 5, 2023
dcb8380
GODRIVER-2742 update mtest usage
prestonvasquez Jul 5, 2023
bdd4e8c
GODRIVER-2585 Add component tests
prestonvasquez Jul 6, 2023
8d4cd4c
GODRIVER-2585 Add missing licenses
prestonvasquez Jul 6, 2023
02b7657
GODRIVER-2585 Add comment to Print ack the message duplication
prestonvasquez Jul 7, 2023
8d91a00
Merge branch 'GODRIVER-2742' into GODRIVER-2572
prestonvasquez Jul 7, 2023
e291f7c
GODRIVER-2572 Add UST
prestonvasquez Jul 7, 2023
b2bff57
GODRIVER-2572 Merge SDAM logging for schema update
prestonvasquez Jul 7, 2023
f2e8a83
GODRIVER-2572 Initialize server selection logging
prestonvasquez Jul 10, 2023
fa90017
GODRIVER-2572 First round infrastructure changes
prestonvasquez Jul 15, 2023
5d96f7e
GODRIVER-2572 Start updating UST runner to include event matching
prestonvasquez Jul 18, 2023
46b62be
GODRIVER-2742 Review changes
prestonvasquez Jul 19, 2023
ccc4072
Update x/mongo/driver/topology/sdam_spec_test.go
prestonvasquez Jul 19, 2023
990f508
Update x/mongo/driver/topology/sdam_spec_test.go
prestonvasquez Jul 19, 2023
6ae5e38
GODRIVER-2572 Merge master
prestonvasquez Jul 19, 2023
1541fee
GODRIVER-2572 Remove safety guards that mask errors
prestonvasquez Jul 19, 2023
4a4bb38
GODRIVER-2572 Merge 2572
prestonvasquez Jul 19, 2023
735b497
GODRIVER-2572 Convert port when logging closed event
prestonvasquez Jul 19, 2023
43e4c3a
GODRIVER-2572 Clean up test use-case
prestonvasquez Jul 20, 2023
7e86550
GODRIVER-2572 Add the opereration names
prestonvasquez Jul 21, 2023
b8b84a9
GODRIVER-2572 Merge master
prestonvasquez Jul 26, 2023
ebf07b4
GODRIVER-2572 Remove uint32 comparison logic
prestonvasquez Jul 26, 2023
0fd270a
GODRIVER-2572 Clean up comments
prestonvasquez Jul 26, 2023
eaad735
GODRIVER-2572 Resolve merge conflicts
prestonvasquez Aug 2, 2023
2a1771b
GODRIVER-2572 Fix static analysis errors
prestonvasquez Aug 4, 2023
a474c8c
GODRIVER-2572 PR requests
prestonvasquez Aug 8, 2023
2bc53da
GODRIVER-2572 Resolve misaligned server selectors
prestonvasquez Aug 8, 2023
adf86c7
GODRIVER-2572 Fix static analysis errors
prestonvasquez Aug 8, 2023
cb497c0
GODRIVER-2572 Add licenses
prestonvasquez Aug 8, 2023
3da9902
GODRIVER-2572 Clean up code per review
prestonvasquez Aug 14, 2023
79894b8
GODRIVER-2572 Resolve merge conflict
prestonvasquez Aug 14, 2023
906d534
GODRIVER-2572 Update topology options to default load empty log options
prestonvasquez Aug 15, 2023
ffbfe15
GODRIVER-2572 Add correct options to log constructor
prestonvasquez Aug 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions internal/logger/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ const (
ConnectionCheckoutFailed = "Connection checkout failed"
ConnectionCheckedOut = "Connection checked out"
ConnectionCheckedIn = "Connection checked in"
ServerSelectionFailed = "Server selection failed"
ServerSelectionStarted = "Server selection started"
ServerSelectionSucceeded = "Server selection succeeded"
ServerSelectionWaiting = "Waiting for suitable server to become available"
TopologyClosed = "Stopped topology monitoring"
TopologyDescriptionChanged = "Topology description changed"
TopologyOpening = "Starting topology monitoring"
Expand All @@ -53,19 +57,32 @@ const (
KeyMessage = "message"
KeyMinPoolSize = "minPoolSize"
KeyNewDescription = "newDescription"
KeyOperation = "operation"
KeyOperationID = "operationId"
KeyPreviousDescription = "previousDescription"
KeyRemainingTimeMS = "remainingTimeMS"
KeyReason = "reason"
KeyReply = "reply"
KeyRequestID = "requestId"
KeySelector = "selector"
KeyServerConnectionID = "serverConnectionId"
KeyServerHost = "serverHost"
KeyServerPort = "serverPort"
KeyServiceID = "serviceId"
KeyTimestamp = "timestamp"
KeyTopologyDescription = "topologyDescription"
KeyTopologyID = "topologyId"
)

// ContextKey is a custom type used to prevent key collisions when using the
// context package.
type ContextKey string

const (
ContextKeyOperation ContextKey = KeyOperation
ContextKeyOperationID ContextKey = KeyOperationID
)
prestonvasquez marked this conversation as resolved.
Show resolved Hide resolved

type KeyValues []interface{}

func (kvs *KeyValues) Add(key string, value interface{}) {
Expand Down Expand Up @@ -252,6 +269,36 @@ func SerializeServer(srv Server, extraKV ...interface{}) KeyValues {
return keysAndValues
}

// ServerSelection contains data that all server selection messages MUST
// contain.
type ServerSelection struct {
Selector string
OperationID interface{}
Operation interface{}
prestonvasquez marked this conversation as resolved.
Show resolved Hide resolved
TopologyDescription string
}

// SerializeServerSelection serializes a Topology message into a slice of keys
// and values that can be passed to a logger.
func SerializeServerSelection(srvSelection ServerSelection, extraKV ...interface{}) KeyValues {
keysAndValues := KeyValues{
KeySelector, srvSelection.Selector,
KeyOperation, srvSelection.Operation,
KeyTopologyDescription, srvSelection.TopologyDescription,
}

if srvSelection.OperationID != nil {
keysAndValues.Add(KeyOperationID, srvSelection.OperationID)
}

// Add the optional keys and values.
for i := 0; i < len(extraKV); i += 2 {
keysAndValues.Add(extraKV[i].(string), extraKV[i+1])
}

return keysAndValues
}

// Topology contains data that all topology messages MAY contain.
type Topology struct {
ID primitive.ObjectID // Driver's unique ID for this topology
Expand Down
38 changes: 33 additions & 5 deletions mongo/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -1868,9 +1868,26 @@ func (coll *Collection) drop(ctx context.Context) error {
return nil
}

// makePinnedSelector makes a selector for a pinned session with a pinned server. Will attempt to do server selection on
// the pinned server but if that fails it will go through a list of default selectors
func makePinnedSelector(sess *session.Client, defaultSelector description.ServerSelector) description.ServerSelectorFunc {
type pinnedServerSelector struct {
stringer fmt.Stringer
fn description.ServerSelectorFunc
prestonvasquez marked this conversation as resolved.
Show resolved Hide resolved
}

func (pss pinnedServerSelector) String() string {
return pss.stringer.String()
}

func (pss pinnedServerSelector) SelectServer(
t description.Topology,
s []description.Server,
) ([]description.Server, error) {
return pss.fn(t, s)
}

// pinnedSelectorFunc makes a selector for a pinned session with a pinned
// server. It attempts to do server selection on the pinned server, but if that
// fails, it will go through a list of default selectors.
func pinnedSelectorFunc(sess *session.Client, defaultSelector description.ServerSelector) description.ServerSelectorFunc {
return func(t description.Topology, svrs []description.Server) ([]description.Server, error) {
if sess != nil && sess.PinnedServer != nil {
// If there is a pinned server, try to find it in the list of candidates.
Expand All @@ -1887,7 +1904,18 @@ func makePinnedSelector(sess *session.Client, defaultSelector description.Server
}
}

func makeReadPrefSelector(sess *session.Client, selector description.ServerSelector, localThreshold time.Duration) description.ServerSelectorFunc {
func makePinnedSelector(sess *session.Client, defaultSelector description.ServerSelector) description.ServerSelector {
if srvSelectorStringer, ok := defaultSelector.(fmt.Stringer); ok {
return pinnedServerSelector{
stringer: srvSelectorStringer,
fn: pinnedSelectorFunc(sess, defaultSelector),
}
}

return pinnedSelectorFunc(sess, defaultSelector)
}

func makeReadPrefSelector(sess *session.Client, selector description.ServerSelector, localThreshold time.Duration) description.ServerSelector {
if sess != nil && sess.TransactionRunning() {
selector = description.CompositeSelector([]description.ServerSelector{
description.ReadPrefSelector(sess.CurrentRp),
Expand All @@ -1898,7 +1926,7 @@ func makeReadPrefSelector(sess *session.Client, selector description.ServerSelec
return makePinnedSelector(sess, selector)
}

func makeOutputAggregateSelector(sess *session.Client, rp *readpref.ReadPref, localThreshold time.Duration) description.ServerSelectorFunc {
func makeOutputAggregateSelector(sess *session.Client, rp *readpref.ReadPref, localThreshold time.Duration) description.ServerSelector {
if sess != nil && sess.TransactionRunning() {
// Use current transaction's read preference if available
rp = sess.CurrentRp
Expand Down
150 changes: 124 additions & 26 deletions mongo/description/server_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package description

import (
"encoding/json"
"fmt"
"math"
"time"
Expand All @@ -30,10 +31,52 @@ func (ssf ServerSelectorFunc) SelectServer(t Topology, s []Server) ([]Server, er
return ssf(t, s)
}

// serverSelectorInfo contains metadata concerning the server selector for the
// purpose of publication.
type serverSelectorInfo struct {
Type string
CustomSelector bool
prestonvasquez marked this conversation as resolved.
Show resolved Hide resolved
Data string `json:",omitempty"`
Selectors []serverSelectorInfo `json:",omitempty"`
}

// String returns the JSON string representation of the serverSelectorInfo.
func (sss serverSelectorInfo) String() string {
bytes, _ := json.Marshal(sss)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned that the nested JSON created here will be difficult to use when output by a structured logger. Is there another format we can use to combine multiple points of info that works better when nested in JSON? Or maybe only produce JSON when the output is unstructured?

Copy link
Collaborator Author

@prestonvasquez prestonvasquez Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate on what you mean by this: "difficult to use when output by a structured logger" ? The output is the type of selector, associated data, and the child selectors (which is only relevant in the composite case). Here is an example:

{
  "Type": "compositeSelector",
  "Selectors": [
    {
      "Type": "writeSelector"
    },
    {
      "Type": "latencySelector"
    }
  ]
}

We could omit child selectors. I would argue that, in the context of logging, more information is better.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is actually that the server selector information is a string containing escaped JSON, rather than a JSON object. Consider the example:

{
    "message": "Server selection started",
    "operation": "find",
    "operationId": 1,
    "selector": "{\"Type\":\"compositeSelector\",\"Selectors\":[{\"Type\":\"readPrefSelector\",\"Data\":\"primary\"},{\"Type\":\"latencySelector\"}]}",
    "timestamp": 1692071928112600000,
    "topologyDescription": "Type: Unknown, Servers: [{ Addr: localhost:27017, Type: Unknown, State: Connected, Average RTT: 0s, Min RTT: 0s }, { Addr: localhost:27018, Type: Unknown, State: Connected, Average RTT: 0s, Min RTT: 0s }, { Addr: localhost:27019, Type: Unknown, State: Connected, Average RTT: 0s, Min RTT: 0s }, ]"
}

That selector information would be more usable and readable as a JSON object.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The specifications require this field to be a string. This representation seems reasonable to me. However, if you want to unescape the string we can build this data directly, similar to what we do with the Server.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the spec requires it to be a string, the escaped JSON seems like a reasonable solution.


return string(bytes)
}

// serverSelectorInfoGetter is an interface that defines an info() method to
// get the serverSelectorInfo.
type serverSelectorInfoGetter interface {
info() serverSelectorInfo
}

type compositeSelector struct {
selectors []ServerSelector
}

func (cs *compositeSelector) info() serverSelectorInfo {
cssInfo := &serverSelectorInfo{
Type: "compositeSelector",
CustomSelector: true,
}

for _, sel := range cs.selectors {
if getter, ok := sel.(serverSelectorInfoGetter); ok {
cssInfo.Selectors = append(cssInfo.Selectors, getter.info())
}
}

return *cssInfo
}

// String returns the JSON string representation of the compositeSelector.
func (cs *compositeSelector) String() string {
return cs.info().String()
}

// CompositeSelector combines multiple selectors into a single selector by applying them in order to the candidates
// list.
//
Expand Down Expand Up @@ -63,13 +106,21 @@ type latencySelector struct {
latency time.Duration
}

func (latencySelector) info() serverSelectorInfo {
return serverSelectorInfo{Type: "latencySelector", CustomSelector: true}
}

func (selector latencySelector) String() string {
return selector.info().String()
}

// LatencySelector creates a ServerSelector which selects servers based on their average RTT values.
func LatencySelector(latency time.Duration) ServerSelector {
return &latencySelector{latency: latency}
}

func (ls *latencySelector) SelectServer(t Topology, candidates []Server) ([]Server, error) {
if ls.latency < 0 {
func (selector *latencySelector) SelectServer(t Topology, candidates []Server) ([]Server, error) {
if selector.latency < 0 {
return candidates, nil
}
if t.Kind == LoadBalanced {
Expand All @@ -94,7 +145,7 @@ func (ls *latencySelector) SelectServer(t Topology, candidates []Server) ([]Serv
return candidates, nil
}

max := min + ls.latency
max := min + selector.latency

viableIndexes := make([]int, 0, len(candidates))
for i, candidate := range candidates {
Expand All @@ -115,37 +166,66 @@ func (ls *latencySelector) SelectServer(t Topology, candidates []Server) ([]Serv
}
}

// WriteSelector selects all the writable servers.
func WriteSelector() ServerSelector {
return ServerSelectorFunc(func(t Topology, candidates []Server) ([]Server, error) {
switch t.Kind {
case Single, LoadBalanced:
return candidates, nil
default:
result := []Server{}
for _, candidate := range candidates {
switch candidate.Kind {
case Mongos, RSPrimary, Standalone:
result = append(result, candidate)
}
type writeServerSelector struct {
fn ServerSelectorFunc
}

func (writeServerSelector) info() serverSelectorInfo {
return serverSelectorInfo{Type: "writeSelector", CustomSelector: true}
}

func (selector writeServerSelector) String() string {
return selector.info().String()
}

func (selector writeServerSelector) SelectServer(t Topology, s []Server) ([]Server, error) {
return selector.fn(t, s)
}

func writeSelectorFunc(t Topology, candidates []Server) ([]Server, error) {
switch t.Kind {
case Single, LoadBalanced:
return candidates, nil
default:
result := []Server{}
for _, candidate := range candidates {
switch candidate.Kind {
case Mongos, RSPrimary, Standalone:
result = append(result, candidate)
}
return result, nil
}
})
return result, nil
}
}

// ReadPrefSelector selects servers based on the provided read preference.
func ReadPrefSelector(rp *readpref.ReadPref) ServerSelector {
return readPrefSelector(rp, false)
// WriteSelector selects all the writable servers.
func WriteSelector() ServerSelector {
return writeServerSelector{
fn: writeSelectorFunc,
}
}

// OutputAggregateSelector selects servers based on the provided read preference given that the underlying operation is
// aggregate with an output stage.
func OutputAggregateSelector(rp *readpref.ReadPref) ServerSelector {
return readPrefSelector(rp, true)
type readPrefServerSelector struct {
stringer fmt.Stringer
fn ServerSelectorFunc
prestonvasquez marked this conversation as resolved.
Show resolved Hide resolved
}

func (selector readPrefServerSelector) info() serverSelectorInfo {
return serverSelectorInfo{
Type: "readPrefSelector",
Data: selector.stringer.String(),
}
}

func (selector readPrefServerSelector) String() string {
return selector.info().String()
}

func readPrefSelector(rp *readpref.ReadPref, isOutputAggregate bool) ServerSelector {
func (selector readPrefServerSelector) SelectServer(t Topology, s []Server) ([]Server, error) {
return selector.fn(t, s)
}

func readPrefSelectorFunc(rp *readpref.ReadPref, isOutputAggregate bool) ServerSelectorFunc {
return ServerSelectorFunc(func(t Topology, candidates []Server) ([]Server, error) {
if t.Kind == LoadBalanced {
// In LoadBalanced mode, there should only be one server in the topology and it must be selected. We check
Expand All @@ -167,6 +247,24 @@ func readPrefSelector(rp *readpref.ReadPref, isOutputAggregate bool) ServerSelec
})
}

// ReadPrefSelector selects servers based on the provided read preference.
func ReadPrefSelector(rp *readpref.ReadPref) ServerSelector {
return readPrefServerSelector{
stringer: rp,
fn: readPrefSelectorFunc(rp, false),
}

}

// OutputAggregateSelector selects servers based on the provided read preference
// given that the underlying operation is aggregate with an output stage.
func OutputAggregateSelector(rp *readpref.ReadPref) ServerSelector {
return readPrefServerSelector{
stringer: rp,
fn: readPrefSelectorFunc(rp, true),
}
}

func selectForReplicaSet(rp *readpref.ReadPref, isOutputAggregate bool, t Topology, candidates []Server) ([]Server, error) {
if err := verifyMaxStaleness(rp, t); err != nil {
return nil, err
Expand Down
Loading