Skip to content

Commit

Permalink
apply patch 13655: snapshot keyspace fix for pitr (#122)
Browse files Browse the repository at this point in the history
Signed-off-by: Priya Bibra <pbibra@slack-corp.com>
  • Loading branch information
pbibra committed Sep 6, 2023
1 parent 0a2ef83 commit e87c86a
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 53 deletions.
8 changes: 6 additions & 2 deletions go/test/endtoend/recovery/recovery_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,22 @@ func VerifyQueriesUsingVtgate(t *testing.T, session *vtgateconn.VTGateSession, q
}

// RestoreTablet performs a PITR restore.
func RestoreTablet(t *testing.T, localCluster *cluster.LocalProcessCluster, tablet *cluster.Vttablet, restoreKSName string, shardName string, keyspaceName string, commonTabletArg []string) {
func RestoreTablet(t *testing.T, localCluster *cluster.LocalProcessCluster, tablet *cluster.Vttablet, restoreKSName string, shardName string, keyspaceName string, commonTabletArg []string, restoreTime time.Time) {
tablet.ValidateTabletRestart(t)
replicaTabletArgs := commonTabletArg

if restoreTime.IsZero() {
restoreTime = time.Now().UTC()
}

_, err := localCluster.VtctlProcess.ExecuteCommandWithOutput("GetKeyspace", restoreKSName)

if err != nil {
tm := time.Now().UTC()
tm.Format(time.RFC3339)
_, err := localCluster.VtctlProcess.ExecuteCommandWithOutput("CreateKeyspace", "--",
"--keyspace_type=SNAPSHOT", "--base_keyspace="+keyspaceName,
"--snapshot_time", tm.Format(time.RFC3339), restoreKSName)
"--snapshot_time", restoreTime.Format(time.RFC3339), restoreKSName)
require.Nil(t, err)
}

Expand Down
115 changes: 65 additions & 50 deletions go/test/endtoend/recovery/unshardedrecovery/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os/exec"
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -173,44 +174,46 @@ SET GLOBAL old_alter_table = ON;

}

// TestRecoveryImpl does following
// - create a shard with primary and replica1 only
// - run InitShardPrimary
// - insert some data
// - take a backup
// - insert more data on the primary
// - take another backup
// - create a recovery keyspace after first backup
// - bring up tablet_replica2 in the new keyspace
// - check that new tablet does not have data created after backup1
// - create second recovery keyspace after second backup
// - bring up tablet_replica3 in second keyspace
// - check that new tablet has data created after backup1 but not data created after backup2
// - check that vtgate queries work correctly
// 1. create a shard with primary and replica1 only
// - run InitShardPrimary
// - insert some data
//
// 2. take a backup
// 3.create a recovery keyspace after first backup
// - bring up tablet_replica2 in the new keyspace
// - check that new tablet has data from backup1
//
// 4. insert more data on the primary
//
// 5. take another backup
// 6. create a recovery keyspace after second backup
// - bring up tablet_replica3 in the new keyspace
// - check that new tablet has data from backup2
//
// 7. check that vtgate queries work correctly
func TestRecoveryImpl(t *testing.T) {
defer cluster.PanicHandler(t)
defer tabletsTeardown()
verifyInitialReplication(t)

// take first backup of value = test1
err := localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias)
assert.NoError(t, err)

backups := listBackups(t)
require.Equal(t, len(backups), 1)
assert.Contains(t, backups[0], replica1.Alias)

_, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true)
assert.NoError(t, err)
cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 2)

err = localCluster.VtctlclientProcess.ApplyVSchema(keyspaceName, vSchema)
assert.NoError(t, err)

output, err := localCluster.VtctlclientProcess.ExecuteCommandWithOutput("GetVSchema", keyspaceName)
assert.NoError(t, err)
assert.Contains(t, output, "vt_insert_test")

recovery.RestoreTablet(t, localCluster, replica2, recoveryKS1, "0", keyspaceName, commonTabletArg)
// restore with latest backup
restoreTime := time.Now().UTC()
recovery.RestoreTablet(t, localCluster, replica2, recoveryKS1, "0", keyspaceName, commonTabletArg, restoreTime)

output, err = localCluster.VtctlclientProcess.ExecuteCommandWithOutput("GetSrvVSchema", cell)
assert.NoError(t, err)
Expand All @@ -219,57 +222,69 @@ func TestRecoveryImpl(t *testing.T) {

err = localCluster.VtctlclientProcess.ExecuteCommand("GetSrvKeyspace", cell, keyspaceName)
assert.NoError(t, err)

output, err = localCluster.VtctlclientProcess.ExecuteCommandWithOutput("GetVSchema", recoveryKS1)
assert.NoError(t, err)
assert.Contains(t, output, "vt_insert_test")

cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 1)
// verify that restored replica has value = test1
qr, err := replica2.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true)
assert.NoError(t, err)
assert.Equal(t, "test1", qr.Rows[0][0].ToString())

cluster.VerifyLocalMetadata(t, replica2, recoveryKS1, shardName, cell)
// insert new row on primary
_, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true)
assert.NoError(t, err)
cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 2)

// update the original row in primary
_, err = primary.VttabletProcess.QueryTablet("update vt_insert_test set msg = 'msgx1' where id = 1", keyspaceName, true)
assert.NoError(t, err)

//verify that primary has new value
qr, err := primary.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true)
qr, err = primary.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true)
assert.NoError(t, err)
assert.Equal(t, "msgx1", qr.Rows[0][0].ToString())

