Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Relax restrictions on Cancel and ReverseTraffic when writes not involved #17128

Merged
merged 8 commits into from
Nov 6, 2024
12 changes: 12 additions & 0 deletions go/test/endtoend/vreplication/multi_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,18 @@ func TestMultiTenantSimple(t *testing.T) {
require.Zero(t, rowCount)
})

t.Run("cancel after switching reads", func(t *testing.T) {
// First let's test canceling the workflow after only switching reads
// to ensure that it properly cleans up all of the state.
createFunc()
mt.SwitchReads()
confirmOnlyReadsSwitched(t)
mt.Cancel()
confirmNoRoutingRules(t)
rowCount := getRowCount(t, vtgateConn, fmt.Sprintf("%s.%s", targetKeyspace, "t1"))
require.Zero(t, rowCount)
})

// Create again and run it to completion.
createFunc()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,22 @@ func testMoveTablesFlags3(t *testing.T, sourceKeyspace, targetKeyspace string, t
// Confirm that the source tables were renamed.
require.True(t, checkTablesExist(t, "zone1-100", []string{"_customer2_old"}))
require.False(t, checkTablesExist(t, "zone1-100", []string{"customer2"}))

// Confirm that we can cancel a workflow after ONLY switching read traffic.
mt = createMoveTables(t, sourceKeyspace, targetKeyspace, workflowName, "customer", createFlags, nil, nil)
mt.Start() // Need to start because we set stop-after-copy to true.
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
for _, tab := range targetTabs {
catchup(t, tab, workflowName, "MoveTables")
}
mt.SwitchReads()
wf := mt.(iWorkflow)
validateReadsRouteToTarget(t, "replica")
validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs)
validateTableRoutingRule(t, "customer", "", targetKs, sourceKs)
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)
mt.Cancel()
confirmNoRoutingRules(t)
}

// Create two workflows in order to confirm that listing all workflows works.
Expand Down Expand Up @@ -450,6 +466,7 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards
"--all-cells", "--format=json",
"--config-overrides", mapToCSV(overrides),
}

rs := newReshard(vc, &reshardWorkflow{
workflowInfo: &workflowInfo{
vc: vc,
Expand All @@ -460,7 +477,6 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards
targetShards: targetShards,
createFlags: createFlags,
}, workflowFlavorVtctld)

ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflowName)
wf := rs.(iWorkflow)
rs.Create()
Expand Down Expand Up @@ -769,8 +785,10 @@ func getRoutingRules(t *testing.T) *vschemapb.RoutingRules {
}

func confirmNoRoutingRules(t *testing.T) {
routingRulesResponse := getRoutingRules(t)
require.Zero(t, len(routingRulesResponse.Rules))
rrRes := getRoutingRules(t)
require.Zero(t, len(rrRes.Rules))
krrRes := getKeyspaceRoutingRules(t, vc)
require.Zero(t, len(krrRes.Rules))
}

func confirmRoutingRulesExist(t *testing.T) {
Expand Down
76 changes: 46 additions & 30 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ var (
// ErrMultipleTargetKeyspaces occurs when a workflow somehow has multiple
// target keyspaces across different shard primaries. This should be
// impossible.
ErrMultipleTargetKeyspaces = errors.New("multiple target keyspaces for a single workflow")
ErrWorkflowNotFullySwitched = errors.New("cannot complete workflow because you have not yet switched all read and write traffic")
ErrWorkflowPartiallySwitched = errors.New("cannot cancel workflow because you have already switched some or all read and write traffic")
ErrMultipleTargetKeyspaces = errors.New("multiple target keyspaces for a single workflow")
ErrWorkflowCompleteNotFullySwitched = errors.New("cannot complete workflow because you have not yet switched all read and write traffic")
ErrWorkflowDeleteWritesSwitched = errors.New("cannot delete workflow because you have already switched write traffic")
)

// Server provides an API to work with Vitess workflows, like vreplication
Expand Down Expand Up @@ -1736,7 +1736,7 @@ func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTa
}

if !state.WritesSwitched || len(state.ReplicaCellsNotSwitched) > 0 || len(state.RdonlyCellsNotSwitched) > 0 {
return nil, ErrWorkflowNotFullySwitched
return nil, ErrWorkflowCompleteNotFullySwitched
}
var renameTable TableRemovalType
if req.RenameTables {
Expand Down Expand Up @@ -2111,10 +2111,12 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe
}

