Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new mysql connection drain #16298

Merged
merged 10 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ Flags:
--mysql_default_workload string Default session workload (OLTP, OLAP, DBA) (default "OLTP")
--mysql_port int mysql port (default 3306)
--mysql_server_bind_address string Binds on this address when listening to MySQL binary protocol. Useful to restrict listening to 'localhost' only for instance.
--mysql_server_drain_onterm If set, the server waits for --onterm_timeout for connected clients to drain
--mysql_server_flush_delay duration Delay after which buffered response will be flushed to the client. (default 100ms)
--mysql_server_port int If set, also listen for MySQL binary protocol connections on this port. (default -1)
--mysql_server_query_timeout duration mysql query timeout
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ Flags:
--mysql_ldap_auth_config_string string JSON representation of LDAP server config.
--mysql_ldap_auth_method string client-side authentication method to use. Supported values: mysql_clear_password, dialog. (default "mysql_clear_password")
--mysql_server_bind_address string Binds on this address when listening to MySQL binary protocol. Useful to restrict listening to 'localhost' only for instance.
--mysql_server_drain_onterm If set, the server waits for --onterm_timeout for connected clients to drain
--mysql_server_flush_delay duration Delay after which buffered response will be flushed to the client. (default 100ms)
--mysql_server_port int If set, also listen for MySQL binary protocol connections on this port. (default -1)
--mysql_server_query_timeout duration mysql query timeout
Expand Down
13 changes: 13 additions & 0 deletions go/test/endtoend/cluster/vtgate_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,19 @@ func (vtgate *VtgateProcess) WaitForStatusOfTabletInShard(name string, endPoints
return fmt.Errorf("wait for %s failed", name)
}

// IsShutdown checks if the vtgate process is shutdown
func (vtgate *VtgateProcess) IsShutdown() bool {
return !vtgate.WaitForStatus()
}

// Terminate sends a SIGTERM to vtgate
func (vtgate *VtgateProcess) Terminate() error {
if vtgate.proc == nil {
return nil
}
return vtgate.proc.Process.Signal(syscall.SIGTERM)
}

// TearDown shuts down the running vtgate service
func (vtgate *VtgateProcess) TearDown() error {
if vtgate.proc == nil || vtgate.exit == nil {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/servenv/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func Run(bindAddress string, port int) {
signal.Notify(ExitChan, syscall.SIGTERM, syscall.SIGINT)
// Wait for signal
<-ExitChan
l.Close()

startTime := time.Now()
log.Infof("Entering lameduck mode for at least %v", timeouts.LameduckPeriod)
Expand All @@ -71,6 +70,7 @@ func Run(bindAddress string, port int) {
log.Infof("Sleeping an extra %v after OnTermSync to finish lameduck period", remain)
time.Sleep(remain)
}
l.Close()

log.Info("Shutting down gracefully")
fireOnCloseHooks(timeouts.OnCloseTimeout)
Expand Down
199 changes: 199 additions & 0 deletions go/vt/vtgate/endtoend/connectiondrain/main_test.go
frouioui marked this conversation as resolved.
Show resolved Hide resolved
frouioui marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
Copyright 2024 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package connectiondrain

import (
"context"
_ "embed"
"flag"
"os"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
)

var (
keyspaceName = "ks"
cell = "zone-1"

//go:embed schema.sql
schemaSQL string
)

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()
os.Exit(m.Run())
}

func setupCluster(t *testing.T) (*cluster.LocalProcessCluster, mysql.ConnParams) {
clusterInstance := cluster.NewCluster(cell, "localhost")

// Start topo server
err := clusterInstance.StartTopo()
require.NoError(t, err)

// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: schemaSQL,
}
err = clusterInstance.StartUnshardedKeyspace(*keyspace, 0, false)
require.NoError(t, err)

// Start vtgate
clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, "--mysql_server_drain_onterm", "--onterm_timeout", "30s")
err = clusterInstance.StartVtgate()
require.NoError(t, err)

vtParams := clusterInstance.GetVTParams(keyspaceName)
return clusterInstance, vtParams
}

