Skip to content

Commit

Permalink
Add VTGate drain flag
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>
  • Loading branch information
frouioui committed May 15, 2024
1 parent 951f273 commit 25e476e
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 1 deletion.
1 change: 1 addition & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ Flags:
--mysql_ldap_auth_config_string string JSON representation of LDAP server config.
--mysql_ldap_auth_method string client-side authentication method to use. Supported values: mysql_clear_password, dialog. (default "mysql_clear_password")
--mysql_server_bind_address string Binds on this address when listening to MySQL binary protocol. Useful to restrict listening to 'localhost' only for instance.
--mysql_server_drain_onterm If set, the server waits for --onterm_timeout for connected clients to drain
--mysql_server_flush_delay duration Delay after which buffered response will be flushed to the client. (default 100ms)
--mysql_server_port int If set, also listen for MySQL binary protocol connections on this port. (default -1)
--mysql_server_query_timeout duration mysql query timeout
Expand Down
2 changes: 1 addition & 1 deletion go/vt/servenv/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func Run(bindAddress string, port int) {
signal.Notify(ExitChan, syscall.SIGTERM, syscall.SIGINT)
// Wait for signal
<-ExitChan
l.Close()

startTime := time.Now()
log.Infof("Entering lameduck mode for at least %v", timeouts.LameduckPeriod)
Expand All @@ -71,6 +70,7 @@ func Run(bindAddress string, port int) {
log.Infof("Sleeping an extra %v after OnTermSync to finish lameduck period", remain)
time.Sleep(remain)
}
l.Close()

log.Info("Shutting down gracefully")
fireOnCloseHooks(timeouts.OnCloseTimeout)
Expand Down
22 changes: 22 additions & 0 deletions go/vt/vtgate/plugin_mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ var (

mysqlDefaultWorkloadName = "OLTP"
mysqlDefaultWorkload int32
mysqlDrainOnTerm bool

mysqlServerFlushDelay = 100 * time.Millisecond
)
Expand Down Expand Up @@ -102,6 +103,7 @@ func registerPluginFlags(fs *pflag.FlagSet) {
fs.DurationVar(&mysqlKeepAlivePeriod, "mysql-server-keepalive-period", mysqlKeepAlivePeriod, "TCP period between keep-alives")
fs.DurationVar(&mysqlServerFlushDelay, "mysql_server_flush_delay", mysqlServerFlushDelay, "Delay after which buffered response will be flushed to the client.")
fs.StringVar(&mysqlDefaultWorkloadName, "mysql_default_workload", mysqlDefaultWorkloadName, "Default session workload (OLTP, OLAP, DBA)")
fs.BoolVar(&mysqlDrainOnTerm, "mysql_server_drain_onterm", mysqlDrainOnTerm, "If set, the server waits for --onterm_timeout for connected clients to drain")
}

// vtgateHandler implements the Listener interface.
Expand Down Expand Up @@ -633,6 +635,26 @@ func (srv *mysqlServer) shutdownMysqlProtocolAndDrain() {
signal.Stop(srv.sigChan)
}

if mysqlDrainOnTerm {
// We wait for connected clients to drain, instead of just
// active (in transaction) clients
if srv.vtgateHandle == nil {
// server never started properly?
return
}

log.Infof("Starting drain loop, waiting for all clients to disconnect")
reported := time.Now()
for srv.vtgateHandle.numConnections() > 0 {
if time.Since(reported) > 5*time.Second {
log.Infof("Still waiting for client connections to drain (%d connected)...", srv.vtgateHandle.numConnections())
reported = time.Now()
}
time.Sleep(1000 * time.Millisecond)
}
return
}

if busy := srv.vtgateHandle.busyConnections.Load(); busy > 0 {
log.Infof("Waiting for all client connections to be idle (%d active)...", busy)
start := time.Now()
Expand Down

0 comments on commit 25e476e

Please sign in to comment.