Skip to content

Commit

Permalink
fixing issue with xtrabackup and long gtids
Browse files Browse the repository at this point in the history
Signed-off-by: Renan Rangel <rrangel@slack-corp.com>
Signed-off-by: 'Renan Rangel' <rrangel@slack-corp.com>
  • Loading branch information
rvrangel committed Jul 1, 2024
1 parent 1799e5b commit 3eb76eb
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 19 deletions.
44 changes: 29 additions & 15 deletions go/vt/mysqlctl/xtrabackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,17 @@ func (be *XtrabackupEngine) backupFiles(
numStripes int,
flavor string,
) (replicationPosition replication.Position, finalErr error) {
// we create a temporary directory so that xtrabackup can create a copy of xtrabackup_info we can read from later.
tempDir, err := os.MkdirTemp("", "xbstream_vitess_")
if err != nil {
return replicationPosition, vterrors.Wrap(err, "unable to create temporary directory")
}
defer func() {
// if we didn't hit any errors, we simply delete the temporary directory, otherwise we leave it for investigation.
if finalErr == nil {
os.RemoveAll(tempDir)
}
}()

backupProgram := path.Join(xtrabackupEnginePath, xtrabackupBinaryName)
flagsToExec := []string{"--defaults-file=" + params.Cnf.Path,
Expand All @@ -300,6 +311,7 @@ func (be *XtrabackupEngine) backupFiles(
"--slave-info",
"--user=" + xtrabackupUser,
"--target-dir=" + params.Cnf.TmpDir,
"--extra-lsndir=" + tempDir,
}
if xtrabackupStreamMode != "" {
flagsToExec = append(flagsToExec, "--stream="+xtrabackupStreamMode)
Expand Down Expand Up @@ -398,27 +410,14 @@ func (be *XtrabackupEngine) backupFiles(
// the replication position. Note that if we don't read stderr as we go, the
// xtrabackup process gets blocked when the write buffer fills up.
stderrBuilder := &strings.Builder{}
posBuilder := &strings.Builder{}
stderrDone := make(chan struct{})
go func() {
defer close(stderrDone)

scanner := bufio.NewScanner(backupErr)
capture := false
for scanner.Scan() {
line := scanner.Text()
params.Logger.Infof("xtrabackup stderr: %s", line)

// Wait until we see the first line of the binlog position.
// Then capture all subsequent lines. We need multiple lines since
// the value we're looking for has newlines in it.
if !capture {
if !strings.Contains(line, "MySQL binlog position") {
continue
}
capture = true
}
fmt.Fprintln(posBuilder, line)
}
if err := scanner.Err(); err != nil {
params.Logger.Errorf("error reading from xtrabackup stderr: %v", err)
Expand Down Expand Up @@ -462,8 +461,7 @@ func (be *XtrabackupEngine) backupFiles(
return replicationPosition, vterrors.Wrap(err, fmt.Sprintf("xtrabackup failed with error. Output=%s", sterrOutput))
}

posOutput := posBuilder.String()
replicationPosition, rerr := findReplicationPosition(posOutput, flavor, params.Logger)
replicationPosition, rerr := findReplicationPositionFromXtrabackupInfo(tempDir, flavor, params.Logger)
if rerr != nil {
return replicationPosition, vterrors.Wrap(rerr, "backup failed trying to find replication position")
}
Expand Down Expand Up @@ -751,6 +749,22 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log
return nil
}

func findReplicationPositionFromXtrabackupInfo(directory, flavor string, logger logutil.Logger) (replication.Position, error) {
f, err := os.Open(path.Join(directory, "xtrabackup_info"))
if err != nil {
return replication.Position{}, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT,
"couldn't open %q to read GTID position", path.Join(directory, "xtrabackup_info"))
}
defer f.Close()

contents, err := io.ReadAll(f)
if err != nil {
return replication.Position{}, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "couldn't read GTID position from %q", f.Name())
}

return findReplicationPosition(string(contents), flavor, logger)
}

var xtrabackupReplicationPositionRegexp = regexp.MustCompile(`GTID of the last change '([^']*)'`)

func findReplicationPosition(input, flavor string, logger logutil.Logger) (replication.Position, error) {
Expand Down
35 changes: 31 additions & 4 deletions go/vt/mysqlctl/xtrabackupengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"bytes"
"crypto/rand"
"io"
"os"
"path"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -49,22 +51,47 @@ func TestFindReplicationPosition(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, want, pos.String())
}
func TestFindReplicationPositionFromXtrabackupInfo(t *testing.T) {
input := `tool_version = 8.0.35-30
binlog_pos = filename 'vt-0476396352-bin.000005', position '310088991', GTID of the last change '145e508e-ae54-11e9-8ce6-46824dd1815e:1-3,
1e51f8be-ae54-11e9-a7c6-4280a041109b:1-3,
47b59de1-b368-11e9-b48b-624401d35560:1-152981,
557def0a-b368-11e9-84ed-f6fffd91cc57:1-3,
599ef589-ae55-11e9-9688-ca1f44501925:1-14857169,
b9ce485d-b36b-11e9-9b17-2a6e0a6011f4:1-371262'
format = xbstream
`
want := "145e508e-ae54-11e9-8ce6-46824dd1815e:1-3,1e51f8be-ae54-11e9-a7c6-4280a041109b:1-3,47b59de1-b368-11e9-b48b-624401d35560:1-152981,557def0a-b368-11e9-84ed-f6fffd91cc57:1-3,599ef589-ae55-11e9-9688-ca1f44501925:1-14857169,b9ce485d-b36b-11e9-9b17-2a6e0a6011f4:1-371262"

tmp, err := os.MkdirTemp(t.TempDir(), "test")
assert.NoError(t, err)

f, err := os.Create(path.Join(tmp, "xtrabackup_info"))
assert.NoError(t, err)
_, err = f.WriteString(input)
assert.NoError(t, err)
assert.NoError(t, f.Close())

pos, err := findReplicationPositionFromXtrabackupInfo(tmp, "MySQL56", logutil.NewConsoleLogger())
assert.NoError(t, err)
assert.Equal(t, want, pos.String())
}

func TestFindReplicationPositionNoMatch(t *testing.T) {
func TestFindReplicationPositionNoMatchFromXtrabackupInfo(t *testing.T) {
// Make sure failure to find a match triggers an error.
input := `nothing`

_, err := findReplicationPosition(input, "MySQL56", logutil.NewConsoleLogger())
_, err := findReplicationPositionFromXtrabackupInfo(input, "MySQL56", logutil.NewConsoleLogger())
assert.Error(t, err)
}

func TestFindReplicationPositionEmptyMatch(t *testing.T) {
func TestFindReplicationPositionEmptyMatchFromXtrabackupInfo(t *testing.T) {
// Make sure failure to find a match triggers an error.
input := `GTID of the last change '
'`

_, err := findReplicationPosition(input, "MySQL56", logutil.NewConsoleLogger())
_, err := findReplicationPositionFromXtrabackupInfo(input, "MySQL56", logutil.NewConsoleLogger())
assert.Error(t, err)
}

Expand Down

0 comments on commit 3eb76eb

Please sign in to comment.