Skip to content

Commit

Permalink
Update MultiNode with latest changes (#15058)
Browse files Browse the repository at this point in the history
* Update MultiNode to latest

* changeset

* lint

* defer reportWg

* Use latest changes

* Update node_lifecycle.go

* lint

* Remove TxError

* Use cfg clientErrors

* Update transaction_sender_test.go

* Use require
  • Loading branch information
DylanTinianov authored Nov 20, 2024
1 parent a9c9ddf commit d4d1456
Show file tree
Hide file tree
Showing 8 changed files with 321 additions and 233 deletions.
5 changes: 5 additions & 0 deletions .changeset/silver-avocados-buy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Update MultiNode with latest changes and bug fixes. Fixes an issue that caused nodes to go OutOfSync incorrectly, and also fixed context handling for sending transactions. #internal #bugfix
39 changes: 24 additions & 15 deletions common/client/node_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"math/big"
"time"

"github.com/smartcontractkit/chainlink/v2/common/types"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

Expand All @@ -17,6 +15,7 @@ import (
bigmath "github.com/smartcontractkit/chainlink-common/pkg/utils/big_math"

iutils "github.com/smartcontractkit/chainlink/v2/common/internal/utils"
"github.com/smartcontractkit/chainlink/v2/common/types"
)

var (
Expand Down Expand Up @@ -132,6 +131,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
}
}

// Get the latest chain info to use as local highest
localHighestChainInfo, _ := n.rpc.GetInterceptedChainInfo()
var pollFailures uint32