func start(t *testing.T, vtParams mysql.ConnParams) (*mysql.Conn, func()) {
vtConn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)

deleteAll := func() {
_, _ = utils.ExecAllowError(t, vtConn, "set workload = oltp")

tables := []string{"t1"}
for _, table := range tables {
_, _ = utils.ExecAllowError(t, vtConn, "delete from "+table)
}
}

deleteAll()

return vtConn, func() {
deleteAll()
vtConn.Close()
cluster.PanicHandler(t)
}
}

func TestConnectionDrainCloseConnections(t *testing.T) {
clusterInstance, vtParams := setupCluster(t)
defer clusterInstance.Teardown()

vtConn, closer := start(t, vtParams)
defer closer()

// Create a second connection, this connection will be used to create a transaction.
vtConn2, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)

// Start the transaction with the second connection
_, err = vtConn2.ExecuteFetch("BEGIN", 1, false)
require.NoError(t, err)
_, err = vtConn2.ExecuteFetch("select id1 from t1", 1, false)
require.NoError(t, err)

_, err = vtConn.ExecuteFetch("select id1 from t1", 1, false)
require.NoError(t, err)

// Tearing down vtgate here, from there on vtConn should still be able to conclude in-flight transaction and
// execute queries with idle connections. However, no new connections are allowed.
err = clusterInstance.VtgateProcess.Terminate()
require.NoError(t, err)

// Give enough time to vtgate to receive and start processing the SIGTERM signal
time.Sleep(2 * time.Second)

// Create a third connection, this connection should not be allowed.
// Set a connection timeout to 1s otherwise the connection will take forever
// and eventually vtgate will reach the --onterm_timeout.
vtParams.ConnectTimeoutMs = 1000
defer func() {
vtParams.ConnectTimeoutMs = 0
}()
_, err = mysql.Connect(context.Background(), &vtParams)
require.Error(t, err)

// Idle connections should be allowed to execute queries until they are drained
_, err = vtConn.ExecuteFetch("select id1 from t1", 1, false)
require.NoError(t, err)

// Finish the transaction
_, err = vtConn2.ExecuteFetch("select id1 from t1", 1, false)
require.NoError(t, err)
_, err = vtConn2.ExecuteFetch("COMMIT", 1, false)
require.NoError(t, err)
vtConn2.Close()

// vtgate should still be running
require.False(t, clusterInstance.VtgateProcess.IsShutdown())

// This connection should still be allowed
_, err = vtConn.ExecuteFetch("select id1 from t1", 1, false)
require.NoError(t, err)
vtConn.Close()

// Give enough time for vtgate to finish all the onterm hooks without reaching the 30s of --onterm_timeout
time.Sleep(10 * time.Second)

// By now the vtgate should have shutdown on its own and without reaching --onterm_timeout
require.True(t, clusterInstance.VtgateProcess.IsShutdown())
}

func TestConnectionDrainOnTermTimeout(t *testing.T) {
clusterInstance, vtParams := setupCluster(t)
defer clusterInstance.Teardown()

// Connect to vtgate again, this should work
vtConn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)
vtConn2, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)

defer func() {
vtConn.Close()
vtConn2.Close()
}()

// Tearing down vtgate here, we want to reach the onterm_timeout of 30s
err = clusterInstance.VtgateProcess.Terminate()
require.NoError(t, err)

// Run a busy query that returns only after the onterm_timeout is reached, this should fail when we reach the timeout
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
var err error
_, err = vtConn.ExecuteFetch("select sleep(40)", 1, false)
require.Error(t, err)
}()

// Sleeping 40 seconds here is already plenty of time, and we will for sure reach the onterm_timeout of 30s
time.Sleep(40 * time.Second)

// Running a query after we have reached the onterm_timeout should fail
_, err = vtConn2.ExecuteFetch("select id from t1", 1, false)
require.Error(t, err)

