Skip to content

Commit

Permalink
Try #6171:
Browse files Browse the repository at this point in the history
  • Loading branch information
spacemesh-bors[bot] authored Jul 25, 2024
2 parents 02bcbe9 + 41ef6e8 commit 6764738
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 44 deletions.
1 change: 0 additions & 1 deletion .github/workflows/systest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ jobs:
go-version-file: "go.mod"

- name: Run tests
timeout-minutes: 60
env:
test_id: systest-${{ steps.vars.outputs.sha_short }}
storage: premium-rwo=10Gi
Expand Down
8 changes: 5 additions & 3 deletions systest/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ ifeq ($(configname),$(test_job_name))
run_deps = config
endif

command := tests -test.v -test.count=$(count) -test.timeout=0 -test.run=$(test_name) -clusters=$(clusters) \
-level=$(level) -configname=$(configname)
command := tests -test.v -test.count=$(count) -test.timeout=60m -test.run=$(test_name) -test.parallel=$(clusters) \
-clusters=$(clusters) -level=$(level) -configname=$(configname)

ifeq ($(failfast),true)
command := $(command) -test.failfast
endif



.PHONY: docker
docker:
@DOCKER_BUILDKIT=1 docker build \
Expand Down Expand Up @@ -88,7 +90,7 @@ gomplate:
# where /bin/bash is an old bash
.PHONY: run
run: gomplate $(run_deps)
@echo "launching test job with name=$(test_job_name) and testid=$(test_id)"
@echo "launching test job with name=$(test_job_name) and testid=$(test_id), command=$(command)"
@ns=$$(kubectl config view --minify -o jsonpath='{..namespace}' 2>/dev/null); \
export ns="$${ns:-default}"; \
if [ -z "$${norbac}" ]; then \
Expand Down
3 changes: 3 additions & 0 deletions systest/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ func ReuseWait(cctx *testcontext.Context, opts ...Opt) (*Cluster, error) {
if err := cl.WaitAllTimeout(cctx.BootstrapDuration); err != nil {
return nil, err
}
if err = cctx.CheckFail(); err != nil {
return nil, err
}
return cl, nil
}

