Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - test(systest): improve systest resilience #6171

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/systest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ jobs:
go-version-file: "go.mod"

- name: Run tests
timeout-minutes: 60
env:
test_id: systest-${{ steps.vars.outputs.sha_short }}
storage: premium-rwo=10Gi
Expand Down
8 changes: 5 additions & 3 deletions systest/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ ifeq ($(configname),$(test_job_name))
run_deps = config
endif

command := tests -test.v -test.count=$(count) -test.timeout=0 -test.run=$(test_name) -clusters=$(clusters) \
-level=$(level) -configname=$(configname)
command := tests -test.v -test.count=$(count) -test.timeout=60m -test.run=$(test_name) -test.parallel=$(clusters) \
-clusters=$(clusters) -level=$(level) -configname=$(configname)

ifeq ($(failfast),true)
command := $(command) -test.failfast
endif
acud marked this conversation as resolved.
Show resolved Hide resolved



.PHONY: docker
docker:
@DOCKER_BUILDKIT=1 docker build \
Expand Down Expand Up @@ -88,7 +90,7 @@ gomplate:
# where /bin/bash is an old bash
.PHONY: run
run: gomplate $(run_deps)
@echo "launching test job with name=$(test_job_name) and testid=$(test_id)"
@echo "launching test job with name=$(test_job_name) and testid=$(test_id), command=$(command)"
@ns=$$(kubectl config view --minify -o jsonpath='{..namespace}' 2>/dev/null); \
export ns="$${ns:-default}"; \
if [ -z "$${norbac}" ]; then \
Expand Down
3 changes: 3 additions & 0 deletions systest/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ func ReuseWait(cctx *testcontext.Context, opts ...Opt) (*Cluster, error) {
if err := cl.WaitAllTimeout(cctx.BootstrapDuration); err != nil {
return nil, err
}
if err = cctx.CheckFail(); err != nil {
return nil, err
}
fasmat marked this conversation as resolved.
Show resolved Hide resolved
return cl, nil
}

