From 25e476e5321ef1432ad0a3458312cba1cc364e8b Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Wed, 15 May 2024 11:34:03 -0600 Subject: [PATCH] Add VTGate drain flag Signed-off-by: Florent Poinsard --- go/flags/endtoend/vtgate.txt | 1 + go/vt/servenv/run.go | 2 +- go/vt/vtgate/plugin_mysql_server.go | 22 ++++++++++++++++++++++ 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index 7d0b3272cc8..f3082589590 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -141,6 +141,7 @@ Flags: --mysql_ldap_auth_config_string string JSON representation of LDAP server config. --mysql_ldap_auth_method string client-side authentication method to use. Supported values: mysql_clear_password, dialog. (default "mysql_clear_password") --mysql_server_bind_address string Binds on this address when listening to MySQL binary protocol. Useful to restrict listening to 'localhost' only for instance. + --mysql_server_drain_onterm If set, the server waits for --onterm_timeout for connected clients to drain --mysql_server_flush_delay duration Delay after which buffered response will be flushed to the client. (default 100ms) --mysql_server_port int If set, also listen for MySQL binary protocol connections on this port. (default -1) --mysql_server_query_timeout duration mysql query timeout diff --git a/go/vt/servenv/run.go b/go/vt/servenv/run.go index 29b15a40008..cef81e87a99 100644 --- a/go/vt/servenv/run.go +++ b/go/vt/servenv/run.go @@ -59,7 +59,6 @@ func Run(bindAddress string, port int) { signal.Notify(ExitChan, syscall.SIGTERM, syscall.SIGINT) // Wait for signal <-ExitChan - l.Close() startTime := time.Now() log.Infof("Entering lameduck mode for at least %v", timeouts.LameduckPeriod) @@ -71,6 +70,7 @@ func Run(bindAddress string, port int) { log.Infof("Sleeping an extra %v after OnTermSync to finish lameduck period", remain) time.Sleep(remain) } + l.Close() log.Info("Shutting down gracefully") fireOnCloseHooks(timeouts.OnCloseTimeout) diff --git a/go/vt/vtgate/plugin_mysql_server.go b/go/vt/vtgate/plugin_mysql_server.go index 175f4b2cc8f..5d9cbfee260 100644 --- a/go/vt/vtgate/plugin_mysql_server.go +++ b/go/vt/vtgate/plugin_mysql_server.go @@ -75,6 +75,7 @@ var ( mysqlDefaultWorkloadName = "OLTP" mysqlDefaultWorkload int32 + mysqlDrainOnTerm bool mysqlServerFlushDelay = 100 * time.Millisecond ) @@ -102,6 +103,7 @@ func registerPluginFlags(fs *pflag.FlagSet) { fs.DurationVar(&mysqlKeepAlivePeriod, "mysql-server-keepalive-period", mysqlKeepAlivePeriod, "TCP period between keep-alives") fs.DurationVar(&mysqlServerFlushDelay, "mysql_server_flush_delay", mysqlServerFlushDelay, "Delay after which buffered response will be flushed to the client.") fs.StringVar(&mysqlDefaultWorkloadName, "mysql_default_workload", mysqlDefaultWorkloadName, "Default session workload (OLTP, OLAP, DBA)") + fs.BoolVar(&mysqlDrainOnTerm, "mysql_server_drain_onterm", mysqlDrainOnTerm, "If set, the server waits for --onterm_timeout for connected clients to drain") } // vtgateHandler implements the Listener interface. @@ -633,6 +635,26 @@ func (srv *mysqlServer) shutdownMysqlProtocolAndDrain() { signal.Stop(srv.sigChan) } + if mysqlDrainOnTerm { + // We wait for connected clients to drain, instead of just + // active (in transaction) clients + if srv.vtgateHandle == nil { + // server never started properly? + return + } + + log.Infof("Starting drain loop, waiting for all clients to disconnect") + reported := time.Now() + for srv.vtgateHandle.numConnections() > 0 { + if time.Since(reported) > 5*time.Second { + log.Infof("Still waiting for client connections to drain (%d connected)...", srv.vtgateHandle.numConnections()) + reported = time.Now() + } + time.Sleep(1000 * time.Millisecond) + } + return + } + if busy := srv.vtgateHandle.busyConnections.Load(); busy > 0 { log.Infof("Waiting for all client connections to be idle (%d active)...", busy) start := time.Now()