diff --git a/mongo/integration/unified/client_entity.go b/mongo/integration/unified/client_entity.go index ff7d9d5fc3..2c730106ce 100644 --- a/mongo/integration/unified/client_entity.go +++ b/mongo/integration/unified/client_entity.go @@ -568,6 +568,8 @@ func setClientOptionsFromURIOptions(clientOpts *options.ClientOptions, uriOpts b switch strings.ToLower(key) { case "appname": clientOpts.SetAppName(value.(string)) + case "connecttimeoutms": + clientOpts.SetConnectTimeout(time.Duration(value.(int32)) * time.Microsecond) case "heartbeatfrequencyms": clientOpts.SetHeartbeatInterval(time.Duration(value.(int32)) * time.Millisecond) case "loadbalanced": diff --git a/mongo/integration/unified/entity.go b/mongo/integration/unified/entity.go index 06de9b6e78..66e4a6f2de 100644 --- a/mongo/integration/unified/entity.go +++ b/mongo/integration/unified/entity.go @@ -283,6 +283,9 @@ func (em *EntityMap) addEntity(ctx context.Context, entityType string, entityOpt err = em.addCollectionEntity(entityOptions) case "session": err = em.addSessionEntity(entityOptions) + case "thread": + // TODO + break case "bucket": err = em.addGridFSBucketEntity(entityOptions) case "clientEncryption": diff --git a/mongo/integration/unified/testrunner_operation.go b/mongo/integration/unified/testrunner_operation.go index 474c01c88a..9a2cf1b557 100644 --- a/mongo/integration/unified/testrunner_operation.go +++ b/mongo/integration/unified/testrunner_operation.go @@ -182,6 +182,12 @@ func executeTestRunnerOperation(ctx context.Context, operation *operation, loopD } } return nil + case "runOnThread": + // TODO + return nil + case "waitForThread": + // TODO + return nil case "waitForEvent": var wfeArgs waitForEventArguments if err := bson.Unmarshal(operation.Arguments, &wfeArgs); err != nil { diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index a8531afc78..212dc2010e 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -384,6 +384,13 @@ func (p *pool) close(ctx context.Context) { // Empty the idle connections stack and try to deliver ErrPoolClosed to any waiting wantConns // from idleConnWait while holding the idleMu lock. p.idleMu.Lock() + for _, conn := range p.idleConns { + _ = p.removeConnection(conn, reason{ + loggerConn: logger.ReasonConnClosedPoolClosed, + event: event.ReasonPoolClosed, + }, nil) + _ = p.closeConnection(conn) // We don't care about errors while closing the connection. + } p.idleConns = p.idleConns[:0] for { w := p.idleConnWait.popFront() @@ -411,16 +418,6 @@ func (p *pool) close(ctx context.Context) { } p.createConnectionsCond.L.Unlock() - // Now that we're not holding any locks, remove all of the connections we collected from the - // pool. - for _, conn := range conns { - _ = p.removeConnection(conn, reason{ - loggerConn: logger.ReasonConnClosedPoolClosed, - event: event.ReasonPoolClosed, - }, nil) - _ = p.closeConnection(conn) // We don't care about errors while closing the connection. - } - if mustLogPoolMessage(p) { logPoolMessage(p, logger.ConnectionPoolClosed) } @@ -431,6 +428,16 @@ func (p *pool) close(ctx context.Context) { Address: p.address.String(), }) } + + // Now that we're not holding any locks, remove all of the connections we collected from the + // pool. + for _, conn := range conns { + _ = p.removeConnection(conn, reason{ + loggerConn: logger.ReasonConnClosedPoolClosed, + event: event.ReasonPoolClosed, + }, nil) + _ = p.closeConnection(conn) // We don't care about errors while closing the connection. + } } func (p *pool) pinConnectionToCursor() { @@ -861,7 +868,7 @@ func (p *pool) interruptInUseConnections() { conn.wait() // Make sure that the connection has finished connecting. } - conn.closeWithErr(poolClearedError{ + _ = conn.closeWithErr(poolClearedError{ err: fmt.Errorf("interrupted"), address: p.address, }) @@ -949,6 +956,8 @@ func (p *pool) clear(err error, serviceID *primitive.ObjectID, cleaningupFns ... fn() } } + + p.removePerishedConns() } // getOrQueueForIdleConn attempts to deliver an idle connection to the given wantConn. If there is