Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GODRIVER-2335 Preemptively cancel in progress operations when SDAM heartbeats timeout. #1423

Merged
merged 13 commits into from
Jan 5, 2024
5 changes: 3 additions & 2 deletions event/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ type PoolEvent struct {
Reason string `json:"reason"`
// ServiceID is only set if the Type is PoolCleared and the server is deployed behind a load balancer. This field
// can be used to distinguish between individual servers in a load balanced deployment.
ServiceID *primitive.ObjectID `json:"serviceId"`
Error error `json:"error"`
ServiceID *primitive.ObjectID `json:"serviceId"`
Interruption bool `json:"interruptInUseConnections"`
Error error `json:"error"`
}

// PoolMonitor is a function that allows the user to gain access to events occurring in the pool
Expand Down
8 changes: 8 additions & 0 deletions internal/eventtest/eventtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,11 @@ func (tpm *TestPoolMonitor) IsPoolCleared() bool {
})
return len(poolClearedEvents) > 0
}

// Interruptions returns the number of interruptions in the events recorded by the testPoolMonitor.
func (tpm *TestPoolMonitor) Interruptions() int {
interruptions := tpm.Events(func(evt *event.PoolEvent) bool {
return evt.Interruption
})
return len(interruptions)
}
2 changes: 2 additions & 0 deletions internal/integration/unified/client_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,8 @@ func setClientOptionsFromURIOptions(clientOpts *options.ClientOptions, uriOpts b
switch strings.ToLower(key) {
case "appname":
clientOpts.SetAppName(value.(string))
case "connecttimeoutms":
clientOpts.SetConnectTimeout(time.Duration(value.(int32)) * time.Millisecond)
case "heartbeatfrequencyms":
clientOpts.SetHeartbeatInterval(time.Duration(value.(int32)) * time.Millisecond)
case "loadbalanced":
Expand Down
69 changes: 69 additions & 0 deletions internal/integration/unified/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,70 @@ func newCollectionEntityOptions(id string, databaseID string, collectionName str
return options
}

type task struct {
name string
execute func() error
}

type backgroundRoutine struct {
tasks chan *task
wg sync.WaitGroup
err error
}

func (b *backgroundRoutine) start() {
b.wg.Add(1)

go func() {
defer b.wg.Done()

for t := range b.tasks {
if b.err != nil {
continue
}

ch := make(chan error)
go func(task *task) {
ch <- task.execute()
}(t)
select {
case err := <-ch:
if err != nil {
b.err = fmt.Errorf("error running operation %s: %v", t.name, err)
}
case <-time.After(10 * time.Second):
b.err = fmt.Errorf("timed out after 10 seconds")
}
}
}()
}

func (b *backgroundRoutine) stop() error {
close(b.tasks)
b.wg.Wait()
return b.err
}

func (b *backgroundRoutine) addTask(name string, execute func() error) bool {
select {
case b.tasks <- &task{
name: name,
execute: execute,
}:
return true
default:
return false
}
}

func newBackgroundRoutine() *backgroundRoutine {
routine := &backgroundRoutine{
tasks: make(chan *task, 10),
}

return routine
}

type clientEncryptionOpts struct {
KeyVaultClient string `bson:"keyVaultClient"`
KeyVaultNamespace string `bson:"keyVaultNamespace"`
Expand All @@ -136,6 +200,7 @@ type EntityMap struct {
successValues map[string]int32
iterationValues map[string]int32
clientEncryptionEntities map[string]*mongo.ClientEncryption
routinesMap sync.Map // maps thread name to *backgroundRoutine
evtLock sync.Mutex
closed atomic.Value
// keyVaultClientIDs tracks IDs of clients used as a keyVaultClient in ClientEncryption objects.
Expand Down Expand Up @@ -283,6 +348,10 @@ func (em *EntityMap) addEntity(ctx context.Context, entityType string, entityOpt
err = em.addCollectionEntity(entityOptions)
case "session":
err = em.addSessionEntity(entityOptions)
case "thread":
routine := newBackgroundRoutine()
em.routinesMap.Store(entityOptions.ID, routine)
routine.start()
case "bucket":
err = em.addGridFSBucketEntity(entityOptions)
case "clientEncryption":
Expand Down
7 changes: 6 additions & 1 deletion internal/integration/unified/event_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ type cmapEvent struct {
ConnectionCheckedInEvent *struct{} `bson:"connectionCheckedInEvent"`

PoolClearedEvent *struct {
HasServiceID *bool `bson:"hasServiceId"`
HasServiceID *bool `bson:"hasServiceId"`
InterruptInUseConnections *bool `bson:"interruptInUseConnections"`
} `bson:"poolClearedEvent"`
}

Expand Down Expand Up @@ -361,6 +362,10 @@ func verifyCMAPEvents(client *clientEntity, expectedEvents *expectedEvents) erro
return newEventVerificationError(idx, client, "error verifying serviceID: %v", err)
}
}
if expectInterruption := evt.PoolClearedEvent.InterruptInUseConnections; expectInterruption != nil && *expectInterruption != actual.Interruption {
return newEventVerificationError(idx, client, "expected interruptInUseConnections %v, got %v",
expectInterruption, actual.Interruption)
}
default:
return newEventVerificationError(idx, client, "no expected event set on cmapEvent instance")
}
Expand Down
35 changes: 30 additions & 5 deletions internal/integration/unified/testrunner_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ func (lp *loopArgs) iterationsStored() bool {
return lp.IterationsEntityID != ""
}

func executeTestRunnerOperation(ctx context.Context, operation *operation, loopDone <-chan struct{}) error {
args := operation.Arguments
func executeTestRunnerOperation(ctx context.Context, op *operation, loopDone <-chan struct{}) error {
args := op.Arguments

switch operation.Name {
switch op.Name {
case "failPoint":
clientID := lookupString(args, "client")
client, err := entities(ctx).client(clientID)
Expand Down Expand Up @@ -187,9 +187,34 @@ func executeTestRunnerOperation(ctx context.Context, operation *operation, loopD
}
}
return nil
case "runOnThread":
operationRaw, err := args.LookupErr("operation")
if err != nil {
return fmt.Errorf("'operation' argument not found in runOnThread operation")
}
threadOp := new(operation)
if err := operationRaw.Unmarshal(threadOp); err != nil {
return fmt.Errorf("error unmarshaling 'operation' argument: %v", err)
}
thread := lookupString(args, "thread")
routine, ok := entities(ctx).routinesMap.Load(thread)
if !ok {
return fmt.Errorf("run on unknown thread: %s", thread)
}
routine.(*backgroundRoutine).addTask(threadOp.Name, func() error {
return threadOp.execute(ctx, loopDone)
})
return nil
case "waitForThread":
thread := lookupString(args, "thread")
routine, ok := entities(ctx).routinesMap.Load(thread)
if !ok {
return fmt.Errorf("wait for unknown thread: %s", thread)
}
return routine.(*backgroundRoutine).stop()
case "waitForEvent":
var wfeArgs waitForEventArguments
if err := bson.Unmarshal(operation.Arguments, &wfeArgs); err != nil {
if err := bson.Unmarshal(op.Arguments, &wfeArgs); err != nil {
return fmt.Errorf("error unmarshalling event to waitForEventArguments: %v", err)
}

Expand All @@ -198,7 +223,7 @@ func executeTestRunnerOperation(ctx context.Context, operation *operation, loopD

return waitForEvent(wfeCtx, wfeArgs)
default:
return fmt.Errorf("unrecognized testRunner operation %q", operation.Name)
return fmt.Errorf("unrecognized testRunner operation %q", op.Name)
}
}

Expand Down
5 changes: 5 additions & 0 deletions internal/integration/unified/unified_spec_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ var (
// TODO(GODRIVER-2843): Fix and unskip these test cases.
"Find operation with snapshot": "Test fails frequently. See GODRIVER-2843",
"Write commands with snapshot session do not affect snapshot reads": "Test fails frequently. See GODRIVER-2843",
// The current logic, which was implemented with GODRIVER-2577, only clears pools and cancels in-progress ops if
// the heartbeat fails twice. Therefore, we skip the following spec tests, which requires canceling ops immediately.
"Connection pool clear uses interruptInUseConnections=true after monitor timeout": "Godriver clears after multiple timeout",
"Error returned from connection pool clear with interruptInUseConnections=true is retryable": "Godriver clears after multiple timeout",
"Error returned from connection pool clear with interruptInUseConnections=true is retryable for write": "Godriver clears after multiple timeout",
}

logMessageValidatorTimeout = 10 * time.Millisecond
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{
"version": 1,
"style": "unit",
"description": "Connections MUST be interrupted as soon as possible (interruptInUseConnections=true)",
"poolOptions": {
"backgroundThreadIntervalMS": 10000
},
"operations": [
{
"name": "ready"
},
{
"name": "checkOut"
},
{
"name": "checkOut",
"label": "conn"
},
{
"name": "clear",
"interruptInUseConnections": true
},
{
"name": "waitForEvent",
"event": "ConnectionPoolCleared",
"count": 1,
"timeout": 1000
},
{
"name": "waitForEvent",
"event": "ConnectionClosed",
"count": 2,
"timeout": 1000
},
{
"name": "close"
}
],
"events": [
{
"type": "ConnectionCheckedOut",
"connectionId": 1,
"address": 42
},
{
"type": "ConnectionCheckedOut",
"connectionId": 2,
"address": 42
},
{
"type": "ConnectionPoolCleared",
"interruptInUseConnections": true
},
{
"type": "ConnectionClosed",
"reason": "stale",
"address": 42
},
{
"type": "ConnectionClosed",
"reason": "stale",
"address": 42
},
{
"type": "ConnectionPoolClosed",
"address": 42
}
],
"ignore": [
"ConnectionCreated",
"ConnectionPoolReady",
"ConnectionReady",
"ConnectionCheckOutStarted",
"ConnectionPoolCreated",
"ConnectionCheckedIn"
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
version: 1
style: unit
description: Connections MUST be interrupted as soon as possible (interruptInUseConnections=true)
poolOptions:
# ensure it's not involved by default
backgroundThreadIntervalMS: 10000
operations:
- name: ready
- name: checkOut
- name: checkOut
label: conn
- name: clear
interruptInUseConnections: true
- name: waitForEvent
event: ConnectionPoolCleared
count: 1
timeout: 1000
- name: waitForEvent
event: ConnectionClosed
count: 2
timeout: 1000
- name: close
events:
- type: ConnectionCheckedOut
connectionId: 1
address: 42
- type: ConnectionCheckedOut
connectionId: 2
address: 42
- type: ConnectionPoolCleared
interruptInUseConnections: true
- type: ConnectionClosed
reason: stale
address: 42
- type: ConnectionClosed
reason: stale
address: 42
- type: ConnectionPoolClosed
address: 42
ignore:
- ConnectionCreated
- ConnectionPoolReady
- ConnectionReady
- ConnectionCheckOutStarted
- ConnectionPoolCreated
- ConnectionCheckedIn
Loading
Loading