Skip to content

Commit

Permalink
adding new mysql shell backup engine
Browse files Browse the repository at this point in the history
Signed-off-by: Renan Rangel <rrangel@slack-corp.com>
  • Loading branch information
rvrangel committed Jun 27, 2024
1 parent 1799e5b commit 476af04
Show file tree
Hide file tree
Showing 3 changed files with 319 additions and 17 deletions.
20 changes: 20 additions & 0 deletions go/vt/mysqlctl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ limitations under the License.
package mysqlctl

import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
Expand All @@ -29,6 +31,7 @@ import (

"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl/backupstats"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
"vitess.io/vitess/go/vt/proto/vtrpc"
Expand Down Expand Up @@ -500,3 +503,20 @@ func Restore(ctx context.Context, params RestoreParams) (*BackupManifest, error)
params.Logger.Infof("Restore: complete")
return manifest, nil
}

// scanLinesToLogger scans full lines from the given Reader and sends them to
// the given Logger until EOF.
func scanLinesToLogger(prefix string, reader io.Reader, logger logutil.Logger, doneFunc func()) {
defer doneFunc()

scanner := bufio.NewScanner(reader)
for scanner.Scan() {
line := scanner.Text()
logger.Infof("%s: %s", prefix, line)
}
if err := scanner.Err(); err != nil {
// This is usually run in a background goroutine, so there's no point
// returning an error. Just log it.
logger.Warningf("error scanning lines from %s: %v", prefix, err)
}
}
299 changes: 299 additions & 0 deletions go/vt/mysqlctl/mysqlshellbackupengine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
package mysqlctl

import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"os/exec"
"path"
"strings"
"sync"
"time"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/vterrors"
)

var (
// location to store the logical backup
mysqlShellLocation = flag.String("mysql_shell_location", "", "location where the backup will be stored")
mysqlShellFlags = flag.String("mysql_shell_flags", "--defaults-file=/dev/null --js -h localhost", "execution flags to pass to mysqlsh binary")
// flags to pass through to backup phase
mysqlShellDumpFlags = flag.String("mysql_shell_dump_flags",
"{\"threads\": 2}",
"flags to pass to mysql shell dump utility. This should be a JSON string and will be saved in the MANIFEST")
// flags to pass through to extract phase of restore
mysqlShellLoadFlags = flag.String("mysql_shell_load_flags",
"{\"threads\": 2, \"updateGtidSet\": \"replace\", \"skipBinlog\": true, \"progressFile\": \"\"}",
"flags to pass to mysql shell load utility. This should be a JSON string")
)

// MySQLShellBackupManifest represents a backup.
type MySQLShellBackupManifest struct {
// BackupManifest is an anonymous embedding of the base manifest struct.
// Note that the manifest itself doesn't fill the Position field, as we have
// no way of fetching that information from mysqlsh at the moment.
BackupManifest

// Location of the backup directory
BackupLocation string
// Params are the parameters that backup was created with
Params string
}

// LogicalBackupEngine encapsulates the logic to implement the restoration
// of a mysql-shell based backup.
type MySQLShellBackupEngine struct {
}

const (
mysqlShellBackupBinaryName = "mysqlsh"
mysqlShellBackupEngineName = "mysqlshell"
)

