Skip to content

Commit

Permalink
GODRIVER-2966Remove unnecessary server heartbeat events.
Browse files Browse the repository at this point in the history
  • Loading branch information
qingyang-hu committed Sep 6, 2023
1 parent 8ca1f20 commit 21f9d2d
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 28 deletions.
9 changes: 0 additions & 9 deletions x/mongo/driver/topology/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,19 +801,10 @@ func (s *Server) check() (description.Server, error) {
if s.conn == nil || s.conn.closed() || s.checkWasCancelled() {
// Create a new connection and add it's handshake RTT as a sample.
err = s.setupHeartbeatConnection()
duration = time.Since(start)
if err == nil {
// Use the description from the connection handshake as the value for this check.
s.rttMonitor.addSample(s.conn.helloRTT)
descPtr = &s.conn.desc
if s.conn != nil {
s.publishServerHeartbeatSucceededEvent(s.conn.ID(), duration, s.conn.desc, false)
}
} else {
err = unwrapConnectionError(err)
if s.conn != nil {
s.publishServerHeartbeatFailedEvent(s.conn.ID(), duration, err, false)
}
}
} else {
// An existing connection is being used. Use the server description properties to execute the right heartbeat.
Expand Down
63 changes: 44 additions & 19 deletions x/mongo/driver/topology/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ type errorQueue struct {
mutex sync.Mutex
}

func (eq *errorQueue) head() error {
func (eq *errorQueue) head() (error, int) {
eq.mutex.Lock()
defer eq.mutex.Unlock()
if len(eq.errors) > 0 {
return eq.errors[0]
if l := len(eq.errors); l > 0 {
return eq.errors[0], l
}
return nil
return nil, 0
}

func (eq *errorQueue) dequeue() bool {
Expand All @@ -78,18 +78,27 @@ func (eq *errorQueue) dequeue() bool {
type timeoutConn struct {
net.Conn
errors *errorQueue
ch chan int
}

func (c *timeoutConn) Read(b []byte) (int, error) {
n, err := 0, c.errors.head()
var n int
err, l := c.errors.head()
defer func(l int) {
c.ch <- l
}(l)
if err == nil {
n, err = c.Conn.Read(b)
}
return n, err
}

func (c *timeoutConn) Write(b []byte) (int, error) {
n, err := 0, c.errors.head()
var n int
err, l := c.errors.head()
defer func(l int) {
c.ch <- l
}(l)
if err == nil {
n, err = c.Conn.Write(b)
}
Expand All @@ -99,6 +108,7 @@ func (c *timeoutConn) Write(b []byte) (int, error) {
type timeoutDialer struct {
Dialer
errors *errorQueue
ch chan int
}

func (d *timeoutDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
Expand All @@ -121,7 +131,7 @@ func (d *timeoutDialer) DialContext(ctx context.Context, network, address string
}
c = tls.Client(c, config)
}
return &timeoutConn{c, d.errors}, e
return &timeoutConn{c, d.errors, d.ch}, e
}

// TestServerHeartbeatTimeout tests timeout retry for GODRIVER-2577.
Expand All @@ -137,16 +147,19 @@ func TestServerHeartbeatTimeout(t *testing.T) {
testCases := []struct {
desc string
ioErrors []error
len int
expectPoolCleared bool
}{
{
desc: "one single timeout should not clear the pool",
ioErrors: []error{nil, networkTimeoutError, nil, networkTimeoutError, nil},
len: 0,
expectPoolCleared: false,
},
{
desc: "continuous timeouts should clear the pool",
ioErrors: []error{nil, networkTimeoutError, networkTimeoutError, nil},
ioErrors: []error{nil, networkTimeoutError, networkTimeoutError},
len: 1,
expectPoolCleared: true,
},
}
Expand All @@ -155,8 +168,9 @@ func TestServerHeartbeatTimeout(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
t.Parallel()

var wg sync.WaitGroup
wg.Add(1)
const heartbeatInterval = 200 * time.Millisecond

c := make(chan int)

errors := &errorQueue{errors: tc.ioErrors}
tpm := eventtest.NewTestPoolMonitor()
Expand All @@ -170,29 +184,40 @@ func TestServerHeartbeatTimeout(t *testing.T) {
return append(opts,
WithDialer(func(d Dialer) Dialer {
var dialer net.Dialer
return &timeoutDialer{&dialer, errors}
return &timeoutDialer{&dialer, errors, c}
}))
}),
WithServerMonitor(func(*event.ServerMonitor) *event.ServerMonitor {
return &event.ServerMonitor{
ServerHeartbeatSucceeded: func(e *event.ServerHeartbeatSucceededEvent) {
if !errors.dequeue() {
wg.Done()
}
errors.dequeue()
},
ServerHeartbeatFailed: func(e *event.ServerHeartbeatFailedEvent) {
if !errors.dequeue() {
wg.Done()
}
errors.dequeue()
},
}
}),
WithHeartbeatInterval(func(time.Duration) time.Duration {
return 200 * time.Millisecond
return heartbeatInterval
}),
)
require.NoError(t, server.Connect(nil))
wg.Wait()

timeout := time.After(10 * heartbeatInterval)
var l int
loop:
for {
select {
case l = <-c:
if l == 0 || tpm.IsPoolCleared() {
break loop
}
case <-timeout:
assert.Fail(t, "timeout")
break loop
}
}
assert.Equal(t, tc.len, l, "pool has been cleared unexpectedly")
assert.Equal(t, tc.expectPoolCleared, tpm.IsPoolCleared(), "expected pool cleared to be %v but was %v", tc.expectPoolCleared, tpm.IsPoolCleared())
})
}
Expand Down

0 comments on commit 21f9d2d

Please sign in to comment.