Skip to content

Commit

Permalink
Revert some parts
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Aug 12, 2024
1 parent 2c4f6cf commit f11099a
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 33 deletions.
14 changes: 6 additions & 8 deletions go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import (
"fmt"
"strconv"
"strings"
"sync/atomic"
"time"

"google.golang.org/protobuf/encoding/prototext"

"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/vterrors"

Expand Down Expand Up @@ -66,7 +66,7 @@ type controller struct {
done chan struct{}

// The following fields are updated after start. So, they need synchronization.
sourceTablet atomic.Value
sourceTablet sync2.AtomicString

lastWorkflowError *lastError
}
Expand All @@ -86,7 +86,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
done: make(chan struct{}),
source: &binlogdatapb.BinlogSource{},
}
ct.sourceTablet.Store(&topodatapb.TabletAlias{})
ct.sourceTablet.Set("")
log.Infof("creating controller with cell: %v, tabletTypes: %v, and params: %v", cell, tabletTypesStr, params)

// id
Expand Down Expand Up @@ -181,7 +181,7 @@ func (ct *controller) run(ctx context.Context) {

func (ct *controller) runBlp(ctx context.Context) (err error) {
defer func() {
ct.sourceTablet.Store(&topodatapb.TabletAlias{})
ct.sourceTablet.Set("")
if x := recover(); x != nil {
log.Errorf("stream %v: caught panic: %v\n%s", ct.id, x, tb.Stack(4))
err = fmt.Errorf("panic: %v", x)
Expand Down Expand Up @@ -299,9 +299,7 @@ func (ct *controller) pickSourceTablet(ctx context.Context, dbClient binlogplaye
}
log.Infof("Trying to find an eligible source tablet for vreplication stream id %d for workflow: %s",
ct.id, ct.workflow)
tpCtx, tpCancel := context.WithTimeout(ctx, discovery.GetTabletPickerRetryDelay()*tabletPickerRetries)
defer tpCancel()
tablet, err := ct.tabletPicker.PickForStreaming(tpCtx)
tablet, err := ct.tabletPicker.PickForStreaming(ctx)
if err != nil {
select {
case <-ctx.Done():
Expand All @@ -314,7 +312,7 @@ func (ct *controller) pickSourceTablet(ctx context.Context, dbClient binlogplaye
ct.setMessage(dbClient, fmt.Sprintf("Picked source tablet: %s", tablet.Alias.String()))
log.Infof("Found eligible source tablet %s for vreplication stream id %d for workflow %s",
tablet.Alias.String(), ct.id, ct.workflow)
ct.sourceTablet.Store(tablet.Alias)
ct.sourceTablet.Set(tablet.Alias.String())
return tablet, err
}

Expand Down
22 changes: 11 additions & 11 deletions go/vt/vttablet/tabletmanager/vreplication/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package vreplication
import (
"fmt"
"sort"
"strings"
"sync"
"time"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/servenv"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

var (
Expand Down Expand Up @@ -140,10 +141,7 @@ func (st *vrStats) register() {
defer st.mu.Unlock()
result := make(map[string]string, len(st.controllers))
for _, ct := range st.controllers {
ta := ct.sourceTablet.Load()
if ta != nil {
result[fmt.Sprintf("%v", ct.id)] = ta.(*topodatapb.TabletAlias).String()
}
result[fmt.Sprintf("%v", ct.id)] = ct.sourceTablet.Get()
}
return result
}))
Expand All @@ -152,10 +150,12 @@ func (st *vrStats) register() {
defer st.mu.Unlock()
result := make(map[string]string, len(st.controllers))
for _, ct := range st.controllers {
ta := ct.sourceTablet.Load()
if ta != nil {
result[fmt.Sprintf("%v", ct.id)] = ta.(*topodatapb.TabletAlias).String()
var messages []string
for _, rec := range ct.blpStats.History.Records() {
hist := rec.(*binlogplayer.StatsHistoryRecord)
messages = append(messages, fmt.Sprintf("%s:%s", hist.Time.Format(time.RFC3339Nano), hist.Message))
}
result[fmt.Sprintf("%v", ct.id)] = strings.Join(messages, "; ")
}
return result
}))
Expand Down Expand Up @@ -394,7 +394,7 @@ func (st *vrStats) status() *EngineStatus {
Counts: ct.blpStats.Timings.Counts(),
Rates: ct.blpStats.Rates.Get(),
State: ct.blpStats.State.Get(),
SourceTablet: ct.sourceTablet.Load().(*topodatapb.TabletAlias),
SourceTablet: ct.sourceTablet.Get(),
Messages: ct.blpStats.MessageHistory(),
QueryCounts: ct.blpStats.QueryCount.Counts(),
PhaseTimings: ct.blpStats.PhaseTimings.Counts(),
Expand Down Expand Up @@ -426,7 +426,7 @@ type ControllerStatus struct {
Counts map[string]int64
Rates map[string][]float64
State string
SourceTablet *topodatapb.TabletAlias
SourceTablet string
Messages []string
QueryCounts map[string]int64
PhaseTimings map[string]int64
Expand Down
17 changes: 3 additions & 14 deletions go/vt/vttablet/tabletmanager/vreplication/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/proto/binlogdata"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

var wantOut = `
Expand Down Expand Up @@ -109,14 +107,8 @@ func TestStatusHtml(t *testing.T) {
done: make(chan struct{}),
},
}
testStats.controllers[1].sourceTablet.Store(&topodatapb.TabletAlias{
Cell: "zone1",
Uid: 01,
})
testStats.controllers[2].sourceTablet.Store(&topodatapb.TabletAlias{
Cell: "zone1",
Uid: 02,
})
testStats.controllers[1].sourceTablet.Set("src1")
testStats.controllers[2].sourceTablet.Set("src2")
close(testStats.controllers[2].done)

tpl := template.Must(template.New("test").Parse(vreplicationTemplate))
Expand All @@ -143,10 +135,7 @@ func TestVReplicationStats(t *testing.T) {
done: make(chan struct{}),
},
}
testStats.controllers[1].sourceTablet.Store(&topodatapb.TabletAlias{
Cell: "zone1",
Uid: 01,
})
testStats.controllers[1].sourceTablet.Set("src1")

sleepTime := 1 * time.Millisecond
record := func(phase string) {
Expand Down

0 comments on commit f11099a

Please sign in to comment.