diff --git a/go/vt/vtadmin/cluster/resolver/resolver.go b/go/vt/vtadmin/cluster/resolver/resolver.go index 203554c35ac..f2ec75d2bdc 100644 --- a/go/vt/vtadmin/cluster/resolver/resolver.go +++ b/go/vt/vtadmin/cluster/resolver/resolver.go @@ -42,7 +42,7 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/vtadmin/cluster/discovery" "vitess.io/vitess/go/vt/vtadmin/debug" - "vitess.io/vitess/go/vt/vtadmin/internal/backoff" + "vitess.io/vitess/go/vt/vterrors/backoff" ) const logPrefix = "[vtadmin.cluster.resolver]" diff --git a/go/vt/vtadmin/internal/backoff/backoff.go b/go/vt/vterrors/backoff/backoff.go similarity index 100% rename from go/vt/vtadmin/internal/backoff/backoff.go rename to go/vt/vterrors/backoff/backoff.go diff --git a/go/vt/vtadmin/internal/backoff/backoff_test.go b/go/vt/vterrors/backoff/backoff_test.go similarity index 100% rename from go/vt/vtadmin/internal/backoff/backoff_test.go rename to go/vt/vterrors/backoff/backoff_test.go diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index e0d195853cf..9cc6538eec1 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -26,6 +26,7 @@ import ( "time" "golang.org/x/exp/maps" + grpcbackoff "google.golang.org/grpc/backoff" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" @@ -36,6 +37,7 @@ import ( "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vterrors/backoff" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" @@ -46,9 +48,11 @@ import ( // vstreamManager manages vstream requests. type vstreamManager struct { - resolver *srvtopo.Resolver - toposerv srvtopo.Server - cell string + resolver *srvtopo.Resolver + toposerv srvtopo.Server + cell string + maxTimeInError time.Duration + baseRetryDelay time.Duration vstreamsCreated *stats.CountersWithMultiLabels vstreamsLag *stats.GaugesWithMultiLabels @@ -61,6 +65,9 @@ const maxSkewTimeoutSeconds = 10 * 60 // for a vstream const tabletPickerContextTimeout = 90 * time.Second +// Default max time a tablet with the same error should be retried before retrying another. +const defaultMaxTimeInError = 5 * time.Minute + // vstream contains the metadata for one VStream request. type vstream struct { // mu protects parts of vgtid, the semantics of a send, and journaler. @@ -122,6 +129,9 @@ type vstream struct { ts *topo.Server tabletPickerOptions discovery.TabletPickerOptions + + lastError *vterrors.LastError + backoffStrategy backoff.Strategy } type journalEvent struct { @@ -134,9 +144,11 @@ func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell str exporter := servenv.NewExporter(cell, "VStreamManager") return &vstreamManager{ - resolver: resolver, - toposerv: serv, - cell: cell, + resolver: resolver, + toposerv: serv, + cell: cell, + maxTimeInError: defaultMaxTimeInError, + baseRetryDelay: grpcbackoff.DefaultConfig.BaseDelay, vstreamsCreated: exporter.NewCountersWithMultiLabels( "VStreamsCreated", "Number of vstreams created", @@ -183,10 +195,26 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta CellPreference: flags.GetCellPreference(), TabletOrder: flags.GetTabletOrder(), }, + lastError: initLastError(vsm.maxTimeInError), + backoffStrategy: initBackOffStrategy(vsm.baseRetryDelay), } return vs.stream(ctx) } +func initLastError(maxTimeInError time.Duration) *vterrors.LastError { + return vterrors.NewLastError("VStreamManager", maxTimeInError) +} + +func initBackOffStrategy(retryDelay time.Duration) backoff.Strategy { + config := grpcbackoff.Config{ + BaseDelay: retryDelay, + Multiplier: grpcbackoff.DefaultConfig.Multiplier, + Jitter: grpcbackoff.DefaultConfig.Jitter, + MaxDelay: grpcbackoff.DefaultConfig.MaxDelay, + } + return backoff.Get("exponential", config) +} + // resolveParams provides defaults for the inputs if they're not specified. func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags) (*binlogdatapb.VGtid, *binlogdatapb.Filter, *vtgatepb.VStreamFlags, error) { @@ -483,8 +511,9 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // It will be closed when all journal events converge. var journalDone chan struct{} ignoreTablets := make([]*topodatapb.TabletAlias, 0) + var prevErr error - errCount := 0 + backoffIndex := 0 for { select { case <-ctx.Done(): @@ -524,11 +553,18 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha if err != nil { return tabletPickerErr(err) } + if len(tp.GetMatchingTablets(ctx)) == 0 { + tperr := vterrors.Wrapf(prevErr, "zero matching tablets for %s tablet for VStream in %s/%s within the %s cell(s)", + vs.tabletType.String(), sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ",")) + log.Errorf("%v", tperr) + return tperr + } // Create a child context with a stricter timeout when picking a tablet. // This will prevent hanging in the case no tablets are found. tpCtx, tpCancel := context.WithTimeout(ctx, tabletPickerContextTimeout) defer tpCancel() tablet, err := tp.PickForStreaming(tpCtx) + if err != nil { return tabletPickerErr(err) } @@ -585,8 +621,8 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } var vstreamCreatedOnce sync.Once err = tabletConn.VStream(ctx, req, func(events []*binlogdatapb.VEvent) error { - // We received a valid event. Reset error count. - errCount = 0 + // We received a valid event. Reset backoff index. + backoffIndex = 0 labels := []string{sgtid.Keyspace, sgtid.Shard, req.Target.TabletType.String()} @@ -708,49 +744,25 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly") } - retry, ignoreTablet := vs.shouldRetry(err) - if !retry { - log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err) - return err - } - if ignoreTablet { + vs.lastError.Record(err) + prevErr = err + + if vs.lastError.ShouldRetry() { + log.Infof("Retrying tablet, count: %d, alias: %v, hostname: %s", backoffIndex, tablet.GetAlias(), tablet.GetHostname()) + retryDelay := vs.backoffStrategy.Backoff(backoffIndex) + backoffIndex++ + time.Sleep(retryDelay) + } else { + log.Infof("Adding tablet to ignore list, alias: %v, hostname: %s", tablet.GetAlias(), tablet.GetHostname()) ignoreTablets = append(ignoreTablets, tablet.GetAlias()) + vs.lastError = initLastError(vs.vsm.maxTimeInError) + backoffIndex = 0 } - errCount++ - // Retry, at most, 3 times if the error can be retried. - if errCount >= 3 { - log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err) - return err - } log.Infof("vstream for %s/%s error, retrying: %v", sgtid.Keyspace, sgtid.Shard, err) } } -// shouldRetry determines whether we should exit immediately or retry the vstream. -// The first return value determines if the error can be retried, while the second -// indicates whether the tablet with which the error occurred should be omitted -// from the candidate list of tablets to choose from on the retry. -// -// An error should be retried if it is expected to be transient. -// A tablet should be ignored upon retry if it's likely another tablet will not -// produce the same error. -func (vs *vstream) shouldRetry(err error) (bool, bool) { - errCode := vterrors.Code(err) - - if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE { - return true, false - } - - // If there is a GTIDSet Mismatch on the tablet, omit it from the candidate - // list in the TabletPicker on retry. - if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch") { - return true, true - } - - return false, false -} - // sendAll sends a group of events together while holding the lock. func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error { vs.mu.Lock() diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index e51bd2785dd..af8eec9e462 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -390,41 +390,42 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) { func TestVStreamRetriableErrors(t *testing.T) { type testCase struct { - name string - code vtrpcpb.Code - msg string - shouldRetry bool - ignoreTablet bool + name string + code vtrpcpb.Code + msg string + shouldSwitchTablets bool } tcases := []testCase{ { - name: "failed precondition", - code: vtrpcpb.Code_FAILED_PRECONDITION, - msg: "", - shouldRetry: true, - ignoreTablet: false, + name: "failed precondition", + code: vtrpcpb.Code_FAILED_PRECONDITION, + msg: "", + shouldSwitchTablets: false, }, { - name: "gtid mismatch", - code: vtrpcpb.Code_INVALID_ARGUMENT, - msg: "GTIDSet Mismatch aa", - shouldRetry: true, - ignoreTablet: true, + name: "gtid mismatch", + code: vtrpcpb.Code_INVALID_ARGUMENT, + msg: "GTIDSet Mismatch aa", + shouldSwitchTablets: true, }, { - name: "unavailable", - code: vtrpcpb.Code_UNAVAILABLE, - msg: "", - shouldRetry: true, - ignoreTablet: false, + name: "unavailable", + code: vtrpcpb.Code_UNAVAILABLE, + msg: "", + shouldSwitchTablets: false, }, { - name: "should not retry", - code: vtrpcpb.Code_INVALID_ARGUMENT, - msg: "final error", - shouldRetry: false, - ignoreTablet: false, + name: "unavailable", + code: vtrpcpb.Code_UNAVAILABLE, + msg: "a different unavailable error that persists", + shouldSwitchTablets: true, + }, + { + name: "should not retry", + code: vtrpcpb.Code_INVALID_ARGUMENT, + msg: "final error", + shouldSwitchTablets: false, }, } @@ -456,13 +457,23 @@ func TestVStreamRetriableErrors(t *testing.T) { vsm := newTestVStreamManager(ctx, hc, st, cells[0]) - // Always have the local cell tablet error so it's ignored on retry and we pick the other one - // if the error requires ignoring the tablet on retry. - sbc0.AddVStreamEvents(nil, vterrors.Errorf(tcase.code, tcase.msg)) + if tcase.shouldSwitchTablets { + // Retry just once before trying another tablet. + vsm.maxTimeInError = 1 * time.Nanosecond + vsm.baseRetryDelay = 1 * time.Nanosecond + } else { + // Retry at least once on the same tablet. + vsm.maxTimeInError = 1 * time.Second + vsm.baseRetryDelay = 1 * time.Nanosecond + } - if tcase.ignoreTablet { + // Always have the local cell tablet error on its first vstream + sbc0.AddVStreamEvents(nil, vterrors.Errorf(tcase.code, tcase.msg)) + if tcase.shouldSwitchTablets { + // Add desired events to the new tablet we should switch to. sbc1.AddVStreamEvents(commit, nil) } else { + // Add desired events to the original tablet. We must retry on the same tablet to obtain them. sbc0.AddVStreamEvents(commit, nil) } @@ -483,10 +494,6 @@ func TestVStreamRetriableErrors(t *testing.T) { }) wantErr := "context canceled" - if !tcase.shouldRetry { - wantErr = tcase.msg - } - if err == nil || !strings.Contains(err.Error(), wantErr) { t.Errorf("vstream end: %v, must contain %v", err.Error(), wantErr) } @@ -495,20 +502,15 @@ func TestVStreamRetriableErrors(t *testing.T) { Loop: for { - if tcase.shouldRetry { - select { - case event := <-ch: - got := event.CloneVT() - if !proto.Equal(got, want) { - t.Errorf("got different vstream event than expected") - } - cancel() - case <-done: - // The goroutine has completed, so break out of the loop - break Loop + select { + case event := <-ch: + got := event.CloneVT() + if !proto.Equal(got, want) { + t.Errorf("got different vstream event than expected") } - } else { - <-done + cancel() + case <-done: + // The goroutine has completed, so break out of the loop break Loop } } @@ -935,6 +937,8 @@ func TestVStreamJournalPartialMatch(t *testing.T) { hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"}) vsm := newTestVStreamManager(ctx, hc, st, "aa") + vsm.maxTimeInError = 1 * time.Nanosecond + vsm.baseRetryDelay = 1 * time.Nanosecond sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, ctx, st, ks, "-10", sbc1.Tablet()) sbc2 := hc.AddTestTablet("aa", "1.1.1.1", 1003, ks, "10-20", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -1582,6 +1586,8 @@ func TestVStreamManagerHealthCheckResponseHandling(t *testing.T) { hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{shard}) vsm := newTestVStreamManager(ctx, hc, st, cell) + vsm.maxTimeInError = 1 * time.Nanosecond + vsm.baseRetryDelay = 1 * time.Nanosecond vgtid := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ Keyspace: ks,