func (be *MySQLShellBackupEngine) ExecuteBackup(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle) (result BackupResult, finalErr error) {
params.Logger.Infof("Starting ExecuteBackup in %s", params.TabletAlias)

err := be.backupPreCheck()
if err != nil {
return BackupUnusable, vterrors.Wrap(err, "failed backup precheck")
}

start := time.Now().UTC()
location := path.Join(*mysqlShellLocation, params.Keyspace, params.Shard, start.Format("2006-01-02_15-04-05"))

args := []string{}

if *mysqlShellFlags != "" {
args = append(args, strings.Fields(*mysqlShellFlags)...)
}

args = append(args, "-e", fmt.Sprintf("util.dumpSchemas([\"vt_%s\"], %q, %s)",
params.Keyspace,
location,
*mysqlShellDumpFlags,
))

cmd := exec.CommandContext(ctx, mysqlShellBackupBinaryName, args...)

params.Logger.Infof("running %s", cmd.String())

cmdOut, err := cmd.StdoutPipe()
if err != nil {
return BackupUnusable, vterrors.Wrap(err, "cannot create stdout pipe")
}
cmdErr, err := cmd.StderrPipe()
if err != nil {
return BackupUnusable, vterrors.Wrap(err, "cannot create stderr pipe")
}
if err := cmd.Start(); err != nil {
return BackupUnusable, vterrors.Wrap(err, "can't start xbstream")
}

cmdWg := &sync.WaitGroup{}
cmdWg.Add(2)
go scanLinesToLogger("logical stdout", cmdOut, params.Logger, cmdWg.Done)
go scanLinesToLogger("logical stderr", cmdErr, params.Logger, cmdWg.Done)
cmdWg.Wait()

// Get exit status.
if err := cmd.Wait(); err != nil {
return BackupUnusable, vterrors.Wrap(err, "logical failed")
}

// open the MANIFEST
params.Logger.Infof("Writing backup MANIFEST")
mwc, err := bh.AddFile(ctx, backupManifestFileName, backupstorage.FileSizeUnknown)
if err != nil {
return BackupUnusable, vterrors.Wrapf(err, "cannot add %v to backup", backupManifestFileName)
}
defer closeFile(mwc, backupManifestFileName, params.Logger, &finalErr)

// JSON-encode and write the MANIFEST
bm := &MySQLShellBackupManifest{
// Common base fields
BackupManifest: BackupManifest{
BackupMethod: mysqlShellBackupEngineName,
Position: replication.Position{},
BackupTime: start.Format(time.RFC3339),
FinishedTime: time.Now().UTC().Format(time.RFC3339),
},

// logical backup specific fields
BackupLocation: location,
Params: *mysqlShellLoadFlags,
}

data, err := json.MarshalIndent(bm, "", " ")
if err != nil {
return BackupUnusable, vterrors.Wrapf(err, "cannot JSON encode %v", backupManifestFileName)
}
if _, err := mwc.Write([]byte(data)); err != nil {
return BackupUnusable, vterrors.Wrapf(err, "cannot write %v", backupManifestFileName)
}

params.Logger.Infof("Backup completed")
return BackupUsable, nil
}

func (be *MySQLShellBackupEngine) ExecuteRestore(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle) (*BackupManifest, error) {
params.Logger.Infof("Calling ExecuteRestore for %s (DeleteBeforeRestore: %v)", params.DbName, params.DeleteBeforeRestore)

err := be.restorePreCheck()
if err != nil {
return nil, vterrors.Wrap(err, "failed restore precheck")
}

var bm MySQLShellBackupManifest
if err := getBackupManifestInto(ctx, bh, &bm); err != nil {
return nil, err
}

// mark restore as in progress
if err := createStateFile(params.Cnf); err != nil {
return nil, err
}

// make sure semi-sync is disabled, otherwise we will wait forever for acknowledgements
err = params.Mysqld.SetSemiSyncEnabled(ctx, false, false)
if err != nil {
return nil, vterrors.Wrap(err, "disable semi-sync failed")
}

// if we received a RestoreFromBackup API call instead of it being a command line argument,
// we need to first clean the host before we start the restore.
if params.DeleteBeforeRestore {
params.Logger.Infof("restoring on an existing tablet, so dropping database %q", params.DbName)

err = params.Mysqld.ExecuteSuperQueryList(ctx,
[]string{fmt.Sprintf("DROP DATABASE IF EXISTS `%s`", params.DbName)},
)
if err != nil {
return nil, vterrors.Wrap(err, fmt.Sprintf("dropping database %q failed", params.DbName))
}

}

// we need to get rid of all the current replication information on the host.
err = params.Mysqld.ResetReplication(ctx)
if err != nil {
return nil, vterrors.Wrap(err, "unable to reset replication")
}

// this is required so we can load the backup generated by MySQL Shell. we will disable it afterwards.
err = params.Mysqld.ExecuteSuperQueryList(ctx, []string{"SET GLOBAL LOCAL_INFILE=1"})
if err != nil {
return nil, vterrors.Wrap(err, "unable to set local_infile=1")
}

args := []string{}

if *mysqlShellFlags != "" {
args = append(args, strings.Fields(*mysqlShellFlags)...)
}

args = append(args, "-e", fmt.Sprintf("util.loadDump(%q, %s)",
bm.BackupLocation,
*mysqlShellLoadFlags,
))

cmd := exec.CommandContext(ctx, "mysqlsh", args...)

params.Logger.Infof("running %s", cmd.String())

cmdOut, err := cmd.StdoutPipe()
if err != nil {
return nil, vterrors.Wrap(err, "cannot create stdout pipe")
}
cmdErr, err := cmd.StderrPipe()
if err != nil {
return nil, vterrors.Wrap(err, "cannot create stderr pipe")
}
if err := cmd.Start(); err != nil {
return nil, vterrors.Wrap(err, "can't start xbstream")
}

cmdWg := &sync.WaitGroup{}
cmdWg.Add(2)
go scanLinesToLogger("logical stdout", cmdOut, params.Logger, cmdWg.Done)
go scanLinesToLogger("logical stderr", cmdErr, params.Logger, cmdWg.Done)
cmdWg.Wait()

// Get the exit status.
if err := cmd.Wait(); err != nil {
return nil, vterrors.Wrap(err, "logical failed")
}
params.Logger.Infof("%s completed successfully", mysqlShellBackupBinaryName)

// disable local_infile now that the restore is done.
err = params.Mysqld.ExecuteSuperQueryList(ctx, []string{
"SET GLOBAL LOCAL_INFILE=0",
})
if err != nil {
return nil, vterrors.Wrap(err, "unable to set local_infile=0")
}
params.Logger.Infof("set local_infile=0")

// since MySQL Shell backups do not store the Executed GTID position in the manifest, we need to
// fetch it and override in the manifest we are returning so Vitess can set it again. alternatively
// we could have a flag in the future where the backup engine controls if it needs to be set or not.
pos, err := params.Mysqld.PrimaryPosition(ctx)
if err != nil {
return nil, vterrors.Wrap(err, "failure getting restored position")
}

params.Logger.Infof("retrieved primary position after restore")
bm.BackupManifest.Position = pos

params.Logger.Infof("Restore completed")

return &bm.BackupManifest, nil
}

