Skip to content

Commit

Permalink
Merge pull request #6571 from multiversx/MX-15963-disable-notifier-un…
Browse files Browse the repository at this point in the history
…til-synced

[sovereign] Disable notifier registration until fully synced
  • Loading branch information
mariusmihaic authored Nov 8, 2024
2 parents 49c93d9 + cc78eb4 commit deaafbd
Show file tree
Hide file tree
Showing 7 changed files with 451 additions and 21 deletions.
5 changes: 5 additions & 0 deletions cmd/sovereignnode/notifier/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package notifier

import "errors"

var errNilSovereignNotifier = errors.New("nil sovereign notifier provided")
8 changes: 8 additions & 0 deletions cmd/sovereignnode/notifier/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package notifier

// SovereignNotifierBootstrapper defines a sovereign notifier bootstrapper
type SovereignNotifierBootstrapper interface {
Start()
Close() error
IsInterfaceNil() bool
}
139 changes: 139 additions & 0 deletions cmd/sovereignnode/notifier/notifierBootstrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package notifier

import (
"context"
"os"
"syscall"
"time"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-go/errors"
"github.com/multiversx/mx-chain-go/process"
logger "github.com/multiversx/mx-chain-logger-go"
notifierProcess "github.com/multiversx/mx-chain-sovereign-notifier-go/process"
)

var log = logger.GetOrCreate("notifier-bootstrap")

// ArgsNotifierBootstrapper defines args needed to create a new notifier bootstrapper
type ArgsNotifierBootstrapper struct {
IncomingHeaderHandler process.IncomingHeaderSubscriber
SovereignNotifier notifierProcess.SovereignNotifier
ForkDetector process.ForkDetector
Bootstrapper process.Bootstrapper
SigStopNode chan os.Signal
RoundDuration uint64
}

type notifierBootstrapper struct {
incomingHeaderHandler process.IncomingHeaderSubscriber
sovereignNotifier notifierProcess.SovereignNotifier
forkDetector process.ForkDetector
sigStopNode chan os.Signal

nodeSyncedChan chan bool
cancelFunc func()
roundDuration uint64
}

// NewNotifierBootstrapper creates a ws receiver connection registration bootstrapper
func NewNotifierBootstrapper(args ArgsNotifierBootstrapper) (*notifierBootstrapper, error) {
if err := checkArgs(args); err != nil {
return nil, err
}

nb := &notifierBootstrapper{
incomingHeaderHandler: args.IncomingHeaderHandler,
sovereignNotifier: args.SovereignNotifier,
forkDetector: args.ForkDetector,
nodeSyncedChan: make(chan bool, 1),
cancelFunc: nil,
roundDuration: args.RoundDuration,
sigStopNode: args.SigStopNode,
}

args.Bootstrapper.AddSyncStateListener(nb.receivedSyncState)

return nb, nil
}

func checkArgs(args ArgsNotifierBootstrapper) error {
if check.IfNil(args.IncomingHeaderHandler) {
return errors.ErrNilIncomingHeaderSubscriber
}
if check.IfNil(args.SovereignNotifier) {
return errNilSovereignNotifier
}
if check.IfNil(args.ForkDetector) {
return errors.ErrNilForkDetector
}
if check.IfNil(args.Bootstrapper) {
return process.ErrNilBootstrapper
}
if args.RoundDuration == 0 {
return errors.ErrInvalidRoundDuration
}

return nil
}

func (nb *notifierBootstrapper) receivedSyncState(isNodeSynchronized bool) {
if isNodeSynchronized && nb.forkDetector.GetHighestFinalBlockNonce() != 0 {
select {
case nb.nodeSyncedChan <- true:
default:
}
}
}

// Start will start waiting on a go routine to be notified via nodeSyncedChan when the sovereign node is synced.
// Meanwhile, it will print the current node state in log. When node is fully synced, it will register the incoming header
// processor to the websocket listener and exit the waiting loop.
func (nb *notifierBootstrapper) Start() {
var ctx context.Context
ctx, nb.cancelFunc = context.WithCancel(context.Background())
go nb.checkNodeState(ctx)
}

