Skip to content

Commit

Permalink
Merge branch 'feat/sovereign-mainchain-header-sync' into MX-15984-fix…
Browse files Browse the repository at this point in the history
…-request-incoming-header
  • Loading branch information
mariusmihaic authored Nov 8, 2024
2 parents 47378d2 + 536725b commit d477412
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 66 deletions.
10 changes: 8 additions & 2 deletions cmd/sovereignnode/notifier/notifierBootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package notifier

import (
"context"
"os"
"syscall"
"time"

"github.com/multiversx/mx-chain-core-go/core"
Expand All @@ -20,13 +22,15 @@ type ArgsNotifierBootstrapper struct {
SovereignNotifier notifierProcess.SovereignNotifier
ForkDetector process.ForkDetector
Bootstrapper process.Bootstrapper
SigStopNode chan os.Signal
RoundDuration uint64
}

type notifierBootstrapper struct {
incomingHeaderHandler process.IncomingHeaderSubscriber
sovereignNotifier notifierProcess.SovereignNotifier
forkDetector process.ForkDetector
sigStopNode chan os.Signal

nodeSyncedChan chan bool
cancelFunc func()
Expand All @@ -46,6 +50,7 @@ func NewNotifierBootstrapper(args ArgsNotifierBootstrapper) (*notifierBootstrapp
nodeSyncedChan: make(chan bool, 1),
cancelFunc: nil,
roundDuration: args.RoundDuration,
sigStopNode: args.SigStopNode,
}

args.Bootstrapper.AddSyncStateListener(nb.receivedSyncState)
Expand Down Expand Up @@ -105,10 +110,11 @@ func (nb *notifierBootstrapper) checkNodeState(ctx context.Context) {
err := nb.sovereignNotifier.RegisterHandler(nb.incomingHeaderHandler)
if err != nil {
log.Error("notifierBootstrapper: sovereignNotifier.RegisterHandler", "err", err)
continue
nb.sigStopNode <- syscall.SIGTERM
} else {
log.Debug("notifierBootstrapper.checkNodeState", "is node synced", true)
}

log.Debug("notifierBootstrapper.checkNodeState", "is node synced", true)
return
case <-ticker.C:
log.Debug("notifierBootstrapper.checkNodeState", "is node synced", false)
Expand Down
27 changes: 15 additions & 12 deletions cmd/sovereignnode/notifier/notifierBootstrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package notifier
import (
"context"
"errors"
"os"
"reflect"
"runtime"
"syscall"
"testing"
"time"

Expand Down Expand Up @@ -151,7 +153,10 @@ func TestNotifierBootstrapper_Start(t *testing.T) {
func TestNotifierBootstrapper_StartWithRegisterFailing(t *testing.T) {
t.Parallel()

sigStopNodeMock := make(chan os.Signal, 1)

args := createArgs()
args.SigStopNode = sigStopNodeMock
args.RoundDuration = 10

registerCalledCt := 0
Expand All @@ -163,12 +168,7 @@ func TestNotifierBootstrapper_StartWithRegisterFailing(t *testing.T) {
registerCalledCt++
}()

switch registerCalledCt {
case 0, 1:
return errors.New("local error")
}

return nil
return errors.New("local error")
},
}

Expand All @@ -194,15 +194,18 @@ func TestNotifierBootstrapper_StartWithRegisterFailing(t *testing.T) {
time.Sleep(time.Millisecond * 50)
require.Equal(t, 1, registerCalledCt)

nb.receivedSyncState(true)
time.Sleep(time.Millisecond * 50)
require.Equal(t, 2, registerCalledCt)
select {
case sig := <-sigStopNodeMock:
require.Equal(t, syscall.SIGTERM, sig)
case <-time.After(time.Millisecond * 100): // Timeout to avoid hanging
t.Error("expected SIGTERM signal on sigStopNodeMock, but none received")
}

// Once registered, the waiting is done, no other register is called
for i := 3; i < 10; i++ {
// Once registration fails, the waiting is done, no other register is called
for i := 0; i < 10; i++ {
nb.receivedSyncState(true)
time.Sleep(time.Millisecond * 50)
require.Equal(t, 3, registerCalledCt)
require.Equal(t, 1, registerCalledCt)
}
}

Expand Down
14 changes: 11 additions & 3 deletions cmd/sovereignnode/sovereignNodeRunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,13 +529,20 @@ func (snr *sovereignNodeRunner) executeOneComponentCreationCycle(
return true, err
}

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

notifierServices, err := createNotifierWSReceiverServicesIfNeeded(
&configs.SovereignExtraConfig.NotifierConfig,
incomingHeaderHandler,
managedCoreComponents.GenesisNodesSetup().GetRoundDuration(),
managedProcessComponents.ForkDetector(),
managedConsensusComponents.Bootstrapper(),
sigs,
)
if err != nil {
return true, err
}

log.Debug("creating node structure")

Expand Down Expand Up @@ -605,9 +612,6 @@ func (snr *sovereignNodeRunner) executeOneComponentCreationCycle(
statusHandler.SetStringValue(common.MetricAreVMQueriesReady, strconv.FormatBool(true))
}(managedStatusCoreComponents.AppStatusHandler())

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

err = waitForSignal(
sigs,
managedCoreComponents.ChanStopNodeProcess(),
Expand Down Expand Up @@ -1892,6 +1896,7 @@ func createNotifierWSReceiverServicesIfNeeded(
roundDuration uint64,
forkDetector process.ForkDetector,
bootstrapper process.Bootstrapper,
sigStopNode chan os.Signal,
) ([]mainFactory.Closer, error) {
if !config.Enabled {
log.Info("running without any notifier attached")
Expand All @@ -1916,6 +1921,7 @@ func createNotifierWSReceiverServicesIfNeeded(
roundDuration,
forkDetector,
bootstrapper,
sigStopNode,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1961,13 +1967,15 @@ func startSovereignNotifierBootstrapper(
roundDuration uint64,
forkDetector process.ForkDetector,
bootstrapper process.Bootstrapper,
sigStopNode chan os.Signal,
) (notifier.SovereignNotifierBootstrapper, error) {
args := notifier.ArgsNotifierBootstrapper{
IncomingHeaderHandler: incomingHeaderHandler,
SovereignNotifier: sovereignNotifier,
ForkDetector: forkDetector,
Bootstrapper: bootstrapper,
RoundDuration: roundDuration,
SigStopNode: sigStopNode,
}

notifierBootstrapper, err := notifier.NewNotifierBootstrapper(args)
Expand Down
24 changes: 8 additions & 16 deletions process/track/blockProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type blockProcessor struct {
highestRoundInReceivedHeaders uint64,
shardID uint32,
)
getBlockFinalityFunc func(shardID uint32) uint64
}

// NewBlockProcessor creates a block processor object which implements blockProcessorHandler interface
Expand Down Expand Up @@ -65,7 +64,6 @@ func NewBlockProcessor(arguments ArgBlockProcessor) (*blockProcessor, error) {
bp.doJobOnReceivedCrossNotarizedHeaderFunc = bp.doJobOnReceivedCrossNotarizedHeader
bp.requestHeaderWithShardAndNonceFunc = bp.requestHeaderWithShardAndNonce
bp.requestHeadersIfNothingNewIsReceivedFunc = bp.requestHeadersIfNothingNewIsReceived
bp.getBlockFinalityFunc = bp.getBlockFinality

return &bp, nil
}
Expand Down Expand Up @@ -182,7 +180,7 @@ func (bp *blockProcessor) doJobOnReceivedMetachainHeader() {
finalMetachainHeaders := make([]data.HeaderHandler, 0)
finalMetachainHeadersHashes := make([][]byte, 0)

err = bp.checkHeaderFinality(header, sortedHeaders, 0, core.MetachainShardId)
err = bp.checkHeaderFinality(header, sortedHeaders, 0)
if err == nil {
finalMetachainHeaders = append(finalMetachainHeaders, header)
finalMetachainHeadersHashes = append(finalMetachainHeadersHashes, headerHash)
Expand Down Expand Up @@ -261,7 +259,7 @@ func (bp *blockProcessor) ComputeLongestChain(shardID uint32, header data.Header

longestChainHeadersIndexes := make([]int, 0)
headersIndexes := make([]int, 0)
bp.getNextHeader(&longestChainHeadersIndexes, headersIndexes, header, sortedHeaders, 0, shardID)
bp.getNextHeader(&longestChainHeadersIndexes, headersIndexes, header, sortedHeaders, 0)

for _, index := range longestChainHeadersIndexes {
headers = append(headers, sortedHeaders[index])
Expand All @@ -277,7 +275,6 @@ func (bp *blockProcessor) getNextHeader(
prevHeader data.HeaderHandler,
sortedHeaders []data.HeaderHandler,
index int,
shardID uint32,
) {
defer func() {
if len(headersIndexes) > len(*longestChainHeadersIndexes) {
Expand All @@ -300,13 +297,13 @@ func (bp *blockProcessor) getNextHeader(
continue
}

err = bp.checkHeaderFinality(currHeader, sortedHeaders, i+1, shardID)
err = bp.checkHeaderFinality(currHeader, sortedHeaders, i+1)
if err != nil {
continue
}

headersIndexes = append(headersIndexes, i)
bp.getNextHeader(longestChainHeadersIndexes, headersIndexes, currHeader, sortedHeaders, i+1, shardID)
bp.getNextHeader(longestChainHeadersIndexes, headersIndexes, currHeader, sortedHeaders, i+1)
headersIndexes = headersIndexes[:len(headersIndexes)-1]
}
}
Expand All @@ -315,7 +312,6 @@ func (bp *blockProcessor) checkHeaderFinality(
header data.HeaderHandler,
sortedHeaders []data.HeaderHandler,
index int,
shardID uint32,
) error {

if check.IfNil(header) {
Expand All @@ -327,7 +323,7 @@ func (bp *blockProcessor) checkHeaderFinality(

for i := index; i < len(sortedHeaders); i++ {
currHeader := sortedHeaders[i]
if numFinalityAttestingHeaders >= bp.getBlockFinalityFunc(shardID) || currHeader.GetNonce() > prevHeader.GetNonce()+1 {
if numFinalityAttestingHeaders >= bp.blockFinality || currHeader.GetNonce() > prevHeader.GetNonce()+1 {
break
}

Expand All @@ -340,7 +336,7 @@ func (bp *blockProcessor) checkHeaderFinality(
numFinalityAttestingHeaders += 1
}

if numFinalityAttestingHeaders < bp.getBlockFinalityFunc(shardID) {
if numFinalityAttestingHeaders < bp.blockFinality {
return process.ErrHeaderNotFinal
}

Expand Down Expand Up @@ -379,7 +375,7 @@ func (bp *blockProcessor) requestHeadersIfNeeded(
highestNonceInLongestChain = longestChainHeaders[numLongestChainHeaders-1].GetNonce()
}

shouldRequestHeaders = highestNonceReceived > highestNonceInLongestChain+bp.getBlockFinalityFunc(shardID) && numLongestChainHeaders == 0
shouldRequestHeaders = highestNonceReceived > highestNonceInLongestChain+bp.blockFinality && numLongestChainHeaders == 0
if !shouldRequestHeaders {
return
}
Expand Down Expand Up @@ -462,7 +458,7 @@ func (bp *blockProcessor) baseRequestHeadersIfNothingNewIsReceived(
}

func (bp *blockProcessor) requestHeaders(shardID uint32, fromNonce uint64) {
toNonce := fromNonce + bp.getBlockFinalityFunc(shardID)
toNonce := fromNonce + bp.blockFinality
for nonce := fromNonce; nonce <= toNonce; nonce++ {
log.Trace("requestHeaders.RequestHeaderByNonce",
"shard", shardID,
Expand All @@ -481,10 +477,6 @@ func (bp *blockProcessor) requestHeaderWithShardAndNonce(shardID uint32, nonce u
}
}

func (bp *blockProcessor) getBlockFinality(_ uint32) uint64 {
return bp.blockFinality
}

// IsInterfaceNil returns true if there is no value under the interface
func (bp *blockProcessor) IsInterfaceNil() bool {
return bp == nil
Expand Down
8 changes: 4 additions & 4 deletions process/track/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,11 @@ func (bp *blockProcessor) ComputeSelfNotarizedHeaders(headers []data.HeaderHandl
}

func (bp *blockProcessor) GetNextHeader(longestChainHeadersIndexes *[]int, headersIndexes []int, prevHeader data.HeaderHandler, sortedHeaders []data.HeaderHandler, index int) {
bp.getNextHeader(longestChainHeadersIndexes, headersIndexes, prevHeader, sortedHeaders, index, 0)
bp.getNextHeader(longestChainHeadersIndexes, headersIndexes, prevHeader, sortedHeaders, index)
}

func (bp *blockProcessor) CheckHeaderFinality(header data.HeaderHandler, sortedHeaders []data.HeaderHandler, index int) error {
return bp.checkHeaderFinality(header, sortedHeaders, index, 0)
return bp.checkHeaderFinality(header, sortedHeaders, index)
}

func (bp *blockProcessor) RequestHeadersIfNeeded(lastNotarizedHeader data.HeaderHandler, sortedHeaders []data.HeaderHandler, longestChainHeaders []data.HeaderHandler, shardID uint32) {
Expand Down Expand Up @@ -274,8 +274,8 @@ func (scbp *sovereignChainBlockProcessor) ShouldProcessReceivedHeader(headerHand
return scbp.shouldProcessReceivedHeaderFunc(headerHandler)
}

func (scbp *sovereignChainBlockProcessor) GetBlockFinality(shardID uint32) uint64 {
return scbp.getBlockFinality(shardID)
func (scbp *sovereignChainBlockProcessor) GetBlockFinality() uint64 {
return scbp.blockFinality
}

// miniBlockTrack
Expand Down
10 changes: 1 addition & 9 deletions process/track/sovereignChainBlockProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewSovereignChainBlockProcessor(blockProcessor *blockProcessor) (*sovereign
scbp.doJobOnReceivedCrossNotarizedHeaderFunc = scbp.doJobOnReceivedCrossNotarizedHeader
scbp.requestHeaderWithShardAndNonceFunc = scbp.requestHeaderWithShardAndNonce
scbp.requestHeadersIfNothingNewIsReceivedFunc = scbp.requestHeadersIfNothingNewIsReceived
scbp.getBlockFinalityFunc = scbp.getBlockFinality
scbp.blockFinality = 0

extendedShardHeaderRequester, ok := scbp.requestHandler.(extendedShardHeaderRequestHandler)
if !ok {
Expand Down Expand Up @@ -122,11 +122,3 @@ func (scbp *sovereignChainBlockProcessor) requestHeadersIfNothingNewIsReceived(

scbp.baseRequestHeadersIfNothingNewIsReceived(lastNotarizedHeaderNonce, latestValidHeader, highestRoundInReceivedHeaders, shardID)
}

func (scbp *sovereignChainBlockProcessor) getBlockFinality(shardID uint32) uint64 {
if shardID == core.MainChainShardId {
return 0
}

return scbp.blockFinality
}
31 changes: 11 additions & 20 deletions process/track/sovereignChainBlockProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ func TestNewSovereignChainBlockProcessor_ShouldWork(t *testing.T) {
bp, _ := track.NewBlockProcessor(blockProcessorArguments)

scpb, err := track.NewSovereignChainBlockProcessor(bp)
assert.NotNil(t, scpb)
assert.Nil(t, err)
require.NotNil(t, scpb)
require.Zero(t, scpb.GetBlockFinality())
require.Nil(t, err)
}

func TestSovereignChainBlockProcessor_ShouldProcessReceivedHeaderShouldReturnFalseWhenGetLastNotarizedHeaderFails(t *testing.T) {
Expand Down Expand Up @@ -335,15 +336,15 @@ func TestSovereignChainBlockProcessor_RequestHeadersShouldAddAndRequestForShardH
})
mutRequest.Unlock()

require.Equal(t, 2, len(shardIDAddCalled))
require.Equal(t, 2, len(nonceAddCalled))
require.Equal(t, 2, len(shardIDRequestCalled))
require.Equal(t, 2, len(nonceRequestCalled))
require.Equal(t, 1, len(shardIDAddCalled))
require.Equal(t, 1, len(nonceAddCalled))
require.Equal(t, 1, len(shardIDRequestCalled))
require.Equal(t, 1, len(nonceRequestCalled))

assert.Equal(t, []uint32{shardID, shardID}, shardIDAddCalled)
assert.Equal(t, []uint64{fromNonce, fromNonce + 1}, nonceAddCalled)
assert.Equal(t, []uint32{shardID, shardID}, shardIDRequestCalled)
assert.Equal(t, []uint64{fromNonce, fromNonce + 1}, nonceRequestCalled)
assert.Equal(t, []uint32{shardID}, shardIDAddCalled)
assert.Equal(t, []uint64{fromNonce}, nonceAddCalled)
assert.Equal(t, []uint32{shardID}, shardIDRequestCalled)
assert.Equal(t, []uint64{fromNonce}, nonceRequestCalled)
}

func TestSovereignChainBlockProcessor_RequestHeadersShouldAddAndRequestForExtendedShardHeaders(t *testing.T) {
Expand Down Expand Up @@ -400,13 +401,3 @@ func TestSovereignChainBlockProcessor_RequestHeadersShouldAddAndRequestForExtend
assert.Equal(t, []uint32{shardID}, shardIDRequestCalled)
assert.Equal(t, []uint64{fromNonce}, nonceRequestCalled)
}

func TestSovereignChainShardBlockTrack_getBlockFinality(t *testing.T) {
t.Parallel()

blockProcessorArguments := CreateSovereignChainBlockProcessorMockArguments()
bp, _ := track.NewBlockProcessor(blockProcessorArguments)
scpb, _ := track.NewSovereignChainBlockProcessor(bp)
require.Equal(t, uint64(process.BlockFinality), scpb.GetBlockFinality(core.SovereignChainShardId))
require.Zero(t, scpb.GetBlockFinality(core.MainChainShardId))
}

0 comments on commit d477412

Please sign in to comment.