if ts.workflowType != binlogdatapb.VReplicationWorkflowType_CreateLookupIndex {
// Return an error if the workflow traffic is partially switched.
if state.WritesSwitched || len(state.ReplicaCellsSwitched) > 0 || len(state.RdonlyCellsSwitched) > 0 {
return nil, ErrWorkflowPartiallySwitched
// Return an error if the write workflow traffic is switched.
if state.WritesSwitched {
return nil, ErrWorkflowDeleteWritesSwitched
}
// If only reads have been switched, then we can delete the
// workflow and its related artifacts.
}

// Lock the workflow for deletion.
Expand Down Expand Up @@ -2158,22 +2160,21 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe
ts.workflowType)
}
// We need to delete the rows that the target tables would have for the tenant.
// We don't cleanup other related artifacts since they are not tied to the tenant.
if !req.GetKeepData() {
if err := s.deleteTenantData(ctx, ts, req.DeleteBatchSize); err != nil {
return nil, vterrors.Wrapf(err, "failed to fully delete all migrated data for tenant %s, please retry the operation",
ts.options.TenantId)
}
}
} else {
// Cleanup related data and artifacts. There are none for a LookupVindex workflow.
if ts.workflowType != binlogdatapb.VReplicationWorkflowType_CreateLookupIndex {
if _, err := s.dropTargets(ctx, ts, req.GetKeepData(), req.GetKeepRoutingRules(), false); err != nil {
if topo.IsErrType(err, topo.NoNode) {
return nil, vterrors.Wrapf(err, "%s keyspace does not exist", req.GetKeyspace())
}
return nil, err
}

// Cleanup related data and artifacts. There are none for a LookupVindex workflow.
if ts.workflowType != binlogdatapb.VReplicationWorkflowType_CreateLookupIndex {
if _, err := s.dropTargets(ctx, ts, req.GetKeepData(), req.GetKeepRoutingRules(), false); err != nil {
if topo.IsErrType(err, topo.NoNode) {
return nil, vterrors.Wrapf(err, "%s keyspace does not exist", req.GetKeyspace())
}
return nil, err
}
}

Expand Down Expand Up @@ -2697,8 +2698,10 @@ func (s *Server) dropTargets(ctx context.Context, ts *trafficSwitcher, keepData,
if !keepData {
switch ts.MigrationType() {
case binlogdatapb.MigrationType_TABLES:
if err := sw.removeTargetTables(ctx); err != nil {
return nil, err
if !ts.IsMultiTenantMigration() {
if err := sw.removeTargetTables(ctx); err != nil {
return nil, err
}
}
if err := sw.dropSourceDeniedTables(ctx); err != nil {
return nil, err
Expand Down Expand Up @@ -3239,12 +3242,13 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid action for Migrate workflow: SwitchTraffic")
}

if direction == DirectionBackward && ts.IsMultiTenantMigration() {
// In a multi-tenant migration, multiple migrations would be writing to the same
// table, so we can't stop writes like we do with MoveTables, using denied tables,
// since it would block all other migrations as well as traffic for tenants which
// have already been migrated.
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot reverse traffic for multi-tenant migrations")
if ts.IsMultiTenantMigration() {
// Multi-tenant migrations use keyspace routing rules, so we need to update the state
// using them.
err = updateKeyspaceRoutingState(ctx, ts.TopoServer(), ts.sourceKeyspace, ts.targetKeyspace, startState)
if err != nil {
return nil, vterrors.Wrap(err, "failed to update multi-tenant workflow state using keyspace routing rules")
}
}

// We need this to know when there isn't a (non-FROZEN) reverse workflow to use.
Expand All @@ -3255,6 +3259,13 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor
(direction == DirectionBackward && !startState.WritesSwitched)

if direction == DirectionBackward && !onlySwitchingReads {
if ts.IsMultiTenantMigration() {
// In a multi-tenant migration, multiple migrations would be writing to the same
// table, so we can't stop writes like we do with MoveTables, using denied tables,
// since it would block all other migrations as well as traffic for tenants which
// have already been migrated.
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot reverse write traffic for multi-tenant migrations")
}
// This means that the main workflow is FROZEN and the reverse workflow
// exists. So we update the starting state so that we're using the reverse
// workflow and we can move forward with a normal traffic switch forward
Expand Down Expand Up @@ -3336,6 +3347,15 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor
resp.StartState = startState.String()
s.Logger().Infof("Before reloading workflow state after switching traffic: %+v\n", resp.StartState)
_, currentState, err := s.getWorkflowState(ctx, ts.targetKeyspace, ts.workflow)
if ts.IsMultiTenantMigration() {
// Multi-tenant migrations use keyspace routing rules, so we need to update the state
// using them.
sourceKs, targetKs := ts.sourceKeyspace, ts.targetKeyspace
if TrafficSwitchDirection(req.Direction) == DirectionBackward {
sourceKs, targetKs = targetKs, sourceKs
}
err = updateKeyspaceRoutingState(ctx, ts.TopoServer(), sourceKs, targetKs, currentState)
}
if err != nil {
resp.CurrentState = fmt.Sprintf("Error reloading workflow state after switching traffic: %v", err)
} else {
Expand Down Expand Up @@ -3387,11 +3407,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
// shard level traffic switching is all or nothing
trafficSwitchingIsAllOrNothing = true
case ts.MigrationType() == binlogdatapb.MigrationType_TABLES && ts.IsMultiTenantMigration():
if direction == DirectionBackward {
return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
"requesting reversal of read traffic for multi-tenant migrations is not supported"))
}
// For multi-tenant migrations, we only support switching traffic to all cells at once
// For multi-tenant migrations, we only support switching traffic to all cells at once.
allCells, err := ts.TopoServer().GetCellInfoNames(ctx)
if err != nil {
return nil, err
Expand All @@ -3415,7 +3431,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
}
if direction == DirectionBackward && switchRdonly && len(state.RdonlyCellsSwitched) == 0 {
return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION,
"requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched"))
"requesting reversal of read traffic for RDONLYs but RDONLY reads have not been switched"))
}
}

Expand Down
Loading
Loading