From 85a5064b5842be0a7b644b082962740e4f9113a3 Mon Sep 17 00:00:00 2001 From: MariusC Date: Wed, 30 Oct 2024 14:48:14 +0200 Subject: [PATCH 1/8] FEAT: Add first version notifierBootstrapper.go --- .../notifier/notifierBootstrapper.go | 94 +++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 cmd/sovereignnode/notifier/notifierBootstrapper.go diff --git a/cmd/sovereignnode/notifier/notifierBootstrapper.go b/cmd/sovereignnode/notifier/notifierBootstrapper.go new file mode 100644 index 00000000000..c6ae1da54cc --- /dev/null +++ b/cmd/sovereignnode/notifier/notifierBootstrapper.go @@ -0,0 +1,94 @@ +package notifier + +import ( + "context" + "time" + + "github.com/multiversx/mx-chain-core-go/core" + "github.com/multiversx/mx-chain-go/process" + logger "github.com/multiversx/mx-chain-logger-go" + notifierProcess "github.com/multiversx/mx-chain-sovereign-notifier-go/process" +) + +var log = logger.GetOrCreate("notifier-bootstrap") + +type ArgsNotifierBootstrapper struct { + IncomingHeaderHandler process.IncomingHeaderSubscriber + SovereignNotifier notifierProcess.SovereignNotifier + ForkDetector process.ForkDetector + Bootstrapper process.Bootstrapper + RoundDuration uint64 +} + +type notifierBootstrapper struct { + incomingHeaderHandler process.IncomingHeaderSubscriber + sovereignNotifier notifierProcess.SovereignNotifier + forkDetector process.ForkDetector + + nodeSyncedChan chan bool + cancelFunc func() + roundDuration uint64 +} + +func NewNotifierBootstrapper(args ArgsNotifierBootstrapper) (*notifierBootstrapper, error) { + nb := ¬ifierBootstrapper{ + incomingHeaderHandler: args.IncomingHeaderHandler, + sovereignNotifier: args.SovereignNotifier, + forkDetector: args.ForkDetector, + nodeSyncedChan: make(chan bool, 1), + cancelFunc: nil, + roundDuration: args.RoundDuration, + } + + args.Bootstrapper.AddSyncStateListener(nb.receivedSyncState) + + return nb, nil +} + +func (nb *notifierBootstrapper) receivedSyncState(isNodeSynchronized bool) { + if isNodeSynchronized && nb.forkDetector.GetHighestFinalBlockNonce() != 0 { + select { + case nb.nodeSyncedChan <- true: + default: + } + } +} + +func (nb *notifierBootstrapper) Start() { + var ctx context.Context + ctx, nb.cancelFunc = context.WithCancel(context.Background()) + go nb.checkChannels(ctx) +} + +func (nb *notifierBootstrapper) checkChannels(ctx context.Context) { + timeToWaitReSync := (process.MaxRoundsWithoutNewBlockReceived + 1) * nb.roundDuration + ticker := time.NewTicker(time.Duration(timeToWaitReSync) * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + log.Debug("worker's go routine is stopping...") + return + case _ = <-nb.nodeSyncedChan: + err := nb.sovereignNotifier.RegisterHandler(nb.incomingHeaderHandler) + if err != nil { + log.Error("ERROR SYNCING", "err", err) + } + log.Error("SYYYYYYNNNCCCCCEEEEEEED") + return + case <-ticker.C: + log.Error("NOT STATE NOT SYNCED YET") + } + + } +} + +func (nb *notifierBootstrapper) Close() { + if nb.cancelFunc != nil { + nb.cancelFunc() + } + + nrReads := core.EmptyChannel(nb.nodeSyncedChan) + log.Debug("notifierBootstrapper: emptied channel", "nodeSyncedChan nrReads", nrReads) +} From 55e7569b3aa08ff75bdbebc4609f82beb9610988 Mon Sep 17 00:00:00 2001 From: MariusC Date: Wed, 30 Oct 2024 15:20:42 +0200 Subject: [PATCH 2/8] FEAT: Integrate notifierBootstrapper into sovereignNodeRunner.go --- cmd/sovereignnode/notifier/interface.go | 6 ++ .../notifier/notifierBootstrapper.go | 7 +- cmd/sovereignnode/sovereignNodeRunner.go | 74 ++++++++++++++----- 3 files changed, 67 insertions(+), 20 deletions(-) create mode 100644 cmd/sovereignnode/notifier/interface.go diff --git a/cmd/sovereignnode/notifier/interface.go b/cmd/sovereignnode/notifier/interface.go new file mode 100644 index 00000000000..7d4f70a5205 --- /dev/null +++ b/cmd/sovereignnode/notifier/interface.go @@ -0,0 +1,6 @@ +package notifier + +type SovereignNotifierBootstrapper interface { + Start() + Close() error +} diff --git a/cmd/sovereignnode/notifier/notifierBootstrapper.go b/cmd/sovereignnode/notifier/notifierBootstrapper.go index c6ae1da54cc..f3c5a3a921b 100644 --- a/cmd/sovereignnode/notifier/notifierBootstrapper.go +++ b/cmd/sovereignnode/notifier/notifierBootstrapper.go @@ -57,10 +57,10 @@ func (nb *notifierBootstrapper) receivedSyncState(isNodeSynchronized bool) { func (nb *notifierBootstrapper) Start() { var ctx context.Context ctx, nb.cancelFunc = context.WithCancel(context.Background()) - go nb.checkChannels(ctx) + go nb.checkNodeState(ctx) } -func (nb *notifierBootstrapper) checkChannels(ctx context.Context) { +func (nb *notifierBootstrapper) checkNodeState(ctx context.Context) { timeToWaitReSync := (process.MaxRoundsWithoutNewBlockReceived + 1) * nb.roundDuration ticker := time.NewTicker(time.Duration(timeToWaitReSync) * time.Millisecond) defer ticker.Stop() @@ -84,11 +84,12 @@ func (nb *notifierBootstrapper) checkChannels(ctx context.Context) { } } -func (nb *notifierBootstrapper) Close() { +func (nb *notifierBootstrapper) Close() error { if nb.cancelFunc != nil { nb.cancelFunc() } nrReads := core.EmptyChannel(nb.nodeSyncedChan) log.Debug("notifierBootstrapper: emptied channel", "nodeSyncedChan nrReads", nrReads) + return nil } diff --git a/cmd/sovereignnode/sovereignNodeRunner.go b/cmd/sovereignnode/sovereignNodeRunner.go index bce4e470621..c20fc77add4 100644 --- a/cmd/sovereignnode/sovereignNodeRunner.go +++ b/cmd/sovereignnode/sovereignNodeRunner.go @@ -24,6 +24,7 @@ import ( "github.com/multiversx/mx-chain-core-go/data/endProcess" outportCore "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-go/process/block/sovereign/incomingHeader" + "github.com/multiversx/mx-chain-go/sovereignnode/notifier" logger "github.com/multiversx/mx-chain-logger-go" "github.com/multiversx/mx-chain-sovereign-bridge-go/cert" factoryBridge "github.com/multiversx/mx-chain-sovereign-bridge-go/client" @@ -524,14 +525,29 @@ func (snr *sovereignNodeRunner) executeOneComponentCreationCycle( managedProcessComponents, managedStatusCoreComponents, ) + if err != nil { + return true, err + } + sovereignNotifier, err := createSovereignNotifier(&configs.SovereignExtraConfig.NotifierConfig) if err != nil { return true, err } sovereignWsReceiver, err := createSovereignWsReceiver( &configs.SovereignExtraConfig.NotifierConfig, + sovereignNotifier, + ) + if err != nil { + return true, err + } + + sovereignNotifierBootstrapper, err := startSovereignNotifierBootstrapper( incomingHeaderHandler, + sovereignNotifier, + managedCoreComponents.GenesisNodesSetup().GetRoundDuration(), + managedProcessComponents.ForkDetector(), + managedConsensusComponents.Bootstrapper(), ) if err != nil { return true, err @@ -547,6 +563,11 @@ func (snr *sovereignNodeRunner) executeOneComponentCreationCycle( n.AddClosableComponent(outGoingBridgeOpHandler) return nil } + extraOptionNotifierBootstrapper := func(n *node.Node) error { + n.AddClosableComponent(sovereignNotifierBootstrapper) + return nil + } + nodeHandler, err := node.CreateNode( configs.GeneralConfig, managedRunTypeComponents, @@ -566,6 +587,7 @@ func (snr *sovereignNodeRunner) executeOneComponentCreationCycle( node.NewSovereignNodeFactory(configs.GeneralConfig.SovereignConfig.GenesisConfig.NativeESDT), extraOptionNotifierReceiver, extraOptionOutGoingBridgeSender, + extraOptionNotifierBootstrapper, ) if err != nil { return true, err @@ -1884,24 +1906,8 @@ func createWhiteListerVerifiedTxs(generalConfig *config.Config) (process.WhiteLi func createSovereignWsReceiver( config *config.NotifierConfig, - incomingHeaderHandler process.IncomingHeaderSubscriber, + sovereignNotifier notifierProcess.SovereignNotifier, ) (notifierProcess.WSClient, error) { - argsNotifier := factory.ArgsCreateSovereignNotifier{ - MarshallerType: config.WebSocketConfig.MarshallerType, - SubscribedEvents: getNotifierSubscribedEvents(config.SubscribedEvents), - HasherType: config.WebSocketConfig.HasherType, - } - - sovereignNotifier, err := factory.CreateSovereignNotifier(argsNotifier) - if err != nil { - return nil, err - } - - err = sovereignNotifier.RegisterHandler(incomingHeaderHandler) - if err != nil { - return nil, err - } - argsWsReceiver := factory.ArgsWsClientReceiverNotifier{ WebSocketConfig: notifierCfg.WebSocketConfig{ Url: config.WebSocketConfig.Url, @@ -1919,6 +1925,40 @@ func createSovereignWsReceiver( return factory.CreateWsClientReceiverNotifier(argsWsReceiver) } +func createSovereignNotifier(config *config.NotifierConfig) (notifierProcess.SovereignNotifier, error) { + argsNotifier := factory.ArgsCreateSovereignNotifier{ + MarshallerType: config.WebSocketConfig.MarshallerType, + SubscribedEvents: getNotifierSubscribedEvents(config.SubscribedEvents), + HasherType: config.WebSocketConfig.HasherType, + } + + return factory.CreateSovereignNotifier(argsNotifier) +} + +func startSovereignNotifierBootstrapper( + incomingHeaderHandler process.IncomingHeaderSubscriber, + sovereignNotifier notifierProcess.SovereignNotifier, + roundDuration uint64, + forkDetector process.ForkDetector, + bootstrapper process.Bootstrapper, +) (notifier.SovereignNotifierBootstrapper, error) { + args := notifier.ArgsNotifierBootstrapper{ + IncomingHeaderHandler: incomingHeaderHandler, + SovereignNotifier: sovereignNotifier, + ForkDetector: forkDetector, + Bootstrapper: bootstrapper, + RoundDuration: roundDuration, + } + + notifierBootstrapper, err := notifier.NewNotifierBootstrapper(args) + if err != nil { + return nil, err + } + + notifierBootstrapper.Start() + return notifierBootstrapper, nil +} + func getNotifierSubscribedEvents(events []config.SubscribedEvent) []notifierCfg.SubscribedEvent { ret := make([]notifierCfg.SubscribedEvent, len(events)) From f7ab872615c0d30e202f0c0a53b7dd4774e052f2 Mon Sep 17 00:00:00 2001 From: MariusC Date: Wed, 30 Oct 2024 16:05:11 +0200 Subject: [PATCH 3/8] FIX: getLastCrossNotarizedHeaders --- process/block/sovereignCrossNotarizer.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/process/block/sovereignCrossNotarizer.go b/process/block/sovereignCrossNotarizer.go index 8646e531fa3..5c3175582bd 100644 --- a/process/block/sovereignCrossNotarizer.go +++ b/process/block/sovereignCrossNotarizer.go @@ -15,6 +15,8 @@ func (scn *sovereignShardCrossNotarizer) getLastCrossNotarizedHeaders() []bootst return nil } + bootstrapHeaderInfo.ShardId = core.MainChainShardId + lastCrossNotarizedHeaders := make([]bootstrapStorage.BootstrapHeaderInfo, 0, 1) lastCrossNotarizedHeaders = append(lastCrossNotarizedHeaders, *bootstrapHeaderInfo) return trimSliceBootstrapHeaderInfo(lastCrossNotarizedHeaders) From d5f31146148754d60beba39be4f593f63e7487de Mon Sep 17 00:00:00 2001 From: MariusC Date: Thu, 31 Oct 2024 15:22:47 +0200 Subject: [PATCH 4/8] CLN: checkNodeState --- cmd/sovereignnode/notifier/notifierBootstrapper.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/sovereignnode/notifier/notifierBootstrapper.go b/cmd/sovereignnode/notifier/notifierBootstrapper.go index f3c5a3a921b..ef2437b32cd 100644 --- a/cmd/sovereignnode/notifier/notifierBootstrapper.go +++ b/cmd/sovereignnode/notifier/notifierBootstrapper.go @@ -70,15 +70,15 @@ func (nb *notifierBootstrapper) checkNodeState(ctx context.Context) { case <-ctx.Done(): log.Debug("worker's go routine is stopping...") return - case _ = <-nb.nodeSyncedChan: + case <-nb.nodeSyncedChan: err := nb.sovereignNotifier.RegisterHandler(nb.incomingHeaderHandler) if err != nil { - log.Error("ERROR SYNCING", "err", err) + log.Error("notifierBootstrapper: sovereignNotifier.RegisterHandler", "err", err) } - log.Error("SYYYYYYNNNCCCCCEEEEEEED") + log.Debug("notifierBootstrapper.checkNodeState", "is node synced", true) return case <-ticker.C: - log.Error("NOT STATE NOT SYNCED YET") + log.Debug("notifierBootstrapper.checkNodeState", "is node synced", false) } } From fa21b21fd244ea8b86b23799042c172c3b11b891 Mon Sep 17 00:00:00 2001 From: MariusC Date: Thu, 31 Oct 2024 17:31:37 +0200 Subject: [PATCH 5/8] FEAT: First unit tests notifierBootstrapper.go --- cmd/sovereignnode/notifier/errors.go | 5 + .../notifier/notifierBootstrapper.go | 42 +++++- .../notifier/notifierBootstrapper_test.go | 142 ++++++++++++++++++ 3 files changed, 187 insertions(+), 2 deletions(-) create mode 100644 cmd/sovereignnode/notifier/errors.go create mode 100644 cmd/sovereignnode/notifier/notifierBootstrapper_test.go diff --git a/cmd/sovereignnode/notifier/errors.go b/cmd/sovereignnode/notifier/errors.go new file mode 100644 index 00000000000..10397cdbd75 --- /dev/null +++ b/cmd/sovereignnode/notifier/errors.go @@ -0,0 +1,5 @@ +package notifier + +import "errors" + +var errNilSovereignNotifier = errors.New("nil sovereign notifier provided") diff --git a/cmd/sovereignnode/notifier/notifierBootstrapper.go b/cmd/sovereignnode/notifier/notifierBootstrapper.go index ef2437b32cd..39cbcaf1ad1 100644 --- a/cmd/sovereignnode/notifier/notifierBootstrapper.go +++ b/cmd/sovereignnode/notifier/notifierBootstrapper.go @@ -5,6 +5,8 @@ import ( "time" "github.com/multiversx/mx-chain-core-go/core" + "github.com/multiversx/mx-chain-core-go/core/check" + "github.com/multiversx/mx-chain-go/errors" "github.com/multiversx/mx-chain-go/process" logger "github.com/multiversx/mx-chain-logger-go" notifierProcess "github.com/multiversx/mx-chain-sovereign-notifier-go/process" @@ -12,6 +14,7 @@ import ( var log = logger.GetOrCreate("notifier-bootstrap") +// ArgsNotifierBootstrapper defines args needed to create a new notifier bootstrapper type ArgsNotifierBootstrapper struct { IncomingHeaderHandler process.IncomingHeaderSubscriber SovereignNotifier notifierProcess.SovereignNotifier @@ -30,7 +33,12 @@ type notifierBootstrapper struct { roundDuration uint64 } +// NewNotifierBootstrapper creates a sovereign notifier ws receiver connection bootstrapper func NewNotifierBootstrapper(args ArgsNotifierBootstrapper) (*notifierBootstrapper, error) { + if err := checkArgs(args); err != nil { + return nil, err + } + nb := ¬ifierBootstrapper{ incomingHeaderHandler: args.IncomingHeaderHandler, sovereignNotifier: args.SovereignNotifier, @@ -45,6 +53,26 @@ func NewNotifierBootstrapper(args ArgsNotifierBootstrapper) (*notifierBootstrapp return nb, nil } +func checkArgs(args ArgsNotifierBootstrapper) error { + if check.IfNil(args.IncomingHeaderHandler) { + return errors.ErrNilIncomingHeaderSubscriber + } + if check.IfNil(args.SovereignNotifier) { + return errNilSovereignNotifier + } + if check.IfNil(args.ForkDetector) { + return errors.ErrNilForkDetector + } + if check.IfNil(args.Bootstrapper) { + return process.ErrNilBootstrapper + } + if args.RoundDuration == 0 { + return errors.ErrInvalidRoundDuration + } + + return nil +} + func (nb *notifierBootstrapper) receivedSyncState(isNodeSynchronized bool) { if isNodeSynchronized && nb.forkDetector.GetHighestFinalBlockNonce() != 0 { select { @@ -54,6 +82,9 @@ func (nb *notifierBootstrapper) receivedSyncState(isNodeSynchronized bool) { } } +// Start will start waiting on a go routine to be notified via nodeSyncedChan when the sovereign node is synced. +// Meanwhile, it will print the current node state in log. When node is fully synced, it will register the incoming header +// processor to the websocket listener and exit the waiting loop. func (nb *notifierBootstrapper) Start() { var ctx context.Context ctx, nb.cancelFunc = context.WithCancel(context.Background()) @@ -68,22 +99,24 @@ func (nb *notifierBootstrapper) checkNodeState(ctx context.Context) { for { select { case <-ctx.Done(): - log.Debug("worker's go routine is stopping...") + log.Debug("notifierBootstrapper.checkNodeState: worker's go routine is stopping...") return case <-nb.nodeSyncedChan: err := nb.sovereignNotifier.RegisterHandler(nb.incomingHeaderHandler) if err != nil { log.Error("notifierBootstrapper: sovereignNotifier.RegisterHandler", "err", err) + continue } + log.Debug("notifierBootstrapper.checkNodeState", "is node synced", true) return case <-ticker.C: log.Debug("notifierBootstrapper.checkNodeState", "is node synced", false) } - } } +// Close cancels current context and empties channel reads func (nb *notifierBootstrapper) Close() error { if nb.cancelFunc != nil { nb.cancelFunc() @@ -93,3 +126,8 @@ func (nb *notifierBootstrapper) Close() error { log.Debug("notifierBootstrapper: emptied channel", "nodeSyncedChan nrReads", nrReads) return nil } + +// IsInterfaceNil checks if the underlying pointer is nil +func (nb *notifierBootstrapper) IsInterfaceNil() bool { + return nb == nil +} diff --git a/cmd/sovereignnode/notifier/notifierBootstrapper_test.go b/cmd/sovereignnode/notifier/notifierBootstrapper_test.go new file mode 100644 index 00000000000..c238bf0223c --- /dev/null +++ b/cmd/sovereignnode/notifier/notifierBootstrapper_test.go @@ -0,0 +1,142 @@ +package notifier + +import ( + "reflect" + "runtime" + "testing" + "time" + + "github.com/multiversx/mx-chain-go/errors" + "github.com/multiversx/mx-chain-go/integrationTests/mock" + "github.com/multiversx/mx-chain-go/process" + processMocks "github.com/multiversx/mx-chain-go/process/mock" + "github.com/multiversx/mx-chain-go/testscommon/sovereign" + notifierProcess "github.com/multiversx/mx-chain-sovereign-notifier-go/process" + "github.com/multiversx/mx-chain-sovereign-notifier-go/testscommon" + "github.com/stretchr/testify/require" +) + +func createArgs() ArgsNotifierBootstrapper { + return ArgsNotifierBootstrapper{ + IncomingHeaderHandler: &sovereign.IncomingHeaderSubscriberStub{}, + SovereignNotifier: &testscommon.SovereignNotifierStub{}, + ForkDetector: &mock.ForkDetectorStub{}, + Bootstrapper: &processMocks.BootstrapperStub{}, + RoundDuration: 100, + } +} + +func getFunctionName(i interface{}) string { + return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name() +} + +func TestNewNotifierBootstrapper(t *testing.T) { + t.Parallel() + + t.Run("nil incoming header processor", func(t *testing.T) { + args := createArgs() + args.IncomingHeaderHandler = nil + nb, err := NewNotifierBootstrapper(args) + require.Nil(t, nb) + require.Equal(t, errors.ErrNilIncomingHeaderSubscriber, err) + }) + t.Run("nil sovereign notifier", func(t *testing.T) { + args := createArgs() + args.SovereignNotifier = nil + nb, err := NewNotifierBootstrapper(args) + require.Nil(t, nb) + require.Equal(t, errNilSovereignNotifier, err) + }) + t.Run("nil fork detector", func(t *testing.T) { + args := createArgs() + args.ForkDetector = nil + nb, err := NewNotifierBootstrapper(args) + require.Nil(t, nb) + require.Equal(t, errors.ErrNilForkDetector, err) + }) + t.Run("nil bootstrapper", func(t *testing.T) { + args := createArgs() + args.Bootstrapper = nil + nb, err := NewNotifierBootstrapper(args) + require.Nil(t, nb) + require.Equal(t, process.ErrNilBootstrapper, err) + }) + t.Run("zero value round duration", func(t *testing.T) { + args := createArgs() + args.RoundDuration = 0 + nb, err := NewNotifierBootstrapper(args) + require.Nil(t, nb) + require.Equal(t, errors.ErrInvalidRoundDuration, err) + }) + t.Run("should work", func(t *testing.T) { + args := createArgs() + nb, err := NewNotifierBootstrapper(args) + require.Nil(t, err) + require.False(t, nb.IsInterfaceNil()) + }) +} + +func TestNotifierBootstrapper_Start(t *testing.T) { + t.Parallel() + + args := createArgs() + + wasRegisteredToStateSync := false + args.Bootstrapper = &processMocks.BootstrapperStub{ + AddSyncStateListenerCalled: func(f func(bool)) { + require.Contains(t, getFunctionName(f), "(*notifierBootstrapper).receivedSyncState") + wasRegisteredToStateSync = true + }, + } + + registerCalledCt := 0 + args.SovereignNotifier = &testscommon.SovereignNotifierStub{ + RegisterHandlerCalled: func(handler notifierProcess.IncomingHeaderSubscriber) error { + require.Equal(t, args.IncomingHeaderHandler, handler) + registerCalledCt++ + return nil + }, + } + + getHighestNonceCalledCt := 0 + args.ForkDetector = &mock.ForkDetectorStub{ + GetHighestFinalBlockNonceCalled: func() uint64 { + defer func() { + getHighestNonceCalledCt++ + }() + + return uint64(getHighestNonceCalledCt) + }, + } + + nb, _ := NewNotifierBootstrapper(args) + require.True(t, wasRegisteredToStateSync) + + nb.Start() + + defer func() { + err := nb.Close() + require.Nil(t, err) + }() + + time.Sleep(time.Millisecond * 50) + require.Zero(t, registerCalledCt) + require.Zero(t, getHighestNonceCalledCt) + + nb.receivedSyncState(true) + time.Sleep(time.Millisecond * 50) + require.Zero(t, registerCalledCt) + require.Equal(t, 1, getHighestNonceCalledCt) + + nb.receivedSyncState(true) + time.Sleep(time.Millisecond * 50) + require.Equal(t, 1, registerCalledCt) + require.Equal(t, 2, getHighestNonceCalledCt) + + for i := 3; i < 10; i++ { + nb.receivedSyncState(true) + time.Sleep(time.Millisecond * 50) + require.Equal(t, 1, registerCalledCt) + require.Equal(t, i, getHighestNonceCalledCt) + } +} From 0f972f52a5dbee1d48a4aaa6a57cf567b58f64d3 Mon Sep 17 00:00:00 2001 From: MariusC Date: Thu, 31 Oct 2024 17:54:54 +0200 Subject: [PATCH 6/8] FEAT: Extend unit tests notifier bootstrapper --- .../notifier/notifierBootstrapper_test.go | 96 ++++++++++++++++++- 1 file changed, 92 insertions(+), 4 deletions(-) diff --git a/cmd/sovereignnode/notifier/notifierBootstrapper_test.go b/cmd/sovereignnode/notifier/notifierBootstrapper_test.go index c238bf0223c..72091d4af35 100644 --- a/cmd/sovereignnode/notifier/notifierBootstrapper_test.go +++ b/cmd/sovereignnode/notifier/notifierBootstrapper_test.go @@ -1,12 +1,14 @@ package notifier import ( + "context" + "errors" "reflect" "runtime" "testing" "time" - "github.com/multiversx/mx-chain-go/errors" + errorsMx "github.com/multiversx/mx-chain-go/errors" "github.com/multiversx/mx-chain-go/integrationTests/mock" "github.com/multiversx/mx-chain-go/process" processMocks "github.com/multiversx/mx-chain-go/process/mock" @@ -38,7 +40,7 @@ func TestNewNotifierBootstrapper(t *testing.T) { args.IncomingHeaderHandler = nil nb, err := NewNotifierBootstrapper(args) require.Nil(t, nb) - require.Equal(t, errors.ErrNilIncomingHeaderSubscriber, err) + require.Equal(t, errorsMx.ErrNilIncomingHeaderSubscriber, err) }) t.Run("nil sovereign notifier", func(t *testing.T) { args := createArgs() @@ -52,7 +54,7 @@ func TestNewNotifierBootstrapper(t *testing.T) { args.ForkDetector = nil nb, err := NewNotifierBootstrapper(args) require.Nil(t, nb) - require.Equal(t, errors.ErrNilForkDetector, err) + require.Equal(t, errorsMx.ErrNilForkDetector, err) }) t.Run("nil bootstrapper", func(t *testing.T) { args := createArgs() @@ -66,7 +68,7 @@ func TestNewNotifierBootstrapper(t *testing.T) { args.RoundDuration = 0 nb, err := NewNotifierBootstrapper(args) require.Nil(t, nb) - require.Equal(t, errors.ErrInvalidRoundDuration, err) + require.Equal(t, errorsMx.ErrInvalidRoundDuration, err) }) t.Run("should work", func(t *testing.T) { args := createArgs() @@ -123,6 +125,11 @@ func TestNotifierBootstrapper_Start(t *testing.T) { require.Zero(t, registerCalledCt) require.Zero(t, getHighestNonceCalledCt) + nb.receivedSyncState(false) + time.Sleep(time.Millisecond * 50) + require.Zero(t, registerCalledCt) + require.Zero(t, registerCalledCt) + nb.receivedSyncState(true) time.Sleep(time.Millisecond * 50) require.Zero(t, registerCalledCt) @@ -140,3 +147,84 @@ func TestNotifierBootstrapper_Start(t *testing.T) { require.Equal(t, i, getHighestNonceCalledCt) } } + +func TestNotifierBootstrapper_StartWithRegisterFailing(t *testing.T) { + t.Parallel() + + args := createArgs() + args.RoundDuration = 10 + + registerCalledCt := 0 + args.SovereignNotifier = &testscommon.SovereignNotifierStub{ + RegisterHandlerCalled: func(handler notifierProcess.IncomingHeaderSubscriber) error { + require.Equal(t, args.IncomingHeaderHandler, handler) + + defer func() { + registerCalledCt++ + }() + + switch registerCalledCt { + case 0, 1: + return errors.New("local error") + } + + return nil + }, + } + + args.ForkDetector = &mock.ForkDetectorStub{ + GetHighestFinalBlockNonceCalled: func() uint64 { + return 1 + }, + } + + nb, _ := NewNotifierBootstrapper(args) + + nb.Start() + + defer func() { + err := nb.Close() + require.Nil(t, err) + }() + + time.Sleep(time.Millisecond * 200) + require.Zero(t, registerCalledCt) + + nb.receivedSyncState(true) + time.Sleep(time.Millisecond * 50) + require.Equal(t, 1, registerCalledCt) + + nb.receivedSyncState(true) + time.Sleep(time.Millisecond * 50) + require.Equal(t, 2, registerCalledCt) + + // Once registered, the waiting is done, no other register is called + for i := 3; i < 10; i++ { + nb.receivedSyncState(true) + time.Sleep(time.Millisecond * 50) + require.Equal(t, 3, registerCalledCt) + } +} + +func TestCheckNodeState_CtxDone(t *testing.T) { + t.Parallel() + + args := createArgs() + nb, _ := NewNotifierBootstrapper(args) + + ctx, cancel := context.WithCancel(context.Background()) + doneChan := make(chan struct{}) + + go func() { + nb.checkNodeState(ctx) + close(doneChan) + }() + + cancel() + + select { + case <-doneChan: + case <-time.After(time.Second): + require.Fail(t, "checkNodeState did not exit on ctx.Done() as expected") + } +} From d9d349afef4d81befd2e1b1e5299a6dba337d042 Mon Sep 17 00:00:00 2001 From: MariusC Date: Thu, 31 Oct 2024 18:09:41 +0200 Subject: [PATCH 7/8] FIX: Unit test + small cleanup --- cmd/sovereignnode/notifier/interface.go | 2 ++ cmd/sovereignnode/notifier/notifierBootstrapper.go | 2 +- process/block/sovereignCrossNotarizer_test.go | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/cmd/sovereignnode/notifier/interface.go b/cmd/sovereignnode/notifier/interface.go index 7d4f70a5205..345c9cdf2c6 100644 --- a/cmd/sovereignnode/notifier/interface.go +++ b/cmd/sovereignnode/notifier/interface.go @@ -1,6 +1,8 @@ package notifier +// SovereignNotifierBootstrapper defines a sovereign notifier bootstrapper type SovereignNotifierBootstrapper interface { Start() Close() error + IsInterfaceNil() bool } diff --git a/cmd/sovereignnode/notifier/notifierBootstrapper.go b/cmd/sovereignnode/notifier/notifierBootstrapper.go index 39cbcaf1ad1..52e83ae51c2 100644 --- a/cmd/sovereignnode/notifier/notifierBootstrapper.go +++ b/cmd/sovereignnode/notifier/notifierBootstrapper.go @@ -33,7 +33,7 @@ type notifierBootstrapper struct { roundDuration uint64 } -// NewNotifierBootstrapper creates a sovereign notifier ws receiver connection bootstrapper +// NewNotifierBootstrapper creates a ws receiver connection registration bootstrapper func NewNotifierBootstrapper(args ArgsNotifierBootstrapper) (*notifierBootstrapper, error) { if err := checkArgs(args); err != nil { return nil, err diff --git a/process/block/sovereignCrossNotarizer_test.go b/process/block/sovereignCrossNotarizer_test.go index 3db9e337e7c..9f4d1b3d698 100644 --- a/process/block/sovereignCrossNotarizer_test.go +++ b/process/block/sovereignCrossNotarizer_test.go @@ -33,7 +33,7 @@ func TestSovereignShardCrossNotarizer_getLastCrossNotarizedHeaders(t *testing.T) headers := sovereignNotarzier.getLastCrossNotarizedHeaders() expectedHeaders := []bootstrapStorage.BootstrapHeaderInfo{ { - ShardId: header.GetShardID(), + ShardId: core.MainChainShardId, Nonce: header.GetNonce(), Hash: hash, }, From cc78eb4580ce3bf21f74e174b8c42d1ce6c43781 Mon Sep 17 00:00:00 2001 From: MariusC Date: Thu, 7 Nov 2024 17:49:43 +0200 Subject: [PATCH 8/8] FEAT: Stop node on channel if notifier bootstrapper fails --- .../notifier/notifierBootstrapper.go | 10 +++++-- .../notifier/notifierBootstrapper_test.go | 27 ++++++++++--------- cmd/sovereignnode/sovereignNodeRunner.go | 9 ++++--- 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/cmd/sovereignnode/notifier/notifierBootstrapper.go b/cmd/sovereignnode/notifier/notifierBootstrapper.go index 52e83ae51c2..09acaea25b3 100644 --- a/cmd/sovereignnode/notifier/notifierBootstrapper.go +++ b/cmd/sovereignnode/notifier/notifierBootstrapper.go @@ -2,6 +2,8 @@ package notifier import ( "context" + "os" + "syscall" "time" "github.com/multiversx/mx-chain-core-go/core" @@ -20,6 +22,7 @@ type ArgsNotifierBootstrapper struct { SovereignNotifier notifierProcess.SovereignNotifier ForkDetector process.ForkDetector Bootstrapper process.Bootstrapper + SigStopNode chan os.Signal RoundDuration uint64 } @@ -27,6 +30,7 @@ type notifierBootstrapper struct { incomingHeaderHandler process.IncomingHeaderSubscriber sovereignNotifier notifierProcess.SovereignNotifier forkDetector process.ForkDetector + sigStopNode chan os.Signal nodeSyncedChan chan bool cancelFunc func() @@ -46,6 +50,7 @@ func NewNotifierBootstrapper(args ArgsNotifierBootstrapper) (*notifierBootstrapp nodeSyncedChan: make(chan bool, 1), cancelFunc: nil, roundDuration: args.RoundDuration, + sigStopNode: args.SigStopNode, } args.Bootstrapper.AddSyncStateListener(nb.receivedSyncState) @@ -105,10 +110,11 @@ func (nb *notifierBootstrapper) checkNodeState(ctx context.Context) { err := nb.sovereignNotifier.RegisterHandler(nb.incomingHeaderHandler) if err != nil { log.Error("notifierBootstrapper: sovereignNotifier.RegisterHandler", "err", err) - continue + nb.sigStopNode <- syscall.SIGTERM + } else { + log.Debug("notifierBootstrapper.checkNodeState", "is node synced", true) } - log.Debug("notifierBootstrapper.checkNodeState", "is node synced", true) return case <-ticker.C: log.Debug("notifierBootstrapper.checkNodeState", "is node synced", false) diff --git a/cmd/sovereignnode/notifier/notifierBootstrapper_test.go b/cmd/sovereignnode/notifier/notifierBootstrapper_test.go index 72091d4af35..a24cbd9ae24 100644 --- a/cmd/sovereignnode/notifier/notifierBootstrapper_test.go +++ b/cmd/sovereignnode/notifier/notifierBootstrapper_test.go @@ -3,8 +3,10 @@ package notifier import ( "context" "errors" + "os" "reflect" "runtime" + "syscall" "testing" "time" @@ -151,7 +153,10 @@ func TestNotifierBootstrapper_Start(t *testing.T) { func TestNotifierBootstrapper_StartWithRegisterFailing(t *testing.T) { t.Parallel() + sigStopNodeMock := make(chan os.Signal, 1) + args := createArgs() + args.SigStopNode = sigStopNodeMock args.RoundDuration = 10 registerCalledCt := 0 @@ -163,12 +168,7 @@ func TestNotifierBootstrapper_StartWithRegisterFailing(t *testing.T) { registerCalledCt++ }() - switch registerCalledCt { - case 0, 1: - return errors.New("local error") - } - - return nil + return errors.New("local error") }, } @@ -194,15 +194,18 @@ func TestNotifierBootstrapper_StartWithRegisterFailing(t *testing.T) { time.Sleep(time.Millisecond * 50) require.Equal(t, 1, registerCalledCt) - nb.receivedSyncState(true) - time.Sleep(time.Millisecond * 50) - require.Equal(t, 2, registerCalledCt) + select { + case sig := <-sigStopNodeMock: + require.Equal(t, syscall.SIGTERM, sig) + case <-time.After(time.Millisecond * 100): // Timeout to avoid hanging + t.Error("expected SIGTERM signal on sigStopNodeMock, but none received") + } - // Once registered, the waiting is done, no other register is called - for i := 3; i < 10; i++ { + // Once registration fails, the waiting is done, no other register is called + for i := 0; i < 10; i++ { nb.receivedSyncState(true) time.Sleep(time.Millisecond * 50) - require.Equal(t, 3, registerCalledCt) + require.Equal(t, 1, registerCalledCt) } } diff --git a/cmd/sovereignnode/sovereignNodeRunner.go b/cmd/sovereignnode/sovereignNodeRunner.go index c20fc77add4..6d90048e029 100644 --- a/cmd/sovereignnode/sovereignNodeRunner.go +++ b/cmd/sovereignnode/sovereignNodeRunner.go @@ -542,12 +542,16 @@ func (snr *sovereignNodeRunner) executeOneComponentCreationCycle( return true, err } + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + sovereignNotifierBootstrapper, err := startSovereignNotifierBootstrapper( incomingHeaderHandler, sovereignNotifier, managedCoreComponents.GenesisNodesSetup().GetRoundDuration(), managedProcessComponents.ForkDetector(), managedConsensusComponents.Bootstrapper(), + sigs, ) if err != nil { return true, err @@ -623,9 +627,6 @@ func (snr *sovereignNodeRunner) executeOneComponentCreationCycle( statusHandler.SetStringValue(common.MetricAreVMQueriesReady, strconv.FormatBool(true)) }(managedStatusCoreComponents.AppStatusHandler()) - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - err = waitForSignal( sigs, managedCoreComponents.ChanStopNodeProcess(), @@ -1941,6 +1942,7 @@ func startSovereignNotifierBootstrapper( roundDuration uint64, forkDetector process.ForkDetector, bootstrapper process.Bootstrapper, + sigStopNode chan os.Signal, ) (notifier.SovereignNotifierBootstrapper, error) { args := notifier.ArgsNotifierBootstrapper{ IncomingHeaderHandler: incomingHeaderHandler, @@ -1948,6 +1950,7 @@ func startSovereignNotifierBootstrapper( ForkDetector: forkDetector, Bootstrapper: bootstrapper, RoundDuration: roundDuration, + SigStopNode: sigStopNode, } notifierBootstrapper, err := notifier.NewNotifierBootstrapper(args)