Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sovereign] Disable notifier registration until fully synced #6571

5 changes: 5 additions & 0 deletions cmd/sovereignnode/notifier/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package notifier

import "errors"

var errNilSovereignNotifier = errors.New("nil sovereign notifier provided")
8 changes: 8 additions & 0 deletions cmd/sovereignnode/notifier/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package notifier

// SovereignNotifierBootstrapper defines a sovereign notifier bootstrapper
type SovereignNotifierBootstrapper interface {
Start()
Close() error
IsInterfaceNil() bool
}
139 changes: 139 additions & 0 deletions cmd/sovereignnode/notifier/notifierBootstrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package notifier

import (
"context"
"os"
"syscall"
"time"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-go/errors"
"github.com/multiversx/mx-chain-go/process"
logger "github.com/multiversx/mx-chain-logger-go"
notifierProcess "github.com/multiversx/mx-chain-sovereign-notifier-go/process"
)

var log = logger.GetOrCreate("notifier-bootstrap")

// ArgsNotifierBootstrapper defines args needed to create a new notifier bootstrapper
type ArgsNotifierBootstrapper struct {
IncomingHeaderHandler process.IncomingHeaderSubscriber
SovereignNotifier notifierProcess.SovereignNotifier
ForkDetector process.ForkDetector
Bootstrapper process.Bootstrapper
SigStopNode chan os.Signal
RoundDuration uint64
}

type notifierBootstrapper struct {
incomingHeaderHandler process.IncomingHeaderSubscriber
sovereignNotifier notifierProcess.SovereignNotifier
forkDetector process.ForkDetector
sigStopNode chan os.Signal

nodeSyncedChan chan bool
cancelFunc func()
roundDuration uint64
}

// NewNotifierBootstrapper creates a ws receiver connection registration bootstrapper
func NewNotifierBootstrapper(args ArgsNotifierBootstrapper) (*notifierBootstrapper, error) {
if err := checkArgs(args); err != nil {
return nil, err
}

nb := &notifierBootstrapper{
incomingHeaderHandler: args.IncomingHeaderHandler,
sovereignNotifier: args.SovereignNotifier,
forkDetector: args.ForkDetector,
nodeSyncedChan: make(chan bool, 1),
cancelFunc: nil,
roundDuration: args.RoundDuration,
sigStopNode: args.SigStopNode,
}

args.Bootstrapper.AddSyncStateListener(nb.receivedSyncState)

return nb, nil
}

func checkArgs(args ArgsNotifierBootstrapper) error {
if check.IfNil(args.IncomingHeaderHandler) {
return errors.ErrNilIncomingHeaderSubscriber
}
if check.IfNil(args.SovereignNotifier) {
return errNilSovereignNotifier
}
if check.IfNil(args.ForkDetector) {
return errors.ErrNilForkDetector
}
if check.IfNil(args.Bootstrapper) {
return process.ErrNilBootstrapper
}
if args.RoundDuration == 0 {
return errors.ErrInvalidRoundDuration
}

return nil
}

func (nb *notifierBootstrapper) receivedSyncState(isNodeSynchronized bool) {
if isNodeSynchronized && nb.forkDetector.GetHighestFinalBlockNonce() != 0 {
select {
case nb.nodeSyncedChan <- true:
default:
}
}
}

// Start will start waiting on a go routine to be notified via nodeSyncedChan when the sovereign node is synced.
// Meanwhile, it will print the current node state in log. When node is fully synced, it will register the incoming header
// processor to the websocket listener and exit the waiting loop.
func (nb *notifierBootstrapper) Start() {
var ctx context.Context
ctx, nb.cancelFunc = context.WithCancel(context.Background())
go nb.checkNodeState(ctx)
}

