diff --git a/.github/workflows/systest.yml b/.github/workflows/systest.yml index dfd15b59e5..144c365946 100644 --- a/.github/workflows/systest.yml +++ b/.github/workflows/systest.yml @@ -130,7 +130,6 @@ jobs: go-version-file: "go.mod" - name: Run tests - timeout-minutes: 60 env: test_id: systest-${{ steps.vars.outputs.sha_short }} storage: premium-rwo=10Gi diff --git a/systest/Makefile b/systest/Makefile index 9a2cb71dac..08829590b3 100644 --- a/systest/Makefile +++ b/systest/Makefile @@ -36,12 +36,10 @@ ifeq ($(configname),$(test_job_name)) run_deps = config endif -command := tests -test.v -test.count=$(count) -test.timeout=0 -test.run=$(test_name) -clusters=$(clusters) \ --level=$(level) -configname=$(configname) +command := tests -test.v -test.count=$(count) -test.timeout=60m -test.run=$(test_name) -test.parallel=$(clusters) \ + -test.failfast=$(failfast) -clusters=$(clusters) -level=$(level) -configname=$(configname) + -ifeq ($(failfast),true) - command := $(command) -test.failfast -endif .PHONY: docker docker: @@ -88,7 +86,7 @@ gomplate: # where /bin/bash is an old bash .PHONY: run run: gomplate $(run_deps) - @echo "launching test job with name=$(test_job_name) and testid=$(test_id)" + @echo "launching test job with name=$(test_job_name) and testid=$(test_id), command=$(command)" @ns=$$(kubectl config view --minify -o jsonpath='{..namespace}' 2>/dev/null); \ export ns="$${ns:-default}"; \ if [ -z "$${norbac}" ]; then \ diff --git a/systest/cluster/cluster.go b/systest/cluster/cluster.go index 54f54f58c6..36e495d77f 100644 --- a/systest/cluster/cluster.go +++ b/systest/cluster/cluster.go @@ -150,6 +150,9 @@ func ReuseWait(cctx *testcontext.Context, opts ...Opt) (*Cluster, error) { if err := cl.WaitAllTimeout(cctx.BootstrapDuration); err != nil { return nil, err } + if err = cctx.CheckFail(); err != nil { + return nil, err + } return cl, nil } diff --git a/systest/cluster/nodes.go b/systest/cluster/nodes.go index aca5764a1e..cba322d326 100644 --- a/systest/cluster/nodes.go +++ b/systest/cluster/nodes.go @@ -186,7 +186,7 @@ func (n *NodeClient) ensurePubConn(ctx context.Context) (*grpc.ClientConn, error if err != nil { return nil, err } - if err := n.waitForConnectionReady(context.Background(), conn); err != nil { + if err := n.waitForConnectionReady(ctx, conn); err != nil { return nil, err } n.pubConn = conn diff --git a/systest/testcontext/context.go b/systest/testcontext/context.go index 26f8257d4d..34e674aebf 100644 --- a/systest/testcontext/context.go +++ b/systest/testcontext/context.go @@ -2,6 +2,7 @@ package testcontext import ( "context" + "errors" "flag" "fmt" "math/rand/v2" @@ -18,7 +19,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zaptest" - "k8s.io/apimachinery/pkg/api/errors" + k8serr "k8s.io/apimachinery/pkg/api/errors" apimetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" corev1 "k8s.io/client-go/applyconfigurations/core/v1" @@ -53,6 +54,9 @@ var ( tokens chan struct{} initTokens sync.Once + + failed = make(chan struct{}) + failOnce sync.Once ) var ( @@ -227,7 +231,7 @@ func updateContext(ctx *Context) error { ns, err := ctx.Client.CoreV1().Namespaces().Get(ctx, ctx.Namespace, apimetav1.GetOptions{}) if err != nil || ns == nil { - if errors.IsNotFound(err) { + if k8serr.IsNotFound(err) { return nil } return err @@ -288,6 +292,12 @@ func New(t *testing.T, opts ...Opt) *Context { tokens <- struct{}{} t.Cleanup(func() { <-tokens }) } + + t.Cleanup(func() { + if t.Failed() { + failOnce.Do(func() { close(failed) }) + } + }) config, err := rest.InClusterConfig() // The default rate limiter is too slow 5qps and 10 burst, This will prevent the client from being throttled @@ -369,3 +379,12 @@ func New(t *testing.T, opts ...Opt) *Context { cctx.Log.Infow("using", "namespace", cctx.Namespace) return cctx } + +func (c *Context) CheckFail() error { + select { + case <-failed: + return errors.New("test suite failed. aborting test execution") + default: + } + return nil +} diff --git a/systest/tests/common.go b/systest/tests/common.go index d4df0db710..9b31754e47 100644 --- a/systest/tests/common.go +++ b/systest/tests/common.go @@ -28,6 +28,8 @@ const ( attempts = 3 ) +var retryBackoff = 10 * time.Second + func sendTransactions( ctx context.Context, eg *errgroup.Group, @@ -117,24 +119,14 @@ func submitTransaction(ctx context.Context, tx []byte, node *cluster.NodeClient) return response.Txstate.Id.Id, nil } -func watchStateHashes( - ctx context.Context, - eg *errgroup.Group, - node *cluster.NodeClient, - logger *zap.Logger, - collector func(*pb.GlobalStateStreamResponse) (bool, error), -) { - eg.Go(func() error { - return stateHashStream(ctx, node, logger, collector) - }) -} - func stateHashStream( ctx context.Context, node *cluster.NodeClient, logger *zap.Logger, collector func(*pb.GlobalStateStreamResponse) (bool, error), ) error { + retries := 0 +BACKOFF: stateapi := pb.NewGlobalStateServiceClient(node.PubConn()) states, err := stateapi.GlobalStateStream(ctx, &pb.GlobalStateStreamRequest{ @@ -153,7 +145,12 @@ func stateHashStream( zap.Any("status", s), ) if s.Code() == codes.Unavailable { - return nil + if retries == attempts { + return errors.New("state stream unavailable") + } + retries++ + time.Sleep(retryBackoff) + goto BACKOFF } } if err != nil { @@ -185,6 +182,8 @@ func layersStream( logger *zap.Logger, collector layerCollector, ) error { + retries := 0 +BACKOFF: meshapi := pb.NewMeshServiceClient(node.PubConn()) layers, err := meshapi.LayerStream(ctx, &pb.LayerStreamRequest{}) if err != nil { @@ -196,7 +195,12 @@ func layersStream( if ok && s.Code() != codes.OK { logger.Warn("layers stream error", zap.String("client", node.Name), zap.Error(err), zap.Any("status", s)) if s.Code() == codes.Unavailable { - return nil + if retries == attempts { + return errors.New("layer stream unavailable") + } + retries++ + time.Sleep(retryBackoff) + goto BACKOFF } } if err != nil { @@ -214,6 +218,9 @@ func malfeasanceStream( logger *zap.Logger, collector func(*pb.MalfeasanceStreamResponse) (bool, error), ) error { + retries := 0 +BACKOFF: + meshapi := pb.NewMeshServiceClient(node.PubConn()) layers, err := meshapi.MalfeasanceStream(ctx, &pb.MalfeasanceStreamRequest{IncludeProof: true}) if err != nil { @@ -229,7 +236,13 @@ func malfeasanceStream( zap.Any("status", s), ) if s.Code() == codes.Unavailable { - return nil + if retries == attempts { + return errors.New("layer stream unavailable") + } + retries++ + time.Sleep(retryBackoff) + goto BACKOFF + } } if err != nil { @@ -309,6 +322,9 @@ func watchTransactionResults(ctx context.Context, collector func(*pb.TransactionResult) (bool, error), ) { eg.Go(func() error { + retries := 0 + BACKOFF: + api := pb.NewTransactionServiceClient(client.PubConn()) rsts, err := api.StreamResults(ctx, &pb.TransactionResultsRequest{Watch: true}) if err != nil { @@ -324,7 +340,12 @@ func watchTransactionResults(ctx context.Context, zap.Any("status", s), ) if s.Code() == codes.Unavailable { - return nil + if retries == attempts { + return errors.New("transaction results unavailable") + } + retries++ + time.Sleep(retryBackoff) + goto BACKOFF } } if err != nil { @@ -345,6 +366,8 @@ func watchProposals( collector func(*pb.Proposal) (bool, error), ) { eg.Go(func() error { + retries := 0 + BACKOFF: dbg := pb.NewDebugServiceClient(client.PrivConn()) proposals, err := dbg.ProposalsStream(ctx, &emptypb.Empty{}) if err != nil { @@ -360,7 +383,12 @@ func watchProposals( zap.Any("status", s), ) if s.Code() == codes.Unavailable { - return nil + if retries == attempts { + return errors.New("watch proposals unavailable") + } + retries++ + time.Sleep(retryBackoff) + goto BACKOFF } } if err != nil { diff --git a/systest/tests/partition_test.go b/systest/tests/partition_test.go index 89431bfdfe..ba65b833cc 100644 --- a/systest/tests/partition_test.go +++ b/systest/tests/partition_test.go @@ -80,28 +80,32 @@ func testPartition(t *testing.T, tctx *testcontext.Context, cl *cluster.Cluster, stateCh := make(chan *stateUpdate, uint32(cl.Total())*numLayers*10) tctx.Log.Debug("listening to state hashes...") for i := range cl.Total() { - client := cl.Client(i) - watchStateHashes(ctx, eg, client, tctx.Log.Desugar(), func(state *pb.GlobalStateStreamResponse) (bool, error) { - data := state.Datum.Datum - require.IsType(t, &pb.GlobalStateData_GlobalState{}, data) - - resp := data.(*pb.GlobalStateData_GlobalState) - layer := resp.GlobalState.Layer.Number - if layer > stop { - return false, nil - } - - stateHash := types.BytesToHash(resp.GlobalState.RootHash) - tctx.Log.Debugw("state hash collected", - "client", client.Name, - "layer", layer, - "state", stateHash.ShortString()) - stateCh <- &stateUpdate{ - layer: layer, - hash: stateHash, - client: client.Name, - } - return true, nil + node := cl.Client(i) + + eg.Go(func() error { + return stateHashStream(ctx, node, tctx.Log.Desugar(), + func(state *pb.GlobalStateStreamResponse) (bool, error) { + data := state.Datum.Datum + require.IsType(t, &pb.GlobalStateData_GlobalState{}, data) + + resp := data.(*pb.GlobalStateData_GlobalState) + layer := resp.GlobalState.Layer.Number + if layer > stop { + return false, nil + } + + stateHash := types.BytesToHash(resp.GlobalState.RootHash) + tctx.Log.Debugw("state hash collected", + "client", node.Name, + "layer", layer, + "state", stateHash.ShortString()) + stateCh <- &stateUpdate{ + layer: layer, + hash: stateHash, + client: node.Name, + } + return true, nil + }) }) }