diff --git a/x/mongo/driver/topology/rtt_monitor.go b/x/mongo/driver/topology/rtt_monitor.go index 54b37de048..21eafd18f2 100644 --- a/x/mongo/driver/topology/rtt_monitor.go +++ b/x/mongo/driver/topology/rtt_monitor.go @@ -54,7 +54,6 @@ type rttMonitor struct { cfg *rttConfig ctx context.Context cancelFn context.CancelFunc - started bool } var _ driver.RTTMonitor = &rttMonitor{} @@ -78,7 +77,6 @@ func (r *rttMonitor) connect() { r.connMu.Lock() defer r.connMu.Unlock() - r.started = true r.closeWg.Add(1) go func() { @@ -92,10 +90,6 @@ func (r *rttMonitor) disconnect() { r.connMu.Lock() defer r.connMu.Unlock() - if !r.started { - return - } - r.cancelFn() // Wait for the existing connection to complete. diff --git a/x/mongo/driver/topology/server.go b/x/mongo/driver/topology/server.go index 653dee2d9b..bfb0c3ca76 100644 --- a/x/mongo/driver/topology/server.go +++ b/x/mongo/driver/topology/server.go @@ -125,6 +125,7 @@ type Server struct { processErrorLock sync.Mutex rttMonitor *rttMonitor + monitorOnce sync.Once } // updateTopologyCallback is a callback used to create a server that should be called when the parent Topology instance @@ -285,10 +286,10 @@ func (s *Server) Disconnect(ctx context.Context) error { close(s.done) s.cancelCheck() - s.rttMonitor.disconnect() s.pool.close(ctx) s.closewg.Wait() + s.rttMonitor.disconnect() atomic.StoreInt64(&s.state, serverDisconnected) return nil @@ -666,8 +667,8 @@ func (s *Server) update() { transitionedFromNetworkError := desc.LastError != nil && unwrapConnectionError(desc.LastError) != nil && previousDescription.Kind != description.Unknown - if isStreamingEnabled(s) && isStreamable(s) && !s.rttMonitor.started { - s.rttMonitor.connect() + if isStreamingEnabled(s) && isStreamable(s) { + s.monitorOnce.Do(s.rttMonitor.connect) } if isStreamable(s) && (serverSupportsStreaming || connectionIsStreaming) || transitionedFromNetworkError {