diff --git a/event/monitoring.go b/event/monitoring.go index 53d1caf2e3..cc2c7a4e6c 100644 --- a/event/monitoring.go +++ b/event/monitoring.go @@ -120,8 +120,9 @@ type PoolEvent struct { Reason string `json:"reason"` // ServiceID is only set if the Type is PoolCleared and the server is deployed behind a load balancer. This field // can be used to distinguish between individual servers in a load balanced deployment. - ServiceID *primitive.ObjectID `json:"serviceId"` - Error error `json:"error"` + ServiceID *primitive.ObjectID `json:"serviceId"` + Interruption bool `json:"interruptInUseConnections"` + Error error `json:"error"` } // PoolMonitor is a function that allows the user to gain access to events occurring in the pool diff --git a/internal/eventtest/eventtest.go b/internal/eventtest/eventtest.go index c06037e850..1158d69933 100644 --- a/internal/eventtest/eventtest.go +++ b/internal/eventtest/eventtest.go @@ -76,3 +76,11 @@ func (tpm *TestPoolMonitor) IsPoolCleared() bool { }) return len(poolClearedEvents) > 0 } + +// Interruptions returns the number of interruptions in the events recorded by the testPoolMonitor. +func (tpm *TestPoolMonitor) Interruptions() int { + interruptions := tpm.Events(func(evt *event.PoolEvent) bool { + return evt.Interruption + }) + return len(interruptions) +} diff --git a/mongo/integration/unified/client_entity.go b/mongo/integration/unified/client_entity.go index aefe5079f1..b5ea8e7bbe 100644 --- a/mongo/integration/unified/client_entity.go +++ b/mongo/integration/unified/client_entity.go @@ -571,6 +571,8 @@ func setClientOptionsFromURIOptions(clientOpts *options.ClientOptions, uriOpts b switch strings.ToLower(key) { case "appname": clientOpts.SetAppName(value.(string)) + case "connecttimeoutms": + clientOpts.SetConnectTimeout(time.Duration(value.(int32)) * time.Millisecond) case "heartbeatfrequencyms": clientOpts.SetHeartbeatInterval(time.Duration(value.(int32)) * time.Millisecond) case "loadbalanced": diff --git a/mongo/integration/unified/entity.go b/mongo/integration/unified/entity.go index 0ae9fc006f..8c31addf33 100644 --- a/mongo/integration/unified/entity.go +++ b/mongo/integration/unified/entity.go @@ -112,6 +112,70 @@ func newCollectionEntityOptions(id string, databaseID string, collectionName str return options } +type task struct { + name string + execute func() error +} + +type backgroundRoutine struct { + tasks chan *task + wg sync.WaitGroup + err error +} + +func (b *backgroundRoutine) start() { + b.wg.Add(1) + + go func() { + defer b.wg.Done() + + for t := range b.tasks { + if b.err != nil { + continue + } + + ch := make(chan error) + go func(task *task) { + ch <- task.execute() + }(t) + select { + case err := <-ch: + if err != nil { + b.err = fmt.Errorf("error running operation %s: %v", t.name, err) + } + case <-time.After(10 * time.Second): + b.err = fmt.Errorf("timed out after 10 seconds") + } + } + }() +} + +func (b *backgroundRoutine) stop() error { + close(b.tasks) + b.wg.Wait() + return b.err +} + +func (b *backgroundRoutine) addTask(name string, execute func() error) bool { + select { + case b.tasks <- &task{ + name: name, + execute: execute, + }: + return true + default: + return false + } +} + +func newBackgroundRoutine() *backgroundRoutine { + routine := &backgroundRoutine{ + tasks: make(chan *task, 10), + } + + return routine +} + type clientEncryptionOpts struct { KeyVaultClient string `bson:"keyVaultClient"` KeyVaultNamespace string `bson:"keyVaultNamespace"` @@ -136,6 +200,7 @@ type EntityMap struct { successValues map[string]int32 iterationValues map[string]int32 clientEncryptionEntities map[string]*mongo.ClientEncryption + routinesMap sync.Map // maps thread name to *backgroundRoutine evtLock sync.Mutex closed atomic.Value // keyVaultClientIDs tracks IDs of clients used as a keyVaultClient in ClientEncryption objects. @@ -283,6 +348,10 @@ func (em *EntityMap) addEntity(ctx context.Context, entityType string, entityOpt err = em.addCollectionEntity(entityOptions) case "session": err = em.addSessionEntity(entityOptions) + case "thread": + routine := newBackgroundRoutine() + em.routinesMap.Store(entityOptions.ID, routine) + routine.start() case "bucket": err = em.addGridFSBucketEntity(entityOptions) case "clientEncryption": diff --git a/mongo/integration/unified/event_verification.go b/mongo/integration/unified/event_verification.go index 6516000416..6a621b78bc 100644 --- a/mongo/integration/unified/event_verification.go +++ b/mongo/integration/unified/event_verification.go @@ -61,7 +61,8 @@ type cmapEvent struct { ConnectionCheckedInEvent *struct{} `bson:"connectionCheckedInEvent"` PoolClearedEvent *struct { - HasServiceID *bool `bson:"hasServiceId"` + HasServiceID *bool `bson:"hasServiceId"` + InterruptInUseConnections *bool `bson:"interruptInUseConnections"` } `bson:"poolClearedEvent"` } @@ -361,6 +362,10 @@ func verifyCMAPEvents(client *clientEntity, expectedEvents *expectedEvents) erro return newEventVerificationError(idx, client, "error verifying serviceID: %v", err) } } + if expectInterruption := evt.PoolClearedEvent.InterruptInUseConnections; expectInterruption != nil && *expectInterruption != actual.Interruption { + return newEventVerificationError(idx, client, "expected interruptInUseConnections %v, got %v", + expectInterruption, actual.Interruption) + } default: return newEventVerificationError(idx, client, "no expected event set on cmapEvent instance") } diff --git a/mongo/integration/unified/testrunner_operation.go b/mongo/integration/unified/testrunner_operation.go index 411b312f6c..b1af95b5ba 100644 --- a/mongo/integration/unified/testrunner_operation.go +++ b/mongo/integration/unified/testrunner_operation.go @@ -50,10 +50,10 @@ func (lp *loopArgs) iterationsStored() bool { return lp.IterationsEntityID != "" } -func executeTestRunnerOperation(ctx context.Context, operation *operation, loopDone <-chan struct{}) error { - args := operation.Arguments +func executeTestRunnerOperation(ctx context.Context, op *operation, loopDone <-chan struct{}) error { + args := op.Arguments - switch operation.Name { + switch op.Name { case "failPoint": clientID := lookupString(args, "client") client, err := entities(ctx).client(clientID) @@ -187,9 +187,34 @@ func executeTestRunnerOperation(ctx context.Context, operation *operation, loopD } } return nil + case "runOnThread": + operationRaw, err := args.LookupErr("operation") + if err != nil { + return fmt.Errorf("'operation' argument not found in runOnThread operation") + } + threadOp := new(operation) + if err := operationRaw.Unmarshal(threadOp); err != nil { + return fmt.Errorf("error unmarshaling 'operation' argument: %v", err) + } + thread := lookupString(args, "thread") + routine, ok := entities(ctx).routinesMap.Load(thread) + if !ok { + return fmt.Errorf("run on unknown thread: %s", thread) + } + routine.(*backgroundRoutine).addTask(threadOp.Name, func() error { + return threadOp.execute(ctx, loopDone) + }) + return nil + case "waitForThread": + thread := lookupString(args, "thread") + routine, ok := entities(ctx).routinesMap.Load(thread) + if !ok { + return fmt.Errorf("wait for unknown thread: %s", thread) + } + return routine.(*backgroundRoutine).stop() case "waitForEvent": var wfeArgs waitForEventArguments - if err := bson.Unmarshal(operation.Arguments, &wfeArgs); err != nil { + if err := bson.Unmarshal(op.Arguments, &wfeArgs); err != nil { return fmt.Errorf("error unmarshalling event to waitForEventArguments: %v", err) } @@ -198,7 +223,7 @@ func executeTestRunnerOperation(ctx context.Context, operation *operation, loopD return waitForEvent(wfeCtx, wfeArgs) default: - return fmt.Errorf("unrecognized testRunner operation %q", operation.Name) + return fmt.Errorf("unrecognized testRunner operation %q", op.Name) } } diff --git a/mongo/integration/unified/unified_spec_runner.go b/mongo/integration/unified/unified_spec_runner.go index b3ac2d7ac9..b7744844e0 100644 --- a/mongo/integration/unified/unified_spec_runner.go +++ b/mongo/integration/unified/unified_spec_runner.go @@ -28,6 +28,12 @@ var ( // the "find" and one for the "getMore", but we send three for both. "A successful find event with a getmore and the server kills the cursor (<= 4.4)": "See GODRIVER-1773", + // GODRIVER-2577: The following spec tests require canceling ops immediately, but the current logic clears pools + // and cancels in-progress ops after two the heartbeat failures. + "Connection pool clear uses interruptInUseConnections=true after monitor timeout": "Godriver clears after multiple timeout", + "Error returned from connection pool clear with interruptInUseConnections=true is retryable": "Godriver clears after multiple timeout", + "Error returned from connection pool clear with interruptInUseConnections=true is retryable for write": "Godriver clears after multiple timeout", + // TODO(GODRIVER-2843): Fix and unskip these test cases. "Find operation with snapshot": "Test fails frequently. See GODRIVER-2843", "Write commands with snapshot session do not affect snapshot reads": "Test fails frequently. See GODRIVER-2843", diff --git a/testdata/connection-monitoring-and-pooling/pool-clear-interrupt-immediately.json b/testdata/connection-monitoring-and-pooling/pool-clear-interrupt-immediately.json new file mode 100644 index 0000000000..54e2566ede --- /dev/null +++ b/testdata/connection-monitoring-and-pooling/pool-clear-interrupt-immediately.json @@ -0,0 +1,77 @@ +{ + "version": 1, + "style": "unit", + "description": "Connections MUST be interrupted as soon as possible (interruptInUseConnections=true)", + "poolOptions": { + "backgroundThreadIntervalMS": 10000 + }, + "operations": [ + { + "name": "ready" + }, + { + "name": "checkOut" + }, + { + "name": "checkOut", + "label": "conn" + }, + { + "name": "clear", + "interruptInUseConnections": true + }, + { + "name": "waitForEvent", + "event": "ConnectionPoolCleared", + "count": 1, + "timeout": 1000 + }, + { + "name": "waitForEvent", + "event": "ConnectionClosed", + "count": 2, + "timeout": 1000 + }, + { + "name": "close" + } + ], + "events": [ + { + "type": "ConnectionCheckedOut", + "connectionId": 1, + "address": 42 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 2, + "address": 42 + }, + { + "type": "ConnectionPoolCleared", + "interruptInUseConnections": true + }, + { + "type": "ConnectionClosed", + "reason": "stale", + "address": 42 + }, + { + "type": "ConnectionClosed", + "reason": "stale", + "address": 42 + }, + { + "type": "ConnectionPoolClosed", + "address": 42 + } + ], + "ignore": [ + "ConnectionCreated", + "ConnectionPoolReady", + "ConnectionReady", + "ConnectionCheckOutStarted", + "ConnectionPoolCreated", + "ConnectionCheckedIn" + ] +} diff --git a/testdata/connection-monitoring-and-pooling/pool-clear-interrupt-immediately.yml b/testdata/connection-monitoring-and-pooling/pool-clear-interrupt-immediately.yml new file mode 100644 index 0000000000..dd88a15d3d --- /dev/null +++ b/testdata/connection-monitoring-and-pooling/pool-clear-interrupt-immediately.yml @@ -0,0 +1,46 @@ +version: 1 +style: unit +description: Connections MUST be interrupted as soon as possible (interruptInUseConnections=true) +poolOptions: + # ensure it's not involved by default + backgroundThreadIntervalMS: 10000 +operations: + - name: ready + - name: checkOut + - name: checkOut + label: conn + - name: clear + interruptInUseConnections: true + - name: waitForEvent + event: ConnectionPoolCleared + count: 1 + timeout: 1000 + - name: waitForEvent + event: ConnectionClosed + count: 2 + timeout: 1000 + - name: close +events: + - type: ConnectionCheckedOut + connectionId: 1 + address: 42 + - type: ConnectionCheckedOut + connectionId: 2 + address: 42 + - type: ConnectionPoolCleared + interruptInUseConnections: true + - type: ConnectionClosed + reason: stale + address: 42 + - type: ConnectionClosed + reason: stale + address: 42 + - type: ConnectionPoolClosed + address: 42 +ignore: + - ConnectionCreated + - ConnectionPoolReady + - ConnectionReady + - ConnectionCheckOutStarted + - ConnectionPoolCreated + - ConnectionCheckedIn \ No newline at end of file diff --git a/testdata/connection-monitoring-and-pooling/pool-clear-interrupting-pending-connections.json b/testdata/connection-monitoring-and-pooling/pool-clear-interrupting-pending-connections.json new file mode 100644 index 0000000000..ceae07a1c7 --- /dev/null +++ b/testdata/connection-monitoring-and-pooling/pool-clear-interrupting-pending-connections.json @@ -0,0 +1,77 @@ +{ + "version": 1, + "style": "integration", + "description": "clear with interruptInUseConnections = true closes pending connections", + "runOn": [ + { + "minServerVersion": "4.9.0" + } + ], + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "isMaster", + "hello" + ], + "closeConnection": false, + "blockConnection": true, + "blockTimeMS": 1000 + } + }, + "poolOptions": { + "minPoolSize": 0 + }, + "operations": [ + { + "name": "ready" + }, + { + "name": "start", + "target": "thread1" + }, + { + "name": "checkOut", + "thread": "thread1" + }, + { + "name": "waitForEvent", + "event": "ConnectionCreated", + "count": 1 + }, + { + "name": "clear", + "interruptInUseConnections": true + }, + { + "name": "waitForEvent", + "event": "ConnectionCheckOutFailed", + "count": 1 + } + ], + "events": [ + { + "type": "ConnectionCheckOutStarted" + }, + { + "type": "ConnectionCreated" + }, + { + "type": "ConnectionPoolCleared", + "interruptInUseConnections": true + }, + { + "type": "ConnectionClosed" + }, + { + "type": "ConnectionCheckOutFailed" + } + ], + "ignore": [ + "ConnectionCheckedIn", + "ConnectionCheckedOut", + "ConnectionPoolCreated", + "ConnectionPoolReady" + ] +} diff --git a/testdata/connection-monitoring-and-pooling/pool-clear-interrupting-pending-connections.yml b/testdata/connection-monitoring-and-pooling/pool-clear-interrupting-pending-connections.yml new file mode 100644 index 0000000000..d13257c3c4 --- /dev/null +++ b/testdata/connection-monitoring-and-pooling/pool-clear-interrupting-pending-connections.yml @@ -0,0 +1,42 @@ +version: 1 +style: integration +description: clear with interruptInUseConnections = true closes pending connections +runOn: + - + minServerVersion: "4.9.0" +failPoint: + configureFailPoint: failCommand + mode: "alwaysOn" + data: + failCommands: ["isMaster","hello"] + closeConnection: false + blockConnection: true + blockTimeMS: 1000 +poolOptions: + minPoolSize: 0 +operations: + - name: ready + - name: start + target: thread1 + - name: checkOut + thread: thread1 + - name: waitForEvent + event: ConnectionCreated + count: 1 + - name: clear + interruptInUseConnections: true + - name: waitForEvent + event: ConnectionCheckOutFailed + count: 1 +events: + - type: ConnectionCheckOutStarted + - type: ConnectionCreated + - type: ConnectionPoolCleared + interruptInUseConnections: true + - type: ConnectionClosed + - type: ConnectionCheckOutFailed +ignore: + - ConnectionCheckedIn + - ConnectionCheckedOut + - ConnectionPoolCreated + - ConnectionPoolReady \ No newline at end of file diff --git a/testdata/connection-monitoring-and-pooling/pool-clear-schedule-run-interruptInUseConnections-false.json b/testdata/connection-monitoring-and-pooling/pool-clear-schedule-run-interruptInUseConnections-false.json new file mode 100644 index 0000000000..3d7536951d --- /dev/null +++ b/testdata/connection-monitoring-and-pooling/pool-clear-schedule-run-interruptInUseConnections-false.json @@ -0,0 +1,81 @@ +{ + "version": 1, + "style": "unit", + "description": "Pool clear SHOULD schedule the next background thread run immediately (interruptInUseConnections = false)", + "poolOptions": { + "backgroundThreadIntervalMS": 10000 + }, + "operations": [ + { + "name": "ready" + }, + { + "name": "checkOut" + }, + { + "name": "checkOut", + "label": "conn" + }, + { + "name": "checkIn", + "connection": "conn" + }, + { + "name": "clear", + "interruptInUseConnections": false + }, + { + "name": "waitForEvent", + "event": "ConnectionPoolCleared", + "count": 1, + "timeout": 1000 + }, + { + "name": "waitForEvent", + "event": "ConnectionClosed", + "count": 1, + "timeout": 1000 + }, + { + "name": "close" + } + ], + "events": [ + { + "type": "ConnectionCheckedOut", + "connectionId": 1, + "address": 42 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 2, + "address": 42 + }, + { + "type": "ConnectionCheckedIn", + "connectionId": 2, + "address": 42 + }, + { + "type": "ConnectionPoolCleared", + "interruptInUseConnections": false + }, + { + "type": "ConnectionClosed", + "connectionId": 2, + "reason": "stale", + "address": 42 + }, + { + "type": "ConnectionPoolClosed", + "address": 42 + } + ], + "ignore": [ + "ConnectionCreated", + "ConnectionPoolReady", + "ConnectionReady", + "ConnectionCheckOutStarted", + "ConnectionPoolCreated" + ] +} diff --git a/testdata/connection-monitoring-and-pooling/pool-clear-schedule-run-interruptInUseConnections-false.yml b/testdata/connection-monitoring-and-pooling/pool-clear-schedule-run-interruptInUseConnections-false.yml new file mode 100644 index 0000000000..e156b691c8 --- /dev/null +++ b/testdata/connection-monitoring-and-pooling/pool-clear-schedule-run-interruptInUseConnections-false.yml @@ -0,0 +1,48 @@ +version: 1 +style: unit +description: Pool clear SHOULD schedule the next background thread run immediately (interruptInUseConnections = false) +poolOptions: + # ensure it's not involved by default + backgroundThreadIntervalMS: 10000 +operations: + - name: ready + - name: checkOut + - name: checkOut + label: conn + - name: checkIn + connection: conn + - name: clear + interruptInUseConnections: false + - name: waitForEvent + event: ConnectionPoolCleared + count: 1 + timeout: 1000 + - name: waitForEvent + event: ConnectionClosed + count: 1 + timeout: 1000 + - name: close +events: + - type: ConnectionCheckedOut + connectionId: 1 + address: 42 + - type: ConnectionCheckedOut + connectionId: 2 + address: 42 + - type: ConnectionCheckedIn + connectionId: 2 + address: 42 + - type: ConnectionPoolCleared + interruptInUseConnections: false + - type: ConnectionClosed + connectionId: 2 + reason: stale + address: 42 + - type: ConnectionPoolClosed + address: 42 +ignore: + - ConnectionCreated + - ConnectionPoolReady + - ConnectionReady + - ConnectionCheckOutStarted + - ConnectionPoolCreated \ No newline at end of file diff --git a/testdata/server-discovery-and-monitoring/unified/interruptInUse-pool-clear.json b/testdata/server-discovery-and-monitoring/unified/interruptInUse-pool-clear.json new file mode 100644 index 0000000000..be01143ca0 --- /dev/null +++ b/testdata/server-discovery-and-monitoring/unified/interruptInUse-pool-clear.json @@ -0,0 +1,591 @@ +{ + "description": "interruptInUse", + "schemaVersion": "1.11", + "runOnRequirements": [ + { + "minServerVersion": "4.9", + "serverless": "forbid", + "topologies": [ + "replicaset", + "sharded" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "setupClient", + "useMultipleMongoses": false + } + } + ], + "initialData": [ + { + "collectionName": "interruptInUse", + "databaseName": "sdam-tests", + "documents": [] + } + ], + "tests": [ + { + "description": "Connection pool clear uses interruptInUseConnections=true after monitor timeout", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "observeEvents": [ + "poolClearedEvent", + "connectionClosedEvent", + "commandStartedEvent", + "commandSucceededEvent", + "commandFailedEvent", + "connectionCheckedOutEvent", + "connectionCheckedInEvent" + ], + "uriOptions": { + "connectTimeoutMS": 500, + "heartbeatFrequencyMS": 500, + "appname": "interruptInUse", + "retryReads": false, + "minPoolSize": 0 + } + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "sdam-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "interruptInUse" + } + }, + { + "thread": { + "id": "thread1" + } + } + ] + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 1 + } + } + }, + { + "name": "runOnThread", + "object": "testRunner", + "arguments": { + "thread": "thread1", + "operation": { + "name": "find", + "object": "collection", + "arguments": { + "filter": { + "$where": "sleep(2000) || true" + } + }, + "expectError": { + "isError": true + } + } + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "setupClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 4 + }, + "data": { + "failCommands": [ + "hello", + "isMaster" + ], + "blockConnection": true, + "blockTimeMS": 1500, + "appName": "interruptInUse" + } + } + } + }, + { + "name": "waitForThread", + "object": "testRunner", + "arguments": { + "thread": "thread1" + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "command", + "events": [ + { + "commandStartedEvent": { + "commandName": "insert" + } + }, + { + "commandSucceededEvent": { + "commandName": "insert" + } + }, + { + "commandStartedEvent": { + "commandName": "find" + } + }, + { + "commandFailedEvent": { + "commandName": "find" + } + } + ] + }, + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + }, + { + "connectionCheckedOutEvent": {} + }, + { + "poolClearedEvent": { + "interruptInUseConnections": true + } + }, + { + "connectionCheckedInEvent": {} + }, + { + "connectionClosedEvent": {} + } + ] + } + ], + "outcome": [ + { + "collectionName": "interruptInUse", + "databaseName": "sdam-tests", + "documents": [ + { + "_id": 1 + } + ] + } + ] + }, + { + "description": "Error returned from connection pool clear with interruptInUseConnections=true is retryable", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "observeEvents": [ + "poolClearedEvent", + "connectionClosedEvent", + "commandStartedEvent", + "commandFailedEvent", + "commandSucceededEvent", + "connectionCheckedOutEvent", + "connectionCheckedInEvent" + ], + "uriOptions": { + "connectTimeoutMS": 500, + "heartbeatFrequencyMS": 500, + "appname": "interruptInUseRetryable", + "retryReads": true, + "minPoolSize": 0 + } + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "sdam-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "interruptInUse" + } + }, + { + "thread": { + "id": "thread1" + } + } + ] + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 1 + } + } + }, + { + "name": "runOnThread", + "object": "testRunner", + "arguments": { + "thread": "thread1", + "operation": { + "name": "find", + "object": "collection", + "arguments": { + "filter": { + "$where": "sleep(2000) || true" + } + } + } + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "setupClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 4 + }, + "data": { + "failCommands": [ + "hello", + "isMaster" + ], + "blockConnection": true, + "blockTimeMS": 1500, + "appName": "interruptInUseRetryable" + } + } + } + }, + { + "name": "waitForThread", + "object": "testRunner", + "arguments": { + "thread": "thread1" + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "command", + "events": [ + { + "commandStartedEvent": { + "commandName": "insert" + } + }, + { + "commandSucceededEvent": { + "commandName": "insert" + } + }, + { + "commandStartedEvent": { + "commandName": "find" + } + }, + { + "commandFailedEvent": { + "commandName": "find" + } + }, + { + "commandStartedEvent": { + "commandName": "find" + } + }, + { + "commandSucceededEvent": { + "commandName": "find" + } + } + ] + }, + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + }, + { + "connectionCheckedOutEvent": {} + }, + { + "poolClearedEvent": { + "interruptInUseConnections": true + } + }, + { + "connectionCheckedInEvent": {} + }, + { + "connectionClosedEvent": {} + }, + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + } + ] + } + ], + "outcome": [ + { + "collectionName": "interruptInUse", + "databaseName": "sdam-tests", + "documents": [ + { + "_id": 1 + } + ] + } + ] + }, + { + "description": "Error returned from connection pool clear with interruptInUseConnections=true is retryable for write", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "observeEvents": [ + "poolClearedEvent", + "connectionClosedEvent", + "commandStartedEvent", + "commandFailedEvent", + "commandSucceededEvent", + "connectionCheckedOutEvent", + "connectionCheckedInEvent" + ], + "uriOptions": { + "connectTimeoutMS": 500, + "heartbeatFrequencyMS": 500, + "appname": "interruptInUseRetryableWrite", + "retryWrites": true, + "minPoolSize": 0 + } + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "sdam-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "interruptInUse" + } + }, + { + "thread": { + "id": "thread1" + } + } + ] + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 1 + } + } + }, + { + "name": "runOnThread", + "object": "testRunner", + "arguments": { + "thread": "thread1", + "operation": { + "name": "updateOne", + "object": "collection", + "arguments": { + "filter": { + "$where": "sleep(2000) || true" + }, + "update": { + "$set": { + "a": "bar" + } + } + } + } + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "setupClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 4 + }, + "data": { + "failCommands": [ + "hello", + "isMaster" + ], + "blockConnection": true, + "blockTimeMS": 1500, + "appName": "interruptInUseRetryableWrite" + } + } + } + }, + { + "name": "waitForThread", + "object": "testRunner", + "arguments": { + "thread": "thread1" + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "command", + "events": [ + { + "commandStartedEvent": { + "commandName": "insert" + } + }, + { + "commandSucceededEvent": { + "commandName": "insert" + } + }, + { + "commandStartedEvent": { + "commandName": "update" + } + }, + { + "commandFailedEvent": { + "commandName": "update" + } + }, + { + "commandStartedEvent": { + "commandName": "update" + } + }, + { + "commandSucceededEvent": { + "commandName": "update" + } + } + ] + }, + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + }, + { + "connectionCheckedOutEvent": {} + }, + { + "poolClearedEvent": { + "interruptInUseConnections": true + } + }, + { + "connectionCheckedInEvent": {} + }, + { + "connectionClosedEvent": {} + }, + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + } + ] + } + ], + "outcome": [ + { + "collectionName": "interruptInUse", + "databaseName": "sdam-tests", + "documents": [ + { + "_id": 1, + "a": "bar" + } + ] + } + ] + } + ] +} \ No newline at end of file diff --git a/testdata/server-discovery-and-monitoring/unified/interruptInUse-pool-clear.yml b/testdata/server-discovery-and-monitoring/unified/interruptInUse-pool-clear.yml new file mode 100644 index 0000000000..5795bc72d6 --- /dev/null +++ b/testdata/server-discovery-and-monitoring/unified/interruptInUse-pool-clear.yml @@ -0,0 +1,340 @@ +--- +description: interruptInUse + +schemaVersion: "1.11" + +runOnRequirements: + # failCommand appName requirements + - minServerVersion: "4.9" + serverless: forbid + topologies: [ replicaset, sharded ] + +createEntities: + - client: + id: &setupClient setupClient + useMultipleMongoses: false + +initialData: &initialData + - collectionName: &collectionName interruptInUse + databaseName: &databaseName sdam-tests + documents: [] + +tests: + - description: Connection pool clear uses interruptInUseConnections=true after monitor timeout + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + useMultipleMongoses: false + observeEvents: + - poolClearedEvent + - connectionClosedEvent + - commandStartedEvent + - commandSucceededEvent + - commandFailedEvent + - connectionCheckedOutEvent + - connectionCheckedInEvent + uriOptions: + connectTimeoutMS: 500 + heartbeatFrequencyMS: 500 + appname: interruptInUse + retryReads: false + minPoolSize: 0 + - database: + id: &database database + client: *client + databaseName: *databaseName + - collection: + id: &collection collection + database: *database + collectionName: *collectionName + - thread: + id: &thread1 thread1 + - name: insertOne + object: *collection + arguments: + document: { _id: 1 } + # simulate a long-running query + - name: runOnThread + object: testRunner + arguments: + thread: *thread1 + operation: + name: find + object: *collection + arguments: + filter: + $where : sleep(2000) || true + expectError: + isError: true + # Configure the monitor check to fail with a timeout. + # Use "times: 4" to increase the probability that the Monitor check triggers + # the failpoint, since the RTT hello may trigger this failpoint one or many + # times as well. + - name: failPoint + object: testRunner + arguments: + client: *setupClient + failPoint: + configureFailPoint: failCommand + mode: + times: 4 + data: + failCommands: + - hello + - isMaster + blockConnection: true + blockTimeMS: 1500 + appName: interruptInUse + - name: waitForThread + object: testRunner + arguments: + thread: *thread1 + + expectEvents: + - client: *client + eventType: command + events: + - commandStartedEvent: + commandName: insert + - commandSucceededEvent: + commandName: insert + - commandStartedEvent: + commandName: find + - commandFailedEvent: + commandName: find + - client: *client + eventType: cmap + events: + - connectionCheckedOutEvent: {} + - connectionCheckedInEvent: {} + - connectionCheckedOutEvent: {} + - poolClearedEvent: + interruptInUseConnections: true + - connectionCheckedInEvent: {} + - connectionClosedEvent: {} + + outcome: + - collectionName: *collectionName + databaseName: *databaseName + documents: + - _id: 1 + + - description: Error returned from connection pool clear with interruptInUseConnections=true is retryable + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + useMultipleMongoses: false + observeEvents: + - poolClearedEvent + - connectionClosedEvent + - commandStartedEvent + - commandFailedEvent + - commandSucceededEvent + - connectionCheckedOutEvent + - connectionCheckedInEvent + uriOptions: + connectTimeoutMS: 500 + heartbeatFrequencyMS: 500 + appname: interruptInUseRetryable + retryReads: true + minPoolSize: 0 + - database: + id: &database database + client: *client + databaseName: *databaseName + - collection: + id: &collection collection + database: *database + collectionName: *collectionName + - thread: + id: &thread1 thread1 + - name: insertOne + object: *collection + arguments: + document: { _id: 1 } + # simulate a long-running query + - name: runOnThread + object: testRunner + arguments: + thread: *thread1 + operation: + name: find + object: *collection + arguments: + filter: + $where : sleep(2000) || true + # Configure the monitor check to fail with a timeout. + # Use "times: 4" to increase the probability that the Monitor check triggers + # the failpoint, since the RTT hello may trigger this failpoint one or many + # times as well. + - name: failPoint + object: testRunner + arguments: + client: *setupClient + failPoint: + configureFailPoint: failCommand + mode: + times: 4 + data: + failCommands: + - hello + - isMaster + blockConnection: true + blockTimeMS: 1500 + appName: interruptInUseRetryable + - name: waitForThread + object: testRunner + arguments: + thread: *thread1 + + expectEvents: + - client: *client + eventType: command + events: + - commandStartedEvent: + commandName: insert + - commandSucceededEvent: + commandName: insert + - commandStartedEvent: + commandName: find + - commandFailedEvent: + commandName: find + - commandStartedEvent: + commandName: find + - commandSucceededEvent: + commandName: find + - client: *client + eventType: cmap + events: + - connectionCheckedOutEvent: {} + - connectionCheckedInEvent: {} + - connectionCheckedOutEvent: {} + - poolClearedEvent: + interruptInUseConnections: true + - connectionCheckedInEvent: {} + - connectionClosedEvent: {} + - connectionCheckedOutEvent: {} + - connectionCheckedInEvent: {} + + outcome: + - collectionName: *collectionName + databaseName: *databaseName + documents: + - _id: 1 + - description: Error returned from connection pool clear with interruptInUseConnections=true is retryable for write + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + useMultipleMongoses: false + observeEvents: + - poolClearedEvent + - connectionClosedEvent + - commandStartedEvent + - commandFailedEvent + - commandSucceededEvent + - connectionCheckedOutEvent + - connectionCheckedInEvent + uriOptions: + connectTimeoutMS: 500 + heartbeatFrequencyMS: 500 + appname: interruptInUseRetryableWrite + retryWrites: true + minPoolSize: 0 + - database: + id: &database database + client: *client + databaseName: *databaseName + - collection: + id: &collection collection + database: *database + collectionName: *collectionName + - thread: + id: &thread1 thread1 + # ensure the primary is discovered + - name: insertOne + object: *collection + arguments: + document: { _id: 1 } + # simulate a long-running query + - name: runOnThread + object: testRunner + arguments: + thread: *thread1 + operation: + name: updateOne + object: *collection + arguments: + filter: + $where: sleep(2000) || true + update: + "$set": { "a": "bar" } + # Configure the monitor check to fail with a timeout. + # Use "times: 4" to increase the probability that the Monitor check triggers + # the failpoint, since the RTT hello may trigger this failpoint one or many + # times as well. + - name: failPoint + object: testRunner + arguments: + client: *setupClient + failPoint: + configureFailPoint: failCommand + mode: + times: 4 + data: + failCommands: + - hello + - isMaster + blockConnection: true + blockTimeMS: 1500 + appName: interruptInUseRetryableWrite + - name: waitForThread + object: testRunner + arguments: + thread: *thread1 + + expectEvents: + - client: *client + eventType: command + events: + - commandStartedEvent: + commandName: insert + - commandSucceededEvent: + commandName: insert + - commandStartedEvent: + commandName: update + - commandFailedEvent: + commandName: update + - commandStartedEvent: + commandName: update + - commandSucceededEvent: + commandName: update + - client: *client + eventType: cmap + events: + - connectionCheckedOutEvent: {} + - connectionCheckedInEvent: {} + - connectionCheckedOutEvent: {} + - poolClearedEvent: + interruptInUseConnections: true + - connectionCheckedInEvent: {} + - connectionClosedEvent: {} + - connectionCheckedOutEvent: {} + - connectionCheckedInEvent: {} + + outcome: + - collectionName: *collectionName + databaseName: *databaseName + documents: + - { _id: 1, a : bar } diff --git a/x/mongo/driver/topology/CMAP_spec_test.go b/x/mongo/driver/topology/CMAP_spec_test.go index d752af71e2..92cbb7cf31 100644 --- a/x/mongo/driver/topology/CMAP_spec_test.go +++ b/x/mongo/driver/topology/CMAP_spec_test.go @@ -9,6 +9,7 @@ package topology import ( "context" "encoding/json" + "fmt" "io/ioutil" "net" "path" @@ -522,7 +523,12 @@ func runOperationInThread(t *testing.T, operation map[string]interface{}, testIn } return c.Close() case "clear": - s.pool.clear(nil, nil) + needInterruption, ok := operation["interruptInUseConnections"].(bool) + if ok && needInterruption { + s.pool.clearAll(fmt.Errorf("spec test clear"), nil) + } else { + s.pool.clear(fmt.Errorf("spec test clear"), nil) + } case "close": s.pool.close(context.Background()) case "ready": diff --git a/x/mongo/driver/topology/connection.go b/x/mongo/driver/topology/connection.go index 88bfc03cdd..13035abc0f 100644 --- a/x/mongo/driver/topology/connection.go +++ b/x/mongo/driver/topology/connection.go @@ -337,7 +337,10 @@ func (c *connection) cancellationListenerCallback() { func (c *connection) writeWireMessage(ctx context.Context, wm []byte) error { var err error if atomic.LoadInt64(&c.state) != connConnected { - return ConnectionError{ConnectionID: c.id, message: "connection is closed"} + return ConnectionError{ + ConnectionID: c.id, + message: "connection is closed", + } } var deadline time.Time @@ -388,7 +391,10 @@ func (c *connection) write(ctx context.Context, wm []byte) (err error) { // readWireMessage reads a wiremessage from the connection. The dst parameter will be overwritten. func (c *connection) readWireMessage(ctx context.Context) ([]byte, error) { if atomic.LoadInt64(&c.state) != connConnected { - return nil, ConnectionError{ConnectionID: c.id, message: "connection is closed"} + return nil, ConnectionError{ + ConnectionID: c.id, + message: "connection is closed", + } } var deadline time.Time diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index 6e150344db..6ca23c071b 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -375,6 +375,13 @@ func (p *pool) close(ctx context.Context) { // Empty the idle connections stack and try to deliver ErrPoolClosed to any waiting wantConns // from idleConnWait while holding the idleMu lock. p.idleMu.Lock() + for _, conn := range p.idleConns { + _ = p.removeConnection(conn, reason{ + loggerConn: logger.ReasonConnClosedPoolClosed, + event: event.ReasonPoolClosed, + }, nil) + _ = p.closeConnection(conn) // We don't care about errors while closing the connection. + } p.idleConns = p.idleConns[:0] for { w := p.idleConnWait.popFront() @@ -402,16 +409,6 @@ func (p *pool) close(ctx context.Context) { } p.createConnectionsCond.L.Unlock() - // Now that we're not holding any locks, remove all of the connections we collected from the - // pool. - for _, conn := range conns { - _ = p.removeConnection(conn, reason{ - loggerConn: logger.ReasonConnClosedPoolClosed, - event: event.ReasonPoolClosed, - }, nil) - _ = p.closeConnection(conn) // We don't care about errors while closing the connection. - } - if mustLogPoolMessage(p) { logPoolMessage(p, logger.ConnectionPoolClosed) } @@ -422,6 +419,16 @@ func (p *pool) close(ctx context.Context) { Address: p.address.String(), }) } + + // Now that we're not holding any locks, remove all of the connections we collected from the + // pool. + for _, conn := range conns { + _ = p.removeConnection(conn, reason{ + loggerConn: logger.ReasonConnClosedPoolClosed, + event: event.ReasonPoolClosed, + }, nil) + _ = p.closeConnection(conn) // We don't care about errors while closing the connection. + } } func (p *pool) pinConnectionToCursor() { @@ -782,20 +789,16 @@ func (p *pool) checkInNoEvent(conn *connection) error { // connection should never be perished due to max idle time. conn.bumpIdleDeadline() - if reason, perished := connectionPerished(conn); perished { - _ = p.removeConnection(conn, reason, nil) - go func() { - _ = p.closeConnection(conn) - }() - return nil - } - - if conn.pool.getState() == poolClosed { - _ = p.removeConnection(conn, reason{ + r, perished := connectionPerished(conn) + if !perished && conn.pool.getState() == poolClosed { + perished = true + r = reason{ loggerConn: logger.ReasonConnClosedPoolClosed, event: event.ReasonPoolClosed, - }, nil) - + } + } + if perished { + _ = p.removeConnection(conn, r, nil) go func() { _ = p.closeConnection(conn) }() @@ -825,12 +828,37 @@ func (p *pool) checkInNoEvent(conn *connection) error { return nil } +// clear calls clearImpl internally with a false interruptAllConnections value. +func (p *pool) clear(err error, serviceID *primitive.ObjectID) { + p.clearImpl(err, serviceID, false) +} + +// clearAll does same as the "clear" method but interrupts all connections. +func (p *pool) clearAll(err error, serviceID *primitive.ObjectID) { + p.clearImpl(err, serviceID, true) +} + +// interruptConnections interrupts the input connections. +func (p *pool) interruptConnections(conns []*connection) { + for _, conn := range conns { + _ = p.removeConnection(conn, reason{ + loggerConn: logger.ReasonConnClosedStale, + event: event.ReasonStale, + }, nil) + go func(c *connection) { + _ = p.closeConnection(c) + }(conn) + } +} + // clear marks all connections as stale by incrementing the generation number, stops all background // goroutines, removes all requests from idleConnWait and newConnWait, and sets the pool state to // "paused". If serviceID is nil, clear marks all connections as stale. If serviceID is not nil, // clear marks only connections associated with the given serviceID stale (for use in load balancer // mode). -func (p *pool) clear(err error, serviceID *primitive.ObjectID) { +// If interruptAllConnections is true, this function calls interruptConnections to interrupt all +// non-idle connections. +func (p *pool) clearImpl(err error, serviceID *primitive.ObjectID, interruptAllConnections bool) { if p.getState() == poolClosed { return } @@ -854,7 +882,51 @@ func (p *pool) clear(err error, serviceID *primitive.ObjectID) { } p.lastClearErr = err p.stateMu.Unlock() + } + + if mustLogPoolMessage(p) { + keysAndValues := logger.KeyValues{ + logger.KeyServiceID, serviceID, + } + + logPoolMessage(p, logger.ConnectionPoolCleared, keysAndValues...) + } + + if sendEvent && p.monitor != nil { + event := &event.PoolEvent{ + Type: event.PoolCleared, + Address: p.address.String(), + ServiceID: serviceID, + Interruption: interruptAllConnections, + Error: err, + } + p.monitor.Event(event) + } + + p.removePerishedConns() + if interruptAllConnections { + p.createConnectionsCond.L.Lock() + p.idleMu.Lock() + + idleConns := make(map[*connection]bool, len(p.idleConns)) + for _, idle := range p.idleConns { + idleConns[idle] = true + } + + conns := make([]*connection, 0, len(p.conns)) + for _, conn := range p.conns { + if _, ok := idleConns[conn]; !ok && p.stale(conn) { + conns = append(conns, conn) + } + } + + p.idleMu.Unlock() + p.createConnectionsCond.L.Unlock() + + p.interruptConnections(conns) + } + if serviceID == nil { pcErr := poolClearedError{err: err, address: p.address} // Clear the idle connections wait queue. @@ -881,23 +953,6 @@ func (p *pool) clear(err error, serviceID *primitive.ObjectID) { } p.createConnectionsCond.L.Unlock() } - - if mustLogPoolMessage(p) { - keysAndValues := logger.KeyValues{ - logger.KeyServiceID, serviceID, - } - - logPoolMessage(p, logger.ConnectionPoolCleared, keysAndValues...) - } - - if sendEvent && p.monitor != nil { - p.monitor.Event(&event.PoolEvent{ - Type: event.PoolCleared, - Address: p.address.String(), - ServiceID: serviceID, - Error: err, - }) - } } // getOrQueueForIdleConn attempts to deliver an idle connection to the given wantConn. If there is diff --git a/x/mongo/driver/topology/pool_generation_counter.go b/x/mongo/driver/topology/pool_generation_counter.go index 47fac2f618..dd10c0ce7a 100644 --- a/x/mongo/driver/topology/pool_generation_counter.go +++ b/x/mongo/driver/topology/pool_generation_counter.go @@ -112,25 +112,21 @@ func (p *poolGenerationMap) stale(serviceIDPtr *primitive.ObjectID, knownGenerat return true } - serviceID := getServiceID(serviceIDPtr) - p.Lock() - defer p.Unlock() - - if stats, ok := p.generationMap[serviceID]; ok { - return knownGeneration < stats.generation + if generation, ok := p.getGeneration(serviceIDPtr); ok { + return knownGeneration < generation } return false } -func (p *poolGenerationMap) getGeneration(serviceIDPtr *primitive.ObjectID) uint64 { +func (p *poolGenerationMap) getGeneration(serviceIDPtr *primitive.ObjectID) (uint64, bool) { serviceID := getServiceID(serviceIDPtr) p.Lock() defer p.Unlock() if stats, ok := p.generationMap[serviceID]; ok { - return stats.generation + return stats.generation, true } - return 0 + return 0, false } func (p *poolGenerationMap) getNumConns(serviceIDPtr *primitive.ObjectID) uint64 { diff --git a/x/mongo/driver/topology/sdam_spec_test.go b/x/mongo/driver/topology/sdam_spec_test.go index ee43a2bd7d..a24c7e3dae 100644 --- a/x/mongo/driver/topology/sdam_spec_test.go +++ b/x/mongo/driver/topology/sdam_spec_test.go @@ -316,7 +316,7 @@ func applyErrors(t *testing.T, topo *Topology, errors []applicationError) { versionRange := description.NewVersionRange(0, *appErr.MaxWireVersion) desc.WireVersion = &versionRange - generation := server.pool.generation.getGeneration(nil) + generation, _ := server.pool.generation.getGeneration(nil) if appErr.Generation != nil { generation = *appErr.Generation } @@ -548,7 +548,7 @@ func runTest(t *testing.T, directory string, filename string) { topo.serversLock.Lock() actualServer := topo.servers[address.Address(addr)] topo.serversLock.Unlock() - actualGeneration := actualServer.pool.generation.getGeneration(nil) + actualGeneration, _ := actualServer.pool.generation.getGeneration(nil) assert.Equal(t, server.Pool.Generation, actualGeneration, "expected server pool generation to be %v, got %v", server.Pool.Generation, actualGeneration) } diff --git a/x/mongo/driver/topology/server.go b/x/mongo/driver/topology/server.go index 751d05de93..f4c6d744aa 100644 --- a/x/mongo/driver/topology/server.go +++ b/x/mongo/driver/topology/server.go @@ -334,7 +334,7 @@ func (s *Server) ProcessHandshakeError(err error, startingGenerationNumber uint6 return } // Ignore the error if the connection is stale. - if startingGenerationNumber < s.pool.generation.getGeneration(serviceID) { + if generation, _ := s.pool.generation.getGeneration(serviceID); startingGenerationNumber < generation { return } @@ -639,7 +639,11 @@ func (s *Server) update() { // Clear the pool once the description has been updated to Unknown. Pass in a nil service ID to clear // because the monitoring routine only runs for non-load balanced deployments in which servers don't return // IDs. - s.pool.clear(err, nil) + if timeoutCnt > 0 { + s.pool.clearAll(err, nil) + } else { + s.pool.clear(err, nil) + } } // We're either not handling a timeout error, or we just handled the 2nd consecutive // timeout error. In either case, reset the timeout count to 0 and return false to diff --git a/x/mongo/driver/topology/server_test.go b/x/mongo/driver/topology/server_test.go index a04f5ed7c0..e23c604156 100644 --- a/x/mongo/driver/topology/server_test.go +++ b/x/mongo/driver/topology/server_test.go @@ -126,11 +126,8 @@ func (d *timeoutDialer) DialContext(ctx context.Context, network, address string return &timeoutConn{c, d.errors}, e } -// TestServerHeartbeatTimeout tests timeout retry for GODRIVER-2577. +// TestServerHeartbeatTimeout tests timeout retry and preemptive canceling. func TestServerHeartbeatTimeout(t *testing.T) { - if testing.Short() { - t.Skip("skipping integration test in short mode") - } if os.Getenv("DOCKER_RUNNING") != "" { t.Skip("Skipping this test in docker.") } @@ -140,19 +137,19 @@ func TestServerHeartbeatTimeout(t *testing.T) { } testCases := []struct { - desc string - ioErrors []error - expectPoolCleared bool + desc string + ioErrors []error + expectInterruptions int }{ { - desc: "one single timeout should not clear the pool", - ioErrors: []error{nil, networkTimeoutError, nil, networkTimeoutError, nil}, - expectPoolCleared: false, + desc: "one single timeout should not clear the pool", + ioErrors: []error{nil, networkTimeoutError, nil, networkTimeoutError, nil}, + expectInterruptions: 0, }, { - desc: "continuous timeouts should clear the pool", - ioErrors: []error{nil, networkTimeoutError, networkTimeoutError, nil}, - expectPoolCleared: true, + desc: "continuous timeouts should clear the pool with interruption", + ioErrors: []error{nil, networkTimeoutError, networkTimeoutError, nil}, + expectInterruptions: 1, }, } for _, tc := range testCases { @@ -198,7 +195,8 @@ func TestServerHeartbeatTimeout(t *testing.T) { ) require.NoError(t, server.Connect(nil)) wg.Wait() - assert.Equal(t, tc.expectPoolCleared, tpm.IsPoolCleared(), "expected pool cleared to be %v but was %v", tc.expectPoolCleared, tpm.IsPoolCleared()) + interruptions := tpm.Interruptions() + assert.Equal(t, tc.expectInterruptions, interruptions, "expected %d interruption but got %d", tc.expectInterruptions, interruptions) }) } } @@ -443,7 +441,7 @@ func TestServer(t *testing.T) { require.NotNil(t, s.Description().LastError) } - generation := s.pool.generation.getGeneration(nil) + generation, _ := s.pool.generation.getGeneration(nil) if (tt.connectionError || tt.networkError) && generation != 1 { t.Errorf("Expected pool to be drained once on connection or network error. got %d; want %d", generation, 1) } @@ -454,20 +452,25 @@ func TestServer(t *testing.T) { assertGenerationStats := func(t *testing.T, server *Server, serviceID primitive.ObjectID, wantGeneration, wantNumConns uint64) { t.Helper() + getGeneration := func(serviceIDPtr *primitive.ObjectID) uint64 { + generation, _ := server.pool.generation.getGeneration(serviceIDPtr) + return generation + } + // On connection failure, the connection is removed and closed after delivering the // error to Connection(), so it may still count toward the generation connection count // briefly. Wait up to 100ms for the generation connection count to reach the target. assert.Eventuallyf(t, func() bool { - generation := server.pool.generation.getGeneration(&serviceID) + generation, _ := server.pool.generation.getGeneration(&serviceID) numConns := server.pool.generation.getNumConns(&serviceID) return generation == wantGeneration && numConns == wantNumConns }, 100*time.Millisecond, - 1*time.Millisecond, + 10*time.Millisecond, "expected generation number %v, got %v; expected connection count %v, got %v", wantGeneration, - server.pool.generation.getGeneration(&serviceID), + getGeneration(&serviceID), wantNumConns, server.pool.generation.getNumConns(&serviceID)) } @@ -1204,9 +1207,10 @@ func TestServer_ProcessError(t *testing.T) { desc, "expected and actual server descriptions are different") + generation, _ := server.pool.generation.getGeneration(nil) assert.Equal(t, tc.wantGeneration, - server.pool.generation.getGeneration(nil), + generation, "expected and actual pool generation are different") }) }