Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport upstream: do not demote a new primary after backup completion (#12856) #423

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions go/test/endtoend/backup/vtctlbackup/backup_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os/exec"
"path"
"strings"
"sync"
"syscall"
"testing"
"time"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/test/endtoend/cluster"
)

Expand Down Expand Up @@ -329,6 +331,10 @@ func TestBackup(t *testing.T, setupType int, streamMode string, stripes int, cDe
name: "TestTerminatedRestore",
method: terminatedRestore,
}, //
{
name: "DoNotDemoteNewlyPromotedPrimaryIfReparentingDuringBackup",
method: doNotDemoteNewlyPromotedPrimaryIfReparentingDuringBackup,
}, //
}

defer cluster.PanicHandler(t)
Expand All @@ -344,6 +350,10 @@ func TestBackup(t *testing.T, setupType int, streamMode string, stripes int, cDe
if len(runSpecific) > 0 && !isRegistered(test.name, runSpecific) {
continue
}
// don't run this one unless specified
if len(runSpecific) == 0 && test.name == "DoNotDemoteNewlyPromotedPrimaryIfReparentingDuringBackup" {
continue
}
if retVal := t.Run(test.name, test.method); !retVal {
return vterrors.Errorf(vtrpc.Code_UNKNOWN, "test failure: %s", test.name)
}
Expand Down Expand Up @@ -749,6 +759,60 @@ func terminatedRestore(t *testing.T) {
stopAllTablets()
}

func checkTabletType(t *testing.T, alias string, tabletType topodata.TabletType) {
// for loop for 15 seconds to check if tablet type is correct
for i := 0; i < 15; i++ {
output, err := localCluster.VtctlclientProcess.ExecuteCommandWithOutput("GetTablet", alias)
require.Nil(t, err)
var tabletPB topodata.Tablet
err = json2.Unmarshal([]byte(output), &tabletPB)
require.NoError(t, err)
if tabletType == tabletPB.Type {
return
}
time.Sleep(1 * time.Second)
}
require.Failf(t, "checkTabletType failed.", "Tablet type is not correct. Expected: %v", tabletType)
}

func doNotDemoteNewlyPromotedPrimaryIfReparentingDuringBackup(t *testing.T) {
var wg sync.WaitGroup
wg.Add(2)

// Start the backup on a replica
go func() {
defer wg.Done()
// ensure this is a primary first
checkTabletType(t, primary.Alias, topodata.TabletType_PRIMARY)

// now backup
err := localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias)
require.Nil(t, err)
}()

// Perform a graceful reparent operation
go func() {
defer wg.Done()
// ensure this is a primary first
checkTabletType(t, primary.Alias, topodata.TabletType_PRIMARY)

// now reparent
_, err := localCluster.VtctlclientProcess.ExecuteCommandWithOutput(
"PlannedReparentShard", "--",
"--keyspace_shard", fmt.Sprintf("%s/%s", keyspaceName, shardName),
"--new_primary", replica1.Alias)
require.Nil(t, err)

// check that we reparented
checkTabletType(t, replica1.Alias, topodata.TabletType_PRIMARY)
}()

wg.Wait()

// check that this is still a primary
checkTabletType(t, replica1.Alias, topodata.TabletType_PRIMARY)
}

// test_backup will:
// - create a shard with primary and replica1 only
// - run InitShardPrimary
Expand Down
28 changes: 28 additions & 0 deletions go/test/endtoend/backup/xtrabackup/xtrabackup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,34 @@ func TestXtrabackWithZstdCompression(t *testing.T) {
backup.TestBackup(t, backup.XtraBackup, "tar", 0, cDetails, []string{"TestReplicaBackup"})
}

func TestXtrabackupWithExternalZstdCompression(t *testing.T) {
defer setDefaultCompressionFlag()
cDetails := &backup.CompressionDetails{
CompressorEngineName: "external",
ExternalCompressorCmd: "zstd",
ExternalCompressorExt: ".zst",
ExternalDecompressorCmd: "zstd -d",
}

backup.TestBackup(t, backup.XtraBackup, "tar", 0, cDetails, []string{"TestReplicaBackup"})
}

func TestXtrabackupWithExternalZstdCompressionAndManifestedDecompressor(t *testing.T) {
defer setDefaultCompressionFlag()
cDetails := &backup.CompressionDetails{
CompressorEngineName: "external",
ExternalCompressorCmd: "zstd",
ExternalCompressorExt: ".zst",
ExternalDecompressorCmd: "zstd -d",
}

backup.TestBackup(t, backup.XtraBackup, "tar", 0, cDetails, []string{"TestReplicaBackup"})
}

func TestDoNotDemoteNewlyPromotedPrimaryIfReparentingDuringBackup(t *testing.T) {
backup.TestBackup(t, backup.XtraBackup, "xbstream", 0, nil, []string{"DoNotDemoteNewlyPromotedPrimaryIfReparentingDuringBackup"})
}

