Skip to content

Commit

Permalink
slack-15.0: backport vitessio#15772 (#384)
Browse files Browse the repository at this point in the history
* [release-17.0] VReplication: Take replication lag into account in VStreamManager healthcheck result processing (vitessio#15761) (vitessio#15772)

Signed-off-by: Matt Lord <mattalord@gmail.com>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Matt Lord <mattalord@gmail.com>

* fix signature

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

---------

Signed-off-by: Matt Lord <mattalord@gmail.com>
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
3 people authored May 30, 2024
1 parent b2e4e0c commit c801fd2
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 23 deletions.
23 changes: 14 additions & 9 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
Expand Down Expand Up @@ -538,18 +539,23 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
go func() {
_ = tabletConn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error {
var err error
if ctx.Err() != nil {
switch {
case ctx.Err() != nil:
err = fmt.Errorf("context has ended")
} else if shr == nil || shr.RealtimeStats == nil || shr.Target == nil {
err = fmt.Errorf("health check failed")
} else if vs.tabletType != shr.Target.TabletType {
err = fmt.Errorf("tablet type has changed from %s to %s, restarting vstream",
vs.tabletType, shr.Target.TabletType)
} else if shr.RealtimeStats.HealthError != "" {
case shr == nil || shr.RealtimeStats == nil || shr.Target == nil:
err = fmt.Errorf("health check failed on %s", topoproto.TabletAliasString(tablet.Alias))
case vs.tabletType != shr.Target.TabletType:
err = fmt.Errorf("tablet %s type has changed from %s to %s, restarting vstream",
topoproto.TabletAliasString(tablet.Alias), vs.tabletType, shr.Target.TabletType)
case shr.RealtimeStats.HealthError != "":
err = fmt.Errorf("tablet %s is no longer healthy: %s, restarting vstream",
tablet.Alias, shr.RealtimeStats.HealthError)
topoproto.TabletAliasString(tablet.Alias), shr.RealtimeStats.HealthError)
case shr.RealtimeStats.ReplicationLagSeconds > uint32(discovery.GetLowReplicationLag().Seconds()):
err = fmt.Errorf("tablet %s has a replication lag of %d seconds which is beyond the value provided in --discovery_low_replication_lag of %s so the tablet is no longer considered healthy, restarting vstream",
topoproto.TabletAliasString(tablet.Alias), shr.RealtimeStats.ReplicationLagSeconds, discovery.GetLowReplicationLag())
}
if err != nil {
log.Warningf("Tablet state changed: %s, attempting to restart", err)
errCh <- err
return err
}
Expand Down Expand Up @@ -586,7 +592,6 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
case <-ctx.Done():
return ctx.Err()
case streamErr := <-errCh:
log.Warningf("Tablet state changed: %s, attempting to restart", streamErr)
return vterrors.New(vtrpcpb.Code_UNAVAILABLE, streamErr.Error())
case <-journalDone:
// Unreachable.
Expand Down
143 changes: 131 additions & 12 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,27 @@ import (
"testing"
"time"

"vitess.io/vitess/go/sync2"

"vitess.io/vitess/go/vt/topo"

vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/vttablet/sandboxconn"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/sandboxconn"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/srvtopo"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

var mu sync.Mutex
Expand Down Expand Up @@ -1333,6 +1334,124 @@ func TestVstreamCopy(t *testing.T) {
require.Equal(t, "target: TestVStreamCopy.-20.primary: final error", err.Error())
}

// TestVStreamManagerHealthCheckResponseHandling tests the handling of healthcheck responses by
// the vstream manager to confirm that we are correctly restarting the vstream when we should.
func TestVStreamManagerHealthCheckResponseHandling(t *testing.T) {
// Capture the vstream warning log. Otherwise we need to re-implement the vstream error
// handling in SandboxConn's implementation and then we're not actually testing the
// production code.
logger := logutil.NewMemoryLogger()
log.Warningf = logger.Warningf

cell := "aa"
ks := "TestVStream"
shard := "0"
tabletType := topodatapb.TabletType_REPLICA
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{shard})
vsm := newTestVStreamManager(hc, st, cell, true)
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: shard,
}},
}
source := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, shard, tabletType, true, 0, nil)
tabletAlias := topoproto.TabletAliasString(source.Tablet().Alias)
addTabletToSandboxTopo(t, st, ks, shard, source.Tablet())
target := &querypb.Target{
Cell: cell,
Keyspace: ks,
Shard: shard,
TabletType: tabletType,
}
highLag := uint32(discovery.GetLowReplicationLag().Seconds()) + 1

