Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
qingyang-hu committed Nov 30, 2023
1 parent b4352fc commit d938fed
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 40 deletions.
2 changes: 1 addition & 1 deletion mongo/integration/unified/client_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func setClientOptionsFromURIOptions(clientOpts *options.ClientOptions, uriOpts b
case "appname":
clientOpts.SetAppName(value.(string))
case "connecttimeoutms":
clientOpts.SetConnectTimeout(time.Duration(value.(int32)) * time.Microsecond)
clientOpts.SetConnectTimeout(time.Duration(value.(int32)) * time.Millisecond)
case "heartbeatfrequencyms":
clientOpts.SetHeartbeatInterval(time.Duration(value.(int32)) * time.Millisecond)
case "loadbalanced":
Expand Down
6 changes: 3 additions & 3 deletions mongo/integration/unified/testrunner_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func executeTestRunnerOperation(ctx context.Context, op *operation, loopDone <-c
}
threadOp := new(operation)
if err := operationRaw.Unmarshal(threadOp); err != nil {
return fmt.Errorf("error unmarshalling 'operation' argument: %v", err)
return fmt.Errorf("error unmarshaling 'operation' argument: %v", err)
}
ch := entities(ctx).waitChans[lookupString(args, "thread")]
go func(op *operation) {
Expand All @@ -205,8 +205,8 @@ func executeTestRunnerOperation(ctx context.Context, op *operation, loopDone <-c
case "waitForThread":
if ch, ok := entities(ctx).waitChans[lookupString(args, "thread")]; ok {
select {
case <-ch:
return nil
case err := <-ch:
return err
case <-time.After(10 * time.Second):
return fmt.Errorf("timed out after 10 seconds")
}
Expand Down
3 changes: 2 additions & 1 deletion mongo/integration/unified/unified_spec_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ var (
"Write commands with snapshot session do not affect snapshot reads": "Test fails frequently. See GODRIVER-2843",
// TODO(GODRIVER-2943): Fix and unskip this test case.
"Topology lifecycle": "Test times out. See GODRIVER-2943",

// The current logic, which was implemented with GODRIVER-2577, only clears pools and cancels in-progress ops if
// the heartbeat fails twice. Therefore, we skip the following spec tests, which requires canceling ops immediately.
"Connection pool clear uses interruptInUseConnections=true after monitor timeout": "Godriver clears after multiple timeout",
"Error returned from connection pool clear with interruptInUseConnections=true is retryable": "Godriver clears after multiple timeout",
"Error returned from connection pool clear with interruptInUseConnections=true is retryable for write": "Godriver clears after multiple timeout",
Expand Down
72 changes: 37 additions & 35 deletions x/mongo/driver/topology/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,42 +863,44 @@ func (p *pool) checkInWithCallback(conn *connection, cb func() (reason, bool)) e

// clearAll does same as the "clear" method and interrupts all in-use connections as well.
func (p *pool) clearAll(err error, serviceID *primitive.ObjectID) {
p.clearImpl(err, serviceID, func() {
for _, conn := range p.conns {
if conn.inUse && p.stale(conn) {
_ = conn.closeWithErr(poolClearedError{
err: fmt.Errorf("interrupted"),
address: p.address,
})
_ = p.checkInWithCallback(conn, func() (reason, bool) {
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDriverConnectionID, conn.driverConnectionID,
}
p.clearImpl(err, serviceID, true)
}

logPoolMessage(p, logger.ConnectionCheckedIn, keysAndValues...)
func (p *pool) interruptInUseConnections() {
for _, conn := range p.conns {
if conn.inUse && p.stale(conn) {
_ = conn.closeWithErr(poolClearedError{
err: fmt.Errorf("interrupted"),
address: p.address,
})
_ = p.checkInWithCallback(conn, func() (reason, bool) {
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDriverConnectionID, conn.driverConnectionID,
}

if p.monitor != nil {
p.monitor.Event(&event.PoolEvent{
Type: event.ConnectionCheckedIn,
ConnectionID: conn.driverConnectionID,
Address: conn.addr.String(),
})
}
logPoolMessage(p, logger.ConnectionCheckedIn, keysAndValues...)
}

if p.monitor != nil {
p.monitor.Event(&event.PoolEvent{
Type: event.ConnectionCheckedIn,
ConnectionID: conn.driverConnectionID,
Address: conn.addr.String(),
})
}

r, ok := connectionPerished(conn)
if ok {
r = reason{
loggerConn: logger.ReasonConnClosedStale,
event: event.ReasonStale,
}
r, ok := connectionPerished(conn)
if ok {
r = reason{
loggerConn: logger.ReasonConnClosedStale,
event: event.ReasonStale,
}
return r, ok
})
}
}
return r, ok
})
}
})
}
}

// clear marks all connections as stale by incrementing the generation number, stops all background
Expand All @@ -907,10 +909,10 @@ func (p *pool) clearAll(err error, serviceID *primitive.ObjectID) {
// clear marks only connections associated with the given serviceID stale (for use in load balancer
// mode).
func (p *pool) clear(err error, serviceID *primitive.ObjectID) {
p.clearImpl(err, serviceID, nil)
p.clearImpl(err, serviceID, false)
}

func (p *pool) clearImpl(err error, serviceID *primitive.ObjectID, interruptionCallback func()) {
func (p *pool) clearImpl(err error, serviceID *primitive.ObjectID, interruptInUseConnections bool) {
if p.getState() == poolClosed {
return
}
Expand Down Expand Up @@ -951,15 +953,15 @@ func (p *pool) clearImpl(err error, serviceID *primitive.ObjectID, interruptionC
ServiceID: serviceID,
Error: err,
}
if interruptionCallback != nil {
if interruptInUseConnections {
event.Interruption = true
}
p.monitor.Event(event)
}

p.removePerishedConns()
if interruptionCallback != nil {
interruptionCallback()
if interruptInUseConnections {
p.interruptInUseConnections()
}

if serviceID == nil {
Expand Down

0 comments on commit d938fed

Please sign in to comment.