From 999f1c1428be38c907f953ccde09d2de4c977a11 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 20 Feb 2024 14:02:20 -0500 Subject: [PATCH] VReplication: Make Target Sequence Initialization More Robust (#15289) Signed-off-by: Matt Lord --- go/sqltypes/testing.go | 2 +- go/vt/vtctl/workflow/server.go | 25 ++++++------ go/vt/vtctl/workflow/traffic_switcher.go | 17 ++++++--- go/vt/wrangler/traffic_switcher.go | 42 ++++++++++++--------- go/vt/wrangler/traffic_switcher_env_test.go | 4 +- go/vt/wrangler/traffic_switcher_test.go | 2 +- 6 files changed, 54 insertions(+), 38 deletions(-) diff --git a/go/sqltypes/testing.go b/go/sqltypes/testing.go index 63589ee9567..649462333c7 100644 --- a/go/sqltypes/testing.go +++ b/go/sqltypes/testing.go @@ -76,7 +76,7 @@ func MakeTestResult(fields []*querypb.Field, rows ...string) *Result { for i, row := range rows { result.Rows[i] = make([]Value, len(fields)) for j, col := range split(row) { - if col == "null" { + if strings.ToLower(col) == "null" { result.Rows[i][j] = NULL continue } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index e298d3f64f3..5e9f3fc9300 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3301,6 +3301,20 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit sw.cancelMigration(ctx, sm) return handleError("failed to create the reverse vreplication streams", err) } + + // Initialize any target sequences, if there are any, before allowing new writes. + if req.InitializeTargetSequences && len(sequenceMetadata) > 0 { + ts.Logger().Infof("Initializing target sequences") + // Writes are blocked so we can safely initialize the sequence tables but + // we also want to use a shorter timeout than the parent context. + // We use at most half of the overall timeout. + initSeqCtx, cancel := context.WithTimeout(ctx, timeout/2) + defer cancel() + if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil { + sw.cancelMigration(ctx, sm) + return handleError(fmt.Sprintf("failed to initialize the sequences used in the %s keyspace", ts.TargetKeyspaceName()), err) + } + } } else { if cancel { return handleError("invalid cancel", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "traffic switching has reached the point of no return, cannot cancel")) @@ -3317,17 +3331,6 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit if err := sw.createJournals(ctx, sourceWorkflows); err != nil { return handleError("failed to create the journal", err) } - // Initialize any target sequences, if there are any, before allowing new writes. - if req.InitializeTargetSequences && len(sequenceMetadata) > 0 { - // Writes are blocked so we can safely initialize the sequence tables but - // we also want to use a shorter timeout than the parent context. - // We use up at most half of the overall timeout. - initSeqCtx, cancel := context.WithTimeout(ctx, timeout/2) - defer cancel() - if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil { - return handleError(fmt.Sprintf("failed to initialize the sequences used in the %s keyspace", ts.TargetKeyspaceName()), err) - } - } if err := sw.allowTargetWrites(ctx); err != nil { return handleError(fmt.Sprintf("failed to allow writes in the %s keyspace", ts.TargetKeyspaceName()), err) } diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 2a593cfb1db..c8551c5ff73 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -38,6 +38,7 @@ import ( "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/vindexes" @@ -1432,13 +1433,17 @@ func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequen MaxRows: 1, }) if terr != nil || len(qr.Rows) != 1 { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the max used sequence value for target table %s.%s in order to initialize the backing sequence table: %v", - ts.targetKeyspace, sequenceMetadata.usingTableName, terr) + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the max used sequence value for target table %s.%s on tablet %s in order to initialize the backing sequence table: %v", + ts.targetKeyspace, sequenceMetadata.usingTableName, topoproto.TabletAliasString(primary.Alias), terr) } - maxID, terr := sqltypes.Proto3ToResult(qr).Rows[0][0].ToInt64() - if terr != nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the max used sequence value for target table %s.%s in order to initialize the backing sequence table: %v", - ts.targetKeyspace, sequenceMetadata.usingTableName, terr) + rawVal := sqltypes.Proto3ToResult(qr).Rows[0][0] + maxID := int64(0) + if !rawVal.IsNull() { // If it's NULL then there are no rows and 0 remains the max + maxID, terr = rawVal.ToInt64() + if terr != nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the max used sequence value for target table %s.%s on tablet %s in order to initialize the backing sequence table: %v", + ts.targetKeyspace, sequenceMetadata.usingTableName, topoproto.TabletAliasString(primary.Alias), terr) + } } srMu.Lock() defer srMu.Unlock() diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 9e7e133cd1f..a6b3587c3a1 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -40,6 +40,7 @@ import ( "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vtctl/workflow" "vitess.io/vitess/go/vt/vterrors" @@ -619,6 +620,20 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowNa sw.cancelMigration(ctx, sm) return handleError("failed to create the reverse vreplication streams", err) } + + // Initialize any target sequences, if there are any, before allowing new writes. + if initializeTargetSequences && len(sequenceMetadata) > 0 { + ts.Logger().Infof("Initializing target sequences") + // Writes are blocked so we can safely initialize the sequence tables but + // we also want to use a shorter timeout than the parent context. + // We use at most half of the overall timeout. + initSeqCtx, cancel := context.WithTimeout(ctx, timeout/2) + defer cancel() + if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil { + sw.cancelMigration(ctx, sm) + return handleError(fmt.Sprintf("failed to initialize the sequences used in the %s keyspace", ts.TargetKeyspaceName()), err) + } + } } else { if cancel { return handleError("invalid cancel", fmt.Errorf("traffic switching has reached the point of no return, cannot cancel")) @@ -635,17 +650,6 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowNa if err := sw.createJournals(ctx, sourceWorkflows); err != nil { return handleError("failed to create the journal", err) } - // Initialize any target sequences, if there are any, before allowing new writes. - if initializeTargetSequences && len(sequenceMetadata) > 0 { - // Writes are blocked so we can safely initialize the sequence tables but - // we also want to use a shorter timeout than the parent context. - // We use up at most half of the overall timeout. - initSeqCtx, cancel := context.WithTimeout(ctx, timeout/2) - defer cancel() - if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil { - return handleError(fmt.Sprintf("failed to initialize the sequences used in the %s keyspace", ts.TargetKeyspaceName()), err) - } - } if err := sw.allowTargetWrites(ctx); err != nil { return handleError(fmt.Sprintf("failed to allow writes in the %s keyspace", ts.TargetKeyspaceName()), err) } @@ -2197,13 +2201,17 @@ func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequen ) qr, terr := ts.wr.ExecuteFetchAsApp(ictx, primary.GetAlias(), true, query.Query, 1) if terr != nil || len(qr.Rows) != 1 { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the max used sequence value for target table %s.%s in order to initialize the backing sequence table: %v", - ts.targetKeyspace, sequenceMetadata.usingTableName, terr) + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the max used sequence value for target table %s.%s on tablet %s in order to initialize the backing sequence table: %v", + ts.targetKeyspace, sequenceMetadata.usingTableName, topoproto.TabletAliasString(primary.Alias), terr) } - maxID, terr := sqltypes.Proto3ToResult(qr).Rows[0][0].ToInt64() - if terr != nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the max used sequence value for target table %s.%s in order to initialize the backing sequence table: %v", - ts.targetKeyspace, sequenceMetadata.usingTableName, terr) + rawVal := sqltypes.Proto3ToResult(qr).Rows[0][0] + maxID := int64(0) + if !rawVal.IsNull() { // If it's NULL then there are no rows and 0 remains the max + maxID, terr = rawVal.ToInt64() + if terr != nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the max used sequence value for target table %s.%s on tablet %s in order to initialize the backing sequence table: %v", + ts.targetKeyspace, sequenceMetadata.usingTableName, topoproto.TabletAliasString(primary.Alias), terr) + } } srMu.Lock() defer srMu.Unlock() diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index 9134bb8917e..3838ded0669 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -258,7 +258,7 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards, "maxval", "int64", ), - "5", + "NULL", ), ) tme.tmeDB.AddQuery(fmt.Sprintf(maxValForSequence, "ks2", "t2"), @@ -274,7 +274,7 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards, // Now tell the fakesqldb used by the global keyspace tablets to expect // the sequence management related queries against the target keyspace. gfdb.AddQuery( - sqlparser.BuildParsedQuery(sqlInitSequenceTable, sqlescape.EscapeID("vt_global"), sqlescape.EscapeID("t1_seq"), 6, 6, 6).Query, + sqlparser.BuildParsedQuery(sqlInitSequenceTable, sqlescape.EscapeID("vt_global"), sqlescape.EscapeID("t1_seq"), 1, 1, 1).Query, &sqltypes.Result{RowsAffected: 0}, ) gfdb.AddQuery( diff --git a/go/vt/wrangler/traffic_switcher_test.go b/go/vt/wrangler/traffic_switcher_test.go index df1eebc013a..e1ae1ce908f 100644 --- a/go/vt/wrangler/traffic_switcher_test.go +++ b/go/vt/wrangler/traffic_switcher_test.go @@ -1011,8 +1011,8 @@ func TestTableMigrateOneToManyDryRun(t *testing.T) { "\tKeyspace ks1, Shard 0 at Position MariaDB/5-456-892", "Wait for VReplication on stopped streams to catchup for up to 1s", "Create reverse replication workflow test_reverse", - "Create journal entries on source databases", "The following sequence backing tables used by tables being moved will be initialized: t1_seq,t2_seq", + "Create journal entries on source databases", "Enable writes on keyspace ks2 tables [t1,t2]", "Switch routing from keyspace ks1 to keyspace ks2", "Routing rules for tables [t1,t2] will be updated",