Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
qingyang-hu committed Nov 6, 2023
1 parent 9619648 commit 5118fa1
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 11 deletions.
2 changes: 2 additions & 0 deletions mongo/integration/unified/client_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,8 @@ func setClientOptionsFromURIOptions(clientOpts *options.ClientOptions, uriOpts b
switch strings.ToLower(key) {
case "appname":
clientOpts.SetAppName(value.(string))
case "connecttimeoutms":
clientOpts.SetConnectTimeout(time.Duration(value.(int32)) * time.Microsecond)
case "heartbeatfrequencyms":
clientOpts.SetHeartbeatInterval(time.Duration(value.(int32)) * time.Millisecond)
case "loadbalanced":
Expand Down
3 changes: 3 additions & 0 deletions mongo/integration/unified/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ func (em *EntityMap) addEntity(ctx context.Context, entityType string, entityOpt
err = em.addCollectionEntity(entityOptions)
case "session":
err = em.addSessionEntity(entityOptions)
case "thread":
// TODO
break
case "bucket":
err = em.addGridFSBucketEntity(entityOptions)
case "clientEncryption":
Expand Down
6 changes: 6 additions & 0 deletions mongo/integration/unified/testrunner_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ func executeTestRunnerOperation(ctx context.Context, operation *operation, loopD
}
}
return nil
case "runOnThread":
// TODO
return nil
case "waitForThread":
// TODO
return nil
case "waitForEvent":
var wfeArgs waitForEventArguments
if err := bson.Unmarshal(operation.Arguments, &wfeArgs); err != nil {
Expand Down
31 changes: 20 additions & 11 deletions x/mongo/driver/topology/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,13 @@ func (p *pool) close(ctx context.Context) {
// Empty the idle connections stack and try to deliver ErrPoolClosed to any waiting wantConns
// from idleConnWait while holding the idleMu lock.
p.idleMu.Lock()
for _, conn := range p.idleConns {
_ = p.removeConnection(conn, reason{
loggerConn: logger.ReasonConnClosedPoolClosed,
event: event.ReasonPoolClosed,
}, nil)
_ = p.closeConnection(conn) // We don't care about errors while closing the connection.
}
p.idleConns = p.idleConns[:0]
for {
w := p.idleConnWait.popFront()
Expand Down Expand Up @@ -411,16 +418,6 @@ func (p *pool) close(ctx context.Context) {
}
p.createConnectionsCond.L.Unlock()

// Now that we're not holding any locks, remove all of the connections we collected from the
// pool.
for _, conn := range conns {
_ = p.removeConnection(conn, reason{
loggerConn: logger.ReasonConnClosedPoolClosed,
event: event.ReasonPoolClosed,
}, nil)
_ = p.closeConnection(conn) // We don't care about errors while closing the connection.
}

if mustLogPoolMessage(p) {
logPoolMessage(p, logger.ConnectionPoolClosed)
}
Expand All @@ -431,6 +428,16 @@ func (p *pool) close(ctx context.Context) {
Address: p.address.String(),
})
}

// Now that we're not holding any locks, remove all of the connections we collected from the
// pool.
for _, conn := range conns {
_ = p.removeConnection(conn, reason{
loggerConn: logger.ReasonConnClosedPoolClosed,
event: event.ReasonPoolClosed,
}, nil)
_ = p.closeConnection(conn) // We don't care about errors while closing the connection.
}
}

func (p *pool) pinConnectionToCursor() {
Expand Down Expand Up @@ -861,7 +868,7 @@ func (p *pool) interruptInUseConnections() {
conn.wait() // Make sure that the connection has finished connecting.
}

conn.closeWithErr(poolClearedError{
_ = conn.closeWithErr(poolClearedError{
err: fmt.Errorf("interrupted"),
address: p.address,
})
Expand Down Expand Up @@ -949,6 +956,8 @@ func (p *pool) clear(err error, serviceID *primitive.ObjectID, cleaningupFns ...
fn()
}
}

p.removePerishedConns()
}

// getOrQueueForIdleConn attempts to deliver an idle connection to the given wantConn. If there is
Expand Down

0 comments on commit 5118fa1

Please sign in to comment.