Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connpool: Allow time out during shutdown #15979

Merged
merged 8 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 71 additions & 20 deletions go/pools/smartconnpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ var (

// ErrCtxTimeout is returned if a ctx is already expired by the time the connection pool is used
ErrCtxTimeout = vterrors.New(vtrpcpb.Code_DEADLINE_EXCEEDED, "resource pool context already expired")

// ErrShutdown is returned when trying to get a connection from a closed conn pool
vmg marked this conversation as resolved.
Show resolved Hide resolved
ErrConnPoolClosed = vterrors.New(vtrpcpb.Code_INTERNAL, "connection pool is closed")
vmg marked this conversation as resolved.
Show resolved Hide resolved

// PoolCloseTimeout is how long to wait for all connections to be returned to the pool during close
PoolCloseTimeout = 10 * time.Second
)
vmg marked this conversation as resolved.
Show resolved Hide resolved

type Metrics struct {
Expand Down Expand Up @@ -119,8 +125,9 @@ type ConnPool[C Connection] struct {
capacity atomic.Int64

// workers is a waitgroup for all the currently running worker goroutines
workers sync.WaitGroup
close chan struct{}
workers sync.WaitGroup
close chan struct{}
capacityMu sync.Mutex

config struct {
// connect is the callback to create a new connection for the pool
Expand All @@ -142,6 +149,7 @@ type ConnPool[C Connection] struct {
}

Metrics Metrics
Name string
}

// NewPool creates a new connection pool with the given Config.
Expand Down Expand Up @@ -236,29 +244,54 @@ func (pool *ConnPool[C]) Open(connect Connector[C], refresh RefreshCheck) *ConnP

// Close shuts down the pool. No connections will be returned from ConnPool.Get after calling this,
// but calling ConnPool.Put is still allowed. This function will not return until all of the pool's
// connections have been returned.
// connections have been returned or the default PoolCloseTimeout has elapsed
func (pool *ConnPool[C]) Close() {
if pool.close == nil {
ctx, cancel := context.WithTimeout(context.Background(), PoolCloseTimeout)
defer cancel()

if err := pool.CloseWithContext(ctx); err != nil {
log.Errorf("failed to close pool %q: %v", pool.Name, err)
}
}

// CloseWithContext behaves like Close but allows passing in a Context to time out the
// pool closing operation
func (pool *ConnPool[C]) CloseWithContext(ctx context.Context) error {
pool.capacityMu.Lock()
defer pool.capacityMu.Unlock()

if pool.close == nil || pool.capacity.Load() == 0 {
// already closed
return
return nil
}

pool.SetCapacity(0)
err := pool.setCapacity(ctx, 0)
vmg marked this conversation as resolved.
Show resolved Hide resolved

close(pool.close)
pool.workers.Wait()
pool.close = nil
return err
}

func (pool *ConnPool[C]) reopen() {
pool.capacityMu.Lock()
defer pool.capacityMu.Unlock()

capacity := pool.capacity.Load()
if capacity == 0 {
return
}

pool.Close()
pool.open()
pool.SetCapacity(capacity)
ctx, cancel := context.WithTimeout(context.Background(), PoolCloseTimeout)
defer cancel()

if err := pool.setCapacity(ctx, 0); err != nil {
log.Errorf("failed to reopen pool %q: %v", pool.Name, err)
}
vmg marked this conversation as resolved.
Show resolved Hide resolved

// the second call to setCapacity cannot fail because it's only increasing the number
// of connections and doesn't need to shut down any
_ = pool.setCapacity(ctx, capacity)
}

// IsOpen returns whether the pool is open
Expand Down Expand Up @@ -322,7 +355,7 @@ func (pool *ConnPool[C]) Get(ctx context.Context, setting *Setting) (*Pooled[C],
return nil, ErrCtxTimeout
}
if pool.capacity.Load() == 0 {
return nil, ErrTimeout
return nil, ErrConnPoolClosed
}
if setting == nil {
return pool.get(ctx)
Expand Down Expand Up @@ -571,39 +604,55 @@ func (pool *ConnPool[C]) getWithSetting(ctx context.Context, setting *Setting) (
// If the capacity is smaller than the number of connections that there are
// currently open, we'll close enough connections before returning, even if
// that means waiting for clients to return connections to the pool.
func (pool *ConnPool[C]) SetCapacity(newcap int64) {
// If the given context times out before we've managed to close enough connections
// an error will be returned.
func (pool *ConnPool[C]) SetCapacity(ctx context.Context, newcap int64) error {
pool.capacityMu.Lock()
defer pool.capacityMu.Unlock()
return pool.setCapacity(ctx, newcap)
}

// setCapacity is the internal implementation for SetCapacity; it must be called
// with pool.capacityMu being held
func (pool *ConnPool[C]) setCapacity(ctx context.Context, newcap int64) error {
if newcap < 0 {
panic("negative capacity")
}

oldcap := pool.capacity.Swap(newcap)
if oldcap == newcap {
return
return nil
}

backoff := 1 * time.Millisecond
const delay = 10 * time.Millisecond

// close connections until we're under capacity
for pool.active.Load() > newcap {
if err := ctx.Err(); err != nil {
return vterrors.Errorf(vtrpcpb.Code_ABORTED,
"timed out while waiting for connections to be returned to the pool (capacity=%d, active=%d, borrowed=%d)",
pool.capacity.Load(), pool.active.Load(), pool.borrowed.Load())
}
// if we're closing down the pool, make sure there's no clients waiting
// for connections because they won't be returned in the future
if newcap == 0 {
pool.wait.expire(true)
}

// try closing from connections which are currently idle in the stacks
conn := pool.getFromSettingsStack(nil)
if conn == nil {
conn, _ = pool.clean.Pop()
}
if conn == nil {
time.Sleep(backoff)
backoff += 1 * time.Millisecond
time.Sleep(delay)
continue
}
conn.Close()
pool.closedConn()
}

// if we're closing down the pool, wake up any blocked waiters because no connections
// are going to be returned in the future
if newcap == 0 {
pool.wait.expire(true)
}
return nil
}

func (pool *ConnPool[C]) closeIdleResources(now time.Time) {
Expand Down Expand Up @@ -659,6 +708,8 @@ func (pool *ConnPool[C]) RegisterStats(stats *servenv.Exporter, name string) {
return
}

pool.Name = name

stats.NewGaugeFunc(name+"Capacity", "Tablet server conn pool capacity", func() int64 {
return pool.Capacity()
})
Expand Down
68 changes: 60 additions & 8 deletions go/pools/smartconnpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,15 @@ func TestOpen(t *testing.T) {
assert.EqualValues(t, 6, state.lastID.Load())

// SetCapacity
p.SetCapacity(3)
err = p.SetCapacity(ctx, 3)
require.NoError(t, err)
assert.EqualValues(t, 3, state.open.Load())
assert.EqualValues(t, 6, state.lastID.Load())
assert.EqualValues(t, 3, p.Capacity())
assert.EqualValues(t, 3, p.Available())

p.SetCapacity(6)
err = p.SetCapacity(ctx, 6)
require.NoError(t, err)
assert.EqualValues(t, 6, p.Capacity())
assert.EqualValues(t, 6, p.Available())

Expand Down Expand Up @@ -265,7 +267,9 @@ func TestShrinking(t *testing.T) {
}
done := make(chan bool)
go func() {
p.SetCapacity(3)
err := p.SetCapacity(ctx, 3)
require.NoError(t, err)

done <- true
}()
expected := map[string]any{
Expand Down Expand Up @@ -335,7 +339,8 @@ func TestShrinking(t *testing.T) {

// This will also wait
go func() {
p.SetCapacity(2)
err := p.SetCapacity(ctx, 2)
require.NoError(t, err)
done <- true
}()
time.Sleep(10 * time.Millisecond)
Expand All @@ -353,7 +358,8 @@ func TestShrinking(t *testing.T) {
assert.EqualValues(t, 2, state.open.Load())

// Test race condition of SetCapacity with itself
p.SetCapacity(3)
err = p.SetCapacity(ctx, 3)
require.NoError(t, err)
for i := 0; i < 3; i++ {
var r *Pooled[*TestConn]
var err error
Expand All @@ -375,9 +381,15 @@ func TestShrinking(t *testing.T) {
time.Sleep(10 * time.Millisecond)

// This will wait till we Put
go p.SetCapacity(2)
go func() {
err := p.SetCapacity(ctx, 2)
require.NoError(t, err)
}()
time.Sleep(10 * time.Millisecond)
go p.SetCapacity(4)
go func() {
err := p.SetCapacity(ctx, 4)
require.NoError(t, err)
}()
time.Sleep(10 * time.Millisecond)

// This should not hang
Expand All @@ -387,7 +399,7 @@ func TestShrinking(t *testing.T) {
<-done

assert.Panics(t, func() {
p.SetCapacity(-1)
_ = p.SetCapacity(ctx, -1)
})

assert.EqualValues(t, 4, p.Capacity())
Expand Down Expand Up @@ -530,6 +542,46 @@ func TestReopen(t *testing.T) {
assert.EqualValues(t, 0, state.open.Load())
}

func TestUserClosing(t *testing.T) {
var state TestState

ctx := context.Background()
p := NewPool(&Config[*TestConn]{
Capacity: 5,
IdleTimeout: time.Second,
LogWait: state.LogWait,
}).Open(newConnector(&state), nil)

var resources [5]*Pooled[*TestConn]
for i := 0; i < 5; i++ {
var err error
resources[i], err = p.Get(ctx, nil)
require.NoError(t, err)
}

for _, r := range resources[:4] {
r.Recycle()
}

ch := make(chan error)
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

err := p.CloseWithContext(ctx)
ch <- err
close(ch)
}()

select {
case <-time.After(5 * time.Second):
t.Fatalf("Pool did not shutdown after 5s")
case err := <-ch:
require.Error(t, err)
t.Logf("Shutdown error: %v", err)
}
}

func TestIdleTimeout(t *testing.T) {
testTimeout := func(t *testing.T, setting *Setting) {
var state TestState
Expand Down
6 changes: 0 additions & 6 deletions go/vt/vttablet/tabletserver/connpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,9 @@ import (
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// ErrConnPoolClosed is returned when the connection pool is closed.
var ErrConnPoolClosed = vterrors.New(vtrpcpb.Code_INTERNAL, "internal error: unexpected: conn pool is closed")

const (
getWithoutS = "GetWithoutSettings"
getWithS = "GetWithSettings"
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/connpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ func TestConnPoolSetCapacity(t *testing.T) {
defer connPool.Close()

assert.Panics(t, func() {
connPool.SetCapacity(-10)
_ = connPool.SetCapacity(context.Background(), -10)
})
connPool.SetCapacity(10)
_ = connPool.SetCapacity(context.Background(), 10)
vmg marked this conversation as resolved.
Show resolved Hide resolved
if connPool.Capacity() != 10 {
t.Fatalf("capacity should be 10")
}
Expand Down
20 changes: 2 additions & 18 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,15 +780,7 @@ func (qre *QueryExecutor) getConn() (*connpool.PooledConn, error) {
defer func(start time.Time) {
qre.logStats.WaitingForConnection += time.Since(start)
}(time.Now())
conn, err := qre.tsv.qe.conns.Get(ctx, qre.setting)

switch err {
case nil:
return conn, nil
case connpool.ErrConnPoolClosed:
return nil, err
}
return nil, err
return qre.tsv.qe.conns.Get(ctx, qre.setting)
}

func (qre *QueryExecutor) getStreamConn() (*connpool.PooledConn, error) {
Expand All @@ -798,15 +790,7 @@ func (qre *QueryExecutor) getStreamConn() (*connpool.PooledConn, error) {
defer func(start time.Time) {
qre.logStats.WaitingForConnection += time.Since(start)
}(time.Now())
conn, err := qre.tsv.qe.streamConns.Get(ctx, qre.setting)

switch err {
case nil:
return conn, nil
case connpool.ErrConnPoolClosed:
return nil, err
}
return nil, err
return qre.tsv.qe.streamConns.Get(ctx, qre.setting)
}

// txFetch fetches from a TxConnection.
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2000,7 +2000,7 @@ func (tsv *TabletServer) SetPoolSize(val int) {
if val <= 0 {
return
}
tsv.qe.conns.SetCapacity(int64(val))
_ = tsv.qe.conns.SetCapacity(context.Background(), int64(val))
vmg marked this conversation as resolved.
Show resolved Hide resolved
}

// PoolSize returns the pool size.
Expand All @@ -2010,7 +2010,7 @@ func (tsv *TabletServer) PoolSize() int {

// SetStreamPoolSize changes the pool size to the specified value.
func (tsv *TabletServer) SetStreamPoolSize(val int) {
tsv.qe.streamConns.SetCapacity(int64(val))
_ = tsv.qe.streamConns.SetCapacity(context.Background(), int64(val))
}

// SetStreamConsolidationBlocking sets whether the stream consolidator should wait for slow clients
Expand All @@ -2025,7 +2025,7 @@ func (tsv *TabletServer) StreamPoolSize() int {

// SetTxPoolSize changes the tx pool size to the specified value.
func (tsv *TabletServer) SetTxPoolSize(val int) {
tsv.te.txPool.scp.conns.SetCapacity(int64(val))
_ = tsv.te.txPool.scp.conns.SetCapacity(context.Background(), int64(val))
}

// TxPoolSize returns the tx pool size.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/tx_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func primeTxPoolWithConnection(t *testing.T, ctx context.Context) (*fakesqldb.DB
db := fakesqldb.New(t)
txPool, _ := newTxPool()
// Set the capacity to 1 to ensure that the db connection is reused.
txPool.scp.conns.SetCapacity(1)
_ = txPool.scp.conns.SetCapacity(context.Background(), 1)
vmg marked this conversation as resolved.
Show resolved Hide resolved
params := dbconfigs.New(db.ConnParams())
txPool.Open(params, params, params)

Expand Down
Loading