Expand Down
2 changes: 1 addition & 1 deletion systest/cluster/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (n *NodeClient) ensurePubConn(ctx context.Context) (*grpc.ClientConn, error
if err != nil {
return nil, err
}
if err := n.waitForConnectionReady(context.Background(), conn); err != nil {
if err := n.waitForConnectionReady(ctx, conn); err != nil {
return nil, err
}
n.pubConn = conn
Expand Down
19 changes: 19 additions & 0 deletions systest/testcontext/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package testcontext

import (
"context"
goerr "errors"
fasmat marked this conversation as resolved.
Show resolved Hide resolved
"flag"
"fmt"
"math/rand/v2"
Expand Down Expand Up @@ -53,6 +54,9 @@ var (

tokens chan struct{}
initTokens sync.Once

failed = make(chan struct{})
failOnce sync.Once
)

var (
Expand Down Expand Up @@ -288,6 +292,12 @@ func New(t *testing.T, opts ...Opt) *Context {
tokens <- struct{}{}
t.Cleanup(func() { <-tokens })
}

t.Cleanup(func() {
if t.Failed() {
failOnce.Do(func() { close(failed) })
}
})
config, err := rest.InClusterConfig()

// The default rate limiter is too slow 5qps and 10 burst, This will prevent the client from being throttled
Expand Down Expand Up @@ -365,3 +375,12 @@ func New(t *testing.T, opts ...Opt) *Context {
cctx.Log.Infow("using", "namespace", cctx.Namespace)
return cctx
}

func (c *Context) CheckFail() error {
select {
case <-failed:
return goerr.New("test suite failed. aborting test execution")
default:
}
return nil
}
62 changes: 45 additions & 17 deletions systest/tests/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
attempts = 3
)

var retryBackoff = 10 * time.Second

func sendTransactions(
ctx context.Context,
eg *errgroup.Group,
Expand Down Expand Up @@ -117,24 +119,14 @@ func submitTransaction(ctx context.Context, tx []byte, node *cluster.NodeClient)
return response.Txstate.Id.Id, nil
}

func watchStateHashes(
ctx context.Context,
eg *errgroup.Group,
node *cluster.NodeClient,
logger *zap.Logger,
collector func(*pb.GlobalStateStreamResponse) (bool, error),
) {
eg.Go(func() error {
return stateHashStream(ctx, node, logger, collector)
})
}

func stateHashStream(
ctx context.Context,
node *cluster.NodeClient,
logger *zap.Logger,
collector func(*pb.GlobalStateStreamResponse) (bool, error),
) error {
retries := 0
BACKOFF:
stateapi := pb.NewGlobalStateServiceClient(node.PubConn())
states, err := stateapi.GlobalStateStream(ctx,
&pb.GlobalStateStreamRequest{
Expand All @@ -153,7 +145,12 @@ func stateHashStream(
zap.Any("status", s),
)
if s.Code() == codes.Unavailable {
return nil
if retries == attempts {
return errors.New("state stream unavailable")
}
retries++
time.Sleep(retryBackoff)
goto BACKOFF
}
}
if err != nil {
Expand Down Expand Up @@ -185,6 +182,8 @@ func layersStream(
logger *zap.Logger,
collector layerCollector,
) error {
retries := 0
BACKOFF:
meshapi := pb.NewMeshServiceClient(node.PubConn())
layers, err := meshapi.LayerStream(ctx, &pb.LayerStreamRequest{})
if err != nil {
Expand All @@ -196,7 +195,12 @@ func layersStream(
if ok && s.Code() != codes.OK {
logger.Warn("layers stream error", zap.String("client", node.Name), zap.Error(err), zap.Any("status", s))
if s.Code() == codes.Unavailable {
return nil
if retries == attempts {
return errors.New("layer stream unavailable")
}
retries++
time.Sleep(retryBackoff)
goto BACKOFF
fasmat marked this conversation as resolved.
Show resolved Hide resolved
}
}
if err != nil {
Expand All @@ -214,6 +218,9 @@ func malfeasanceStream(
logger *zap.Logger,
collector func(*pb.MalfeasanceStreamResponse) (bool, error),
) error {
retries := 0
BACKOFF:

meshapi := pb.NewMeshServiceClient(node.PubConn())
layers, err := meshapi.MalfeasanceStream(ctx, &pb.MalfeasanceStreamRequest{IncludeProof: true})
if err != nil {
Expand All @@ -229,7 +236,13 @@ func malfeasanceStream(
zap.Any("status", s),
)
if s.Code() == codes.Unavailable {
return nil
if retries == attempts {
return errors.New("layer stream unavailable")
}
retries++
time.Sleep(retryBackoff)
goto BACKOFF

fasmat marked this conversation as resolved.
Show resolved Hide resolved
}
}
if err != nil {
Expand Down Expand Up @@ -309,6 +322,9 @@ func watchTransactionResults(ctx context.Context,
collector func(*pb.TransactionResult) (bool, error),
) {
eg.Go(func() error {
retries := 0
BACKOFF:

api := pb.NewTransactionServiceClient(client.PubConn())
rsts, err := api.StreamResults(ctx, &pb.TransactionResultsRequest{Watch: true})
if err != nil {
Expand All @@ -324,7 +340,12 @@ func watchTransactionResults(ctx context.Context,
zap.Any("status", s),
)
if s.Code() == codes.Unavailable {
return nil
if retries == attempts {
return errors.New("transaction results unavailable")
}
retries++
time.Sleep(retryBackoff)
goto BACKOFF
fasmat marked this conversation as resolved.
Show resolved Hide resolved
}
}
if err != nil {
Expand All @@ -345,6 +366,8 @@ func watchProposals(
collector func(*pb.Proposal) (bool, error),
) {
eg.Go(func() error {
retries := 0
BACKOFF:
dbg := pb.NewDebugServiceClient(client.PrivConn())
proposals, err := dbg.ProposalsStream(ctx, &emptypb.Empty{})
if err != nil {
Expand All @@ -360,7 +383,12 @@ func watchProposals(
zap.Any("status", s),
)
if s.Code() == codes.Unavailable {
return nil
if retries == attempts {
return errors.New("watch proposals unavailable")
}
retries++
time.Sleep(retryBackoff)
goto BACKOFF
fasmat marked this conversation as resolved.
Show resolved Hide resolved
}
}
if err != nil {
Expand Down
48 changes: 26 additions & 22 deletions systest/tests/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,28 +80,32 @@ func testPartition(t *testing.T, tctx *testcontext.Context, cl *cluster.Cluster,
stateCh := make(chan *stateUpdate, uint32(cl.Total())*numLayers*10)
tctx.Log.Debug("listening to state hashes...")
for i := range cl.Total() {
client := cl.Client(i)
watchStateHashes(ctx, eg, client, tctx.Log.Desugar(), func(state *pb.GlobalStateStreamResponse) (bool, error) {
data := state.Datum.Datum
require.IsType(t, &pb.GlobalStateData_GlobalState{}, data)

resp := data.(*pb.GlobalStateData_GlobalState)
layer := resp.GlobalState.Layer.Number
if layer > stop {
return false, nil
}

stateHash := types.BytesToHash(resp.GlobalState.RootHash)
tctx.Log.Debugw("state hash collected",
"client", client.Name,
"layer", layer,
"state", stateHash.ShortString())
stateCh <- &stateUpdate{
layer: layer,
hash: stateHash,
client: client.Name,
}
return true, nil
node := cl.Client(i)

eg.Go(func() error {
return stateHashStream(ctx, node, tctx.Log.Desugar(),
func(state *pb.GlobalStateStreamResponse) (bool, error) {
data := state.Datum.Datum
require.IsType(t, &pb.GlobalStateData_GlobalState{}, data)

resp := data.(*pb.GlobalStateData_GlobalState)
layer := resp.GlobalState.Layer.Number
if layer > stop {
return false, nil
}

stateHash := types.BytesToHash(resp.GlobalState.RootHash)
tctx.Log.Debugw("state hash collected",
"client", node.Name,
"layer", layer,
"state", stateHash.ShortString())
stateCh <- &stateUpdate{
layer: layer,
hash: stateHash,
client: node.Name,
}
return true, nil
})
})
}

Expand Down
Loading