func (nb *notifierBootstrapper) checkNodeState(ctx context.Context) {
timeToWaitReSync := (process.MaxRoundsWithoutNewBlockReceived + 1) * nb.roundDuration
ticker := time.NewTicker(time.Duration(timeToWaitReSync) * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
log.Debug("notifierBootstrapper.checkNodeState: worker's go routine is stopping...")
return
case <-nb.nodeSyncedChan:
err := nb.sovereignNotifier.RegisterHandler(nb.incomingHeaderHandler)
if err != nil {
log.Error("notifierBootstrapper: sovereignNotifier.RegisterHandler", "err", err)
nb.sigStopNode <- syscall.SIGTERM
} else {
log.Debug("notifierBootstrapper.checkNodeState", "is node synced", true)
}

return
case <-ticker.C:
log.Debug("notifierBootstrapper.checkNodeState", "is node synced", false)
}
}
}

// Close cancels current context and empties channel reads
func (nb *notifierBootstrapper) Close() error {
if nb.cancelFunc != nil {
nb.cancelFunc()
}

nrReads := core.EmptyChannel(nb.nodeSyncedChan)
log.Debug("notifierBootstrapper: emptied channel", "nodeSyncedChan nrReads", nrReads)
return nil
}

// IsInterfaceNil checks if the underlying pointer is nil
func (nb *notifierBootstrapper) IsInterfaceNil() bool {
return nb == nil
}
233 changes: 233 additions & 0 deletions cmd/sovereignnode/notifier/notifierBootstrapper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
package notifier

import (
"context"
"errors"
"os"
"reflect"
"runtime"
"syscall"
"testing"
"time"

errorsMx "github.com/multiversx/mx-chain-go/errors"
"github.com/multiversx/mx-chain-go/integrationTests/mock"
"github.com/multiversx/mx-chain-go/process"
processMocks "github.com/multiversx/mx-chain-go/process/mock"
"github.com/multiversx/mx-chain-go/testscommon/sovereign"
notifierProcess "github.com/multiversx/mx-chain-sovereign-notifier-go/process"
"github.com/multiversx/mx-chain-sovereign-notifier-go/testscommon"
"github.com/stretchr/testify/require"
)

func createArgs() ArgsNotifierBootstrapper {
return ArgsNotifierBootstrapper{
IncomingHeaderHandler: &sovereign.IncomingHeaderSubscriberStub{},
SovereignNotifier: &testscommon.SovereignNotifierStub{},
ForkDetector: &mock.ForkDetectorStub{},
Bootstrapper: &processMocks.BootstrapperStub{},
RoundDuration: 100,
}
}

func getFunctionName(i interface{}) string {
return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name()
}

func TestNewNotifierBootstrapper(t *testing.T) {
t.Parallel()

t.Run("nil incoming header processor", func(t *testing.T) {
args := createArgs()
args.IncomingHeaderHandler = nil
nb, err := NewNotifierBootstrapper(args)
require.Nil(t, nb)
require.Equal(t, errorsMx.ErrNilIncomingHeaderSubscriber, err)
})
t.Run("nil sovereign notifier", func(t *testing.T) {
args := createArgs()
args.SovereignNotifier = nil
nb, err := NewNotifierBootstrapper(args)
require.Nil(t, nb)
require.Equal(t, errNilSovereignNotifier, err)
})
t.Run("nil fork detector", func(t *testing.T) {
args := createArgs()
args.ForkDetector = nil
nb, err := NewNotifierBootstrapper(args)
require.Nil(t, nb)
require.Equal(t, errorsMx.ErrNilForkDetector, err)
})
t.Run("nil bootstrapper", func(t *testing.T) {
args := createArgs()
args.Bootstrapper = nil
nb, err := NewNotifierBootstrapper(args)
require.Nil(t, nb)
require.Equal(t, process.ErrNilBootstrapper, err)
})
t.Run("zero value round duration", func(t *testing.T) {
args := createArgs()
args.RoundDuration = 0
nb, err := NewNotifierBootstrapper(args)
require.Nil(t, nb)
require.Equal(t, errorsMx.ErrInvalidRoundDuration, err)
})
t.Run("should work", func(t *testing.T) {
args := createArgs()
nb, err := NewNotifierBootstrapper(args)
require.Nil(t, err)
require.False(t, nb.IsInterfaceNil())
})
}

