Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update MultiNode with latest changes #15058

Merged
merged 14 commits into from
Nov 20, 2024
5 changes: 5 additions & 0 deletions .changeset/silver-avocados-buy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Update MultiNode with latest changes and bug fixes. Fixes an issue that caused nodes to go OutOfSync incorrectly, and also fixed context handling for sending transactions. #internal #bugfix
2 changes: 0 additions & 2 deletions common/client/multi_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ func (c *MultiNode[CHAIN_ID, RPC]) ChainID() CHAIN_ID {
func (c *MultiNode[CHAIN_ID, RPC]) DoAll(ctx context.Context, do func(ctx context.Context, rpc RPC, isSendOnly bool)) error {
var err error
ok := c.IfNotStopped(func() {
ctx, _ = c.chStop.Ctx(ctx)

callsCompleted := 0
for _, n := range c.primaryNodes {
select {
Expand Down
27 changes: 16 additions & 11 deletions common/client/node_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,12 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
n.declareUnreachable()
return
}
_, latestChainInfo := n.StateAndLatest()
if outOfSync, liveNodes := n.isOutOfSyncWithPool(latestChainInfo); outOfSync {
if outOfSync, liveNodes := n.isOutOfSyncWithPool(); outOfSync {
DylanTinianov marked this conversation as resolved.
Show resolved Hide resolved
// note: there must be another live node for us to be out of sync
lggr.Errorw("RPC endpoint has fallen behind", "blockNumber", latestChainInfo.BlockNumber, "totalDifficulty", latestChainInfo.TotalDifficulty, "nodeState", n.getCachedState())
_, highest := n.poolInfoProvider.LatestChainInfo()
_, latestChainInfo := n.StateAndLatest()
lggr.Errorw("RPC endpoint has fallen behind", "blockNumber", latestChainInfo.BlockNumber, "bestLatestBlockNumber",
highest.BlockNumber, "totalDifficulty", latestChainInfo.TotalDifficulty, "nodeState", n.getCachedState())
if liveNodes < 2 {
lggr.Criticalf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState)
continue
Expand All @@ -196,7 +198,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
case <-headsSub.NoNewHeads:
// We haven't received a head on the channel for at least the
// threshold amount of time, mark it broken
lggr.Errorw(fmt.Sprintf("RPC endpoint detected out of sync; no new heads received for %s (last head received was %v)", noNewHeadsTimeoutThreshold, localHighestChainInfo.BlockNumber), "nodeState", n.getCachedState(), "latestReceivedBlockNumber", localHighestChainInfo.BlockNumber, "noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold)
_, latestChainInfo := n.StateAndLatest()
lggr.Errorw(fmt.Sprintf("RPC endpoint detected out of sync; no new heads received for %s (last head received was %v)", noNewHeadsTimeoutThreshold, latestChainInfo.BlockNumber), "nodeState", n.getCachedState(), "latestReceivedBlockNumber", latestChainInfo.BlockNumber, "noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold)
if n.poolInfoProvider != nil {
if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 {
lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState)
Expand Down Expand Up @@ -310,7 +313,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) onNewFinalizedHead(lggr logger.SugaredLogger
}

latestFinalizedBN := latestFinalized.BlockNumber()
lggr.Tracew("Got latest finalized head", "latestFinalized", latestFinalized)
lggr.Debugw("Got latest finalized head", "latestFinalized", latestFinalized)
if latestFinalizedBN <= chainInfo.FinalizedBlockNumber {
lggr.Tracew("Ignoring previously seen finalized block number")
return false
Expand All @@ -328,7 +331,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) onNewHead(lggr logger.SugaredLogger, chainIn
}

promPoolRPCNodeNumSeenBlocks.WithLabelValues(n.chainID.String(), n.name).Inc()
lggr.Tracew("Got head", "head", head)
lggr.Debugw("Got head", "head", head)
lggr = lggr.With("latestReceivedBlockNumber", chainInfo.BlockNumber, "blockNumber", head.BlockNumber(), "nodeState", n.getCachedState())
if head.BlockNumber() <= chainInfo.BlockNumber {
lggr.Tracew("Ignoring previously seen block number")
Expand Down Expand Up @@ -358,7 +361,7 @@ const (
// isOutOfSyncWithPool returns outOfSync true if num or td is more than SyncThresold behind the best node.
// Always returns outOfSync false for SyncThreshold 0.
// liveNodes is only included when outOfSync is true.
func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool(localState ChainInfo) (outOfSync bool, liveNodes int) {
func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool() (outOfSync bool, liveNodes int) {
if n.poolInfoProvider == nil {
n.lfcLog.Warn("skipping sync state against the pool - should only occur in tests")
return // skip for tests
Expand All @@ -369,13 +372,14 @@ func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool(localState ChainInfo) (o
}
// Check against best node
ln, ci := n.poolInfoProvider.LatestChainInfo()
_, localChainInfo := n.StateAndLatest()
mode := n.nodePoolCfg.SelectionMode()
switch mode {
case NodeSelectionModeHighestHead, NodeSelectionModeRoundRobin, NodeSelectionModePriorityLevel:
return localState.BlockNumber < ci.BlockNumber-int64(threshold), ln
return localChainInfo.BlockNumber < ci.BlockNumber-int64(threshold), ln
case NodeSelectionModeTotalDifficulty:
bigThreshold := big.NewInt(int64(threshold))
return localState.TotalDifficulty.Cmp(bigmath.Sub(ci.TotalDifficulty, bigThreshold)) < 0, ln
return localChainInfo.TotalDifficulty.Cmp(bigmath.Sub(ci.TotalDifficulty, bigThreshold)) < 0, ln
default:
panic("unrecognized NodeSelectionMode: " + mode)
}
Expand Down Expand Up @@ -464,7 +468,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) {

// received a new head - clear NoNewHead flag
syncIssues &= ^syncStatusNoNewHead
if outOfSync, _ := n.isOutOfSyncWithPool(localHighestChainInfo); !outOfSync {
if outOfSync, _ := n.isOutOfSyncWithPool(); !outOfSync {
// we caught up with the pool - clear NotInSyncWithPool flag
syncIssues &= ^syncStatusNotInSyncWithPool
} else {
Expand Down Expand Up @@ -504,7 +508,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) {
continue
}

receivedNewHead := n.onNewFinalizedHead(lggr, &localHighestChainInfo, latestFinalized)
_, latestChainInfo := n.StateAndLatest()
receivedNewHead := n.onNewFinalizedHead(lggr, &latestChainInfo, latestFinalized)
if !receivedNewHead {
continue
}
Expand Down
46 changes: 32 additions & 14 deletions common/client/node_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
poolInfo.On("LatestChainInfo").Return(10, ChainInfo{
BlockNumber: syncThreshold + mostRecentBlock + 1,
TotalDifficulty: big.NewInt(10),
}).Once()
})
node.SetPoolChainInfoProvider(poolInfo)
// tries to redial in outOfSync
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) {
Expand Down Expand Up @@ -259,7 +259,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
BlockNumber: syncThreshold + mostRecentBlock + 1,
TotalDifficulty: big.NewInt(10),
}).Once()
})
node.SetPoolChainInfoProvider(poolInfo)
node.declareAlive()
tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState))
Expand Down Expand Up @@ -288,7 +288,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
t.Run("when no new heads received for threshold, transitions to out of sync", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[types.ID, Head](t)
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})
node := newSubscribedNode(t, testNodeOpts{
config: testNodeConfig{},
chainConfig: clientMocks.ChainConfig{
Expand All @@ -312,7 +312,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
t.Run("when no new heads received for threshold but we are the last live node, forcibly stays alive", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[types.ID, Head](t)
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
node := newSubscribedNode(t, testNodeOpts{
config: testNodeConfig{},
Expand Down Expand Up @@ -693,15 +693,25 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
t.Run("if fail to get chainID, transitions to unreachable", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[types.ID, Head](t)
chainID := types.RandomID()
node := newAliveNode(t, testNodeOpts{
rpc: rpc,
rpc: rpc,
chainID: chainID,
})
defer func() { assert.NoError(t, node.close()) }()

rpc.On("ChainID", mock.Anything).Return(chainID, nil)
// for out-of-sync
rpc.On("Dial", mock.Anything).Return(nil).Once()
// for unreachable
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe()
sub := mocks.NewSubscription(t)
errChan := make(chan error, 1)
errChan <- errors.New("subscription was terminate")
sub.On("Err").Return((<-chan error)(errChan))
sub.On("Unsubscribe").Once()
rpc.On("SubscribeToHeads", mock.Anything).Return(make(<-chan Head), sub, nil)
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})

expectedError := errors.New("failed to get chain ID")
// might be called multiple times
Expand Down Expand Up @@ -1025,7 +1035,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
sub.On("Err").Return((<-chan error)(errChan))
sub.On("Unsubscribe").Once()
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return(make(<-chan Head), sub, nil).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})

// unreachable
rpc.On("Dial", mock.Anything).Return(errors.New("failed to redial")).Maybe()
Expand Down Expand Up @@ -1056,7 +1066,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Run(func(args mock.Arguments) {
close(ch)
}).Return((<-chan Head)(ch), sub, nil).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})
// unreachable
rpc.On("Dial", mock.Anything).Return(errors.New("failed to redial")).Maybe()
node.declareOutOfSync(syncStatusNoNewHead)
Expand All @@ -1082,7 +1092,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
defer func() { assert.NoError(t, node.close()) }()

const highestBlock = 13
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{FinalizedBlockNumber: highestBlock}).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{FinalizedBlockNumber: highestBlock}, ChainInfo{FinalizedBlockNumber: highestBlock})

outOfSyncSubscription := mocks.NewSubscription(t)
outOfSyncSubscription.On("Err").Return((<-chan error)(nil))
Expand Down Expand Up @@ -1119,7 +1129,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
defer func() { assert.NoError(t, node.close()) }()

const highestBlock = 13
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{FinalizedBlockNumber: highestBlock}).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{FinalizedBlockNumber: highestBlock})

outOfSyncSubscription := mocks.NewSubscription(t)
outOfSyncSubscription.On("Err").Return((<-chan error)(nil))
Expand Down Expand Up @@ -1582,15 +1592,15 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) {
t.Parallel()
t.Run("skip if nLiveNodes is not configured", func(t *testing.T) {
node := newTestNode(t, testNodeOpts{})
outOfSync, liveNodes := node.isOutOfSyncWithPool(ChainInfo{})
outOfSync, liveNodes := node.isOutOfSyncWithPool()
assert.Equal(t, false, outOfSync)
assert.Equal(t, 0, liveNodes)
})
t.Run("skip if syncThreshold is not configured", func(t *testing.T) {
node := newTestNode(t, testNodeOpts{})
poolInfo := newMockPoolChainInfoProvider(t)
node.SetPoolChainInfoProvider(poolInfo)
outOfSync, liveNodes := node.isOutOfSyncWithPool(ChainInfo{})
outOfSync, liveNodes := node.isOutOfSyncWithPool()
assert.Equal(t, false, outOfSync)
assert.Equal(t, 0, liveNodes)
})
Expand All @@ -1602,7 +1612,7 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) {
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{}).Once()
node.SetPoolChainInfoProvider(poolInfo)
assert.Panics(t, func() {
_, _ = node.isOutOfSyncWithPool(ChainInfo{})
_, _ = node.isOutOfSyncWithPool()
})
})
t.Run("block height selection mode", func(t *testing.T) {
Expand Down Expand Up @@ -1653,7 +1663,11 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) {
for _, td := range []int64{totalDifficulty - syncThreshold - 1, totalDifficulty - syncThreshold, totalDifficulty, totalDifficulty + 1} {
for _, testCase := range testCases {
t.Run(fmt.Sprintf("%s: SelectionModeVal: %s: total difficulty: %d", testCase.name, selectionMode, td), func(t *testing.T) {
outOfSync, liveNodes := node.isOutOfSyncWithPool(ChainInfo{BlockNumber: testCase.blockNumber, TotalDifficulty: big.NewInt(td)})
chainInfo := ChainInfo{BlockNumber: testCase.blockNumber, TotalDifficulty: big.NewInt(td)}
rpc := newMockRPCClient[types.ID, Head](t)
rpc.On("GetInterceptedChainInfo").Return(chainInfo, ChainInfo{}).Once()
node.rpc = rpc
outOfSync, liveNodes := node.isOutOfSyncWithPool()
assert.Equal(t, nodesNum, liveNodes)
assert.Equal(t, testCase.outOfSync, outOfSync)
})
Expand Down Expand Up @@ -1709,7 +1723,11 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) {
for _, hb := range []int64{highestBlock - syncThreshold - 1, highestBlock - syncThreshold, highestBlock, highestBlock + 1} {
for _, testCase := range testCases {
t.Run(fmt.Sprintf("%s: SelectionModeVal: %s: highest block: %d", testCase.name, NodeSelectionModeTotalDifficulty, hb), func(t *testing.T) {
outOfSync, liveNodes := node.isOutOfSyncWithPool(ChainInfo{BlockNumber: hb, TotalDifficulty: big.NewInt(testCase.totalDifficulty)})
chainInfo := ChainInfo{BlockNumber: hb, TotalDifficulty: big.NewInt(testCase.totalDifficulty)}
rpc := newMockRPCClient[types.ID, Head](t)
rpc.On("GetInterceptedChainInfo").Return(chainInfo, ChainInfo{}).Once()
node.rpc = rpc
outOfSync, liveNodes := node.isOutOfSyncWithPool()
assert.Equal(t, nodesNum, liveNodes)
assert.Equal(t, testCase.outOfSync, outOfSync)
})
Expand Down
Loading
Loading