// ShouldDrainForBackup satisfies the BackupEngine interface
// MySQL Shell backups can run while tablet is serving, hence false
func (be *MySQLShellBackupEngine) ShouldDrainForBackup(req *tabletmanagerdatapb.BackupRequest) bool {
return false
}

func (be *MySQLShellBackupEngine) backupPreCheck() error {
if *mysqlShellLocation == "" {
return errors.New("no backup location set via --mysql_shell_location")
}

if *mysqlShellFlags == "" {
return errors.New("at least the --js flag is required")
}

return nil
}

func (be *MySQLShellBackupEngine) restorePreCheck() error {
if *mysqlShellFlags == "" {
return errors.New("at least the --js flag is required")
}

loadFlags := map[string]interface{}{}
err := json.Unmarshal([]byte(*mysqlShellLoadFlags), &loadFlags)
if err != nil {
return errors.New("unable to parse JSON of load flags")
}

if val, ok := loadFlags["updateGtidSet"]; !ok || val != "replace" {
return errors.New("mysql-shell needs to restore with updateGtidSet set to \"replace\" to work with Vitess")
}

if val, ok := loadFlags["progressFile"]; !ok || val != "" {
return errors.New("\"progressFile\" needs to be empty as vitess always starts a restore from scratch")
}

return nil
}

func init() {
BackupRestoreEngineMap[mysqlShellBackupEngineName] = &MySQLShellBackupEngine{}
}
17 changes: 0 additions & 17 deletions go/vt/mysqlctl/xtrabackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,23 +776,6 @@ func findReplicationPosition(input, flavor string, logger logutil.Logger) (repli
return replicationPosition, nil
}

// scanLinesToLogger scans full lines from the given Reader and sends them to
// the given Logger until EOF.
func scanLinesToLogger(prefix string, reader io.Reader, logger logutil.Logger, doneFunc func()) {
defer doneFunc()

scanner := bufio.NewScanner(reader)
for scanner.Scan() {
line := scanner.Text()
logger.Infof("%s: %s", prefix, line)
}
if err := scanner.Err(); err != nil {
// This is usually run in a background goroutine, so there's no point
// returning an error. Just log it.
logger.Warningf("error scanning lines from %s: %v", prefix, err)
}
}

func stripeFileName(baseFileName string, index int) string {
return fmt.Sprintf("%s-%03d", baseFileName, index)
}
Expand Down

0 comments on commit 476af04

Please sign in to comment.