func setDefaultCompressionFlag() {
mysqlctl.CompressionEngineName = "pgzip"
mysqlctl.ExternalCompressorCmd = ""
Expand Down
15 changes: 1 addition & 14 deletions go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,20 +478,7 @@ func (s *VtctldServer) backupTablet(ctx context.Context, tablet *topodatapb.Tabl
logger.Errorf("failed to send stream response %+v: %v", resp, err)
}
case io.EOF:
// Do not do anything for primary tablets and when active reparenting is disabled
if mysqlctl.DisableActiveReparents || tablet.Type == topodatapb.TabletType_PRIMARY {
return nil
}

// Otherwise we find the correct primary tablet and set the replication source,
// since the primary could have changed while we executed the backup which can
// also affect whether we want to send semi sync acks or not.
tabletInfo, err := s.ts.GetTablet(ctx, tablet.Alias)
if err != nil {
return err
}

return reparentutil.SetReplicationSource(ctx, s.ts, s.tmc, tabletInfo.Tablet)
return nil
default:
return err
}
Expand Down
99 changes: 68 additions & 31 deletions go/vt/vttablet/tabletmanager/rpc_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (

"context"

"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vtctl/reparentutil"

"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/topo/topoproto"
Expand Down Expand Up @@ -73,6 +76,9 @@ func (tm *TabletManager) Backup(ctx context.Context, logger logutil.Logger, req
}
defer tm.endBackup(backupMode)

// create the loggers: tee to console and source
l := logutil.NewTeeLogger(logutil.NewConsoleLogger(), logger)

var originalType topodatapb.TabletType
if engine.ShouldDrainForBackup() {
if err := tm.lock(ctx); err != nil {
Expand All @@ -89,6 +95,7 @@ func (tm *TabletManager) Backup(ctx context.Context, logger logutil.Logger, req
if err := tm.changeTypeLocked(ctx, topodatapb.TabletType_BACKUP, DBActionNone, SemiSyncActionUnset); err != nil {
return err
}

// Tell Orchestrator we're stopped on purpose for some Vitess task.
// Do this in the background, as it's best-effort.
go func() {
Expand All @@ -99,9 +106,68 @@ func (tm *TabletManager) Backup(ctx context.Context, logger logutil.Logger, req
logger.Warningf("Orchestrator BeginMaintenance failed: %v", err)
}
}()

// Adding defer to original value in case of any failures.
defer func() {
bgCtx := context.Background()
// Change our type back to the original value.
// Original type could be primary so pass in a real value for PrimaryTermStartTime
if err := tm.changeTypeLocked(bgCtx, originalType, DBActionNone, SemiSyncActionNone); err != nil {
l.Errorf("Failed to change tablet type from %v to %v, error: %v", topodatapb.TabletType_BACKUP, originalType, err)
return
} else {
// Tell Orchestrator we're no longer stopped on purpose.
// Do this in the background, as it's best-effort.
go func() {
if tm.orc == nil {
return
}
if err := tm.orc.EndMaintenance(tm.Tablet()); err != nil {
logger.Warningf("Orchestrator EndMaintenance failed: %v", err)
}
}()
}

// Find the correct primary tablet and set the replication source,
// since the primary could have changed while we executed the backup which can
// also affect whether we want to send semi sync acks or not.
tabletInfo, err := tm.TopoServer.GetTablet(bgCtx, tablet.Alias)
if err != nil {
l.Errorf("Failed to fetch updated tablet info, error: %v", err)
return
}

// Do not do anything for primary tablets or when active reparenting is disabled
if mysqlctl.DisableActiveReparents || tabletInfo.Type == topodatapb.TabletType_PRIMARY {
return
}

shardPrimary, err := topotools.GetShardPrimaryForTablet(bgCtx, tm.TopoServer, tablet.Tablet)
if err != nil {
return
}

durabilityName, err := tm.TopoServer.GetKeyspaceDurability(bgCtx, tablet.Keyspace)
if err != nil {
l.Errorf("Failed to get durability policy, error: %v", err)
return
}
durability, err := reparentutil.GetDurabilityPolicy(durabilityName)
if err != nil {
l.Errorf("Failed to get durability with name %v, error: %v", durabilityName, err)
}

isSemiSync := reparentutil.IsReplicaSemiSync(durability, shardPrimary.Tablet, tabletInfo.Tablet)
semiSyncAction, err := tm.convertBoolToSemiSyncAction(isSemiSync)
if err != nil {
l.Errorf("Failed to convert bool to semisync action, error: %v", err)
return
}
if err := tm.setReplicationSourceLocked(bgCtx, shardPrimary.Alias, 0, "", false, semiSyncAction); err != nil {
l.Errorf("Failed to set replication source, error: %v", err)
}
}()
}
// create the loggers: tee to console and source
l := logutil.NewTeeLogger(logutil.NewConsoleLogger(), logger)

// now we can run the backup
backupParams := mysqlctl.BackupParams{
Expand All @@ -119,35 +185,6 @@ func (tm *TabletManager) Backup(ctx context.Context, logger logutil.Logger, req

returnErr := mysqlctl.Backup(ctx, backupParams)

if engine.ShouldDrainForBackup() {
bgCtx := context.Background()
// Starting from here we won't be able to recover if we get stopped by a cancelled
// context. It is also possible that the context already timed out during the
// above call to Backup. Thus we use the background context to get through to the finish.

// Change our type back to the original value.
// Original type could be primary so pass in a real value for PrimaryTermStartTime
if err := tm.changeTypeLocked(bgCtx, originalType, DBActionNone, SemiSyncActionNone); err != nil {
// failure in changing the topology type is probably worse,
// so returning that (we logged the snapshot error anyway)
if returnErr != nil {
l.Errorf("mysql backup command returned error: %v", returnErr)
}
returnErr = err
} else {
// Tell Orchestrator we're no longer stopped on purpose.
// Do this in the background, as it's best-effort.
go func() {
if tm.orc == nil {
return
}
if err := tm.orc.EndMaintenance(tm.Tablet()); err != nil {
logger.Warningf("Orchestrator EndMaintenance failed: %v", err)
}
}()
}
}

return returnErr
}

Expand Down
Loading