From 24fb7d00ccc1741b6e8af1a04d288ce35abc17a3 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 23 Oct 2023 10:49:43 +0300 Subject: [PATCH] Incremental backup: fix race condition in reading 'mysqlbinlog' output (#14330) Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/mysqlctl/builtinbackupengine.go | 1 + go/vt/mysqlctl/mysqld.go | 140 ++++++++++++++++---------- go/vt/mysqlctl/mysqld_test.go | 10 +- 3 files changed, 88 insertions(+), 63 deletions(-) diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index 33d4ce688fd..e46932bcd51 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -348,6 +348,7 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par if resp.FirstTimestampBinlog == "" || resp.LastTimestampBinlog == "" { return false, vterrors.Errorf(vtrpc.Code_ABORTED, "empty binlog name in response. Request=%v, Response=%v", req, resp) } + log.Infof("ReadBinlogFilesTimestampsResponse: %+v", resp) incrDetails := &IncrementalBackupDetails{ FirstTimestamp: FormatRFC3339(protoutil.TimeFromProto(resp.FirstTimestamp).UTC()), FirstTimestampBinlog: filepath.Base(resp.FirstTimestampBinlog), diff --git a/go/vt/mysqlctl/mysqld.go b/go/vt/mysqlctl/mysqld.go index ba4ccf755b3..ee872c214f4 100644 --- a/go/vt/mysqlctl/mysqld.go +++ b/go/vt/mysqlctl/mysqld.go @@ -1294,36 +1294,46 @@ func (mysqld *Mysqld) ApplyBinlogFile(ctx context.Context, req *mysqlctlpb.Apply } // parseBinlogEntryTimestamp attempts to extract a timestamp from a binlog entry. -func parseBinlogEntryTimestamp(logEntry string) (found bool, t time.Time, err error) { +func parseBinlogEntryTimestamp(logEntry string) (t time.Time, err error) { if len(logEntry) == 0 { - return false, t, nil + return t, nil } if logEntry[0] != '#' { - return false, t, nil + return t, nil } if submatch := binlogEntryCommittedTimestampRegex.FindStringSubmatch(logEntry); submatch != nil { // MySQL 8.0 binlogEntryCommittedTimestamp := submatch[1] unixMicros, err := strconv.ParseInt(binlogEntryCommittedTimestamp, 10, 64) if err != nil { - return false, t, err + return t, err } - return true, time.UnixMicro(unixMicros), nil + return time.UnixMicro(unixMicros), nil } if submatch := binlogEntryTimestampGTIDRegexp.FindStringSubmatch(logEntry); submatch != nil { // MySQL 5.7 t, err = ParseBinlogTimestamp(submatch[1]) if err != nil { - return false, t, err + return t, err } - return true, t, nil + return t, nil } - return false, t, nil + return t, nil } // scanBinlogTimestamp invokes a `mysqlbinlog` binary to look for a timestamp in the given binary. The function -// either looks for the first such timestamp or the last. -func (mysqld *Mysqld) scanBinlogTimestamp(mysqlbinlogDir string, mysqlbinlogEnv []string, mysqlbinlogName string, binlogFile string, stopAtFirst bool) (matchedTime time.Time, matchFound bool, err error) { +// looks for the first and last timestamps. +func (mysqld *Mysqld) scanBinlogTimestamp( + mysqlbinlogDir string, + mysqlbinlogEnv []string, + mysqlbinlogName string, + binlogFile string, + stopAtFirst bool, // unused at this moment, to be used as an optimization hint +) ( + firstMatchedTime time.Time, + lastMatchedTime time.Time, + err error, +) { args := []string{binlogFile} mysqlbinlogCmd := exec.Command(mysqlbinlogName, args...) mysqlbinlogCmd.Dir = mysqlbinlogDir @@ -1331,48 +1341,39 @@ func (mysqld *Mysqld) scanBinlogTimestamp(mysqlbinlogDir string, mysqlbinlogEnv log.Infof("ApplyBinlogFile: running mysqlbinlog command: %#v", mysqlbinlogCmd) pipe, err := mysqlbinlogCmd.StdoutPipe() // to be piped into mysql if err != nil { - return matchedTime, false, err - } - scanComplete := make(chan error) - intentionalKill := false - scan := func() { - defer close(scanComplete) - defer func() { - intentionalKill = true - mysqlbinlogCmd.Process.Kill() // ensures the binlog file is released - }() + return firstMatchedTime, lastMatchedTime, err + } + scan := func() error { // Read line by line and process it scanner := bufio.NewScanner(pipe) for scanner.Scan() { logEntry := scanner.Text() - found, t, err := parseBinlogEntryTimestamp(logEntry) + t, err := parseBinlogEntryTimestamp(logEntry) if err != nil { - scanComplete <- err - return + return err } - if found { - matchedTime = t - matchFound = true + if t.IsZero() { + continue } - if found && stopAtFirst { - // Found the first timestamp and it's all we need. We won't scan any further and so we should also - // kill mysqlbinlog (otherwise it keeps waiting until we've read the entire pipe). - return + if firstMatchedTime.IsZero() { + firstMatchedTime = t } + lastMatchedTime = t } + return nil } - if err := mysqlbinlogCmd.Start(); err != nil { - return matchedTime, false, err + if err := mysqlbinlogCmd.Start(); err != nil { // Start() is nonblockig + return firstMatchedTime, lastMatchedTime, err } - go scan() - if err := mysqlbinlogCmd.Wait(); err != nil && !intentionalKill { - return matchedTime, false, vterrors.Wrapf(err, "waiting on mysqlbinlog command in ReadBinlogFilesTimestamps") + defer mysqlbinlogCmd.Process.Kill() + if err := scan(); err != nil { // We must first exhaust reading the command's output, before calling cmd.Wait() + return firstMatchedTime, lastMatchedTime, vterrors.Wrapf(err, "scanning mysqlbinlog output in ReadBinlogFilesTimestamps") } - if err := <-scanComplete; err != nil { - return matchedTime, false, vterrors.Wrapf(err, "scanning mysqlbinlog output in ReadBinlogFilesTimestamps ") + if err := mysqlbinlogCmd.Wait(); err != nil { + return firstMatchedTime, lastMatchedTime, vterrors.Wrapf(err, "waiting on mysqlbinlog command in ReadBinlogFilesTimestamps") } - return matchedTime, matchFound, nil + return firstMatchedTime, lastMatchedTime, nil } // ReadBinlogFilesTimestamps reads all given binlog files via `mysqlbinlog` command and returns the first and last found transaction timestamps @@ -1402,31 +1403,60 @@ func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlc return nil, err } + lastMatchedTimeMap := map[string]time.Time{} // a simple cache to avoid rescanning same files. Key=binlog file name + resp := &mysqlctlpb.ReadBinlogFilesTimestampsResponse{} // Find first timestamp - for _, binlogFile := range req.BinlogFileNames { - t, found, err := mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, true) - if err != nil { - return nil, err - } - if found { - resp.FirstTimestamp = protoutil.TimeToProto(t) + err = func() error { + for _, binlogFile := range req.BinlogFileNames { + firstMatchedTime, lastMatchedTime, err := mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, true) + if err != nil { + return vterrors.Wrapf(err, "while scanning for first binlog timestamp in %v", binlogFile) + } + if !lastMatchedTime.IsZero() { + // cache result + lastMatchedTimeMap[binlogFile] = lastMatchedTime + } + if firstMatchedTime.IsZero() { + // Timestamp not found in this file. + continue + } + resp.FirstTimestamp = protoutil.TimeToProto(firstMatchedTime) resp.FirstTimestampBinlog = binlogFile - break + return nil // early break } + return nil + }() + if err != nil { + return resp, err } // Find last timestamp - for i := len(req.BinlogFileNames) - 1; i >= 0; i-- { - binlogFile := req.BinlogFileNames[i] - t, found, err := mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, false) - if err != nil { - return nil, err - } - if found { - resp.LastTimestamp = protoutil.TimeToProto(t) + err = func() error { + for i := len(req.BinlogFileNames) - 1; i >= 0; i-- { + binlogFile := req.BinlogFileNames[i] + + // See if we have a cached value for this file. This is certainly be the situation if there's a single binary log file in req.BinlogFileNames, + // which means the first file and last file are the same, and so we have already parsed the file while searching for the first timestamp. + lastMatchedTime, ok := lastMatchedTimeMap[binlogFile] + if !ok { + var err error + _, lastMatchedTime, err = mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, false) + if err != nil { + return vterrors.Wrapf(err, "while scanning for last binlog timestamp in %v", binlogFile) + } + } + if lastMatchedTime.IsZero() { + // Timestamp not found in this file. + continue + } + resp.LastTimestamp = protoutil.TimeToProto(lastMatchedTime) resp.LastTimestampBinlog = binlogFile - break + return nil // early break } + return nil + }() + if err != nil { + return resp, err } return resp, nil } diff --git a/go/vt/mysqlctl/mysqld_test.go b/go/vt/mysqlctl/mysqld_test.go index 2053c0f0cc9..435090008f2 100644 --- a/go/vt/mysqlctl/mysqld_test.go +++ b/go/vt/mysqlctl/mysqld_test.go @@ -139,7 +139,6 @@ func TestParseBinlogEntryTimestamp(t *testing.T) { tcases := []struct { name string entry string - found bool tm time.Time }{ { @@ -157,24 +156,19 @@ func TestParseBinlogEntryTimestamp(t *testing.T) { { name: "mysql80", entry: "#230605 16:06:34 server id 22233 end_log_pos 1037 CRC32 0xa4707c5b GTID last_committed=4 sequence_number=5 rbr_only=no original_committed_timestamp=1685970394031366 immediate_commit_timestamp=1685970394032458 transaction_length=186", - found: true, tm: time.UnixMicro(1685970394031366), }, { name: "mysql57", entry: "#230608 13:14:31 server id 484362839 end_log_pos 259 CRC32 0xc07510d0 GTID last_committed=0 sequence_number=1 rbr_only=yes", - found: true, tm: time.Date(2023, time.June, 8, 13, 14, 31, 0, time.UTC), }, } for _, tcase := range tcases { t.Run(tcase.name, func(t *testing.T) { - found, tm, err := parseBinlogEntryTimestamp(tcase.entry) + tm, err := parseBinlogEntryTimestamp(tcase.entry) assert.NoError(t, err) - assert.Equal(t, tcase.found, found) - if tcase.found { - assert.Equal(t, tcase.tm, tm) - } + assert.Equal(t, tcase.tm, tm) }) } }