wg.Wait()
frouioui marked this conversation as resolved.
Show resolved Hide resolved

// By now vtgate will be shutdown becaused it reached its onterm_timeout, despite idle connections still being opened
require.True(t, clusterInstance.VtgateProcess.IsShutdown())
}
5 changes: 5 additions & 0 deletions go/vt/vtgate/endtoend/connectiondrain/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
create table t1(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;
41 changes: 33 additions & 8 deletions go/vt/vtgate/plugin_mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ var (

mysqlDefaultWorkloadName = "OLTP"
mysqlDefaultWorkload int32
mysqlDrainOnTerm bool

mysqlServerFlushDelay = 100 * time.Millisecond
)
Expand Down Expand Up @@ -102,6 +103,7 @@ func registerPluginFlags(fs *pflag.FlagSet) {
fs.DurationVar(&mysqlKeepAlivePeriod, "mysql-server-keepalive-period", mysqlKeepAlivePeriod, "TCP period between keep-alives")
fs.DurationVar(&mysqlServerFlushDelay, "mysql_server_flush_delay", mysqlServerFlushDelay, "Delay after which buffered response will be flushed to the client.")
fs.StringVar(&mysqlDefaultWorkloadName, "mysql_default_workload", mysqlDefaultWorkloadName, "Default session workload (OLTP, OLAP, DBA)")
fs.BoolVar(&mysqlDrainOnTerm, "mysql_server_drain_onterm", mysqlDrainOnTerm, "If set, the server waits for --onterm_timeout for connected clients to drain")
}

// vtgateHandler implements the Listener interface.
Expand Down Expand Up @@ -621,18 +623,28 @@ func newMysqlUnixSocket(address string, authServer mysql.AuthServer, handler mys
}

func (srv *mysqlServer) shutdownMysqlProtocolAndDrain() {
if srv.tcpListener != nil {
srv.tcpListener.Shutdown()
srv.tcpListener = nil
}
if srv.unixListener != nil {
srv.unixListener.Shutdown()
srv.unixListener = nil
}
if srv.sigChan != nil {
signal.Stop(srv.sigChan)
}

if mysqlDrainOnTerm {
stopListener(srv.unixListener, false)
stopListener(srv.tcpListener, false)
frouioui marked this conversation as resolved.
Show resolved Hide resolved
// We wait for connected clients to drain by themselves or to run into the onterm timeout
log.Infof("Starting drain loop, waiting for all clients to disconnect")
reported := time.Now()
for srv.vtgateHandle.numConnections() > 0 {
if time.Since(reported) > 2*time.Second {
log.Infof("Still waiting for client connections to drain (%d connected)...", srv.vtgateHandle.numConnections())
reported = time.Now()
}
time.Sleep(1000 * time.Millisecond)
}
return
}

stopListener(srv.unixListener, true)
stopListener(srv.tcpListener, true)
if busy := srv.vtgateHandle.busyConnections.Load(); busy > 0 {
log.Infof("Waiting for all client connections to be idle (%d active)...", busy)
start := time.Now()
Expand All @@ -649,6 +661,19 @@ func (srv *mysqlServer) shutdownMysqlProtocolAndDrain() {
}
}

// stopListener Close or Shutdown a mysql listener depending on the shutdown argument.
func stopListener(listener *mysql.Listener, shutdown bool) {
if listener == nil {
return
}
if shutdown {
listener.Shutdown()
} else {
listener.Close()
}
Comment on lines +675 to +679
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listener.Shutdown() calls listener. Close(). Any reason for doing this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listener.Shutdown() also set the .shutdown field to true and then close. We only want to close the listener here to stop accepting new connection but let the idle/current connection active. Shutting down will fail any idle connection on the next query.

listener = nil
frouioui marked this conversation as resolved.
Show resolved Hide resolved
}

func (srv *mysqlServer) rollbackAtShutdown() {
defer log.Flush()
if srv.vtgateHandle == nil {
Expand Down
Loading