type testcase struct {
name string
hcRes *querypb.StreamHealthResponse
wantErr string
}
testcases := []testcase{
{
name: "all healthy", // Will hit the context timeout
},
{
name: "failure",
hcRes: &querypb.StreamHealthResponse{
TabletAlias: source.Tablet().Alias,
Target: nil, // This is seen as a healthcheck stream failure
},
wantErr: fmt.Sprintf("health check failed on %s", tabletAlias),
},
{
name: "tablet type changed",
hcRes: &querypb.StreamHealthResponse{
TabletAlias: source.Tablet().Alias,
Target: &querypb.Target{
Cell: cell,
Keyspace: ks,
Shard: shard,
TabletType: topodatapb.TabletType_PRIMARY,
},
RealtimeStats: &querypb.RealtimeStats{},
},
wantErr: fmt.Sprintf("tablet %s type has changed from %s to %s",
tabletAlias, tabletType, topodatapb.TabletType_PRIMARY.String()),
},
{
name: "unhealthy",
hcRes: &querypb.StreamHealthResponse{
TabletAlias: source.Tablet().Alias,
Target: target,
RealtimeStats: &querypb.RealtimeStats{
HealthError: "unhealthy",
},
},
wantErr: fmt.Sprintf("tablet %s is no longer healthy", tabletAlias),
},
{
name: "replication lag too high",
hcRes: &querypb.StreamHealthResponse{
TabletAlias: source.Tablet().Alias,
Target: target,
RealtimeStats: &querypb.RealtimeStats{
ReplicationLagSeconds: highLag,
},
},
wantErr: fmt.Sprintf("%s has a replication lag of %d seconds which is beyond the value provided",
tabletAlias, highLag),
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
done := make(chan struct{})
go func() {
sctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
defer close(done)
// SandboxConn's VStream implementation always waits for the context to timeout.
err := vsm.VStream(sctx, tabletType, vgtid, nil, nil, func(events []*binlogdatapb.VEvent) error {
require.Fail(t, "unexpected event", "Received unexpected events: %v", events)
return nil
})
if tc.wantErr != "" { // Otherwise we simply expect the context to timeout
if !strings.Contains(logger.String(), tc.wantErr) {
require.Fail(t, "unexpected vstream error", "vstream ended with error: %v, which did not contain: %s", err, tc.wantErr)
}
}
}()
if tc.wantErr != "" {
source.SetStreamHealthResponse(tc.hcRes)
}
<-done
logger.Clear()
})
}
}

func newTestVStreamManager(hc discovery.HealthCheck, serv srvtopo.Server, cell string, allowVstreamCopy bool) *vstreamManager {
gw := NewTabletGateway(context.Background(), hc, serv, cell)
srvResolver := srvtopo.NewResolver(serv, gw, cell)
Expand Down
19 changes: 17 additions & 2 deletions go/vt/vttablet/sandboxconn/sandboxconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ type SandboxConn struct {
EphemeralShardErr error

NotServing bool

streamHealthResponse *querypb.StreamHealthResponse
}

var _ queryservice.QueryService = (*SandboxConn)(nil) // compile-time interface check
Expand Down Expand Up @@ -406,9 +408,22 @@ func (sbc *SandboxConn) MessageAck(ctx context.Context, target *querypb.Target,
// SandboxSQRowCount is the default number of fake splits returned.
var SandboxSQRowCount = int64(10)

// StreamHealth is not implemented.
// SetStreamHealthResponse sets the StreamHealthResponse to be returned in StreamHealth.
func (sbc *SandboxConn) SetStreamHealthResponse(res *querypb.StreamHealthResponse) {
sbc.mapMu.Lock()
defer sbc.mapMu.Unlock()
sbc.streamHealthResponse = res
}

// StreamHealth always mocks a "healthy" result by default. If you want to override this behavior you
// can call SetStreamHealthResponse.
func (sbc *SandboxConn) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error {
return fmt.Errorf("not implemented in test")
sbc.mapMu.Lock()
defer sbc.mapMu.Unlock()
if sbc.streamHealthResponse != nil {
return callback(sbc.streamHealthResponse)
}
return nil
}

// ExpectVStreamStartPos makes the conn verify that that the next vstream request has the right startPos.
Expand Down

0 comments on commit c801fd2

Please sign in to comment.