Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GODRIVER-2335 Preemptively cancel in progress operations when SDAM heartbeats timeout. #1423

Merged
merged 13 commits into from
Jan 5, 2024
5 changes: 3 additions & 2 deletions event/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ type PoolEvent struct {
Reason string `json:"reason"`
// ServiceID is only set if the Type is PoolCleared and the server is deployed behind a load balancer. This field
// can be used to distinguish between individual servers in a load balanced deployment.
ServiceID *primitive.ObjectID `json:"serviceId"`
Error error `json:"error"`
ServiceID *primitive.ObjectID `json:"serviceId"`
Interruption bool `json:"interruptInUseConnections"`
Error error `json:"error"`
}

// PoolMonitor is a function that allows the user to gain access to events occurring in the pool
Expand Down
8 changes: 8 additions & 0 deletions internal/eventtest/eventtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,11 @@ func (tpm *TestPoolMonitor) IsPoolCleared() bool {
})
return len(poolClearedEvents) > 0
}

// Interruptions returns the number of interruptions in the events recorded by the testPoolMonitor.
func (tpm *TestPoolMonitor) Interruptions() int {
interruptions := tpm.Events(func(evt *event.PoolEvent) bool {
return evt.Interruption
})
return len(interruptions)
}
2 changes: 2 additions & 0 deletions internal/integration/unified/client_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,8 @@ func setClientOptionsFromURIOptions(clientOpts *options.ClientOptions, uriOpts b
switch strings.ToLower(key) {
case "appname":
clientOpts.SetAppName(value.(string))
case "connecttimeoutms":
clientOpts.SetConnectTimeout(time.Duration(value.(int32)) * time.Millisecond)
case "heartbeatfrequencyms":
clientOpts.SetHeartbeatInterval(time.Duration(value.(int32)) * time.Millisecond)
case "loadbalanced":
Expand Down
4 changes: 4 additions & 0 deletions internal/integration/unified/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ type EntityMap struct {
successValues map[string]int32
iterationValues map[string]int32
clientEncryptionEntities map[string]*mongo.ClientEncryption
waitChans map[string]chan error
evtLock sync.Mutex
closed atomic.Value
// keyVaultClientIDs tracks IDs of clients used as a keyVaultClient in ClientEncryption objects.
Expand Down Expand Up @@ -167,6 +168,7 @@ func newEntityMap() *EntityMap {
successValues: make(map[string]int32),
iterationValues: make(map[string]int32),
clientEncryptionEntities: make(map[string]*mongo.ClientEncryption),
waitChans: make(map[string]chan error),
keyVaultClientIDs: make(map[string]bool),
}
em.setClosed(false)
Expand Down Expand Up @@ -283,6 +285,8 @@ func (em *EntityMap) addEntity(ctx context.Context, entityType string, entityOpt
err = em.addCollectionEntity(entityOptions)
case "session":
err = em.addSessionEntity(entityOptions)
case "thread":
em.waitChans[entityOptions.ID] = make(chan error)
case "bucket":
err = em.addGridFSBucketEntity(entityOptions)
case "clientEncryption":
Expand Down
7 changes: 6 additions & 1 deletion internal/integration/unified/event_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ type cmapEvent struct {
ConnectionCheckedInEvent *struct{} `bson:"connectionCheckedInEvent"`

PoolClearedEvent *struct {
HasServiceID *bool `bson:"hasServiceId"`
HasServiceID *bool `bson:"hasServiceId"`
InterruptInUseConnections *bool `bson:"interruptInUseConnections"`
} `bson:"poolClearedEvent"`
}

Expand Down Expand Up @@ -361,6 +362,10 @@ func verifyCMAPEvents(client *clientEntity, expectedEvents *expectedEvents) erro
return newEventVerificationError(idx, client, "error verifying serviceID: %v", err)
}
}
if expectInterruption := evt.PoolClearedEvent.InterruptInUseConnections; expectInterruption != nil && *expectInterruption != actual.Interruption {
return newEventVerificationError(idx, client, "expected interruptInUseConnections %v, got %v",
expectInterruption, actual.Interruption)
}
default:
return newEventVerificationError(idx, client, "no expected event set on cmapEvent instance")
}
Expand Down
35 changes: 30 additions & 5 deletions internal/integration/unified/testrunner_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ func (lp *loopArgs) iterationsStored() bool {
return lp.IterationsEntityID != ""
}

func executeTestRunnerOperation(ctx context.Context, operation *operation, loopDone <-chan struct{}) error {
args := operation.Arguments
func executeTestRunnerOperation(ctx context.Context, op *operation, loopDone <-chan struct{}) error {
args := op.Arguments

switch operation.Name {
switch op.Name {
case "failPoint":
clientID := lookupString(args, "client")
client, err := entities(ctx).client(clientID)
Expand Down Expand Up @@ -187,9 +187,34 @@ func executeTestRunnerOperation(ctx context.Context, operation *operation, loopD
}
}
return nil
case "runOnThread":
operationRaw, err := args.LookupErr("operation")
if err != nil {
return fmt.Errorf("'operation' argument not found in runOnThread operation")
}
threadOp := new(operation)
if err := operationRaw.Unmarshal(threadOp); err != nil {
return fmt.Errorf("error unmarshaling 'operation' argument: %v", err)
}
ch := entities(ctx).waitChans[lookupString(args, "thread")]
go func(op *operation) {
err := op.execute(ctx, loopDone)
ch <- err
}(threadOp)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This style of running operations on a "thread" doesn't guarantee that operations will be run in the specified order. The runOnThread operation in the unified test format spec fails to mention that property, but my understanding is that unified spec test runners must preserve the order of ops run in a "thread".

The legacy "runOnThread" implementation here uses a job queue per "thread". Consider using a similar approach and/or copying the code from the legacy spec test runner.

return nil
case "waitForThread":
if ch, ok := entities(ctx).waitChans[lookupString(args, "thread")]; ok {
select {
case err := <-ch:
return err
case <-time.After(10 * time.Second):
return fmt.Errorf("timed out after 10 seconds")
}
}
return nil
case "waitForEvent":
var wfeArgs waitForEventArguments
if err := bson.Unmarshal(operation.Arguments, &wfeArgs); err != nil {
if err := bson.Unmarshal(op.Arguments, &wfeArgs); err != nil {
return fmt.Errorf("error unmarshalling event to waitForEventArguments: %v", err)
}

Expand All @@ -198,7 +223,7 @@ func executeTestRunnerOperation(ctx context.Context, operation *operation, loopD

return waitForEvent(wfeCtx, wfeArgs)
default:
return fmt.Errorf("unrecognized testRunner operation %q", operation.Name)
return fmt.Errorf("unrecognized testRunner operation %q", op.Name)
}
}

Expand Down
5 changes: 5 additions & 0 deletions internal/integration/unified/unified_spec_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ var (
// TODO(GODRIVER-2843): Fix and unskip these test cases.
"Find operation with snapshot": "Test fails frequently. See GODRIVER-2843",
"Write commands with snapshot session do not affect snapshot reads": "Test fails frequently. See GODRIVER-2843",
// The current logic, which was implemented with GODRIVER-2577, only clears pools and cancels in-progress ops if
// the heartbeat fails twice. Therefore, we skip the following spec tests, which requires canceling ops immediately.
"Connection pool clear uses interruptInUseConnections=true after monitor timeout": "Godriver clears after multiple timeout",
"Error returned from connection pool clear with interruptInUseConnections=true is retryable": "Godriver clears after multiple timeout",
"Error returned from connection pool clear with interruptInUseConnections=true is retryable for write": "Godriver clears after multiple timeout",
}

logMessageValidatorTimeout = 10 * time.Millisecond
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{
"version": 1,
"style": "unit",
"description": "Connections MUST be interrupted as soon as possible (interruptInUseConnections=true)",
"poolOptions": {
"backgroundThreadIntervalMS": 10000
},
"operations": [
{
"name": "ready"
},
{
"name": "checkOut"
},
{
"name": "checkOut",
"label": "conn"
},
{
"name": "clear",
"interruptInUseConnections": true
},
{
"name": "waitForEvent",
"event": "ConnectionPoolCleared",
"count": 1,
"timeout": 1000
},
{
"name": "waitForEvent",
"event": "ConnectionClosed",
"count": 2,
"timeout": 1000
},
{
"name": "close"
}
],
"events": [
{
"type": "ConnectionCheckedOut",
"connectionId": 1,
"address": 42
},
{
"type": "ConnectionCheckedOut",
"connectionId": 2,
"address": 42
},
{
"type": "ConnectionPoolCleared",
"interruptInUseConnections": true
},
{
"type": "ConnectionClosed",
"reason": "stale",
"address": 42
},
{
"type": "ConnectionClosed",
"reason": "stale",
"address": 42
},
{
"type": "ConnectionPoolClosed",
"address": 42
}
],
"ignore": [
"ConnectionCreated",
"ConnectionPoolReady",
"ConnectionReady",
"ConnectionCheckOutStarted",
"ConnectionPoolCreated",
"ConnectionCheckedIn"
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
version: 1
style: unit
description: Connections MUST be interrupted as soon as possible (interruptInUseConnections=true)
poolOptions:
# ensure it's not involved by default
backgroundThreadIntervalMS: 10000
operations:
- name: ready
- name: checkOut
- name: checkOut
label: conn
- name: clear
interruptInUseConnections: true
- name: waitForEvent
event: ConnectionPoolCleared
count: 1
timeout: 1000
- name: waitForEvent
event: ConnectionClosed
count: 2
timeout: 1000
- name: close
events:
- type: ConnectionCheckedOut
connectionId: 1
address: 42
- type: ConnectionCheckedOut
connectionId: 2
address: 42
- type: ConnectionPoolCleared
interruptInUseConnections: true
- type: ConnectionClosed
reason: stale
address: 42
- type: ConnectionClosed
reason: stale
address: 42
- type: ConnectionPoolClosed
address: 42
ignore:
- ConnectionCreated
- ConnectionPoolReady
- ConnectionReady
- ConnectionCheckOutStarted
- ConnectionPoolCreated
- ConnectionCheckedIn
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{
"version": 1,
"style": "integration",
"description": "clear with interruptInUseConnections = true closes pending connections",
"runOn": [
{
"minServerVersion": "4.9.0"
}
],
"failPoint": {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"failCommands": [
"isMaster",
"hello"
],
"closeConnection": false,
"blockConnection": true,
"blockTimeMS": 1000
}
},
"poolOptions": {
"minPoolSize": 0
},
"operations": [
{
"name": "ready"
},
{
"name": "start",
"target": "thread1"
},
{
"name": "checkOut",
"thread": "thread1"
},
{
"name": "waitForEvent",
"event": "ConnectionCreated",
"count": 1
},
{
"name": "clear",
"interruptInUseConnections": true
},
{
"name": "waitForEvent",
"event": "ConnectionCheckOutFailed",
"count": 1
}
],
"events": [
{
"type": "ConnectionCheckOutStarted"
},
{
"type": "ConnectionCreated"
},
{
"type": "ConnectionPoolCleared",
"interruptInUseConnections": true
},
{
"type": "ConnectionClosed"
},
{
"type": "ConnectionCheckOutFailed"
}
],
"ignore": [
"ConnectionCheckedIn",
"ConnectionCheckedOut",
"ConnectionPoolCreated",
"ConnectionPoolReady"
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
version: 1
style: integration
description: clear with interruptInUseConnections = true closes pending connections
runOn:
-
minServerVersion: "4.9.0"
failPoint:
configureFailPoint: failCommand
mode: "alwaysOn"
data:
failCommands: ["isMaster","hello"]
closeConnection: false
blockConnection: true
blockTimeMS: 1000
poolOptions:
minPoolSize: 0
operations:
- name: ready
- name: start
target: thread1
- name: checkOut
thread: thread1
- name: waitForEvent
event: ConnectionCreated
count: 1
- name: clear
interruptInUseConnections: true
- name: waitForEvent
event: ConnectionCheckOutFailed
count: 1
events:
- type: ConnectionCheckOutStarted
- type: ConnectionCreated
- type: ConnectionPoolCleared
interruptInUseConnections: true
- type: ConnectionClosed
- type: ConnectionCheckOutFailed
ignore:
- ConnectionCheckedIn
- ConnectionCheckedOut
- ConnectionPoolCreated
- ConnectionPoolReady
Loading
Loading