Skip to content

Commit

Permalink
feat(pool): add check for badConnection
Browse files Browse the repository at this point in the history
* fix: badConn check(#2053)

* fix: internalpool test

* fix: sentinel test

* fix: conncheck ut

* fix: remove maxBadConnRetries

* fix: add connCheck.deadline check

Signed-off-by: monkey92t <golang@88.com>
  • Loading branch information
naiqianz authored Mar 20, 2022
1 parent f5fbb36 commit a8a7665
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 8 deletions.
49 changes: 49 additions & 0 deletions internal/pool/conn_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// +build linux darwin dragonfly freebsd netbsd openbsd solaris illumos

package pool

import (
"errors"
"io"
"net"
"syscall"
"time"
)

var errUnexpectedRead = errors.New("unexpected read from socket")

func connCheck(conn net.Conn) error {
// Reset previous timeout.
_ = conn.SetDeadline(time.Time{})

sysConn, ok := conn.(syscall.Conn)
if !ok {
return nil
}
rawConn, err := sysConn.SyscallConn()
if err != nil {
return err
}

var sysErr error
err = rawConn.Read(func(fd uintptr) bool {
var buf [1]byte
n, err := syscall.Read(int(fd), buf[:])
switch {
case n == 0 && err == nil:
sysErr = io.EOF
case n > 0:
sysErr = errUnexpectedRead
case err == syscall.EAGAIN || err == syscall.EWOULDBLOCK:
sysErr = nil
default:
sysErr = err
}
return true
})
if err != nil {
return err
}

return sysErr
}
9 changes: 9 additions & 0 deletions internal/pool/conn_check_dummy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// +build !linux,!darwin,!dragonfly,!freebsd,!netbsd,!openbsd,!solaris,!illumos

package pool

import "net"

func connCheck(conn net.Conn) error {
return nil
}
48 changes: 48 additions & 0 deletions internal/pool/conn_check_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//go:build linux || darwin || dragonfly || freebsd || netbsd || openbsd || solaris || illumos
// +build linux darwin dragonfly freebsd netbsd openbsd solaris illumos

package pool

import (
"net"
"net/http/httptest"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

var _ = Describe("tests conn_check with real conns", func() {
var ts *httptest.Server
var conn net.Conn
var err error

BeforeEach(func() {
ts = httptest.NewServer(nil)
conn, err = net.DialTimeout(ts.Listener.Addr().Network(), ts.Listener.Addr().String(), time.Second)
Expect(err).NotTo(HaveOccurred())
})

AfterEach(func() {
ts.Close()
})

It("good conn check", func() {
Expect(connCheck(conn)).NotTo(HaveOccurred())

Expect(conn.Close()).NotTo(HaveOccurred())
Expect(connCheck(conn)).To(HaveOccurred())
})

It("bad conn check", func() {
Expect(conn.Close()).NotTo(HaveOccurred())
Expect(connCheck(conn)).To(HaveOccurred())
})

It("check conn deadline", func() {
Expect(conn.SetDeadline(time.Now())).NotTo(HaveOccurred())
time.Sleep(time.Millisecond * 10)
Expect(connCheck(conn)).NotTo(HaveOccurred())
Expect(conn.Close()).NotTo(HaveOccurred())
})
})
5 changes: 5 additions & 0 deletions internal/pool/export_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package pool

import (
"net"
"time"
)

func (cn *Conn) SetCreatedAt(tm time.Time) {
cn.createdAt = tm
}

func (cn *Conn) NetConn() net.Conn {
return cn.netConn
}
87 changes: 86 additions & 1 deletion internal/pool/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package pool_test

import (
"context"
"fmt"
"net"
"sync"
"syscall"
"testing"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -32,5 +35,87 @@ func perform(n int, cbs ...func(int)) {
}

func dummyDialer(context.Context) (net.Conn, error) {
return &net.TCPConn{}, nil
// return &net.TCPConn{}, nil
return newDummyConn(), nil
}

func newDummyConn() net.Conn {
return &dummyConn{
rawConn: &dummyRawConn{},
}
}

var _ net.Conn = (*dummyConn)(nil)
var _ syscall.Conn = (*dummyConn)(nil)

type dummyConn struct {
rawConn *dummyRawConn
}

func (d *dummyConn) SyscallConn() (syscall.RawConn, error) {
return d.rawConn, nil
}

var errDummy = fmt.Errorf("dummyConn err")

func (d *dummyConn) Read(b []byte) (n int, err error) {
return 0, errDummy
}

func (d *dummyConn) Write(b []byte) (n int, err error) {
return 0, errDummy
}

func (d *dummyConn) Close() error {
d.rawConn.Close()
return nil
}

func (d *dummyConn) LocalAddr() net.Addr {
return &net.TCPAddr{}
}

func (d *dummyConn) RemoteAddr() net.Addr {
return &net.TCPAddr{}
}

func (d *dummyConn) SetDeadline(t time.Time) error {
return nil
}

func (d *dummyConn) SetReadDeadline(t time.Time) error {
return nil
}

func (d *dummyConn) SetWriteDeadline(t time.Time) error {
return nil
}

var _ syscall.RawConn = (*dummyRawConn)(nil)

type dummyRawConn struct {
closed bool
mux sync.Mutex
}

func (d *dummyRawConn) Control(f func(fd uintptr)) error {
return nil
}

func (d *dummyRawConn) Read(f func(fd uintptr) (done bool)) error {
d.mux.Lock()
defer d.mux.Unlock()
if d.closed {
return fmt.Errorf("dummyRawConn closed")
}
return nil
}

func (d *dummyRawConn) Write(f func(fd uintptr) (done bool)) error {
return nil
}
func (d *dummyRawConn) Close() {
d.mux.Lock()
d.closed = true
d.mux.Unlock()
}
4 changes: 2 additions & 2 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ func (p *ConnPool) reapStaleConn() *Conn {

func (p *ConnPool) isStaleConn(cn *Conn) bool {
if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 {
return false
return connCheck(cn.netConn) != nil
}

now := time.Now()
Expand All @@ -553,5 +553,5 @@ func (p *ConnPool) isStaleConn(cn *Conn) bool {
return true
}

return false
return connCheck(cn.netConn) != nil
}
3 changes: 3 additions & 0 deletions internal/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ var _ = Describe("conns reaper", func() {
cn.SetUsedAt(time.Now().Add(-2 * idleTimeout))
case "aged":
cn.SetCreatedAt(time.Now().Add(-2 * maxAge))
case "connCheck":
_ = cn.Close()
}
conns = append(conns, cn)
staleConns = append(staleConns, cn)
Expand Down Expand Up @@ -409,6 +411,7 @@ var _ = Describe("conns reaper", func() {

assert("idle")
assert("aged")
assert("connCheck")
})

var _ = Describe("race", func() {
Expand Down
2 changes: 1 addition & 1 deletion pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ var _ = Describe("pool", func() {
client.Pool().Put(ctx, cn)

err = client.Ping(ctx).Err()
Expect(err).To(MatchError("bad connection"))
Expect(err).NotTo(HaveOccurred())

val, err := client.Ping(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expand Down
2 changes: 1 addition & 1 deletion sentinel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ var _ = Describe("NewFailoverClusterClient", func() {
err = master.Shutdown(ctx).Err()
Expect(err).NotTo(HaveOccurred())
Eventually(func() error {
return sentinelMaster.Ping(ctx).Err()
return master.Ping(ctx).Err()
}, "15s", "100ms").Should(HaveOccurred())

// Check that client picked up new master.
Expand Down
3 changes: 0 additions & 3 deletions tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,6 @@ var _ = Describe("Tx", func() {
return err
}

err = do()
Expect(err).To(MatchError("bad connection"))

err = do()
Expect(err).NotTo(HaveOccurred())
})
Expand Down

0 comments on commit a8a7665

Please sign in to comment.