Expand Down Expand Up @@ -168,10 +168,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
n.declareUnreachable()
return
}
_, latestChainInfo := n.StateAndLatest()
if outOfSync, liveNodes := n.isOutOfSyncWithPool(latestChainInfo); outOfSync {
if outOfSync, liveNodes := n.isOutOfSyncWithPool(); outOfSync {
// note: there must be another live node for us to be out of sync
lggr.Errorw("RPC endpoint has fallen behind", "blockNumber", latestChainInfo.BlockNumber, "totalDifficulty", latestChainInfo.TotalDifficulty, "nodeState", n.getCachedState())
if liveNodes < 2 {
lggr.Criticalf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState)
continue
Expand Down Expand Up @@ -310,9 +308,9 @@ func (n *node[CHAIN_ID, HEAD, RPC]) onNewFinalizedHead(lggr logger.SugaredLogger
}

latestFinalizedBN := latestFinalized.BlockNumber()
lggr.Tracew("Got latest finalized head", "latestFinalized", latestFinalized)
lggr.Debugw("Got latest finalized head", "latestFinalized", latestFinalized)
if latestFinalizedBN <= chainInfo.FinalizedBlockNumber {
lggr.Tracew("Ignoring previously seen finalized block number")
lggr.Debugw("Ignoring previously seen finalized block number")
return false
}

Expand All @@ -328,10 +326,10 @@ func (n *node[CHAIN_ID, HEAD, RPC]) onNewHead(lggr logger.SugaredLogger, chainIn
}

promPoolRPCNodeNumSeenBlocks.WithLabelValues(n.chainID.String(), n.name).Inc()
lggr.Tracew("Got head", "head", head)
lggr.Debugw("Got head", "head", head)
lggr = lggr.With("latestReceivedBlockNumber", chainInfo.BlockNumber, "blockNumber", head.BlockNumber(), "nodeState", n.getCachedState())
if head.BlockNumber() <= chainInfo.BlockNumber {
lggr.Tracew("Ignoring previously seen block number")
lggr.Debugw("Ignoring previously seen block number")
return false
}

Expand All @@ -358,7 +356,7 @@ const (
// isOutOfSyncWithPool returns outOfSync true if num or td is more than SyncThresold behind the best node.
// Always returns outOfSync false for SyncThreshold 0.
// liveNodes is only included when outOfSync is true.
func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool(localState ChainInfo) (outOfSync bool, liveNodes int) {
func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool() (outOfSync bool, liveNodes int) {
if n.poolInfoProvider == nil {
n.lfcLog.Warn("skipping sync state against the pool - should only occur in tests")
return // skip for tests
Expand All @@ -369,16 +367,22 @@ func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool(localState ChainInfo) (o
}
// Check against best node
ln, ci := n.poolInfoProvider.LatestChainInfo()
localChainInfo, _ := n.rpc.GetInterceptedChainInfo()
mode := n.nodePoolCfg.SelectionMode()
switch mode {
case NodeSelectionModeHighestHead, NodeSelectionModeRoundRobin, NodeSelectionModePriorityLevel:
return localState.BlockNumber < ci.BlockNumber-int64(threshold), ln
outOfSync = localChainInfo.BlockNumber < ci.BlockNumber-int64(threshold)
case NodeSelectionModeTotalDifficulty:
bigThreshold := big.NewInt(int64(threshold))
return localState.TotalDifficulty.Cmp(bigmath.Sub(ci.TotalDifficulty, bigThreshold)) < 0, ln
outOfSync = localChainInfo.TotalDifficulty.Cmp(bigmath.Sub(ci.TotalDifficulty, bigThreshold)) < 0
default:
panic("unrecognized NodeSelectionMode: " + mode)
}

if outOfSync && n.getCachedState() == nodeStateAlive {
n.lfcLog.Errorw("RPC endpoint has fallen behind", "blockNumber", localChainInfo.BlockNumber, "bestLatestBlockNumber", ci.BlockNumber, "totalDifficulty", localChainInfo.TotalDifficulty)
}
return outOfSync, ln
}

// outOfSyncLoop takes an OutOfSync node and waits until isOutOfSync returns false to go back to live status
Expand Down Expand Up @@ -464,7 +468,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) {

// received a new head - clear NoNewHead flag
syncIssues &= ^syncStatusNoNewHead
if outOfSync, _ := n.isOutOfSyncWithPool(localHighestChainInfo); !outOfSync {
if outOfSync, _ := n.isOutOfSyncWithPool(); !outOfSync {
// we caught up with the pool - clear NotInSyncWithPool flag
syncIssues &= ^syncStatusNotInSyncWithPool
} else {
Expand Down Expand Up @@ -515,7 +519,12 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) {
finalizedHeadsSub.ResetTimer(noNewFinalizedBlocksTimeoutThreshold)
}

lggr.Debugw(msgReceivedFinalizedBlock, "blockNumber", latestFinalized.BlockNumber(), "syncIssues", syncIssues)
var highestSeen ChainInfo
if n.poolInfoProvider != nil {
highestSeen = n.poolInfoProvider.HighestUserObservations()
}

lggr.Debugw(msgReceivedFinalizedBlock, "blockNumber", latestFinalized.BlockNumber(), "poolHighestBlockNumber", highestSeen.FinalizedBlockNumber, "syncIssues", syncIssues)
case err := <-finalizedHeadsSub.Errors:
lggr.Errorw("Finalized head subscription was terminated", "err", err)
n.declareUnreachable()
Expand Down Expand Up @@ -648,7 +657,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) syncingLoop() {
case nodeStateClosed:
return
default:
panic(fmt.Sprintf("syncingLoop can only run for node in nodeStateSyncing state, got: %s", state))
panic(fmt.Sprintf("syncingLoop can only run for node in NodeStateSyncing state, got: %s", state))
}
}

Expand Down
46 changes: 32 additions & 14 deletions common/client/node_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
poolInfo.On("LatestChainInfo").Return(10, ChainInfo{
BlockNumber: syncThreshold + mostRecentBlock + 1,
TotalDifficulty: big.NewInt(10),
}).Once()
})
node.SetPoolChainInfoProvider(poolInfo)
// tries to redial in outOfSync
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) {
Expand Down Expand Up @@ -259,7 +259,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
BlockNumber: syncThreshold + mostRecentBlock + 1,
TotalDifficulty: big.NewInt(10),
}).Once()
})
node.SetPoolChainInfoProvider(poolInfo)
node.declareAlive()
tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState))
Expand Down Expand Up @@ -288,7 +288,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
t.Run("when no new heads received for threshold, transitions to out of sync", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[types.ID, Head](t)
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})
node := newSubscribedNode(t, testNodeOpts{
config: testNodeConfig{},
chainConfig: clientMocks.ChainConfig{
Expand All @@ -312,7 +312,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
t.Run("when no new heads received for threshold but we are the last live node, forcibly stays alive", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[types.ID, Head](t)
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
node := newSubscribedNode(t, testNodeOpts{
config: testNodeConfig{},
Expand Down Expand Up @@ -693,15 +693,25 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
t.Run("if fail to get chainID, transitions to unreachable", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[types.ID, Head](t)
chainID := types.RandomID()
node := newAliveNode(t, testNodeOpts{
rpc: rpc,
rpc: rpc,
chainID: chainID,
})
defer func() { assert.NoError(t, node.close()) }()

rpc.On("ChainID", mock.Anything).Return(chainID, nil)
// for out-of-sync
rpc.On("Dial", mock.Anything).Return(nil).Once()
// for unreachable
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe()
sub := mocks.NewSubscription(t)
errChan := make(chan error, 1)
errChan <- errors.New("subscription was terminate")
sub.On("Err").Return((<-chan error)(errChan))
sub.On("Unsubscribe").Once()
rpc.On("SubscribeToHeads", mock.Anything).Return(make(<-chan Head), sub, nil)
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})

expectedError := errors.New("failed to get chain ID")
// might be called multiple times
Expand Down Expand Up @@ -1025,7 +1035,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
sub.On("Err").Return((<-chan error)(errChan))
sub.On("Unsubscribe").Once()
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return(make(<-chan Head), sub, nil).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})

// unreachable
rpc.On("Dial", mock.Anything).Return(errors.New("failed to redial")).Maybe()
Expand Down Expand Up @@ -1056,7 +1066,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Run(func(args mock.Arguments) {
close(ch)
}).Return((<-chan Head)(ch), sub, nil).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})
// unreachable
rpc.On("Dial", mock.Anything).Return(errors.New("failed to redial")).Maybe()
node.declareOutOfSync(syncStatusNoNewHead)
Expand All @@ -1082,7 +1092,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
defer func() { assert.NoError(t, node.close()) }()

const highestBlock = 13
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{FinalizedBlockNumber: highestBlock}).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{FinalizedBlockNumber: highestBlock}, ChainInfo{FinalizedBlockNumber: highestBlock})

