Skip to content

Commit

Permalink
Finish up new unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Jun 6, 2024
1 parent c80f8a3 commit 4cf58c6
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 38 deletions.
56 changes: 37 additions & 19 deletions go/vt/vtctl/workflow/framework_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2023 The Vitess Authors.
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@ import (
"regexp"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -64,6 +65,11 @@ type queryResult struct {
result *querypb.QueryResult
}

func TestMain(m *testing.M) {
_flag.ParseFlagsForTest()
os.Exit(m.Run())
}

type testEnv struct {
ws *Server
ts *topo.Server
Expand All @@ -74,14 +80,6 @@ type testEnv struct {
cell string
}

//----------------------------------------------
// testEnv

func TestMain(m *testing.M) {
_flag.ParseFlagsForTest()
os.Exit(m.Run())
}

func newTestEnv(t *testing.T, ctx context.Context, cell string, sourceKeyspace, targetKeyspace *testKeyspace) *testEnv {
t.Helper()
env := &testEnv{
Expand Down Expand Up @@ -160,9 +158,6 @@ func (env *testEnv) deleteTablet(tablet *topodatapb.Tablet) {
delete(env.tablets[tablet.Keyspace], int(tablet.Alias.Uid))
}

//----------------------------------------------
// testTMClient

type testTMClient struct {
tmclient.TabletManagerClient
schema map[string]*tabletmanagerdatapb.SchemaDefinition
Expand All @@ -176,6 +171,8 @@ type testTMClient struct {
workflowDeleteCalls int

env *testEnv

reverse atomic.Bool // Are we reversing traffic?
}

func newTestTMClient(env *testEnv) *testTMClient {
Expand Down Expand Up @@ -211,7 +208,7 @@ func (tmc *testTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *t
res := &tabletmanagerdatapb.ReadVReplicationWorkflowResponse{
Workflow: request.Workflow,
WorkflowType: workflowType,
Streams: make([]*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream, len(tmc.env.sourceKeyspace.ShardNames)),
Streams: make([]*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream, 0, 2),
}
rules := make([]*binlogdatapb.Rule, len(tmc.schema))
for i, table := range maps.Keys(tmc.schema) {
Expand All @@ -220,18 +217,23 @@ func (tmc *testTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *t
Filter: fmt.Sprintf("select * from %s", table),
}
}
for i, shard := range tmc.env.sourceKeyspace.ShardNames {
res.Streams[i] = &tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{
blsKs := tmc.env.sourceKeyspace
if tmc.reverse.Load() && tablet.Keyspace == tmc.env.sourceKeyspace.KeyspaceName {
blsKs = tmc.env.targetKeyspace
}
for i, shard := range blsKs.ShardNames {
stream := &tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{
Id: int32(i + 1),
Bls: &binlogdatapb.BinlogSource{
Keyspace: tmc.env.sourceKeyspace.KeyspaceName,
Keyspace: blsKs.KeyspaceName,
Shard: shard,
Tables: maps.Keys(tmc.schema),
Filter: &binlogdatapb.Filter{
Rules: rules,
},
},
}
res.Streams = append(res.Streams, stream)
}

return res, nil
Expand Down Expand Up @@ -308,7 +310,7 @@ func (tmc *testTMClient) verifyQueries(t *testing.T) {
for _, qr := range qrs {
list = append(list, qr.query)
}
t.Errorf("tablet %v: found queries that were expected but never got executed by the test: %v", tabletID, list)
require.Failf(t, "missing query", "tablet %v: found queries that were expected but never got executed by the test: %v", tabletID, list)
}
}
}
Expand Down Expand Up @@ -385,6 +387,10 @@ func (tmc *testTMClient) ReadVReplicationWorkflows(ctx context.Context, tablet *
workflowType = binlogdatapb.VReplicationWorkflowType_CreateLookupIndex
}
}
ks := tmc.env.sourceKeyspace
if tmc.reverse.Load() {
ks = tmc.env.targetKeyspace
}
return &tabletmanagerdatapb.ReadVReplicationWorkflowsResponse{
Workflows: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse{
{
Expand All @@ -395,8 +401,8 @@ func (tmc *testTMClient) ReadVReplicationWorkflows(ctx context.Context, tablet *
Id: 1,
State: binlogdatapb.VReplicationWorkflowState_Running,
Bls: &binlogdatapb.BinlogSource{
Keyspace: tmc.env.sourceKeyspace.KeyspaceName,
Shard: tmc.env.sourceKeyspace.ShardNames[0],
Keyspace: ks.KeyspaceName,
Shard: ks.ShardNames[0],
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
{
Expand Down Expand Up @@ -425,3 +431,15 @@ func (tmc *testTMClient) UpdateVReplicationWorkflow(ctx context.Context, tablet
},
}, nil
}

func (tmc *testTMClient) PrimaryPosition(ctx context.Context, tablet *topodatapb.Tablet) (string, error) {
return position, nil
}

func (tmc *testTMClient) WaitForPosition(ctx context.Context, tablet *topodatapb.Tablet, pos string) error {
return nil
}

func (tmc *testTMClient) VReplicationWaitForPos(ctx context.Context, tablet *topodatapb.Tablet, id int32, pos string) error {
return nil
}
6 changes: 4 additions & 2 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3044,6 +3044,8 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor
if err != nil {
return nil, err
}
log.Errorf("req: %+v", req)
log.Errorf("ts: %+v", ts)

if startState.WorkflowType == TypeMigrate {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid action for Migrate workflow: SwitchTraffic")
Expand Down Expand Up @@ -3115,7 +3117,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor
resp.Summary = fmt.Sprintf("%s dry run results for workflow %s.%s at %v", cmd, req.Keyspace, req.Workflow, time.Now().UTC().Format(time.RFC822))
resp.DryRunResults = dryRunResults
} else {
log.Infof("SwitchTraffic done for workflow %s.%s", req.Keyspace, req.Workflow)
log.Infof("%s done for workflow %s.%s", cmd, req.Keyspace, req.Workflow)
resp.Summary = fmt.Sprintf("%s was successful for workflow %s.%s", cmd, req.Keyspace, req.Workflow)
// Reload the state after the SwitchTraffic operation
// and return that as a string.
Expand All @@ -3133,7 +3135,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor
} else {
resp.CurrentState = currentState.String()
}
log.Infof("SwitchTraffic done for workflow %s.%s, returning response %v", req.Keyspace, req.Workflow, resp)
log.Infof("%s done for workflow %s.%s, returning response %v", cmd, req.Keyspace, req.Workflow, resp)
}
return resp, nil
}
Expand Down
Loading

0 comments on commit 4cf58c6

Please sign in to comment.