Expand Down
2 changes: 1 addition & 1 deletion systest/cluster/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (n *NodeClient) ensurePubConn(ctx context.Context) (*grpc.ClientConn, error
if err != nil {
return nil, err
}
if err := n.waitForConnectionReady(context.Background(), conn); err != nil {
if err := n.waitForConnectionReady(ctx, conn); err != nil {
return nil, err
}
n.pubConn = conn
Expand Down
19 changes: 19 additions & 0 deletions systest/testcontext/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package testcontext

import (
"context"
goerr "errors"
"flag"
"fmt"
"math/rand/v2"
Expand Down Expand Up @@ -53,6 +54,9 @@ var (

tokens chan struct{}
initTokens sync.Once

failed = make(chan struct{})
failOnce sync.Once
)

var (
Expand Down Expand Up @@ -288,6 +292,12 @@ func New(t *testing.T, opts ...Opt) *Context {
tokens <- struct{}{}
t.Cleanup(func() { <-tokens })
}

t.Cleanup(func() {
if t.Failed() {
failOnce.Do(func() { close(failed) })
}
})
config, err := rest.InClusterConfig()

// The default rate limiter is too slow 5qps and 10 burst, This will prevent the client from being throttled
Expand Down Expand Up @@ -365,3 +375,12 @@ func New(t *testing.T, opts ...Opt) *Context {
cctx.Log.Infow("using", "namespace", cctx.Namespace)
return cctx
}

func (c *Context) CheckFail() error {
select {
case <-failed:
return goerr.New("test suite failed. aborting test execution")
default:
}
return nil
}
62 changes: 45 additions & 17 deletions systest/tests/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
attempts = 3
)

var retryBackoff = 10 * time.Second

func sendTransactions(
ctx context.Context,
eg *errgroup.Group,
Expand Down Expand Up @@ -117,24 +119,14 @@ func submitTransaction(ctx context.Context, tx []byte, node *cluster.NodeClient)
return response.Txstate.Id.Id, nil
}

func watchStateHashes(
ctx context.Context,
eg *errgroup.Group,
node *cluster.NodeClient,
logger *zap.Logger,
collector func(*pb.GlobalStateStreamResponse) (bool, error),
) {
eg.Go(func() error {
return stateHashStream(ctx, node, logger, collector)
})
}

func stateHashStream(
ctx context.Context,
node *cluster.NodeClient,
logger *zap.Logger,
collector func(*pb.GlobalStateStreamResponse) (bool, error),
) error {
retries := 0
BACKOFF:
stateapi := pb.NewGlobalStateServiceClient(node.PubConn())
states, err := stateapi.GlobalStateStream(ctx,
&pb.GlobalStateStreamRequest{
Expand All @@ -153,7 +145,12 @@ func stateHashStream(
zap.Any("status", s),
)
if s.Code() == codes.Unavailable {
return nil
if retries == attempts {
return errors.New("state stream unavailable")
}
retries++
time.Sleep(retryBackoff)
goto BACKOFF
}
}
if err != nil {
Expand Down Expand Up @@ -185,6 +182,8 @@ func layersStream(
logger *zap.Logger,
collector layerCollector,
) error {
retries := 0
BACKOFF:
meshapi := pb.NewMeshServiceClient(node.PubConn())
layers, err := meshapi.LayerStream(ctx, &pb.LayerStreamRequest{})
if err != nil {
Expand All @@ -196,7 +195,12 @@ func layersStream(
if ok && s.Code() != codes.OK {
logger.Warn("layers stream error", zap.String("client", node.Name), zap.Error(err), zap.Any("status", s))
if s.Code() == codes.Unavailable {
return nil
if retries == attempts {
return errors.New("layer stream unavailable")
}
retries++
time.Sleep(retryBackoff)
goto BACKOFF
}
}
if err != nil {
Expand All @@ -214,6 +218,9 @@ func malfeasanceStream(
logger *zap.Logger,
collector func(*pb.MalfeasanceStreamResponse) (bool, error),
) error {
retries := 0
BACKOFF:

meshapi := pb.NewMeshServiceClient(node.PubConn())
layers, err := meshapi.MalfeasanceStream(ctx, &pb.MalfeasanceStreamRequest{IncludeProof: true})
if err != nil {
Expand All @@ -229,7 +236,13 @@ func malfeasanceStream(
zap.Any("status", s),
)
if s.Code() == codes.Unavailable {
return nil
if retries == attempts {
return errors.New("layer stream unavailable")
}
retries++
time.Sleep(retryBackoff)
goto BACKOFF

}
}
if err != nil {
Expand Down Expand Up @@ -309,6 +322,9 @@ func watchTransactionResults(ctx context.Context,
collector func(*pb.TransactionResult) (bool, error),
) {
eg.Go(func() error {
retries := 0
BACKOFF:

api := pb.NewTransactionServiceClient(client.PubConn())
rsts, err := api.StreamResults(ctx, &pb.TransactionResultsRequest{Watch: true})
if err != nil {
Expand All @@ -324,7 +340,12 @@ func watchTransactionResults(ctx context.Context,
zap.Any("status", s),
)
if s.Code() == codes.Unavailable {
return nil
if retries == attempts {
return errors.New("transaction results unavailable")
}
retries++
time.Sleep(retryBackoff)
goto BACKOFF
}
}
if err != nil {
Expand All @@ -345,6 +366,8 @@ func watchProposals(
collector func(*pb.Proposal) (bool, error),
) {
eg.Go(func() error {
retries := 0
BACKOFF:
dbg := pb.NewDebugServiceClient(client.PrivConn())
proposals, err := dbg.ProposalsStream(ctx, &emptypb.Empty{})
if err != nil {
Expand All @@ -360,7 +383,12 @@ func watchProposals(
zap.Any("status", s),
)
if s.Code() == codes.Unavailable {
return nil
if retries == attempts {
return errors.New("watch proposals unavailable")
}
retries++
time.Sleep(retryBackoff)
goto BACKOFF
}
}
if err != nil {
Expand Down
48 changes: 26 additions & 22 deletions systest/tests/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,28 +80,32 @@ func testPartition(t *testing.T, tctx *testcontext.Context, cl *cluster.Cluster,
stateCh := make(chan *stateUpdate, uint32(cl.Total())*numLayers*10)
tctx.Log.Debug("listening to state hashes...")
for i := range cl.Total() {
client := cl.Client(i)
watchStateHashes(ctx, eg, client, tctx.Log.Desugar(), func(state *pb.GlobalStateStreamResponse) (bool, error) {
data := state.Datum.Datum
require.IsType(t, &pb.GlobalStateData_GlobalState{}, data)

resp := data.(*pb.GlobalStateData_GlobalState)
layer := resp.GlobalState.Layer.Number
if layer > stop {
return false, nil
}

stateHash := types.BytesToHash(resp.GlobalState.RootHash)
tctx.Log.Debugw("state hash collected",
"client", client.Name,
"layer", layer,
"state", stateHash.ShortString())
stateCh <- &stateUpdate{
layer: layer,
hash: stateHash,
client: client.Name,
}
return true, nil
node := cl.Client(i)

eg.Go(func() error {
return stateHashStream(ctx, node, tctx.Log.Desugar(),
func(state *pb.GlobalStateStreamResponse) (bool, error) {
data := state.Datum.Datum
require.IsType(t, &pb.GlobalStateData_GlobalState{}, data)

resp := data.(*pb.GlobalStateData_GlobalState)
layer := resp.GlobalState.Layer.Number
if layer > stop {
return false, nil
}

stateHash := types.BytesToHash(resp.GlobalState.RootHash)
tctx.Log.Debugw("state hash collected",
"client", node.Name,
"layer", layer,
"state", stateHash.ShortString())
stateCh <- &stateUpdate{
layer: layer,
hash: stateHash,
client: node.Name,
}
return true, nil
})
})
}

Expand Down

0 comments on commit 6764738

Please sign in to comment.