diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index fa9b3d6907e..b404d28f875 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -231,6 +231,7 @@ Flags: --mysql_default_workload string Default session workload (OLTP, OLAP, DBA) (default "OLTP") --mysql_port int mysql port (default 3306) --mysql_server_bind_address string Binds on this address when listening to MySQL binary protocol. Useful to restrict listening to 'localhost' only for instance. + --mysql_server_drain_onterm If set, the server waits for --onterm_timeout for connected clients to drain --mysql_server_flush_delay duration Delay after which buffered response will be flushed to the client. (default 100ms) --mysql_server_port int If set, also listen for MySQL binary protocol connections on this port. (default -1) --mysql_server_query_timeout duration mysql query timeout diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index 16f261194ca..68899cecd56 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -141,6 +141,7 @@ Flags: --mysql_ldap_auth_config_string string JSON representation of LDAP server config. --mysql_ldap_auth_method string client-side authentication method to use. Supported values: mysql_clear_password, dialog. (default "mysql_clear_password") --mysql_server_bind_address string Binds on this address when listening to MySQL binary protocol. Useful to restrict listening to 'localhost' only for instance. + --mysql_server_drain_onterm If set, the server waits for --onterm_timeout for connected clients to drain --mysql_server_flush_delay duration Delay after which buffered response will be flushed to the client. (default 100ms) --mysql_server_port int If set, also listen for MySQL binary protocol connections on this port. (default -1) --mysql_server_query_timeout duration mysql query timeout diff --git a/go/test/endtoend/cluster/vtgate_process.go b/go/test/endtoend/cluster/vtgate_process.go index d1877fb89bb..5143e9ad8a4 100644 --- a/go/test/endtoend/cluster/vtgate_process.go +++ b/go/test/endtoend/cluster/vtgate_process.go @@ -237,6 +237,19 @@ func (vtgate *VtgateProcess) WaitForStatusOfTabletInShard(name string, endPoints return fmt.Errorf("wait for %s failed", name) } +// IsShutdown checks if the vtgate process is shutdown +func (vtgate *VtgateProcess) IsShutdown() bool { + return !vtgate.WaitForStatus() +} + +// Terminate sends a SIGTERM to vtgate +func (vtgate *VtgateProcess) Terminate() error { + if vtgate.proc == nil { + return nil + } + return vtgate.proc.Process.Signal(syscall.SIGTERM) +} + // TearDown shuts down the running vtgate service func (vtgate *VtgateProcess) TearDown() error { if vtgate.proc == nil || vtgate.exit == nil { diff --git a/go/test/endtoend/vtgate/connectiondrain/main_test.go b/go/test/endtoend/vtgate/connectiondrain/main_test.go new file mode 100644 index 00000000000..32807b22bde --- /dev/null +++ b/go/test/endtoend/vtgate/connectiondrain/main_test.go @@ -0,0 +1,187 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package connectiondrain + +import ( + "context" + _ "embed" + "flag" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/utils" +) + +var ( + keyspaceName = "ks" + cell = "zone-1" + + //go:embed schema.sql + schemaSQL string +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + os.Exit(m.Run()) +} + +func setupCluster(t *testing.T) (*cluster.LocalProcessCluster, mysql.ConnParams) { + clusterInstance := cluster.NewCluster(cell, "localhost") + + // Start topo server + err := clusterInstance.StartTopo() + require.NoError(t, err) + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: schemaSQL, + } + err = clusterInstance.StartUnshardedKeyspace(*keyspace, 0, false) + require.NoError(t, err) + + // Start vtgate + clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, "--mysql_server_drain_onterm", "--onterm_timeout", "30s") + err = clusterInstance.StartVtgate() + require.NoError(t, err) + + vtParams := clusterInstance.GetVTParams(keyspaceName) + return clusterInstance, vtParams +} + +func start(t *testing.T, vtParams mysql.ConnParams) (*mysql.Conn, func()) { + vtConn, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + + deleteAll := func() { + _, _ = utils.ExecAllowError(t, vtConn, "set workload = oltp") + + tables := []string{"t1"} + for _, table := range tables { + _, _ = utils.ExecAllowError(t, vtConn, "delete from "+table) + } + } + + deleteAll() + + return vtConn, func() { + deleteAll() + vtConn.Close() + cluster.PanicHandler(t) + } +} + +func TestConnectionDrainCloseConnections(t *testing.T) { + clusterInstance, vtParams := setupCluster(t) + defer clusterInstance.Teardown() + + vtConn, closer := start(t, vtParams) + defer closer() + + // Create a second connection, this connection will be used to create a transaction. + vtConn2, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + + // Start the transaction with the second connection + _, err = vtConn2.ExecuteFetch("BEGIN", 1, false) + require.NoError(t, err) + _, err = vtConn2.ExecuteFetch("select id1 from t1", 1, false) + require.NoError(t, err) + + _, err = vtConn.ExecuteFetch("select id1 from t1", 1, false) + require.NoError(t, err) + + // Tearing down vtgate here, from there on vtConn should still be able to conclude in-flight transaction and + // execute queries with idle connections. However, no new connections are allowed. + err = clusterInstance.VtgateProcess.Terminate() + require.NoError(t, err) + + // Give enough time to vtgate to receive and start processing the SIGTERM signal + time.Sleep(2 * time.Second) + + // Create a third connection, this connection should not be allowed. + // Set a connection timeout to 1s otherwise the connection will take forever + // and eventually vtgate will reach the --onterm_timeout. + vtParams.ConnectTimeoutMs = 1000 + defer func() { + vtParams.ConnectTimeoutMs = 0 + }() + _, err = mysql.Connect(context.Background(), &vtParams) + require.Error(t, err) + + // Idle connections should be allowed to execute queries until they are drained + _, err = vtConn.ExecuteFetch("select id1 from t1", 1, false) + require.NoError(t, err) + + // Finish the transaction + _, err = vtConn2.ExecuteFetch("select id1 from t1", 1, false) + require.NoError(t, err) + _, err = vtConn2.ExecuteFetch("COMMIT", 1, false) + require.NoError(t, err) + vtConn2.Close() + + // vtgate should still be running + require.False(t, clusterInstance.VtgateProcess.IsShutdown()) + + // This connection should still be allowed + _, err = vtConn.ExecuteFetch("select id1 from t1", 1, false) + require.NoError(t, err) + vtConn.Close() + + // Give enough time for vtgate to finish all the onterm hooks without reaching the 30s of --onterm_timeout + time.Sleep(10 * time.Second) + + // By now the vtgate should have shutdown on its own and without reaching --onterm_timeout + require.True(t, clusterInstance.VtgateProcess.IsShutdown()) +} + +func TestConnectionDrainOnTermTimeout(t *testing.T) { + clusterInstance, vtParams := setupCluster(t) + defer clusterInstance.Teardown() + + // Connect to vtgate again, this should work + vtConn, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + vtConn2, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + + defer func() { + vtConn.Close() + vtConn2.Close() + }() + + // Tearing down vtgate here, we want to reach the onterm_timeout of 30s + err = clusterInstance.VtgateProcess.Terminate() + require.NoError(t, err) + + // Run a busy query that returns only after the onterm_timeout is reached, this should fail when we reach the timeout + _, err = vtConn.ExecuteFetch("select sleep(40)", 1, false) + require.Error(t, err) + + // Running a query after we have reached the onterm_timeout should fail + _, err = vtConn2.ExecuteFetch("select id from t1", 1, false) + require.Error(t, err) + + // By now vtgate will be shutdown becaused it reached its onterm_timeout, despite idle connections still being opened + require.True(t, clusterInstance.VtgateProcess.IsShutdown()) +} diff --git a/go/test/endtoend/vtgate/connectiondrain/schema.sql b/go/test/endtoend/vtgate/connectiondrain/schema.sql new file mode 100644 index 00000000000..45db74d062f --- /dev/null +++ b/go/test/endtoend/vtgate/connectiondrain/schema.sql @@ -0,0 +1,5 @@ +create table t1( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; diff --git a/go/vt/servenv/run.go b/go/vt/servenv/run.go index 29b15a40008..cef81e87a99 100644 --- a/go/vt/servenv/run.go +++ b/go/vt/servenv/run.go @@ -59,7 +59,6 @@ func Run(bindAddress string, port int) { signal.Notify(ExitChan, syscall.SIGTERM, syscall.SIGINT) // Wait for signal <-ExitChan - l.Close() startTime := time.Now() log.Infof("Entering lameduck mode for at least %v", timeouts.LameduckPeriod) @@ -71,6 +70,7 @@ func Run(bindAddress string, port int) { log.Infof("Sleeping an extra %v after OnTermSync to finish lameduck period", remain) time.Sleep(remain) } + l.Close() log.Info("Shutting down gracefully") fireOnCloseHooks(timeouts.OnCloseTimeout) diff --git a/go/vt/vtgate/plugin_mysql_server.go b/go/vt/vtgate/plugin_mysql_server.go index 175f4b2cc8f..55954013862 100644 --- a/go/vt/vtgate/plugin_mysql_server.go +++ b/go/vt/vtgate/plugin_mysql_server.go @@ -75,6 +75,7 @@ var ( mysqlDefaultWorkloadName = "OLTP" mysqlDefaultWorkload int32 + mysqlDrainOnTerm bool mysqlServerFlushDelay = 100 * time.Millisecond ) @@ -102,6 +103,7 @@ func registerPluginFlags(fs *pflag.FlagSet) { fs.DurationVar(&mysqlKeepAlivePeriod, "mysql-server-keepalive-period", mysqlKeepAlivePeriod, "TCP period between keep-alives") fs.DurationVar(&mysqlServerFlushDelay, "mysql_server_flush_delay", mysqlServerFlushDelay, "Delay after which buffered response will be flushed to the client.") fs.StringVar(&mysqlDefaultWorkloadName, "mysql_default_workload", mysqlDefaultWorkloadName, "Default session workload (OLTP, OLAP, DBA)") + fs.BoolVar(&mysqlDrainOnTerm, "mysql_server_drain_onterm", mysqlDrainOnTerm, "If set, the server waits for --onterm_timeout for connected clients to drain") } // vtgateHandler implements the Listener interface. @@ -621,18 +623,34 @@ func newMysqlUnixSocket(address string, authServer mysql.AuthServer, handler mys } func (srv *mysqlServer) shutdownMysqlProtocolAndDrain() { - if srv.tcpListener != nil { - srv.tcpListener.Shutdown() - srv.tcpListener = nil + if srv.sigChan != nil { + signal.Stop(srv.sigChan) } - if srv.unixListener != nil { - srv.unixListener.Shutdown() + setListenerToNil := func() { + srv.tcpListener = nil srv.unixListener = nil } - if srv.sigChan != nil { - signal.Stop(srv.sigChan) + + if mysqlDrainOnTerm { + stopListener(srv.unixListener, false) + stopListener(srv.tcpListener, false) + setListenerToNil() + // We wait for connected clients to drain by themselves or to run into the onterm timeout + log.Infof("Starting drain loop, waiting for all clients to disconnect") + reported := time.Now() + for srv.vtgateHandle.numConnections() > 0 { + if time.Since(reported) > 2*time.Second { + log.Infof("Still waiting for client connections to drain (%d connected)...", srv.vtgateHandle.numConnections()) + reported = time.Now() + } + time.Sleep(1000 * time.Millisecond) + } + return } + stopListener(srv.unixListener, true) + stopListener(srv.tcpListener, true) + setListenerToNil() if busy := srv.vtgateHandle.busyConnections.Load(); busy > 0 { log.Infof("Waiting for all client connections to be idle (%d active)...", busy) start := time.Now() @@ -649,6 +667,18 @@ func (srv *mysqlServer) shutdownMysqlProtocolAndDrain() { } } +// stopListener Close or Shutdown a mysql listener depending on the shutdown argument. +func stopListener(listener *mysql.Listener, shutdown bool) { + if listener == nil { + return + } + if shutdown { + listener.Shutdown() + } else { + listener.Close() + } +} + func (srv *mysqlServer) rollbackAtShutdown() { defer log.Flush() if srv.vtgateHandle == nil { diff --git a/test/config.json b/test/config.json index 713faf97024..8b48ccc3ec0 100644 --- a/test/config.json +++ b/test/config.json @@ -500,6 +500,15 @@ "RetryMax": 2, "Tags": [] }, + "vtgate_connectiondrain": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/connectiondrain"], + "Command": [], + "Manual": false, + "Shard": "vtgate_general_heavy", + "RetryMax": 2, + "Tags": [] + }, "vtgate_queries_derived": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/queries/derived"],