func TestNotifierBootstrapper_Start(t *testing.T) {
t.Parallel()

args := createArgs()

wasRegisteredToStateSync := false
args.Bootstrapper = &processMocks.BootstrapperStub{
AddSyncStateListenerCalled: func(f func(bool)) {
require.Contains(t, getFunctionName(f), "(*notifierBootstrapper).receivedSyncState")
wasRegisteredToStateSync = true
},
}

registerCalledCt := 0
args.SovereignNotifier = &testscommon.SovereignNotifierStub{
RegisterHandlerCalled: func(handler notifierProcess.IncomingHeaderSubscriber) error {
require.Equal(t, args.IncomingHeaderHandler, handler)
registerCalledCt++
return nil
},
}

getHighestNonceCalledCt := 0
args.ForkDetector = &mock.ForkDetectorStub{
GetHighestFinalBlockNonceCalled: func() uint64 {
defer func() {
getHighestNonceCalledCt++
}()

return uint64(getHighestNonceCalledCt)
},
}

nb, _ := NewNotifierBootstrapper(args)
require.True(t, wasRegisteredToStateSync)

nb.Start()

defer func() {
err := nb.Close()
require.Nil(t, err)
}()

time.Sleep(time.Millisecond * 50)
require.Zero(t, registerCalledCt)
require.Zero(t, getHighestNonceCalledCt)

nb.receivedSyncState(false)
time.Sleep(time.Millisecond * 50)
require.Zero(t, registerCalledCt)
require.Zero(t, registerCalledCt)

nb.receivedSyncState(true)
time.Sleep(time.Millisecond * 50)
require.Zero(t, registerCalledCt)
require.Equal(t, 1, getHighestNonceCalledCt)

nb.receivedSyncState(true)
time.Sleep(time.Millisecond * 50)
require.Equal(t, 1, registerCalledCt)
require.Equal(t, 2, getHighestNonceCalledCt)

for i := 3; i < 10; i++ {
nb.receivedSyncState(true)
time.Sleep(time.Millisecond * 50)
require.Equal(t, 1, registerCalledCt)
require.Equal(t, i, getHighestNonceCalledCt)
}
}

func TestNotifierBootstrapper_StartWithRegisterFailing(t *testing.T) {
t.Parallel()

sigStopNodeMock := make(chan os.Signal, 1)

args := createArgs()
args.SigStopNode = sigStopNodeMock
args.RoundDuration = 10

registerCalledCt := 0
args.SovereignNotifier = &testscommon.SovereignNotifierStub{
RegisterHandlerCalled: func(handler notifierProcess.IncomingHeaderSubscriber) error {
require.Equal(t, args.IncomingHeaderHandler, handler)

defer func() {
registerCalledCt++
}()

return errors.New("local error")
},
}

args.ForkDetector = &mock.ForkDetectorStub{
GetHighestFinalBlockNonceCalled: func() uint64 {
return 1
},
}

nb, _ := NewNotifierBootstrapper(args)

nb.Start()

defer func() {
err := nb.Close()
require.Nil(t, err)
}()

time.Sleep(time.Millisecond * 200)
require.Zero(t, registerCalledCt)

nb.receivedSyncState(true)
time.Sleep(time.Millisecond * 50)
require.Equal(t, 1, registerCalledCt)

select {
case sig := <-sigStopNodeMock:
require.Equal(t, syscall.SIGTERM, sig)
case <-time.After(time.Millisecond * 100): // Timeout to avoid hanging
t.Error("expected SIGTERM signal on sigStopNodeMock, but none received")
}

// Once registration fails, the waiting is done, no other register is called
for i := 0; i < 10; i++ {
nb.receivedSyncState(true)
time.Sleep(time.Millisecond * 50)
require.Equal(t, 1, registerCalledCt)
}
}

func TestCheckNodeState_CtxDone(t *testing.T) {
t.Parallel()

args := createArgs()
nb, _ := NewNotifierBootstrapper(args)

ctx, cancel := context.WithCancel(context.Background())
doneChan := make(chan struct{})

go func() {
nb.checkNodeState(ctx)
close(doneChan)
}()

cancel()

select {
case <-doneChan:
case <-time.After(time.Second):
require.Fail(t, "checkNodeState did not exit on ctx.Done() as expected")
}
}
Loading

0 comments on commit deaafbd

Please sign in to comment.