From 84dfb303f0704fd3ef41275690bb7c6762597760 Mon Sep 17 00:00:00 2001 From: Noble Mittal <62551163+beingnoble03@users.noreply.github.com> Date: Mon, 8 Apr 2024 14:41:19 +0530 Subject: [PATCH] test: Add e2e tests for `replication` (#15671) Signed-off-by: Noble Mittal Signed-off-by: Manan Gupta Co-authored-by: Manan Gupta --- go/test/endtoend/utils/mysql.go | 16 +- go/test/endtoend/utils/mysql_test.go | 348 ++++++++++++++++++++++++++- 2 files changed, 355 insertions(+), 9 deletions(-) diff --git a/go/test/endtoend/utils/mysql.go b/go/test/endtoend/utils/mysql.go index 790e1fc4ba1..a522af2472e 100644 --- a/go/test/endtoend/utils/mysql.go +++ b/go/test/endtoend/utils/mysql.go @@ -54,7 +54,7 @@ func NewMySQL(cluster *cluster.LocalProcessCluster, dbName string, schemaSQL ... } sqls = append(sqls, split...) } - mysqlParam, _, closer, error := NewMySQLWithMysqld(cluster.GetAndReservePort(), cluster.Hostname, dbName, sqls...) + mysqlParam, _, _, closer, error := NewMySQLWithMysqld(cluster.GetAndReservePort(), cluster.Hostname, dbName, sqls...) return mysqlParam, closer, error } @@ -75,24 +75,24 @@ func CreateMysqldAndMycnf(tabletUID uint32, mysqlSocket string, mysqlPort int) ( return mysqlctl.NewMysqld(&cfg), mycnf, nil } -func NewMySQLWithMysqld(port int, hostname, dbName string, schemaSQL ...string) (mysql.ConnParams, *mysqlctl.Mysqld, func(), error) { +func NewMySQLWithMysqld(port int, hostname, dbName string, schemaSQL ...string) (mysql.ConnParams, *mysqlctl.Mysqld, *mysqlctl.Mycnf, func(), error) { mysqlDir, err := createMySQLDir() if err != nil { - return mysql.ConnParams{}, nil, nil, err + return mysql.ConnParams{}, nil, nil, nil, err } initMySQLFile, err := createInitSQLFile(mysqlDir, dbName) if err != nil { - return mysql.ConnParams{}, nil, nil, err + return mysql.ConnParams{}, nil, nil, nil, err } mysqlPort := port mysqld, mycnf, err := CreateMysqldAndMycnf(0, "", mysqlPort) if err != nil { - return mysql.ConnParams{}, nil, nil, err + return mysql.ConnParams{}, nil, nil, nil, err } err = initMysqld(mysqld, mycnf, initMySQLFile) if err != nil { - return mysql.ConnParams{}, nil, nil, err + return mysql.ConnParams{}, nil, nil, nil, err } params := mysql.ConnParams{ @@ -104,10 +104,10 @@ func NewMySQLWithMysqld(port int, hostname, dbName string, schemaSQL ...string) for _, sql := range schemaSQL { err = prepareMySQLWithSchema(params, sql) if err != nil { - return mysql.ConnParams{}, nil, nil, err + return mysql.ConnParams{}, nil, nil, nil, err } } - return params, mysqld, func() { + return params, mysqld, mycnf, func() { ctx := context.Background() _ = mysqld.Teardown(ctx, mycnf, true, mysqlShutdownTimeout) }, nil diff --git a/go/test/endtoend/utils/mysql_test.go b/go/test/endtoend/utils/mysql_test.go index de9db23dab1..ae550e34864 100644 --- a/go/test/endtoend/utils/mysql_test.go +++ b/go/test/endtoend/utils/mysql_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/mysqlctl" ) @@ -34,6 +35,7 @@ var ( clusterInstance *cluster.LocalProcessCluster mysqlParams mysql.ConnParams mysqld *mysqlctl.Mysqld + mycnf *mysqlctl.Mycnf keyspaceName = "ks" cell = "test" schemaSQL = `create table t1( @@ -53,7 +55,7 @@ func TestMain(m *testing.M) { var closer func() var err error - mysqlParams, mysqld, closer, err = NewMySQLWithMysqld(clusterInstance.GetAndReservePort(), clusterInstance.Hostname, keyspaceName, schemaSQL) + mysqlParams, mysqld, mycnf, closer, err = NewMySQLWithMysqld(clusterInstance.GetAndReservePort(), clusterInstance.Hostname, keyspaceName, schemaSQL) if err != nil { fmt.Println(err) return 1 @@ -123,3 +125,347 @@ func TestSetSuperReadOnlyMySQL(t *testing.T) { isReadOnly, _ = mysqld.IsReadOnly() assert.True(t, isReadOnly, "read_only should be set to True") } + +func TestGetMysqlPort(t *testing.T) { + require.NotNil(t, mysqld) + + port, err := mysqld.GetMysqlPort() + + // Expected port should be one less than the port returned by GetAndReservePort + // As we are calling this second time to get port + want := clusterInstance.GetAndReservePort() - 1 + assert.Equal(t, want, int(port)) + assert.NoError(t, err) +} + +func TestGetServerID(t *testing.T) { + require.NotNil(t, mysqld) + + sid, err := mysqld.GetServerID(context.Background()) + assert.NoError(t, err) + assert.Equal(t, mycnf.ServerID, sid) + + suuid, err := mysqld.GetServerUUID(context.Background()) + assert.NoError(t, err) + assert.NotEmpty(t, suuid) +} + +func TestReplicationStatus(t *testing.T) { + require.NotNil(t, mysqld) + + // Initially we should expect an error for no replication status + _, err := mysqld.ReplicationStatus() + assert.ErrorContains(t, err, "no replication status") + + ctx := context.Background() + conn, err := mysql.Connect(ctx, &mysqlParams) + require.NoError(t, err) + + port, err := mysqld.GetMysqlPort() + require.NoError(t, err) + host := "localhost" + + q := conn.SetReplicationSourceCommand(&mysqlParams, host, port, int(port)) + res := Exec(t, conn, q) + require.NotNil(t, res) + + r, err := mysqld.ReplicationStatus() + assert.NoError(t, err) + assert.Equal(t, port, r.SourcePort) + assert.Equal(t, host, r.SourceHost) +} + +func TestPrimaryStatus(t *testing.T) { + require.NotNil(t, mysqld) + + res, err := mysqld.PrimaryStatus(context.Background()) + assert.NoError(t, err) + + r, err := mysqld.ReplicationStatus() + assert.NoError(t, err) + + assert.True(t, res.Position.Equal(r.Position), "primary replication status should be same as replication status here") +} + +func TestGTID(t *testing.T) { + require.NotNil(t, mysqld) + + res, err := mysqld.GetGTIDPurged(context.Background()) + assert.Empty(t, res.String()) + assert.NoError(t, err) + + primaryPosition, err := mysqld.PrimaryPosition() + assert.NotNil(t, primaryPosition) + assert.NoError(t, err) + + // Now we set gtid_purged for testing + conn, err := mysql.Connect(context.Background(), &mysqlParams) + require.NoError(t, err) + + gtid := "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8" + r := Exec(t, conn, fmt.Sprintf("SET GLOBAL gtid_purged='%s'", gtid)) + require.NotNil(t, r) + + res, err = mysqld.GetGTIDPurged(context.Background()) + assert.NoError(t, err) + assert.Equal(t, gtid, res.String()) + + primaryPosition, err = mysqld.PrimaryPosition() + assert.NoError(t, err) + assert.Contains(t, primaryPosition.String(), gtid) +} + +func TestSetReplicationPosition(t *testing.T) { + require.NotNil(t, mysqld) + + pos := replication.Position{GTIDSet: replication.Mysql56GTIDSet{}} + sid := replication.SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15} + pos.GTIDSet = pos.GTIDSet.AddGTID(replication.Mysql56GTID{Server: sid, Sequence: 1}) + + err := mysqld.SetReplicationPosition(context.Background(), pos) + assert.NoError(t, err) + + want := "00010203-0405-0607-0809-0a0b0c0d0e0f:1" + res, err := mysqld.GetGTIDPurged(context.Background()) + assert.NoError(t, err) + assert.Contains(t, res.String(), want) +} + +func TestSetAndResetReplication(t *testing.T) { + require.NotNil(t, mysqld) + + port, err := mysqld.GetMysqlPort() + require.NoError(t, err) + host := "localhost" + + err = mysqld.SetReplicationSource(context.Background(), host, port, true, true) + assert.NoError(t, err) + + r, err := mysqld.ReplicationStatus() + assert.NoError(t, err) + assert.Equal(t, port, r.SourcePort) + assert.Equal(t, host, r.SourceHost) + + err = mysqld.ResetReplication(context.Background()) + assert.NoError(t, err) + + r, err = mysqld.ReplicationStatus() + assert.ErrorContains(t, err, "no replication status") + assert.Equal(t, "", r.SourceHost) + assert.Equal(t, int32(0), r.SourcePort) + + err = mysqld.SetReplicationSource(context.Background(), host, port, true, true) + assert.NoError(t, err) + + r, err = mysqld.ReplicationStatus() + assert.NoError(t, err) + assert.Equal(t, port, r.SourcePort) + assert.Equal(t, host, r.SourceHost) + + err = mysqld.ResetReplication(context.Background()) + assert.NoError(t, err) + + r, err = mysqld.ReplicationStatus() + assert.ErrorContains(t, err, "no replication status") + assert.Equal(t, "", r.SourceHost) + assert.Equal(t, int32(0), r.SourcePort) +} + +func TestGetBinlogInformation(t *testing.T) { + require.NotNil(t, mysqld) + + // Default values + binlogFormat, logBin, logReplicaUpdates, binlogRowImage, err := mysqld.GetBinlogInformation(context.Background()) + assert.NoError(t, err) + assert.Equal(t, "ROW", binlogFormat) + assert.True(t, logBin) + assert.True(t, logReplicaUpdates) + assert.Equal(t, "FULL", binlogRowImage) + + conn, err := mysql.Connect(context.Background(), &mysqlParams) + require.NoError(t, err) + + res := Exec(t, conn, "SET GLOBAL binlog_format = 'STATEMENT'") + require.NotNil(t, res) + + res = Exec(t, conn, "SET GLOBAL binlog_row_image = 'MINIMAL'") + require.NotNil(t, res) + + binlogFormat, logBin, logReplicaUpdates, binlogRowImage, err = mysqld.GetBinlogInformation(context.Background()) + assert.NoError(t, err) + assert.Equal(t, "STATEMENT", binlogFormat) + assert.True(t, logBin) + assert.True(t, logReplicaUpdates) + assert.Equal(t, "MINIMAL", binlogRowImage) + + // Set to default + res = Exec(t, conn, "SET GLOBAL binlog_format = 'ROW'") + require.NotNil(t, res) + + res = Exec(t, conn, "SET GLOBAL binlog_row_image = 'FULL'") + require.NotNil(t, res) +} + +func TestGetGTIDMode(t *testing.T) { + require.NotNil(t, mysqld) + + // Default value + ctx := context.Background() + res, err := mysqld.GetGTIDMode(ctx) + assert.NoError(t, err) + assert.Equal(t, "ON", res) + + conn, err := mysql.Connect(context.Background(), &mysqlParams) + require.NoError(t, err) + + // Change value for the purpose of testing + r := Exec(t, conn, "SET GLOBAL gtid_mode = 'ON_PERMISSIVE'") + require.NotNil(t, r) + + res, err = mysqld.GetGTIDMode(ctx) + assert.NoError(t, err) + assert.Equal(t, "ON_PERMISSIVE", res) + + // Back to default + r = Exec(t, conn, "SET GLOBAL gtid_mode = 'ON'") + require.NotNil(t, r) +} + +func TestBinaryLogs(t *testing.T) { + require.NotNil(t, mysqld) + + res, err := mysqld.GetBinaryLogs(context.Background()) + assert.NoError(t, err) + oldNumLogs := len(res) + + err = mysqld.FlushBinaryLogs(context.Background()) + assert.NoError(t, err) + + res, err = mysqld.GetBinaryLogs(context.Background()) + assert.NoError(t, err) + newNumLogs := len(res) + assert.Equal(t, 1, newNumLogs-oldNumLogs, "binary logs should have been flushed once") +} + +func TestGetPreviousGTIDs(t *testing.T) { + require.NotNil(t, mysqld) + + res, err := mysqld.GetBinaryLogs(context.Background()) + require.NoError(t, err) + require.NotEmpty(t, res) + + ctx := context.Background() + r, err := mysqld.GetPreviousGTIDs(ctx, res[0]) + assert.NoError(t, err) + assert.Empty(t, r) + + _, err = mysqld.GetPreviousGTIDs(ctx, "invalid_binlog_file") + assert.ErrorContains(t, err, "Could not find target log") +} + +func TestSemiSyncEnabled(t *testing.T) { + require.NotNil(t, mysqld) + + err := mysqld.SetSemiSyncEnabled(true, false) + assert.NoError(t, err) + + p, r := mysqld.SemiSyncEnabled() + assert.True(t, p) + assert.False(t, r) + + err = mysqld.SetSemiSyncEnabled(false, true) + assert.NoError(t, err) + + p, r = mysqld.SemiSyncEnabled() + assert.False(t, p) + assert.True(t, r) +} + +func TestWaitForReplicationStart(t *testing.T) { + require.NotNil(t, mysqld) + + err := mysqlctl.WaitForReplicationStart(mysqld, 1) + assert.ErrorContains(t, err, "no replication status") + + port, err := mysqld.GetMysqlPort() + require.NoError(t, err) + host := "localhost" + + err = mysqld.SetReplicationSource(context.Background(), host, port, true, true) + assert.NoError(t, err) + + err = mysqlctl.WaitForReplicationStart(mysqld, 1) + assert.NoError(t, err) + + err = mysqld.ResetReplication(context.Background()) + require.NoError(t, err) +} + +func TestStartReplication(t *testing.T) { + require.NotNil(t, mysqld) + + err := mysqld.StartReplication(map[string]string{}) + assert.ErrorContains(t, err, "The server is not configured as replica") + + port, err := mysqld.GetMysqlPort() + require.NoError(t, err) + host := "localhost" + + // Set startReplicationAfter to false as we want to test StartReplication here + err = mysqld.SetReplicationSource(context.Background(), host, port, true, false) + assert.NoError(t, err) + + err = mysqld.StartReplication(map[string]string{}) + assert.NoError(t, err) + + err = mysqld.ResetReplication(context.Background()) + require.NoError(t, err) +} + +func TestStopReplication(t *testing.T) { + require.NotNil(t, mysqld) + + port, err := mysqld.GetMysqlPort() + require.NoError(t, err) + host := "localhost" + + err = mysqld.SetReplicationSource(context.Background(), host, port, true, true) + assert.NoError(t, err) + + r, err := mysqld.ReplicationStatus() + assert.NoError(t, err) + assert.Equal(t, host, r.SourceHost) + assert.Equal(t, port, r.SourcePort) + assert.Equal(t, replication.ReplicationStateRunning, r.SQLState) + + err = mysqld.StopReplication(map[string]string{}) + assert.NoError(t, err) + + r, err = mysqld.ReplicationStatus() + assert.NoError(t, err) + assert.Equal(t, replication.ReplicationStateStopped, r.SQLState) +} + +func TestStopSQLThread(t *testing.T) { + require.NotNil(t, mysqld) + + port, err := mysqld.GetMysqlPort() + require.NoError(t, err) + host := "localhost" + + err = mysqld.SetReplicationSource(context.Background(), host, port, true, true) + assert.NoError(t, err) + + r, err := mysqld.ReplicationStatus() + assert.NoError(t, err) + assert.Equal(t, host, r.SourceHost) + assert.Equal(t, port, r.SourcePort) + assert.Equal(t, replication.ReplicationStateRunning, r.SQLState) + + err = mysqld.StopSQLThread(context.Background()) + assert.NoError(t, err) + + r, err = mysqld.ReplicationStatus() + assert.NoError(t, err) + assert.Equal(t, replication.ReplicationStateStopped, r.SQLState) +}