outOfSyncSubscription := mocks.NewSubscription(t)
outOfSyncSubscription.On("Err").Return((<-chan error)(nil))
Expand Down Expand Up @@ -1119,7 +1129,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
defer func() { assert.NoError(t, node.close()) }()

const highestBlock = 13
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{FinalizedBlockNumber: highestBlock}).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{FinalizedBlockNumber: highestBlock})

outOfSyncSubscription := mocks.NewSubscription(t)
outOfSyncSubscription.On("Err").Return((<-chan error)(nil))
Expand Down Expand Up @@ -1582,15 +1592,15 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) {
t.Parallel()
t.Run("skip if nLiveNodes is not configured", func(t *testing.T) {
node := newTestNode(t, testNodeOpts{})
outOfSync, liveNodes := node.isOutOfSyncWithPool(ChainInfo{})
outOfSync, liveNodes := node.isOutOfSyncWithPool()
assert.Equal(t, false, outOfSync)
assert.Equal(t, 0, liveNodes)
})
t.Run("skip if syncThreshold is not configured", func(t *testing.T) {
node := newTestNode(t, testNodeOpts{})
poolInfo := newMockPoolChainInfoProvider(t)
node.SetPoolChainInfoProvider(poolInfo)
outOfSync, liveNodes := node.isOutOfSyncWithPool(ChainInfo{})
outOfSync, liveNodes := node.isOutOfSyncWithPool()
assert.Equal(t, false, outOfSync)
assert.Equal(t, 0, liveNodes)
})
Expand All @@ -1602,7 +1612,7 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) {
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{}).Once()
node.SetPoolChainInfoProvider(poolInfo)
assert.Panics(t, func() {
_, _ = node.isOutOfSyncWithPool(ChainInfo{})
_, _ = node.isOutOfSyncWithPool()
})
})
t.Run("block height selection mode", func(t *testing.T) {
Expand Down Expand Up @@ -1653,7 +1663,11 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) {
for _, td := range []int64{totalDifficulty - syncThreshold - 1, totalDifficulty - syncThreshold, totalDifficulty, totalDifficulty + 1} {
for _, testCase := range testCases {
t.Run(fmt.Sprintf("%s: SelectionModeVal: %s: total difficulty: %d", testCase.name, selectionMode, td), func(t *testing.T) {
outOfSync, liveNodes := node.isOutOfSyncWithPool(ChainInfo{BlockNumber: testCase.blockNumber, TotalDifficulty: big.NewInt(td)})
chainInfo := ChainInfo{BlockNumber: testCase.blockNumber, TotalDifficulty: big.NewInt(td)}
rpc := newMockRPCClient[types.ID, Head](t)
rpc.On("GetInterceptedChainInfo").Return(chainInfo, ChainInfo{}).Once()
node.rpc = rpc
outOfSync, liveNodes := node.isOutOfSyncWithPool()
assert.Equal(t, nodesNum, liveNodes)
assert.Equal(t, testCase.outOfSync, outOfSync)
})
Expand Down Expand Up @@ -1709,7 +1723,11 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) {
for _, hb := range []int64{highestBlock - syncThreshold - 1, highestBlock - syncThreshold, highestBlock, highestBlock + 1} {
for _, testCase := range testCases {
t.Run(fmt.Sprintf("%s: SelectionModeVal: %s: highest block: %d", testCase.name, NodeSelectionModeTotalDifficulty, hb), func(t *testing.T) {
outOfSync, liveNodes := node.isOutOfSyncWithPool(ChainInfo{BlockNumber: hb, TotalDifficulty: big.NewInt(testCase.totalDifficulty)})
chainInfo := ChainInfo{BlockNumber: hb, TotalDifficulty: big.NewInt(testCase.totalDifficulty)}
rpc := newMockRPCClient[types.ID, Head](t)
rpc.On("GetInterceptedChainInfo").Return(chainInfo, ChainInfo{}).Once()
node.rpc = rpc
outOfSync, liveNodes := node.isOutOfSyncWithPool()
assert.Equal(t, nodesNum, liveNodes)
assert.Equal(t, testCase.outOfSync, outOfSync)
})
Expand Down
Loading

0 comments on commit d4d1456

Please sign in to comment.