diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 31f5087977c..73afc77d696 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -20,11 +20,11 @@ import ( "fmt" "strconv" "strings" - "sync/atomic" "time" "google.golang.org/protobuf/encoding/prototext" + "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/vterrors" @@ -66,7 +66,7 @@ type controller struct { done chan struct{} // The following fields are updated after start. So, they need synchronization. - sourceTablet atomic.Value + sourceTablet sync2.AtomicString lastWorkflowError *lastError } @@ -86,7 +86,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor done: make(chan struct{}), source: &binlogdatapb.BinlogSource{}, } - ct.sourceTablet.Store(&topodatapb.TabletAlias{}) + ct.sourceTablet.Set("") log.Infof("creating controller with cell: %v, tabletTypes: %v, and params: %v", cell, tabletTypesStr, params) // id @@ -181,7 +181,7 @@ func (ct *controller) run(ctx context.Context) { func (ct *controller) runBlp(ctx context.Context) (err error) { defer func() { - ct.sourceTablet.Store(&topodatapb.TabletAlias{}) + ct.sourceTablet.Set("") if x := recover(); x != nil { log.Errorf("stream %v: caught panic: %v\n%s", ct.id, x, tb.Stack(4)) err = fmt.Errorf("panic: %v", x) @@ -299,9 +299,7 @@ func (ct *controller) pickSourceTablet(ctx context.Context, dbClient binlogplaye } log.Infof("Trying to find an eligible source tablet for vreplication stream id %d for workflow: %s", ct.id, ct.workflow) - tpCtx, tpCancel := context.WithTimeout(ctx, discovery.GetTabletPickerRetryDelay()*tabletPickerRetries) - defer tpCancel() - tablet, err := ct.tabletPicker.PickForStreaming(tpCtx) + tablet, err := ct.tabletPicker.PickForStreaming(ctx) if err != nil { select { case <-ctx.Done(): @@ -314,7 +312,7 @@ func (ct *controller) pickSourceTablet(ctx context.Context, dbClient binlogplaye ct.setMessage(dbClient, fmt.Sprintf("Picked source tablet: %s", tablet.Alias.String())) log.Infof("Found eligible source tablet %s for vreplication stream id %d for workflow %s", tablet.Alias.String(), ct.id, ct.workflow) - ct.sourceTablet.Store(tablet.Alias) + ct.sourceTablet.Set(tablet.Alias.String()) return tablet, err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats.go b/go/vt/vttablet/tabletmanager/vreplication/stats.go index b159310b09f..09bb3cd2097 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats.go @@ -19,12 +19,13 @@ package vreplication import ( "fmt" "sort" + "strings" "sync" + "time" "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/servenv" - - topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) var ( @@ -140,10 +141,7 @@ func (st *vrStats) register() { defer st.mu.Unlock() result := make(map[string]string, len(st.controllers)) for _, ct := range st.controllers { - ta := ct.sourceTablet.Load() - if ta != nil { - result[fmt.Sprintf("%v", ct.id)] = ta.(*topodatapb.TabletAlias).String() - } + result[fmt.Sprintf("%v", ct.id)] = ct.sourceTablet.Get() } return result })) @@ -152,10 +150,12 @@ func (st *vrStats) register() { defer st.mu.Unlock() result := make(map[string]string, len(st.controllers)) for _, ct := range st.controllers { - ta := ct.sourceTablet.Load() - if ta != nil { - result[fmt.Sprintf("%v", ct.id)] = ta.(*topodatapb.TabletAlias).String() + var messages []string + for _, rec := range ct.blpStats.History.Records() { + hist := rec.(*binlogplayer.StatsHistoryRecord) + messages = append(messages, fmt.Sprintf("%s:%s", hist.Time.Format(time.RFC3339Nano), hist.Message)) } + result[fmt.Sprintf("%v", ct.id)] = strings.Join(messages, "; ") } return result })) @@ -394,7 +394,7 @@ func (st *vrStats) status() *EngineStatus { Counts: ct.blpStats.Timings.Counts(), Rates: ct.blpStats.Rates.Get(), State: ct.blpStats.State.Get(), - SourceTablet: ct.sourceTablet.Load().(*topodatapb.TabletAlias), + SourceTablet: ct.sourceTablet.Get(), Messages: ct.blpStats.MessageHistory(), QueryCounts: ct.blpStats.QueryCount.Counts(), PhaseTimings: ct.blpStats.PhaseTimings.Counts(), @@ -426,7 +426,7 @@ type ControllerStatus struct { Counts map[string]int64 Rates map[string][]float64 State string - SourceTablet *topodatapb.TabletAlias + SourceTablet string Messages []string QueryCounts map[string]int64 PhaseTimings map[string]int64 diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go index b63583e57ee..2accc3cfa24 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go @@ -28,8 +28,6 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/proto/binlogdata" - - topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) var wantOut = ` @@ -109,14 +107,8 @@ func TestStatusHtml(t *testing.T) { done: make(chan struct{}), }, } - testStats.controllers[1].sourceTablet.Store(&topodatapb.TabletAlias{ - Cell: "zone1", - Uid: 01, - }) - testStats.controllers[2].sourceTablet.Store(&topodatapb.TabletAlias{ - Cell: "zone1", - Uid: 02, - }) + testStats.controllers[1].sourceTablet.Set("src1") + testStats.controllers[2].sourceTablet.Set("src2") close(testStats.controllers[2].done) tpl := template.Must(template.New("test").Parse(vreplicationTemplate)) @@ -143,10 +135,7 @@ func TestVReplicationStats(t *testing.T) { done: make(chan struct{}), }, } - testStats.controllers[1].sourceTablet.Store(&topodatapb.TabletAlias{ - Cell: "zone1", - Uid: 01, - }) + testStats.controllers[1].sourceTablet.Set("src1") sleepTime := 1 * time.Millisecond record := func(phase string) {