Skip to content

Commit

Permalink
test(systest): improve systest resilience (#6171)
Browse files Browse the repository at this point in the history
## Motivation

Improve systest resilience by retrying some of the checks on the `Unavailable` error.
  • Loading branch information
acud committed Aug 2, 2024
1 parent 9327750 commit 142bb72
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 49 deletions.
1 change: 0 additions & 1 deletion .github/workflows/systest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ jobs:
go-version-file: "go.mod"

- name: Run tests
timeout-minutes: 60
env:
test_id: systest-${{ steps.vars.outputs.sha_short }}
storage: premium-rwo=10Gi
Expand Down
10 changes: 4 additions & 6 deletions systest/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@ ifeq ($(configname),$(test_job_name))
run_deps = config
endif

command := tests -test.v -test.count=$(count) -test.timeout=0 -test.run=$(test_name) -clusters=$(clusters) \
-level=$(level) -configname=$(configname)
command := tests -test.v -test.count=$(count) -test.timeout=60m -test.run=$(test_name) -test.parallel=$(clusters) \
-test.failfast=$(failfast) -clusters=$(clusters) -level=$(level) -configname=$(configname)


ifeq ($(failfast),true)
command := $(command) -test.failfast
endif

.PHONY: docker
docker:
Expand Down Expand Up @@ -88,7 +86,7 @@ gomplate:
# where /bin/bash is an old bash
.PHONY: run
run: gomplate $(run_deps)
@echo "launching test job with name=$(test_job_name) and testid=$(test_id)"
@echo "launching test job with name=$(test_job_name) and testid=$(test_id), command=$(command)"
@ns=$$(kubectl config view --minify -o jsonpath='{..namespace}' 2>/dev/null); \
export ns="$${ns:-default}"; \
if [ -z "$${norbac}" ]; then \
Expand Down
3 changes: 3 additions & 0 deletions systest/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ func ReuseWait(cctx *testcontext.Context, opts ...Opt) (*Cluster, error) {
if err := cl.WaitAllTimeout(cctx.BootstrapDuration); err != nil {
return nil, err
}
if err = cctx.CheckFail(); err != nil {
return nil, err
}
return cl, nil
}

Expand Down
2 changes: 1 addition & 1 deletion systest/cluster/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (n *NodeClient) ensurePubConn(ctx context.Context) (*grpc.ClientConn, error
if err != nil {
return nil, err
}
if err := n.waitForConnectionReady(context.Background(), conn); err != nil {
if err := n.waitForConnectionReady(ctx, conn); err != nil {
return nil, err
}
n.pubConn = conn
Expand Down
23 changes: 21 additions & 2 deletions systest/testcontext/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package testcontext

import (
"context"
"errors"
"flag"
"fmt"
"math/rand/v2"
Expand All @@ -18,7 +19,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"k8s.io/apimachinery/pkg/api/errors"
k8serr "k8s.io/apimachinery/pkg/api/errors"
apimetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
corev1 "k8s.io/client-go/applyconfigurations/core/v1"
Expand Down Expand Up @@ -53,6 +54,9 @@ var (

tokens chan struct{}
initTokens sync.Once

failed = make(chan struct{})
failOnce sync.Once
)

var (
Expand Down Expand Up @@ -227,7 +231,7 @@ func updateContext(ctx *Context) error {
ns, err := ctx.Client.CoreV1().Namespaces().Get(ctx, ctx.Namespace,
apimetav1.GetOptions{})
if err != nil || ns == nil {
if errors.IsNotFound(err) {
if k8serr.IsNotFound(err) {
return nil
}
return err
Expand Down Expand Up @@ -288,6 +292,12 @@ func New(t *testing.T, opts ...Opt) *Context {
tokens <- struct{}{}
t.Cleanup(func() { <-tokens })
}

t.Cleanup(func() {
if t.Failed() {
failOnce.Do(func() { close(failed) })
}
})
config, err := rest.InClusterConfig()

// The default rate limiter is too slow 5qps and 10 burst, This will prevent the client from being throttled
Expand Down Expand Up @@ -369,3 +379,12 @@ func New(t *testing.T, opts ...Opt) *Context {
cctx.Log.Infow("using", "namespace", cctx.Namespace)
return cctx
}

func (c *Context) CheckFail() error {
select {
case <-failed:
return errors.New("test suite failed. aborting test execution")
default:
}
return nil
}
62 changes: 45 additions & 17 deletions systest/tests/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
attempts = 3
)

var retryBackoff = 10 * time.Second

func sendTransactions(
ctx context.Context,
eg *errgroup.Group,
Expand Down Expand Up @@ -117,24 +119,14 @@ func submitTransaction(ctx context.Context, tx []byte, node *cluster.NodeClient)
return response.Txstate.Id.Id, nil
}

func watchStateHashes(
ctx context.Context,
eg *errgroup.Group,
node *cluster.NodeClient,
logger *zap.Logger,
collector func(*pb.GlobalStateStreamResponse) (bool, error),
) {
eg.Go(func() error {
return stateHashStream(ctx, node, logger, collector)
})
}

func stateHashStream(
ctx context.Context,
node *cluster.NodeClient,
logger *zap.Logger,
collector func(*pb.GlobalStateStreamResponse) (bool, error),
) error {
retries := 0
BACKOFF:
stateapi := pb.NewGlobalStateServiceClient(node.PubConn())
states, err := stateapi.GlobalStateStream(ctx,
&pb.GlobalStateStreamRequest{
Expand All @@ -153,7 +145,12 @@ func stateHashStream(
zap.Any("status", s),
)
if s.Code() == codes.Unavailable {
return nil
if retries == attempts {
return errors.New("state stream unavailable")
}
retries++
time.Sleep(retryBackoff)
goto BACKOFF
}
}
if err != nil {
Expand Down Expand Up @@ -185,6 +182,8 @@ func layersStream(
logger *zap.Logger,
collector layerCollector,
) error {
retries := 0
BACKOFF:
meshapi := pb.NewMeshServiceClient(node.PubConn())
layers, err := meshapi.LayerStream(ctx, &pb.LayerStreamRequest{})
if err != nil {
Expand All @@ -196,7 +195,12 @@ func layersStream(
if ok && s.Code() != codes.OK {
logger.Warn("layers stream error", zap.String("client", node.Name), zap.Error(err), zap.Any("status", s))
if s.Code() == codes.Unavailable {
return nil
if retries == attempts {
return errors.New("layer stream unavailable")
}
retries++
time.Sleep(retryBackoff)
goto BACKOFF
}
}
if err != nil {
Expand All @@ -214,6 +218,9 @@ func malfeasanceStream(
logger *zap.Logger,
collector func(*pb.MalfeasanceStreamResponse) (bool, error),
) error {
retries := 0
BACKOFF:

meshapi := pb.NewMeshServiceClient(node.PubConn())
layers, err := meshapi.MalfeasanceStream(ctx, &pb.MalfeasanceStreamRequest{IncludeProof: true})
if err != nil {
Expand All @@ -229,7 +236,13 @@ func malfeasanceStream(
zap.Any("status", s),
)
if s.Code() == codes.Unavailable {
return nil
if retries == attempts {
return errors.New("layer stream unavailable")
}
retries++
time.Sleep(retryBackoff)
goto BACKOFF

}
}
if err != nil {
Expand Down Expand Up @@ -309,6 +322,9 @@ func watchTransactionResults(ctx context.Context,
collector func(*pb.TransactionResult) (bool, error),
) {
eg.Go(func() error {
retries := 0
BACKOFF:

api := pb.NewTransactionServiceClient(client.PubConn())
rsts, err := api.StreamResults(ctx, &pb.TransactionResultsRequest{Watch: true})
if err != nil {
Expand All @@ -324,7 +340,12 @@ func watchTransactionResults(ctx context.Context,
zap.Any("status", s),
)
if s.Code() == codes.Unavailable {
return nil
if retries == attempts {
return errors.New("transaction results unavailable")
}
retries++
time.Sleep(retryBackoff)
goto BACKOFF
}
}
if err != nil {
Expand All @@ -345,6 +366,8 @@ func watchProposals(
collector func(*pb.Proposal) (bool, error),
) {
eg.Go(func() error {
retries := 0
BACKOFF:
dbg := pb.NewDebugServiceClient(client.PrivConn())
proposals, err := dbg.ProposalsStream(ctx, &emptypb.Empty{})
if err != nil {
Expand All @@ -360,7 +383,12 @@ func watchProposals(
zap.Any("status", s),
)
if s.Code() == codes.Unavailable {
return nil
if retries == attempts {
return errors.New("watch proposals unavailable")
}
retries++
time.Sleep(retryBackoff)
goto BACKOFF
}
}
if err != nil {
Expand Down
48 changes: 26 additions & 22 deletions systest/tests/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,28 +80,32 @@ func testPartition(t *testing.T, tctx *testcontext.Context, cl *cluster.Cluster,
stateCh := make(chan *stateUpdate, uint32(cl.Total())*numLayers*10)
tctx.Log.Debug("listening to state hashes...")
for i := range cl.Total() {
client := cl.Client(i)
watchStateHashes(ctx, eg, client, tctx.Log.Desugar(), func(state *pb.GlobalStateStreamResponse) (bool, error) {
data := state.Datum.Datum
require.IsType(t, &pb.GlobalStateData_GlobalState{}, data)

resp := data.(*pb.GlobalStateData_GlobalState)
layer := resp.GlobalState.Layer.Number
if layer > stop {
return false, nil
}

stateHash := types.BytesToHash(resp.GlobalState.RootHash)
tctx.Log.Debugw("state hash collected",
"client", client.Name,
"layer", layer,
"state", stateHash.ShortString())
stateCh <- &stateUpdate{
layer: layer,
hash: stateHash,
client: client.Name,
}
return true, nil
node := cl.Client(i)

eg.Go(func() error {
return stateHashStream(ctx, node, tctx.Log.Desugar(),
func(state *pb.GlobalStateStreamResponse) (bool, error) {
data := state.Datum.Datum
require.IsType(t, &pb.GlobalStateData_GlobalState{}, data)

resp := data.(*pb.GlobalStateData_GlobalState)
layer := resp.GlobalState.Layer.Number
if layer > stop {
return false, nil
}

stateHash := types.BytesToHash(resp.GlobalState.RootHash)
tctx.Log.Debugw("state hash collected",
"client", node.Name,
"layer", layer,
"state", stateHash.ShortString())
stateCh <- &stateUpdate{
layer: layer,
hash: stateHash,
client: node.Name,
}
return true, nil
})
})
}

Expand Down

0 comments on commit 142bb72

Please sign in to comment.