func (nb *notifierBootstrapper) checkNodeState(ctx context.Context) {
timeToWaitReSync := (process.MaxRoundsWithoutNewBlockReceived + 1) * nb.roundDuration
ticker := time.NewTicker(time.Duration(timeToWaitReSync) * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
log.Debug("notifierBootstrapper.checkNodeState: worker's go routine is stopping...")
return
case <-nb.nodeSyncedChan:
err := nb.sovereignNotifier.RegisterHandler(nb.incomingHeaderHandler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is quite a problem is register is failing.

Can't we design in such a way that the node stops ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it as requested to signal the node to stop if registration fails

if err != nil {
log.Error("notifierBootstrapper: sovereignNotifier.RegisterHandler", "err", err)
nb.sigStopNode <- syscall.SIGTERM
} else {
log.Debug("notifierBootstrapper.checkNodeState", "is node synced", true)
}

return
case <-ticker.C:
log.Debug("notifierBootstrapper.checkNodeState", "is node synced", false)
}
}
}

// Close cancels current context and empties channel reads
func (nb *notifierBootstrapper) Close() error {
if nb.cancelFunc != nil {
nb.cancelFunc()
}

nrReads := core.EmptyChannel(nb.nodeSyncedChan)
log.Debug("notifierBootstrapper: emptied channel", "nodeSyncedChan nrReads", nrReads)
return nil
}

// IsInterfaceNil checks if the underlying pointer is nil
func (nb *notifierBootstrapper) IsInterfaceNil() bool {
return nb == nil
}
233 changes: 233 additions & 0 deletions cmd/sovereignnode/notifier/notifierBootstrapper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
package notifier

import (
"context"
"errors"
"os"
"reflect"
"runtime"
"syscall"
"testing"
"time"

errorsMx "github.com/multiversx/mx-chain-go/errors"
"github.com/multiversx/mx-chain-go/integrationTests/mock"
"github.com/multiversx/mx-chain-go/process"
processMocks "github.com/multiversx/mx-chain-go/process/mock"
"github.com/multiversx/mx-chain-go/testscommon/sovereign"
notifierProcess "github.com/multiversx/mx-chain-sovereign-notifier-go/process"
"github.com/multiversx/mx-chain-sovereign-notifier-go/testscommon"
"github.com/stretchr/testify/require"
)

func createArgs() ArgsNotifierBootstrapper {
return ArgsNotifierBootstrapper{
IncomingHeaderHandler: &sovereign.IncomingHeaderSubscriberStub{},
SovereignNotifier: &testscommon.SovereignNotifierStub{},
ForkDetector: &mock.ForkDetectorStub{},
Bootstrapper: &processMocks.BootstrapperStub{},
RoundDuration: 100,
}
}

func getFunctionName(i interface{}) string {
return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name()
}

func TestNewNotifierBootstrapper(t *testing.T) {
t.Parallel()

t.Run("nil incoming header processor", func(t *testing.T) {
args := createArgs()
args.IncomingHeaderHandler = nil
nb, err := NewNotifierBootstrapper(args)
require.Nil(t, nb)
require.Equal(t, errorsMx.ErrNilIncomingHeaderSubscriber, err)
})
t.Run("nil sovereign notifier", func(t *testing.T) {
args := createArgs()
args.SovereignNotifier = nil
nb, err := NewNotifierBootstrapper(args)
require.Nil(t, nb)
require.Equal(t, errNilSovereignNotifier, err)
})
t.Run("nil fork detector", func(t *testing.T) {
args := createArgs()
args.ForkDetector = nil
nb, err := NewNotifierBootstrapper(args)
require.Nil(t, nb)
require.Equal(t, errorsMx.ErrNilForkDetector, err)
})
t.Run("nil bootstrapper", func(t *testing.T) {
args := createArgs()
args.Bootstrapper = nil
nb, err := NewNotifierBootstrapper(args)
require.Nil(t, nb)
require.Equal(t, process.ErrNilBootstrapper, err)
})
t.Run("zero value round duration", func(t *testing.T) {
args := createArgs()
args.RoundDuration = 0
nb, err := NewNotifierBootstrapper(args)
require.Nil(t, nb)
require.Equal(t, errorsMx.ErrInvalidRoundDuration, err)
})
t.Run("should work", func(t *testing.T) {
args := createArgs()
nb, err := NewNotifierBootstrapper(args)
require.Nil(t, err)
require.False(t, nb.IsInterfaceNil())
})
}

