From 7bec60e8bd1374792c71bfb3faf4c8d0bc58291d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Thu, 11 Jul 2024 09:45:49 +0000 Subject: [PATCH] stabilize e2e tests - take 2 (#6126) ## Motivation Closes #6119 --- activation/e2e/activation_test.go | 3 ++- activation/e2e/atx_merge_test.go | 3 ++- activation/e2e/builds_atx_v2_test.go | 3 ++- activation/e2e/certifier_client_test.go | 4 ++- activation/e2e/checkpoint_merged_test.go | 2 +- activation/e2e/checkpoint_test.go | 2 +- activation/e2e/nipost_test.go | 6 ++--- activation/e2e/validation_test.go | 2 +- api/grpcserver/post_client.go | 13 +++++----- api/grpcserver/post_service.go | 25 +++++++++++++++---- .../distributed_post_verification_test.go | 5 +++- 11 files changed, 46 insertions(+), 22 deletions(-) diff --git a/activation/e2e/activation_test.go b/activation/e2e/activation_test.go index 7b80618211..c8a447aef4 100644 --- a/activation/e2e/activation_test.go +++ b/activation/e2e/activation_test.go @@ -64,8 +64,9 @@ func Test_BuilderWithMultipleClients(t *testing.T) { db := sql.InMemory() localDB := localsql.InMemory() - svc := grpcserver.NewPostService(logger) + svc := grpcserver.NewPostService(logger, grpcserver.PostServiceQueryInterval(100*time.Millisecond)) svc.AllowConnections(true) + grpcCfg, cleanup := launchServer(t, svc) t.Cleanup(cleanup) var eg errgroup.Group diff --git a/activation/e2e/atx_merge_test.go b/activation/e2e/atx_merge_test.go index 343351fd99..9818edee6f 100644 --- a/activation/e2e/atx_merge_test.go +++ b/activation/e2e/atx_merge_test.go @@ -209,8 +209,9 @@ func Test_MarryAndMerge(t *testing.T) { cdb := datastore.NewCachedDB(db, logger) localDB := localsql.InMemory() - svc := grpcserver.NewPostService(logger) + svc := grpcserver.NewPostService(logger, grpcserver.PostServiceQueryInterval(100*time.Millisecond)) svc.AllowConnections(true) + grpcCfg, cleanup := launchServer(t, svc) t.Cleanup(cleanup) diff --git a/activation/e2e/builds_atx_v2_test.go b/activation/e2e/builds_atx_v2_test.go index f4d8060af0..2a37a1bafb 100644 --- a/activation/e2e/builds_atx_v2_test.go +++ b/activation/e2e/builds_atx_v2_test.go @@ -56,8 +56,9 @@ func TestBuilder_SwitchesToBuildV2(t *testing.T) { cdb := datastore.NewCachedDB(db, logger) opts := testPostSetupOpts(t) - svc := grpcserver.NewPostService(logger) + svc := grpcserver.NewPostService(logger, grpcserver.PostServiceQueryInterval(100*time.Millisecond)) svc.AllowConnections(true) + grpcCfg, cleanup := launchServer(t, svc) t.Cleanup(cleanup) diff --git a/activation/e2e/certifier_client_test.go b/activation/e2e/certifier_client_test.go index 809ce2cd94..9ff1a35887 100644 --- a/activation/e2e/certifier_client_test.go +++ b/activation/e2e/certifier_client_test.go @@ -38,8 +38,10 @@ func TestCertification(t *testing.T) { localDb := localsql.InMemory() opts := testPostSetupOpts(t) - svc := grpcserver.NewPostService(zaptest.NewLogger(t)) + logger := zaptest.NewLogger(t) + svc := grpcserver.NewPostService(logger, grpcserver.PostServiceQueryInterval(100*time.Millisecond)) svc.AllowConnections(true) + grpcCfg, cleanup := launchServer(t, svc) t.Cleanup(cleanup) diff --git a/activation/e2e/checkpoint_merged_test.go b/activation/e2e/checkpoint_merged_test.go index 7e818610f6..545082f268 100644 --- a/activation/e2e/checkpoint_merged_test.go +++ b/activation/e2e/checkpoint_merged_test.go @@ -47,7 +47,7 @@ func Test_CheckpointAfterMerge(t *testing.T) { cdb := datastore.NewCachedDB(db, logger) localDB := localsql.InMemory() - svc := grpcserver.NewPostService(logger) + svc := grpcserver.NewPostService(logger, grpcserver.PostServiceQueryInterval(100*time.Millisecond)) svc.AllowConnections(true) grpcCfg, cleanup := launchServer(t, svc) t.Cleanup(cleanup) diff --git a/activation/e2e/checkpoint_test.go b/activation/e2e/checkpoint_test.go index 7232f41ab9..048469b2ff 100644 --- a/activation/e2e/checkpoint_test.go +++ b/activation/e2e/checkpoint_test.go @@ -50,7 +50,7 @@ func TestCheckpoint_PublishingSoloATXs(t *testing.T) { cdb := datastore.NewCachedDB(db, logger) opts := testPostSetupOpts(t) - svc := grpcserver.NewPostService(logger) + svc := grpcserver.NewPostService(logger, grpcserver.PostServiceQueryInterval(100*time.Millisecond)) svc.AllowConnections(true) grpcCfg, cleanup := launchServer(t, svc) t.Cleanup(cleanup) diff --git a/activation/e2e/nipost_test.go b/activation/e2e/nipost_test.go index c158f86f6d..2a135e3256 100644 --- a/activation/e2e/nipost_test.go +++ b/activation/e2e/nipost_test.go @@ -29,7 +29,7 @@ import ( ) const ( - layersPerEpoch = 5 + layersPerEpoch = 10 layerDuration = time.Second postGenesisEpoch types.EpochID = 2 ) @@ -161,7 +161,7 @@ func TestNIPostBuilderWithClients(t *testing.T) { localDb := localsql.InMemory() opts := testPostSetupOpts(t) - svc := grpcserver.NewPostService(logger) + svc := grpcserver.NewPostService(logger, grpcserver.PostServiceQueryInterval(100*time.Millisecond)) svc.AllowConnections(true) grpcCfg, cleanup := launchServer(t, svc) t.Cleanup(cleanup) @@ -246,7 +246,7 @@ func Test_NIPostBuilderWithMultipleClients(t *testing.T) { db := sql.InMemory() opts := testPostSetupOpts(t) - svc := grpcserver.NewPostService(logger) + svc := grpcserver.NewPostService(logger, grpcserver.PostServiceQueryInterval(100*time.Millisecond)) svc.AllowConnections(true) grpcCfg, cleanup := launchServer(t, svc) t.Cleanup(cleanup) diff --git a/activation/e2e/validation_test.go b/activation/e2e/validation_test.go index 170696e2ce..2ea7521777 100644 --- a/activation/e2e/validation_test.go +++ b/activation/e2e/validation_test.go @@ -34,7 +34,7 @@ func TestValidator_Validate(t *testing.T) { validator := activation.NewMocknipostValidator(gomock.NewController(t)) opts := testPostSetupOpts(t) - svc := grpcserver.NewPostService(zaptest.NewLogger(t)) + svc := grpcserver.NewPostService(logger, grpcserver.PostServiceQueryInterval(100*time.Millisecond)) svc.AllowConnections(true) grpcCfg, cleanup := launchServer(t, svc) t.Cleanup(cleanup) diff --git a/api/grpcserver/post_client.go b/api/grpcserver/post_client.go index 97447aa6c4..4810ad8c92 100644 --- a/api/grpcserver/post_client.go +++ b/api/grpcserver/post_client.go @@ -19,15 +19,17 @@ import ( // Additionally if instructed it will start the post service and connect it to // the node. type postClient struct { - con chan<- postCommand + con chan<- postCommand + queryInterval time.Duration closed chan struct{} } -func newPostClient(con chan<- postCommand) *postClient { +func newPostClient(con chan<- postCommand, queryInterval time.Duration) *postClient { return &postClient{ - con: con, - closed: make(chan struct{}), + con: con, + queryInterval: queryInterval, + closed: make(chan struct{}), } } @@ -102,8 +104,7 @@ func (pc *postClient) Proof(ctx context.Context, challenge []byte) (*types.Post, select { case <-ctx.Done(): return nil, nil, ctx.Err() - case <-time.After(2 * time.Second): - // TODO(mafa): make polling interval configurable + case <-time.After(pc.queryInterval): continue } } diff --git a/api/grpcserver/post_service.go b/api/grpcserver/post_service.go index 3f0ffe8607..41a55825fe 100644 --- a/api/grpcserver/post_service.go +++ b/api/grpcserver/post_service.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sync" + "time" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" @@ -26,6 +27,7 @@ type PostService struct { clientMtx sync.Mutex allowConnections bool client map[types.NodeID]*postClient + queryInterval time.Duration } type postCommand struct { @@ -47,12 +49,25 @@ func (s *PostService) String() string { return "PostService" } +type PostServiceOpt func(*PostService) + +func PostServiceQueryInterval(interval time.Duration) PostServiceOpt { + return func(s *PostService) { + s.queryInterval = interval + } +} + // NewPostService creates a new instance of the post grpc service. -func NewPostService(log *zap.Logger) *PostService { - return &PostService{ - log: log, - client: make(map[types.NodeID]*postClient), +func NewPostService(log *zap.Logger, opts ...PostServiceOpt) *PostService { + s := &PostService{ + log: log, + client: make(map[types.NodeID]*postClient), + queryInterval: 2 * time.Second, + } + for _, opt := range opts { + opt(s) } + return s } // AllowConnections sets if the grpc service accepts new incoming connections from post services. @@ -132,7 +147,7 @@ func (s *PostService) setConnection(nodeId types.NodeID, con chan postCommand) e if _, ok := s.client[nodeId]; ok { return errors.New("post service already registered") } - s.client[nodeId] = newPostClient(con) + s.client[nodeId] = newPostClient(con, s.queryInterval) s.log.Info("post service registered", zap.Stringer("node_id", nodeId)) return nil } diff --git a/systest/tests/distributed_post_verification_test.go b/systest/tests/distributed_post_verification_test.go index 887e1e4ef0..a719fc6145 100644 --- a/systest/tests/distributed_post_verification_test.go +++ b/systest/tests/distributed_post_verification_test.go @@ -140,7 +140,10 @@ func TestPostMalfeasanceProof(t *testing.T) { require.NoError(t, err) t.Cleanup(clock.Close) - grpcPostService := grpcserver.NewPostService(logger.Named("grpc-post-service")) + grpcPostService := grpcserver.NewPostService( + logger.Named("grpc-post-service"), + grpcserver.PostServiceQueryInterval(500*time.Millisecond), + ) grpcPostService.AllowConnections(true) grpcPrivateServer, err := grpcserver.NewWithServices(