diff --git a/common/client/mock_node_client_test.go b/common/client/mock_node_client_test.go index 5643dcde90..120e6c0b68 100644 --- a/common/client/mock_node_client_test.go +++ b/common/client/mock_node_client_test.go @@ -400,6 +400,39 @@ func (_c *mockNodeClient_IsSyncing_Call[CHAIN_ID, HEAD]) RunAndReturn(run func(c return _c } +// SetAliveLoopFinalizedHeadSub provides a mock function with given fields: _a0 +func (_m *mockNodeClient[CHAIN_ID, HEAD]) SetAliveLoopFinalizedHeadSub(_a0 types.Subscription) { + _m.Called(_a0) +} + +// mockNodeClient_SetAliveLoopFinalizedHeadSub_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetAliveLoopFinalizedHeadSub' +type mockNodeClient_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID types.ID, HEAD Head] struct { + *mock.Call +} + +// SetAliveLoopFinalizedHeadSub is a helper method to define mock.On call +// - _a0 types.Subscription +func (_e *mockNodeClient_Expecter[CHAIN_ID, HEAD]) SetAliveLoopFinalizedHeadSub(_a0 interface{}) *mockNodeClient_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, HEAD] { + return &mockNodeClient_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, HEAD]{Call: _e.mock.On("SetAliveLoopFinalizedHeadSub", _a0)} +} + +func (_c *mockNodeClient_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, HEAD]) Run(run func(_a0 types.Subscription)) *mockNodeClient_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, HEAD] { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(types.Subscription)) + }) + return _c +} + +func (_c *mockNodeClient_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, HEAD]) Return() *mockNodeClient_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, HEAD] { + _c.Call.Return() + return _c +} + +func (_c *mockNodeClient_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, HEAD]) RunAndReturn(run func(types.Subscription)) *mockNodeClient_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, HEAD] { + _c.Call.Return(run) + return _c +} + // SetAliveLoopSub provides a mock function with given fields: _a0 func (_m *mockNodeClient[CHAIN_ID, HEAD]) SetAliveLoopSub(_a0 types.Subscription) { _m.Called(_a0) diff --git a/common/client/mock_rpc_test.go b/common/client/mock_rpc_test.go index 00473c6636..9e25c99dbf 100644 --- a/common/client/mock_rpc_test.go +++ b/common/client/mock_rpc_test.go @@ -1379,6 +1379,39 @@ func (_c *mockRPC_SequenceAt_Call[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, return _c } +// SetAliveLoopFinalizedHeadSub provides a mock function with given fields: _a0 +func (_m *mockRPC[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM]) SetAliveLoopFinalizedHeadSub(_a0 types.Subscription) { + _m.Called(_a0) +} + +// mockRPC_SetAliveLoopFinalizedHeadSub_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetAliveLoopFinalizedHeadSub' +type mockRPC_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID types.ID, SEQ types.Sequence, ADDR types.Hashable, BLOCK_HASH types.Hashable, TX interface{}, TX_HASH types.Hashable, EVENT interface{}, EVENT_OPS interface{}, TX_RECEIPT types.Receipt[TX_HASH, BLOCK_HASH], FEE feetypes.Fee, HEAD types.Head[BLOCK_HASH], BATCH_ELEM interface{}] struct { + *mock.Call +} + +// SetAliveLoopFinalizedHeadSub is a helper method to define mock.On call +// - _a0 types.Subscription +func (_e *mockRPC_Expecter[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM]) SetAliveLoopFinalizedHeadSub(_a0 interface{}) *mockRPC_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM] { + return &mockRPC_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM]{Call: _e.mock.On("SetAliveLoopFinalizedHeadSub", _a0)} +} + +func (_c *mockRPC_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM]) Run(run func(_a0 types.Subscription)) *mockRPC_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM] { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(types.Subscription)) + }) + return _c +} + +func (_c *mockRPC_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM]) Return() *mockRPC_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM] { + _c.Call.Return() + return _c +} + +func (_c *mockRPC_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM]) RunAndReturn(run func(types.Subscription)) *mockRPC_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM] { + _c.Call.Return(run) + return _c +} + // SetAliveLoopSub provides a mock function with given fields: _a0 func (_m *mockRPC[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM]) SetAliveLoopSub(_a0 types.Subscription) { _m.Called(_a0) diff --git a/common/client/node_lifecycle.go b/common/client/node_lifecycle.go index 40d9a9ef6e..bc87b25148 100644 --- a/common/client/node_lifecycle.go +++ b/common/client/node_lifecycle.go @@ -119,6 +119,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { } defer finalizedHeadsSub.Unsubscribe() + n.rpc.SetAliveLoopFinalizedHeadSub(finalizedHeadsSub.sub) } var pollCh <-chan time.Time diff --git a/common/client/node_lifecycle_test.go b/common/client/node_lifecycle_test.go index 833bccf7f2..a5681eda96 100644 --- a/common/client/node_lifecycle_test.go +++ b/common/client/node_lifecycle_test.go @@ -446,6 +446,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once() rpc.On("SubscribeToHeads", mock.Anything).Return(make(<-chan Head), newSub(t), nil).Once() rpc.On("SetAliveLoopSub", mock.Anything).Once() + rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once() lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) node := newDialedNode(t, testNodeOpts{ config: testNodeConfig{}, @@ -467,6 +468,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { ch := make(chan Head) rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(ch), newSub(t), nil).Once() rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once() + rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once() name := "node-" + rand.Str(5) node := newSubscribedNode(t, testNodeOpts{ config: testNodeConfig{}, @@ -501,6 +503,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { ch := make(chan Head) close(ch) rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(ch), newSub(t), nil).Once() + rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once() lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) node := newSubscribedNode(t, testNodeOpts{ chainConfig: clientMocks.ChainConfig{ @@ -527,6 +530,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { ch := make(chan Head, 1) ch <- head{BlockNumber: 10}.ToMockHead(t) rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(ch), newSub(t), nil).Once() + rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once() lggr, observed := logger.TestObserved(t, zap.DebugLevel) noNewFinalizedHeadsThreshold := tests.TestInterval node := newSubscribedNode(t, testNodeOpts{ @@ -560,6 +564,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { rpc := newMockNodeClient[types.ID, Head](t) rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once() rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return(make(<-chan Head), newSub(t), nil).Once() + rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once() lggr, observed := logger.TestObserved(t, zap.DebugLevel) noNewFinalizedHeadsThreshold := tests.TestInterval node := newSubscribedNode(t, testNodeOpts{ @@ -593,6 +598,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { sub.On("Err").Return((<-chan error)(errCh)) sub.On("Unsubscribe").Once() rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(nil), sub, nil).Once() + rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once() lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) node := newSubscribedNode(t, testNodeOpts{ chainConfig: clientMocks.ChainConfig{ @@ -1116,6 +1122,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { outOfSyncSubscription.On("Unsubscribe").Once() ch := make(chan Head) rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(ch), outOfSyncSubscription, nil).Once() + rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once() setupRPCForAliveLoop(t, rpc) diff --git a/common/client/types.go b/common/client/types.go index c9b6a3580e..806a3eaef1 100644 --- a/common/client/types.go +++ b/common/client/types.go @@ -66,6 +66,7 @@ type NodeClient[ ClientVersion(context.Context) (string, error) SubscribersCount() int32 SetAliveLoopSub(types.Subscription) + SetAliveLoopFinalizedHeadSub(types.Subscription) UnsubscribeAllExceptAliveLoop() IsSyncing(ctx context.Context) (bool, error) SubscribeToFinalizedHeads(_ context.Context) (<-chan HEAD, types.Subscription, error) diff --git a/core/chains/evm/client/mocks/rpc_client.go b/core/chains/evm/client/mocks/rpc_client.go index 5567b3f897..695698eb05 100644 --- a/core/chains/evm/client/mocks/rpc_client.go +++ b/core/chains/evm/client/mocks/rpc_client.go @@ -1754,6 +1754,39 @@ func (_c *RPCClient_SequenceAt_Call) RunAndReturn(run func(context.Context, comm return _c } +// SetAliveLoopFinalizedHeadSub provides a mock function with given fields: _a0 +func (_m *RPCClient) SetAliveLoopFinalizedHeadSub(_a0 commontypes.Subscription) { + _m.Called(_a0) +} + +// RPCClient_SetAliveLoopFinalizedHeadSub_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetAliveLoopFinalizedHeadSub' +type RPCClient_SetAliveLoopFinalizedHeadSub_Call struct { + *mock.Call +} + +// SetAliveLoopFinalizedHeadSub is a helper method to define mock.On call +// - _a0 commontypes.Subscription +func (_e *RPCClient_Expecter) SetAliveLoopFinalizedHeadSub(_a0 interface{}) *RPCClient_SetAliveLoopFinalizedHeadSub_Call { + return &RPCClient_SetAliveLoopFinalizedHeadSub_Call{Call: _e.mock.On("SetAliveLoopFinalizedHeadSub", _a0)} +} + +func (_c *RPCClient_SetAliveLoopFinalizedHeadSub_Call) Run(run func(_a0 commontypes.Subscription)) *RPCClient_SetAliveLoopFinalizedHeadSub_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(commontypes.Subscription)) + }) + return _c +} + +func (_c *RPCClient_SetAliveLoopFinalizedHeadSub_Call) Return() *RPCClient_SetAliveLoopFinalizedHeadSub_Call { + _c.Call.Return() + return _c +} + +func (_c *RPCClient_SetAliveLoopFinalizedHeadSub_Call) RunAndReturn(run func(commontypes.Subscription)) *RPCClient_SetAliveLoopFinalizedHeadSub_Call { + _c.Call.Return(run) + return _c +} + // SetAliveLoopSub provides a mock function with given fields: _a0 func (_m *RPCClient) SetAliveLoopSub(_a0 commontypes.Subscription) { _m.Called(_a0) diff --git a/core/chains/evm/client/rpc_client.go b/core/chains/evm/client/rpc_client.go index f55c35980d..ea324f6db7 100644 --- a/core/chains/evm/client/rpc_client.go +++ b/core/chains/evm/client/rpc_client.go @@ -137,7 +137,8 @@ type rpcClient struct { subs []ethereum.Subscription // Need to track the aliveLoop subscription, so we do not cancel it when checking lease on the MultiNode - aliveLoopSub ethereum.Subscription + aliveLoopHeadsSub ethereum.Subscription + aliveLoopFinalizedHeadsSub ethereum.Subscription // chStopInFlight can be closed to immediately cancel all in-flight requests on // this rpcClient. Closing and replacing should be serialized through @@ -368,11 +369,18 @@ func (r *rpcClient) unsubscribeAll() { } r.subs = nil } -func (r *rpcClient) SetAliveLoopSub(sub commontypes.Subscription) { +func (r *rpcClient) SetAliveLoopSub(headsSub commontypes.Subscription) { r.stateMu.Lock() defer r.stateMu.Unlock() - r.aliveLoopSub = sub + r.aliveLoopHeadsSub = headsSub +} + +func (r *rpcClient) SetAliveLoopFinalizedHeadSub(finalizedHeads commontypes.Subscription) { + r.stateMu.Lock() + defer r.stateMu.Unlock() + + r.aliveLoopFinalizedHeadsSub = finalizedHeads } // SubscribersCount returns the number of client subscribed to the node @@ -389,7 +397,7 @@ func (r *rpcClient) UnsubscribeAllExceptAliveLoop() { defer r.stateMu.Unlock() for _, s := range r.subs { - if s != r.aliveLoopSub { + if s != r.aliveLoopHeadsSub && s != r.aliveLoopFinalizedHeadsSub { s.Unsubscribe() } } diff --git a/core/chains/evm/client/rpc_client_test.go b/core/chains/evm/client/rpc_client_test.go index 662c757ffb..7e97bd2aa2 100644 --- a/core/chains/evm/client/rpc_client_test.go +++ b/core/chains/evm/client/rpc_client_test.go @@ -318,6 +318,25 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { require.NoError(t, err) checkClosedRPCClientShouldRemoveExistingSub(t, ctx, sub, rpc) }) + t.Run("UnsubscribeAllExceptAliveLoop should keep finalized heads subscription open", func(t *testing.T) { + server := testutils.NewWSServer(t, chainId, serverCallBack) + wsURL := server.WSURL() + + rpc := client.NewRPCClient(lggr, wsURL, &url.URL{}, "rpc", 1, chainId, commonclient.Primary, 1, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + defer rpc.Close() + require.NoError(t, rpc.Dial(ctx)) + + _, sub, err := rpc.SubscribeToFinalizedHeads(tests.Context(t)) + require.NoError(t, err) + rpc.SetAliveLoopFinalizedHeadSub(sub) + rpc.UnsubscribeAllExceptAliveLoop() + select { + case <-sub.Err(): + t.Fatal("Expected subscription to remain open") + default: + } + checkClosedRPCClientShouldRemoveExistingSub(t, ctx, sub, rpc) + }) t.Run("Subscription error is properly wrapper", func(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL()