Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry VStream errors with exponential backoff and switch tablets for repeated errors #16536

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/vt/vtadmin/cluster/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vtadmin/cluster/discovery"
"vitess.io/vitess/go/vt/vtadmin/debug"
"vitess.io/vitess/go/vt/vtadmin/internal/backoff"
"vitess.io/vitess/go/vt/vterrors/backoff"
)

const logPrefix = "[vtadmin.cluster.resolver]"
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems nice. I had never even noticed it. I feel like it should probably be in go/vt/grpcbackoff instead though (with grpcbackoff as the package name too) as isn't really error related and it is grpc related — really wrapping/extending google.golang.org/grpc/backoff.

File renamed without changes.
102 changes: 57 additions & 45 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"golang.org/x/exp/maps"
grpcbackoff "google.golang.org/grpc/backoff"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
Expand All @@ -36,6 +37,7 @@ import (
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vterrors/backoff"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
Expand All @@ -46,9 +48,11 @@ import (

// vstreamManager manages vstream requests.
type vstreamManager struct {
resolver *srvtopo.Resolver
toposerv srvtopo.Server
cell string
resolver *srvtopo.Resolver
toposerv srvtopo.Server
cell string
maxTimeInError time.Duration
baseRetryDelay time.Duration

vstreamsCreated *stats.CountersWithMultiLabels
vstreamsLag *stats.GaugesWithMultiLabels
Expand All @@ -61,6 +65,9 @@ const maxSkewTimeoutSeconds = 10 * 60
// for a vstream
const tabletPickerContextTimeout = 90 * time.Second

// Default max time a tablet with the same error should be retried before retrying another.
const defaultMaxTimeInError = 5 * time.Minute

// vstream contains the metadata for one VStream request.
type vstream struct {
// mu protects parts of vgtid, the semantics of a send, and journaler.
Expand Down Expand Up @@ -122,6 +129,9 @@ type vstream struct {
ts *topo.Server

tabletPickerOptions discovery.TabletPickerOptions

lastError *vterrors.LastError
backoffStrategy backoff.Strategy
}

type journalEvent struct {
Expand All @@ -134,9 +144,11 @@ func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell str
exporter := servenv.NewExporter(cell, "VStreamManager")

return &vstreamManager{
resolver: resolver,
toposerv: serv,
cell: cell,
resolver: resolver,
toposerv: serv,
cell: cell,
maxTimeInError: defaultMaxTimeInError,
baseRetryDelay: grpcbackoff.DefaultConfig.BaseDelay,
vstreamsCreated: exporter.NewCountersWithMultiLabels(
"VStreamsCreated",
"Number of vstreams created",
Expand Down Expand Up @@ -183,10 +195,26 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta
CellPreference: flags.GetCellPreference(),
TabletOrder: flags.GetTabletOrder(),
},
lastError: initLastError(vsm.maxTimeInError),
backoffStrategy: initBackOffStrategy(vsm.baseRetryDelay),
}
return vs.stream(ctx)
}

func initLastError(maxTimeInError time.Duration) *vterrors.LastError {
return vterrors.NewLastError("VStreamManager", maxTimeInError)
}

func initBackOffStrategy(retryDelay time.Duration) backoff.Strategy {
config := grpcbackoff.Config{
BaseDelay: retryDelay,
Multiplier: grpcbackoff.DefaultConfig.Multiplier,
Jitter: grpcbackoff.DefaultConfig.Jitter,
MaxDelay: grpcbackoff.DefaultConfig.MaxDelay,
}
return backoff.Get("exponential", config)
}

// resolveParams provides defaults for the inputs if they're not specified.
func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid,
filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags) (*binlogdatapb.VGtid, *binlogdatapb.Filter, *vtgatepb.VStreamFlags, error) {
Expand Down Expand Up @@ -483,8 +511,9 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// It will be closed when all journal events converge.
var journalDone chan struct{}
ignoreTablets := make([]*topodatapb.TabletAlias, 0)
var prevErr error

errCount := 0
backoffIndex := 0
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -524,11 +553,18 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
if err != nil {
return tabletPickerErr(err)
}
if len(tp.GetMatchingTablets(ctx)) == 0 {
tperr := vterrors.Wrapf(prevErr, "zero matching tablets for %s tablet for VStream in %s/%s within the %s cell(s)",
vs.tabletType.String(), sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ","))
log.Errorf("%v", tperr)
return tperr
}
// Create a child context with a stricter timeout when picking a tablet.
// This will prevent hanging in the case no tablets are found.
tpCtx, tpCancel := context.WithTimeout(ctx, tabletPickerContextTimeout)
defer tpCancel()
tablet, err := tp.PickForStreaming(tpCtx)

if err != nil {
return tabletPickerErr(err)
}
Expand Down Expand Up @@ -585,8 +621,8 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}
var vstreamCreatedOnce sync.Once
err = tabletConn.VStream(ctx, req, func(events []*binlogdatapb.VEvent) error {
// We received a valid event. Reset error count.
errCount = 0
// We received a valid event. Reset backoff index.
backoffIndex = 0

labels := []string{sgtid.Keyspace, sgtid.Shard, req.Target.TabletType.String()}

Expand Down Expand Up @@ -708,49 +744,25 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly")
}

retry, ignoreTablet := vs.shouldRetry(err)
if !retry {
log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err)
return err
}
if ignoreTablet {
vs.lastError.Record(err)
prevErr = err

if vs.lastError.ShouldRetry() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be done in the vstream.shouldRetry function so that the last error consideration is taking into account along in addition to the other considerations. I think this would likely also address the test failures.

log.Infof("Retrying tablet, count: %d, alias: %v, hostname: %s", backoffIndex, tablet.GetAlias(), tablet.GetHostname())
retryDelay := vs.backoffStrategy.Backoff(backoffIndex)
backoffIndex++
time.Sleep(retryDelay)
} else {
log.Infof("Adding tablet to ignore list, alias: %v, hostname: %s", tablet.GetAlias(), tablet.GetHostname())
ignoreTablets = append(ignoreTablets, tablet.GetAlias())
vs.lastError = initLastError(vs.vsm.maxTimeInError)
backoffIndex = 0
}

errCount++
// Retry, at most, 3 times if the error can be retried.
if errCount >= 3 {
log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err)
return err
}
log.Infof("vstream for %s/%s error, retrying: %v", sgtid.Keyspace, sgtid.Shard, err)
}
}

// shouldRetry determines whether we should exit immediately or retry the vstream.
// The first return value determines if the error can be retried, while the second
// indicates whether the tablet with which the error occurred should be omitted
// from the candidate list of tablets to choose from on the retry.
//
// An error should be retried if it is expected to be transient.
// A tablet should be ignored upon retry if it's likely another tablet will not
// produce the same error.
func (vs *vstream) shouldRetry(err error) (bool, bool) {
errCode := vterrors.Code(err)

if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE {
return true, false
}

// If there is a GTIDSet Mismatch on the tablet, omit it from the candidate
// list in the TabletPicker on retry.
if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch") {
return true, true
}

return false, false
}

// sendAll sends a group of events together while holding the lock.
func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error {
vs.mu.Lock()
Expand Down
98 changes: 52 additions & 46 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,41 +390,42 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) {

func TestVStreamRetriableErrors(t *testing.T) {
type testCase struct {
name string
code vtrpcpb.Code
msg string
shouldRetry bool
ignoreTablet bool
name string
code vtrpcpb.Code
msg string
shouldSwitchTablets bool
}

tcases := []testCase{
{
name: "failed precondition",
code: vtrpcpb.Code_FAILED_PRECONDITION,
msg: "",
shouldRetry: true,
ignoreTablet: false,
name: "failed precondition",
code: vtrpcpb.Code_FAILED_PRECONDITION,
msg: "",
shouldSwitchTablets: false,
},
{
name: "gtid mismatch",
code: vtrpcpb.Code_INVALID_ARGUMENT,
msg: "GTIDSet Mismatch aa",
shouldRetry: true,
ignoreTablet: true,
name: "gtid mismatch",
code: vtrpcpb.Code_INVALID_ARGUMENT,
msg: "GTIDSet Mismatch aa",
shouldSwitchTablets: true,
},
{
name: "unavailable",
code: vtrpcpb.Code_UNAVAILABLE,
msg: "",
shouldRetry: true,
ignoreTablet: false,
name: "unavailable",
code: vtrpcpb.Code_UNAVAILABLE,
msg: "",
shouldSwitchTablets: false,
},
{
name: "should not retry",
code: vtrpcpb.Code_INVALID_ARGUMENT,
msg: "final error",
shouldRetry: false,
ignoreTablet: false,
name: "unavailable",
code: vtrpcpb.Code_UNAVAILABLE,
msg: "a different unavailable error that persists",
shouldSwitchTablets: true,
},
{
name: "should not retry",
code: vtrpcpb.Code_INVALID_ARGUMENT,
msg: "final error",
shouldSwitchTablets: false,
},
}

Expand Down Expand Up @@ -456,13 +457,23 @@ func TestVStreamRetriableErrors(t *testing.T) {

vsm := newTestVStreamManager(ctx, hc, st, cells[0])

// Always have the local cell tablet error so it's ignored on retry and we pick the other one
// if the error requires ignoring the tablet on retry.
sbc0.AddVStreamEvents(nil, vterrors.Errorf(tcase.code, tcase.msg))
if tcase.shouldSwitchTablets {
// Retry just once before trying another tablet.
vsm.maxTimeInError = 1 * time.Nanosecond
vsm.baseRetryDelay = 1 * time.Nanosecond
} else {
// Retry at least once on the same tablet.
vsm.maxTimeInError = 1 * time.Second
vsm.baseRetryDelay = 1 * time.Nanosecond
}

if tcase.ignoreTablet {
// Always have the local cell tablet error on its first vstream
sbc0.AddVStreamEvents(nil, vterrors.Errorf(tcase.code, tcase.msg))
if tcase.shouldSwitchTablets {
// Add desired events to the new tablet we should switch to.
sbc1.AddVStreamEvents(commit, nil)
} else {
// Add desired events to the original tablet. We must retry on the same tablet to obtain them.
sbc0.AddVStreamEvents(commit, nil)
}

Expand All @@ -483,10 +494,6 @@ func TestVStreamRetriableErrors(t *testing.T) {
})
wantErr := "context canceled"

if !tcase.shouldRetry {
wantErr = tcase.msg
}

if err == nil || !strings.Contains(err.Error(), wantErr) {
t.Errorf("vstream end: %v, must contain %v", err.Error(), wantErr)
}
Expand All @@ -495,20 +502,15 @@ func TestVStreamRetriableErrors(t *testing.T) {

Loop:
for {
if tcase.shouldRetry {
select {
case event := <-ch:
got := event.CloneVT()
if !proto.Equal(got, want) {
t.Errorf("got different vstream event than expected")
}
cancel()
case <-done:
// The goroutine has completed, so break out of the loop
break Loop
select {
case event := <-ch:
got := event.CloneVT()
if !proto.Equal(got, want) {
t.Errorf("got different vstream event than expected")
}
} else {
<-done
cancel()
case <-done:
// The goroutine has completed, so break out of the loop
break Loop
}
}
Expand Down Expand Up @@ -935,6 +937,8 @@ func TestVStreamJournalPartialMatch(t *testing.T) {
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"})
vsm := newTestVStreamManager(ctx, hc, st, "aa")
vsm.maxTimeInError = 1 * time.Nanosecond
vsm.baseRetryDelay = 1 * time.Nanosecond
sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, ctx, st, ks, "-10", sbc1.Tablet())
sbc2 := hc.AddTestTablet("aa", "1.1.1.1", 1003, ks, "10-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
Expand Down Expand Up @@ -1582,6 +1586,8 @@ func TestVStreamManagerHealthCheckResponseHandling(t *testing.T) {
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{shard})
vsm := newTestVStreamManager(ctx, hc, st, cell)
vsm.maxTimeInError = 1 * time.Nanosecond
vsm.baseRetryDelay = 1 * time.Nanosecond
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Expand Down
Loading