//verify that restored replica has old value
qr, err = replica2.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true)
assert.NoError(t, err)
assert.Equal(t, "test1", qr.Rows[0][0].ToString())
// check that replica1, used for the backup, has the new value
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

err = localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias)
assert.NoError(t, err)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

_, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test3')", keyspaceName, true)
assert.NoError(t, err)
cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 3)
for {
qr, err = replica1.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true)
assert.NoError(t, err)
if qr.Rows[0][0].ToString() == "msgx1" {
break
}

recovery.RestoreTablet(t, localCluster, replica3, recoveryKS2, "0", keyspaceName, commonTabletArg)
select {
case <-ctx.Done():
t.Error("timeout waiting for new value to be replicated on replica 1")
break
case <-ticker.C:
}
}

output, err = localCluster.VtctlclientProcess.ExecuteCommandWithOutput("GetVSchema", recoveryKS2)
// take second backup of value = msgx1
err = localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias)
assert.NoError(t, err)
assert.Contains(t, output, "vt_insert_test")

cluster.VerifyRowsInTablet(t, replica3, keyspaceName, 2)
// restore to first backup
recovery.RestoreTablet(t, localCluster, replica3, recoveryKS2, "0", keyspaceName, commonTabletArg, restoreTime)

// update the original row in primary
_, err = primary.VttabletProcess.QueryTablet("update vt_insert_test set msg = 'msgx2' where id = 1", keyspaceName, true)
output, err = localCluster.VtctlclientProcess.ExecuteCommandWithOutput("GetVSchema", recoveryKS2)
assert.NoError(t, err)
assert.Contains(t, output, "vt_insert_test")

//verify that primary has new value
qr, err = primary.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true)
assert.NoError(t, err)
assert.Equal(t, "msgx2", qr.Rows[0][0].ToString())
// only one row from first backup
cluster.VerifyRowsInTablet(t, replica3, keyspaceName, 1)

//verify that restored replica has old value
// verify that restored replica has value = test1
qr, err = replica3.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true)
assert.NoError(t, err)
assert.Equal(t, "msgx1", qr.Rows[0][0].ToString())
assert.Equal(t, "test1", qr.Rows[0][0].ToString())

vtgateInstance := localCluster.NewVtgateInstance()
vtgateInstance.TabletTypesToWait = "REPLICA"
Expand All @@ -294,26 +309,26 @@ func TestRecoveryImpl(t *testing.T) {
session := vtgateConn.Session("@replica", nil)

//check that vtgate doesn't route queries to new tablet
recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(3)")
recovery.VerifyQueriesUsingVtgate(t, session, "select msg from vt_insert_test where id = 1", `VARCHAR("msgx2")`)
recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(2)")
recovery.VerifyQueriesUsingVtgate(t, session, "select msg from vt_insert_test where id = 1", `VARCHAR("msgx1")`)
recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select count(*) from %s.vt_insert_test", recoveryKS1), "INT64(1)")
recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select msg from %s.vt_insert_test where id = 1", recoveryKS1), `VARCHAR("test1")`)
recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select count(*) from %s.vt_insert_test", recoveryKS2), "INT64(2)")
recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select msg from %s.vt_insert_test where id = 1", recoveryKS2), `VARCHAR("msgx1")`)
recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select count(*) from %s.vt_insert_test", recoveryKS2), "INT64(1)")
recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select msg from %s.vt_insert_test where id = 1", recoveryKS2), `VARCHAR("test1")`)

// check that new keyspace is accessible with 'use ks'
cluster.ExecuteQueriesUsingVtgate(t, session, "use "+recoveryKS1+"@replica")
recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(1)")

cluster.ExecuteQueriesUsingVtgate(t, session, "use "+recoveryKS2+"@replica")
recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(2)")
recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(1)")

// check that new tablet is accessible with use `ks:shard`
cluster.ExecuteQueriesUsingVtgate(t, session, "use `"+recoveryKS1+":0@replica`")
recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(1)")

cluster.ExecuteQueriesUsingVtgate(t, session, "use `"+recoveryKS2+":0@replica`")
recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(2)")
recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(1)")
}

// verifyInitialReplication will create schema in primary, insert some data to primary and verify the same data in replica.
Expand Down
6 changes: 5 additions & 1 deletion go/vt/vttablet/tabletmanager/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L
log.Infof("Using base_keyspace %v to restore keyspace %v using a backup time of %v", keyspace, tablet.Keyspace, backupTime)
}

if backupTime.IsZero() {
backupTime = logutil.ProtoToTime(keyspaceInfo.SnapshotTime)
}

params := mysqlctl.RestoreParams{
Cnf: tm.Cnf,
Mysqld: tm.MysqlDaemon,
Expand Down Expand Up @@ -305,7 +309,7 @@ func (tm *TabletManager) restoreToTimeFromBinlog(ctx context.Context, pos mysql.
// getGTIDFromTimestamp computes 2 GTIDs based on restoreTime
// afterPos is the GTID of the first event at or after restoreTime.
// beforePos is the GTID of the last event before restoreTime. This is the GTID upto which replication will be applied
// afterPos can be used directly in the query `START SLAVE UNTIL SQL_BEFORE_GTIDS = ''`
// afterPos can be used directly in the query `START SLAVE UNTIL SQL_BEFORE_GTIDS = `
// beforePos will be used to check if replication was able to catch up from the binlog server
func (tm *TabletManager) getGTIDFromTimestamp(ctx context.Context, pos mysql.Position, restoreTime int64) (afterPos string, beforePos string, err error) {
connParams := &mysql.ConnParams{
Expand Down

0 comments on commit e87c86a

Please sign in to comment.