Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the race condition during vttablet startup #15731

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions go/test/endtoend/utils/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -129,7 +130,9 @@ func TestSetSuperReadOnlyMySQL(t *testing.T) {
func TestGetMysqlPort(t *testing.T) {
require.NotNil(t, mysqld)

port, err := mysqld.GetMysqlPort()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
port, err := mysqld.GetMysqlPort(ctx)

// Expected port should be one less than the port returned by GetAndReservePort
// As we are calling this second time to get port
Expand Down Expand Up @@ -161,7 +164,9 @@ func TestReplicationStatus(t *testing.T) {
conn, err := mysql.Connect(ctx, &mysqlParams)
require.NoError(t, err)

port, err := mysqld.GetMysqlPort()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
port, err := mysqld.GetMysqlPort(ctx)
require.NoError(t, err)
host := "localhost"

Expand Down Expand Up @@ -234,7 +239,9 @@ func TestSetReplicationPosition(t *testing.T) {
func TestSetAndResetReplication(t *testing.T) {
require.NotNil(t, mysqld)

port, err := mysqld.GetMysqlPort()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
port, err := mysqld.GetMysqlPort(ctx)
require.NoError(t, err)
host := "localhost"

Expand Down Expand Up @@ -387,7 +394,9 @@ func TestWaitForReplicationStart(t *testing.T) {
err := mysqlctl.WaitForReplicationStart(mysqld, 1)
assert.ErrorContains(t, err, "no replication status")

port, err := mysqld.GetMysqlPort()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
port, err := mysqld.GetMysqlPort(ctx)
require.NoError(t, err)
host := "localhost"

Expand All @@ -407,7 +416,9 @@ func TestStartReplication(t *testing.T) {
err := mysqld.StartReplication(map[string]string{})
assert.ErrorContains(t, err, "The server is not configured as replica")

port, err := mysqld.GetMysqlPort()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
port, err := mysqld.GetMysqlPort(ctx)
require.NoError(t, err)
host := "localhost"

Expand All @@ -425,7 +436,9 @@ func TestStartReplication(t *testing.T) {
func TestStopReplication(t *testing.T) {
require.NotNil(t, mysqld)

port, err := mysqld.GetMysqlPort()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
port, err := mysqld.GetMysqlPort(ctx)
require.NoError(t, err)
host := "localhost"

Expand All @@ -449,7 +462,9 @@ func TestStopReplication(t *testing.T) {
func TestStopSQLThread(t *testing.T) {
require.NotNil(t, mysqld)

port, err := mysqld.GetMysqlPort()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
port, err := mysqld.GetMysqlPort(ctx)
require.NoError(t, err)
host := "localhost"

Expand Down
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (fmd *FakeMysqlDaemon) WaitForDBAGrants(ctx context.Context, waitTime time.
}

// GetMysqlPort is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) GetMysqlPort() (int32, error) {
func (fmd *FakeMysqlDaemon) GetMysqlPort(ctx context.Context) (int32, error) {
if fmd.MysqlPort.Load() == -1 {
return 0, fmt.Errorf("FakeMysqlDaemon.GetMysqlPort returns an error")
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/mysql_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type MysqlDaemon interface {
WaitForDBAGrants(ctx context.Context, waitTime time.Duration) (err error)

// GetMysqlPort returns the current port mysql is listening on.
GetMysqlPort() (int32, error)
GetMysqlPort(ctx context.Context) (int32, error)

// GetServerID returns the servers ID.
GetServerID(ctx context.Context) (uint32, error)
Expand Down
6 changes: 5 additions & 1 deletion go/vt/mysqlctl/mysqld.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,11 +521,15 @@ func (mysqld *Mysqld) WaitForDBAGrants(ctx context.Context, waitTime time.Durati
if waitTime == 0 {
return nil
}
params, err := mysqld.dbcfgs.DbaConnector().MysqlParams()
if err != nil {
return err
}
timer := time.NewTimer(waitTime)
ctx, cancel := context.WithTimeout(ctx, waitTime)
defer cancel()
for {
conn, connErr := dbconnpool.NewDBConnection(ctx, mysqld.dbcfgs.DbaConnector())
conn, connErr := mysql.Connect(ctx, params)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opted to also refactor this case slightly to avoid using the pool entirely so for any future readers it's also more obvious no pooling is actually used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good. When I was looking at this yesterday, I had to navigate through the code to see that NewDBConnection returns a non-pooled connection.

if connErr == nil {
res, fetchErr := conn.ExecuteFetch("SHOW GRANTS", 1000, false)
conn.Close()
Expand Down
18 changes: 16 additions & 2 deletions go/vt/mysqlctl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"strings"
"time"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/vt/hook"
Expand Down Expand Up @@ -174,8 +175,21 @@ func (mysqld *Mysqld) RestartReplication(hookExtraEnv map[string]string) error {
}

// GetMysqlPort returns mysql port
func (mysqld *Mysqld) GetMysqlPort() (int32, error) {
qr, err := mysqld.FetchSuperQuery(context.TODO(), "SHOW VARIABLES LIKE 'port'")
func (mysqld *Mysqld) GetMysqlPort(ctx context.Context) (int32, error) {
// We can not use the connection pool here. This check runs very early
// during MySQL startup when we still might be loading things like grants.
// This means we need to use an isolated connection to avoid poisoning the
// DBA connection pool for further queries.
params, err := mysqld.dbcfgs.DbaConnector().MysqlParams()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing it here to use a connection without pooling which avoids the poisoning. I also fixed the context.TODO() while being in here, making sure it's explicitly passed in.

if err != nil {
return 0, err
}
conn, err := mysql.Connect(ctx, params)
if err != nil {
return 0, err
}
defer conn.Close()
qr, err := conn.ExecuteFetch("SHOW VARIABLES LIKE 'port'", 1, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is not super important here, but should the context be passed down to the actual query? We've seen various cases where MySQL ends up being "stuck" for whatever reason not replying to incoming queries, and this would cause the GetMysqlPort function to hang indefinitely (instead of returning an error once ctx expires).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arthurschreiber Do you mean as a general feature / refactor? It's not possible to pass in a context at the moment in the MySQL connection handling that Vitess does.

That might be useful as a separate feature / change, but I think that's independent of what we're doing here?

if err != nil {
return 0, err
}
Expand Down
7 changes: 5 additions & 2 deletions go/vt/mysqlctl/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -133,12 +134,14 @@ func TestGetMysqlPort(t *testing.T) {
testMysqld := NewMysqld(dbc)
defer testMysqld.Close()

res, err := testMysqld.GetMysqlPort()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
res, err := testMysqld.GetMysqlPort(ctx)
assert.Equal(t, int32(12), res)
assert.NoError(t, err)

db.AddQuery("SHOW VARIABLES LIKE 'port'", &sqltypes.Result{})
res, err = testMysqld.GetMysqlPort()
res, err = testMysqld.GetMysqlPort(ctx)
assert.ErrorContains(t, err, "no port variable in mysql")
assert.Equal(t, int32(0), res)
}
Expand Down
16 changes: 12 additions & 4 deletions go/vt/vttablet/tabletmanager/tm_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func (tm *TabletManager) Start(tablet *topodatapb.Tablet, config *tabletenv.Tabl
if err := tm.checkPrimaryShip(ctx, si); err != nil {
return err
}
if err := tm.checkMysql(); err != nil {
if err := tm.checkMysql(ctx); err != nil {
return err
}
if err := tm.initTablet(ctx); err != nil {
Expand Down Expand Up @@ -702,7 +702,7 @@ func (tm *TabletManager) checkPrimaryShip(ctx context.Context, si *topo.ShardInf
return nil
}

func (tm *TabletManager) checkMysql() error {
func (tm *TabletManager) checkMysql(ctx context.Context) error {
appConfig, err := tm.DBConfigs.AppWithDB().MysqlParams()
if err != nil {
return err
Expand All @@ -717,7 +717,7 @@ func (tm *TabletManager) checkMysql() error {
tm.tmState.UpdateTablet(func(tablet *topodatapb.Tablet) {
tablet.MysqlHostname = tablet.Hostname
})
mysqlPort, err := tm.MysqlDaemon.GetMysqlPort()
mysqlPort, err := tm.MysqlDaemon.GetMysqlPort(ctx)
if err != nil {
log.Warningf("Cannot get current mysql port, will keep retrying every %v: %v", mysqlPortRetryInterval, err)
go tm.findMysqlPort(mysqlPortRetryInterval)
Expand All @@ -730,10 +730,18 @@ func (tm *TabletManager) checkMysql() error {
return nil
}

const portCheckTimeout = 5 * time.Second

func (tm *TabletManager) getMysqlPort() (int32, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems overkill to define a function that is used exactly once. Why can't all this be folded into findMysqlPort? It will grow from 10 lines to 15 lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@deepthi Because it's setting up a context and you can't defer that properly in a for loop. So hence the separate function so it's not needed to manually cancel that which is more error prone.

ctx, cancel := context.WithTimeout(context.Background(), portCheckTimeout)
defer cancel()
return tm.MysqlDaemon.GetMysqlPort(ctx)
}

func (tm *TabletManager) findMysqlPort(retryInterval time.Duration) {
for {
time.Sleep(retryInterval)
mport, err := tm.MysqlDaemon.GetMysqlPort()
mport, err := tm.getMysqlPort()
if err != nil || mport == 0 {
continue
}
Expand Down
Loading