From 3eb76ebc7c596f150055506279d158a441877da1 Mon Sep 17 00:00:00 2001 From: Renan Rangel Date: Mon, 1 Jul 2024 04:51:34 -0700 Subject: [PATCH] fixing issue with xtrabackup and long gtids Signed-off-by: Renan Rangel Signed-off-by: 'Renan Rangel' --- go/vt/mysqlctl/xtrabackupengine.go | 44 ++++++++++++++++--------- go/vt/mysqlctl/xtrabackupengine_test.go | 35 +++++++++++++++++--- 2 files changed, 60 insertions(+), 19 deletions(-) diff --git a/go/vt/mysqlctl/xtrabackupengine.go b/go/vt/mysqlctl/xtrabackupengine.go index 3f8491fdfb6..d08706bc337 100644 --- a/go/vt/mysqlctl/xtrabackupengine.go +++ b/go/vt/mysqlctl/xtrabackupengine.go @@ -292,6 +292,17 @@ func (be *XtrabackupEngine) backupFiles( numStripes int, flavor string, ) (replicationPosition replication.Position, finalErr error) { + // we create a temporary directory so that xtrabackup can create a copy of xtrabackup_info we can read from later. + tempDir, err := os.MkdirTemp("", "xbstream_vitess_") + if err != nil { + return replicationPosition, vterrors.Wrap(err, "unable to create temporary directory") + } + defer func() { + // if we didn't hit any errors, we simply delete the temporary directory, otherwise we leave it for investigation. + if finalErr == nil { + os.RemoveAll(tempDir) + } + }() backupProgram := path.Join(xtrabackupEnginePath, xtrabackupBinaryName) flagsToExec := []string{"--defaults-file=" + params.Cnf.Path, @@ -300,6 +311,7 @@ func (be *XtrabackupEngine) backupFiles( "--slave-info", "--user=" + xtrabackupUser, "--target-dir=" + params.Cnf.TmpDir, + "--extra-lsndir=" + tempDir, } if xtrabackupStreamMode != "" { flagsToExec = append(flagsToExec, "--stream="+xtrabackupStreamMode) @@ -398,27 +410,14 @@ func (be *XtrabackupEngine) backupFiles( // the replication position. Note that if we don't read stderr as we go, the // xtrabackup process gets blocked when the write buffer fills up. stderrBuilder := &strings.Builder{} - posBuilder := &strings.Builder{} stderrDone := make(chan struct{}) go func() { defer close(stderrDone) scanner := bufio.NewScanner(backupErr) - capture := false for scanner.Scan() { line := scanner.Text() params.Logger.Infof("xtrabackup stderr: %s", line) - - // Wait until we see the first line of the binlog position. - // Then capture all subsequent lines. We need multiple lines since - // the value we're looking for has newlines in it. - if !capture { - if !strings.Contains(line, "MySQL binlog position") { - continue - } - capture = true - } - fmt.Fprintln(posBuilder, line) } if err := scanner.Err(); err != nil { params.Logger.Errorf("error reading from xtrabackup stderr: %v", err) @@ -462,8 +461,7 @@ func (be *XtrabackupEngine) backupFiles( return replicationPosition, vterrors.Wrap(err, fmt.Sprintf("xtrabackup failed with error. Output=%s", sterrOutput)) } - posOutput := posBuilder.String() - replicationPosition, rerr := findReplicationPosition(posOutput, flavor, params.Logger) + replicationPosition, rerr := findReplicationPositionFromXtrabackupInfo(tempDir, flavor, params.Logger) if rerr != nil { return replicationPosition, vterrors.Wrap(rerr, "backup failed trying to find replication position") } @@ -751,6 +749,22 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log return nil } +func findReplicationPositionFromXtrabackupInfo(directory, flavor string, logger logutil.Logger) (replication.Position, error) { + f, err := os.Open(path.Join(directory, "xtrabackup_info")) + if err != nil { + return replication.Position{}, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, + "couldn't open %q to read GTID position", path.Join(directory, "xtrabackup_info")) + } + defer f.Close() + + contents, err := io.ReadAll(f) + if err != nil { + return replication.Position{}, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "couldn't read GTID position from %q", f.Name()) + } + + return findReplicationPosition(string(contents), flavor, logger) +} + var xtrabackupReplicationPositionRegexp = regexp.MustCompile(`GTID of the last change '([^']*)'`) func findReplicationPosition(input, flavor string, logger logutil.Logger) (replication.Position, error) { diff --git a/go/vt/mysqlctl/xtrabackupengine_test.go b/go/vt/mysqlctl/xtrabackupengine_test.go index f560833d278..d47bdc5ba6f 100644 --- a/go/vt/mysqlctl/xtrabackupengine_test.go +++ b/go/vt/mysqlctl/xtrabackupengine_test.go @@ -20,6 +20,8 @@ import ( "bytes" "crypto/rand" "io" + "os" + "path" "testing" "github.com/stretchr/testify/assert" @@ -49,22 +51,47 @@ func TestFindReplicationPosition(t *testing.T) { assert.NoError(t, err) assert.Equal(t, want, pos.String()) } +func TestFindReplicationPositionFromXtrabackupInfo(t *testing.T) { + input := `tool_version = 8.0.35-30 + binlog_pos = filename 'vt-0476396352-bin.000005', position '310088991', GTID of the last change '145e508e-ae54-11e9-8ce6-46824dd1815e:1-3, + 1e51f8be-ae54-11e9-a7c6-4280a041109b:1-3, + 47b59de1-b368-11e9-b48b-624401d35560:1-152981, + 557def0a-b368-11e9-84ed-f6fffd91cc57:1-3, + 599ef589-ae55-11e9-9688-ca1f44501925:1-14857169, + b9ce485d-b36b-11e9-9b17-2a6e0a6011f4:1-371262' + format = xbstream + ` + want := "145e508e-ae54-11e9-8ce6-46824dd1815e:1-3,1e51f8be-ae54-11e9-a7c6-4280a041109b:1-3,47b59de1-b368-11e9-b48b-624401d35560:1-152981,557def0a-b368-11e9-84ed-f6fffd91cc57:1-3,599ef589-ae55-11e9-9688-ca1f44501925:1-14857169,b9ce485d-b36b-11e9-9b17-2a6e0a6011f4:1-371262" + + tmp, err := os.MkdirTemp(t.TempDir(), "test") + assert.NoError(t, err) + + f, err := os.Create(path.Join(tmp, "xtrabackup_info")) + assert.NoError(t, err) + _, err = f.WriteString(input) + assert.NoError(t, err) + assert.NoError(t, f.Close()) + + pos, err := findReplicationPositionFromXtrabackupInfo(tmp, "MySQL56", logutil.NewConsoleLogger()) + assert.NoError(t, err) + assert.Equal(t, want, pos.String()) +} -func TestFindReplicationPositionNoMatch(t *testing.T) { +func TestFindReplicationPositionNoMatchFromXtrabackupInfo(t *testing.T) { // Make sure failure to find a match triggers an error. input := `nothing` - _, err := findReplicationPosition(input, "MySQL56", logutil.NewConsoleLogger()) + _, err := findReplicationPositionFromXtrabackupInfo(input, "MySQL56", logutil.NewConsoleLogger()) assert.Error(t, err) } -func TestFindReplicationPositionEmptyMatch(t *testing.T) { +func TestFindReplicationPositionEmptyMatchFromXtrabackupInfo(t *testing.T) { // Make sure failure to find a match triggers an error. input := `GTID of the last change ' '` - _, err := findReplicationPosition(input, "MySQL56", logutil.NewConsoleLogger()) + _, err := findReplicationPositionFromXtrabackupInfo(input, "MySQL56", logutil.NewConsoleLogger()) assert.Error(t, err) }