func TestNotifierBootstrapper_Start(t *testing.T) {
t.Parallel()

args := createArgs()

wasRegisteredToStateSync := false
args.Bootstrapper = &processMocks.BootstrapperStub{
AddSyncStateListenerCalled: func(f func(bool)) {
require.Contains(t, getFunctionName(f), "(*notifierBootstrapper).receivedSyncState")
wasRegisteredToStateSync = true
},
}

registerCalledCt := 0
args.SovereignNotifier = &testscommon.SovereignNotifierStub{
RegisterHandlerCalled: func(handler notifierProcess.IncomingHeaderSubscriber) error {
require.Equal(t, args.IncomingHeaderHandler, handler)
registerCalledCt++
return nil
},
}

getHighestNonceCalledCt := 0
args.ForkDetector = &mock.ForkDetectorStub{
GetHighestFinalBlockNonceCalled: func() uint64 {
defer func() {
getHighestNonceCalledCt++
}()

return uint64(getHighestNonceCalledCt)
},
}

nb, _ := NewNotifierBootstrapper(args)
require.True(t, wasRegisteredToStateSync)

nb.Start()

defer func() {
err := nb.Close()
require.Nil(t, err)
}()

time.Sleep(time.Millisecond * 50)
require.Zero(t, registerCalledCt)
require.Zero(t, getHighestNonceCalledCt)

nb.receivedSyncState(false)
time.Sleep(time.Millisecond * 50)
require.Zero(t, registerCalledCt)
require.Zero(t, registerCalledCt)

nb.receivedSyncState(true)
time.Sleep(time.Millisecond * 50)
require.Zero(t, registerCalledCt)
require.Equal(t, 1, getHighestNonceCalledCt)

nb.receivedSyncState(true)
time.Sleep(time.Millisecond * 50)
require.Equal(t, 1, registerCalledCt)
require.Equal(t, 2, getHighestNonceCalledCt)

for i := 3; i < 10; i++ {
nb.receivedSyncState(true)
time.Sleep(time.Millisecond * 50)
require.Equal(t, 1, registerCalledCt)
require.Equal(t, i, getHighestNonceCalledCt)
}
}

func TestNotifierBootstrapper_StartWithRegisterFailing(t *testing.T) {
t.Parallel()

sigStopNodeMock := make(chan os.Signal, 1)

args := createArgs()
args.SigStopNode = sigStopNodeMock
args.RoundDuration = 10

registerCalledCt := 0
args.SovereignNotifier = &testscommon.SovereignNotifierStub{
RegisterHandlerCalled: func(handler notifierProcess.IncomingHeaderSubscriber) error {
require.Equal(t, args.IncomingHeaderHandler, handler)

defer func() {
registerCalledCt++
}()

return errors.New("local error")
},
}

args.ForkDetector = &mock.ForkDetectorStub{
GetHighestFinalBlockNonceCalled: func() uint64 {
return 1
},
}

nb, _ := NewNotifierBootstrapper(args)

nb.Start()

defer func() {
err := nb.Close()
require.Nil(t, err)
}()

time.Sleep(time.Millisecond * 200)
require.Zero(t, registerCalledCt)

nb.receivedSyncState(true)
time.Sleep(time.Millisecond * 50)
require.Equal(t, 1, registerCalledCt)

select {
case sig := <-sigStopNodeMock:
require.Equal(t, syscall.SIGTERM, sig)
case <-time.After(time.Millisecond * 100): // Timeout to avoid hanging
t.Error("expected SIGTERM signal on sigStopNodeMock, but none received")
}

// Once registration fails, the waiting is done, no other register is called
for i := 0; i < 10; i++ {
nb.receivedSyncState(true)
time.Sleep(time.Millisecond * 50)
require.Equal(t, 1, registerCalledCt)
}
}

func TestCheckNodeState_CtxDone(t *testing.T) {
t.Parallel()

args := createArgs()
nb, _ := NewNotifierBootstrapper(args)

ctx, cancel := context.WithCancel(context.Background())
doneChan := make(chan struct{})

go func() {
nb.checkNodeState(ctx)
close(doneChan)
}()

cancel()

select {
case <-doneChan:
case <-time.After(time.Second):
require.Fail(t, "checkNodeState did not exit on ctx.Done() as expected")
}
}
Loading
Loading