Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GODRIVER-2335 Preemptively cancel in progress operations when SDAM heartbeats timeout. #1423

Merged
merged 13 commits into from
Jan 5, 2024
5 changes: 3 additions & 2 deletions event/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ type PoolEvent struct {
Reason string `json:"reason"`
// ServiceID is only set if the Type is PoolCleared and the server is deployed behind a load balancer. This field
// can be used to distinguish between individual servers in a load balanced deployment.
ServiceID *primitive.ObjectID `json:"serviceId"`
Error error `json:"error"`
ServiceID *primitive.ObjectID `json:"serviceId"`
Interruption bool `json:"interruptInUseConnections"`
Error error `json:"error"`
}

// PoolMonitor is a function that allows the user to gain access to events occurring in the pool
Expand Down
2 changes: 2 additions & 0 deletions mongo/integration/unified/client_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,8 @@ func setClientOptionsFromURIOptions(clientOpts *options.ClientOptions, uriOpts b
switch strings.ToLower(key) {
case "appname":
clientOpts.SetAppName(value.(string))
case "connecttimeoutms":
clientOpts.SetConnectTimeout(time.Duration(value.(int32)) * time.Microsecond)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this conversion be time.Millisecond? `connect

case "heartbeatfrequencyms":
clientOpts.SetHeartbeatInterval(time.Duration(value.(int32)) * time.Millisecond)
case "loadbalanced":
Expand Down
4 changes: 4 additions & 0 deletions mongo/integration/unified/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ type EntityMap struct {
successValues map[string]int32
iterationValues map[string]int32
clientEncryptionEntities map[string]*mongo.ClientEncryption
waitChans map[string]chan error
evtLock sync.Mutex
closed atomic.Value
// keyVaultClientIDs tracks IDs of clients used as a keyVaultClient in ClientEncryption objects.
Expand Down Expand Up @@ -167,6 +168,7 @@ func newEntityMap() *EntityMap {
successValues: make(map[string]int32),
iterationValues: make(map[string]int32),
clientEncryptionEntities: make(map[string]*mongo.ClientEncryption),
waitChans: make(map[string]chan error),
keyVaultClientIDs: make(map[string]bool),
}
em.setClosed(false)
Expand Down Expand Up @@ -283,6 +285,8 @@ func (em *EntityMap) addEntity(ctx context.Context, entityType string, entityOpt
err = em.addCollectionEntity(entityOptions)
case "session":
err = em.addSessionEntity(entityOptions)
case "thread":
em.waitChans[entityOptions.ID] = make(chan error)
case "bucket":
err = em.addGridFSBucketEntity(entityOptions)
case "clientEncryption":
Expand Down
7 changes: 6 additions & 1 deletion mongo/integration/unified/event_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ type cmapEvent struct {
ConnectionCheckedInEvent *struct{} `bson:"connectionCheckedInEvent"`

PoolClearedEvent *struct {
HasServiceID *bool `bson:"hasServiceId"`
HasServiceID *bool `bson:"hasServiceId"`
InterruptInUseConnections *bool `bson:"interruptInUseConnections"`
} `bson:"poolClearedEvent"`
}

Expand Down Expand Up @@ -361,6 +362,10 @@ func verifyCMAPEvents(client *clientEntity, expectedEvents *expectedEvents) erro
return newEventVerificationError(idx, client, "error verifying serviceID: %v", err)
}
}
if expectInterruption := evt.PoolClearedEvent.InterruptInUseConnections; expectInterruption != nil && *expectInterruption != actual.Interruption {
return newEventVerificationError(idx, client, "expected interruptInUseConnections %v, got %v",
expectInterruption, actual.Interruption)
}
default:
return newEventVerificationError(idx, client, "no expected event set on cmapEvent instance")
}
Expand Down
35 changes: 30 additions & 5 deletions mongo/integration/unified/testrunner_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ func (lp *loopArgs) iterationsStored() bool {
return lp.IterationsEntityID != ""
}

func executeTestRunnerOperation(ctx context.Context, operation *operation, loopDone <-chan struct{}) error {
args := operation.Arguments
func executeTestRunnerOperation(ctx context.Context, op *operation, loopDone <-chan struct{}) error {
args := op.Arguments

switch operation.Name {
switch op.Name {
case "failPoint":
clientID := lookupString(args, "client")
client, err := entities(ctx).client(clientID)
Expand Down Expand Up @@ -187,9 +187,34 @@ func executeTestRunnerOperation(ctx context.Context, operation *operation, loopD
}
}
return nil
case "runOnThread":
operationRaw, err := args.LookupErr("operation")
if err != nil {
return fmt.Errorf("'operation' argument not found in runOnThread operation")
}
threadOp := new(operation)
if err := operationRaw.Unmarshal(threadOp); err != nil {
return fmt.Errorf("error unmarshalling 'operation' argument: %v", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return fmt.Errorf("error unmarshalling 'operation' argument: %v", err)
return fmt.Errorf("error unmarshaling 'operation' argument: %v", err)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is what confuses me. I believe we've talked about its spelling in en-US vs en-GB, but I still see a lot "marshalling" in our code. Maybe they are worth an orthography PR.

}
ch := entities(ctx).waitChans[lookupString(args, "thread")]
go func(op *operation) {
err := op.execute(ctx, loopDone)
ch <- err
}(threadOp)
return nil
case "waitForThread":
if ch, ok := entities(ctx).waitChans[lookupString(args, "thread")]; ok {
select {
case <-ch:
return nil
case <-time.After(10 * time.Second):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the significance of 10 seconds here? Can we constantize this value akin to waitForEventTimeout ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the specs:

If the waitForThread operation is not satisfied after 10 seconds, this operation MUST cause a test failure.

return fmt.Errorf("timed out after 10 seconds")
}
}
return nil
case "waitForEvent":
var wfeArgs waitForEventArguments
if err := bson.Unmarshal(operation.Arguments, &wfeArgs); err != nil {
if err := bson.Unmarshal(op.Arguments, &wfeArgs); err != nil {
return fmt.Errorf("error unmarshalling event to waitForEventArguments: %v", err)
}

Expand All @@ -198,7 +223,7 @@ func executeTestRunnerOperation(ctx context.Context, operation *operation, loopD

return waitForEvent(wfeCtx, wfeArgs)
default:
return fmt.Errorf("unrecognized testRunner operation %q", operation.Name)
return fmt.Errorf("unrecognized testRunner operation %q", op.Name)
}
}

Expand Down
4 changes: 4 additions & 0 deletions mongo/integration/unified/unified_spec_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ var (
"Write commands with snapshot session do not affect snapshot reads": "Test fails frequently. See GODRIVER-2843",
// TODO(GODRIVER-2943): Fix and unskip this test case.
"Topology lifecycle": "Test times out. See GODRIVER-2943",

"Connection pool clear uses interruptInUseConnections=true after monitor timeout": "Godriver clears after multiple timeout",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment above these tests with a TODO to resolve or a more concise explanation as to why we are skipping them?

Not running these tests avoids ever executing the runOnThread logic.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qingyang-hu So is the reason we skip this is because these cases don't apply to the Go Driver?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we skip these spec tests, the new "cancel in-progress operations" feature seems untested by any new or existing tests. We should add tests that make sure the feature works.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to update the test cases for GODRIVER-2577 in server_test.go to sync up with the changes.

"Error returned from connection pool clear with interruptInUseConnections=true is retryable": "Godriver clears after multiple timeout",
"Error returned from connection pool clear with interruptInUseConnections=true is retryable for write": "Godriver clears after multiple timeout",
}

logMessageValidatorTimeout = 10 * time.Millisecond
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{
"version": 1,
"style": "unit",
"description": "Connections MUST be interrupted as soon as possible (interruptInUseConnections=true)",
"poolOptions": {
"backgroundThreadIntervalMS": 10000
},
"operations": [
{
"name": "ready"
},
{
"name": "checkOut"
},
{
"name": "checkOut",
"label": "conn"
},
{
"name": "clear",
"interruptInUseConnections": true
},
{
"name": "waitForEvent",
"event": "ConnectionPoolCleared",
"count": 1,
"timeout": 1000
},
{
"name": "waitForEvent",
"event": "ConnectionClosed",
"count": 2,
"timeout": 1000
},
{
"name": "close"
}
],
"events": [
{
"type": "ConnectionCheckedOut",
"connectionId": 1,
"address": 42
},
{
"type": "ConnectionCheckedOut",
"connectionId": 2,
"address": 42
},
{
"type": "ConnectionPoolCleared",
"interruptInUseConnections": true
},
{
"type": "ConnectionClosed",
"reason": "stale",
"address": 42
},
{
"type": "ConnectionClosed",
"reason": "stale",
"address": 42
},
{
"type": "ConnectionPoolClosed",
"address": 42
}
],
"ignore": [
"ConnectionCreated",
"ConnectionPoolReady",
"ConnectionReady",
"ConnectionCheckOutStarted",
"ConnectionPoolCreated",
"ConnectionCheckedIn"
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
version: 1
style: unit
description: Connections MUST be interrupted as soon as possible (interruptInUseConnections=true)
poolOptions:
# ensure it's not involved by default
backgroundThreadIntervalMS: 10000
operations:
- name: ready
- name: checkOut
- name: checkOut
label: conn
- name: clear
interruptInUseConnections: true
- name: waitForEvent
event: ConnectionPoolCleared
count: 1
timeout: 1000
- name: waitForEvent
event: ConnectionClosed
count: 2
timeout: 1000
- name: close
events:
- type: ConnectionCheckedOut
connectionId: 1
address: 42
- type: ConnectionCheckedOut
connectionId: 2
address: 42
- type: ConnectionPoolCleared
interruptInUseConnections: true
- type: ConnectionClosed
reason: stale
address: 42
- type: ConnectionClosed
reason: stale
address: 42
- type: ConnectionPoolClosed
address: 42
ignore:
- ConnectionCreated
- ConnectionPoolReady
- ConnectionReady
- ConnectionCheckOutStarted
- ConnectionPoolCreated
- ConnectionCheckedIn
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{
"version": 1,
"style": "integration",
"description": "clear with interruptInUseConnections = true closes pending connections",
"runOn": [
{
"minServerVersion": "4.9.0"
}
],
"failPoint": {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"failCommands": [
"isMaster",
"hello"
],
"closeConnection": false,
"blockConnection": true,
"blockTimeMS": 1000
}
},
"poolOptions": {
"minPoolSize": 0
},
"operations": [
{
"name": "ready"
},
{
"name": "start",
"target": "thread1"
},
{
"name": "checkOut",
"thread": "thread1"
},
{
"name": "waitForEvent",
"event": "ConnectionCreated",
"count": 1
},
{
"name": "clear",
"interruptInUseConnections": true
},
{
"name": "waitForEvent",
"event": "ConnectionCheckOutFailed",
"count": 1
}
],
"events": [
{
"type": "ConnectionCheckOutStarted"
},
{
"type": "ConnectionCreated"
},
{
"type": "ConnectionPoolCleared",
"interruptInUseConnections": true
},
{
"type": "ConnectionClosed"
},
{
"type": "ConnectionCheckOutFailed"
}
],
"ignore": [
"ConnectionCheckedIn",
"ConnectionCheckedOut",
"ConnectionPoolCreated",
"ConnectionPoolReady"
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
version: 1
style: integration
description: clear with interruptInUseConnections = true closes pending connections
runOn:
-
minServerVersion: "4.9.0"
failPoint:
configureFailPoint: failCommand
mode: "alwaysOn"
data:
failCommands: ["isMaster","hello"]
closeConnection: false
blockConnection: true
blockTimeMS: 1000
poolOptions:
minPoolSize: 0
operations:
- name: ready
- name: start
target: thread1
- name: checkOut
thread: thread1
- name: waitForEvent
event: ConnectionCreated
count: 1
- name: clear
interruptInUseConnections: true
- name: waitForEvent
event: ConnectionCheckOutFailed
count: 1
events:
- type: ConnectionCheckOutStarted
- type: ConnectionCreated
- type: ConnectionPoolCleared
interruptInUseConnections: true
- type: ConnectionClosed
- type: ConnectionCheckOutFailed
ignore:
- ConnectionCheckedIn
- ConnectionCheckedOut
- ConnectionPoolCreated
- ConnectionPoolReady
Loading
Loading