From e649c9371e3ee7a37d24777b6f7113e856af383e Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Thu, 31 Oct 2024 14:55:57 -0400 Subject: [PATCH 01/11] Update MultiNode to latest --- common/client/multi_node.go | 2 - common/client/node_lifecycle.go | 27 ++-- common/client/node_lifecycle_test.go | 46 ++++-- common/client/transaction_sender.go | 138 ++++++++-------- common/client/transaction_sender_test.go | 183 +++++++++++++--------- core/chains/evm/client/chain_client.go | 26 +-- core/chains/evm/client/evm_client.go | 4 +- core/chains/evm/client/helpers_test.go | 4 +- core/chains/evm/client/rpc_client.go | 41 ++++- core/chains/evm/client/rpc_client_test.go | 41 ++--- 10 files changed, 307 insertions(+), 205 deletions(-) diff --git a/common/client/multi_node.go b/common/client/multi_node.go index 9594743f6bd..012729f0f7e 100644 --- a/common/client/multi_node.go +++ b/common/client/multi_node.go @@ -94,8 +94,6 @@ func (c *MultiNode[CHAIN_ID, RPC]) ChainID() CHAIN_ID { func (c *MultiNode[CHAIN_ID, RPC]) DoAll(ctx context.Context, do func(ctx context.Context, rpc RPC, isSendOnly bool)) error { var err error ok := c.IfNotStopped(func() { - ctx, _ = c.chStop.Ctx(ctx) - callsCompleted := 0 for _, n := range c.primaryNodes { select { diff --git a/common/client/node_lifecycle.go b/common/client/node_lifecycle.go index ce508a43dde..1dd59a0c0c0 100644 --- a/common/client/node_lifecycle.go +++ b/common/client/node_lifecycle.go @@ -168,10 +168,12 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { n.declareUnreachable() return } - _, latestChainInfo := n.StateAndLatest() - if outOfSync, liveNodes := n.isOutOfSyncWithPool(latestChainInfo); outOfSync { + if outOfSync, liveNodes := n.isOutOfSyncWithPool(); outOfSync { // note: there must be another live node for us to be out of sync - lggr.Errorw("RPC endpoint has fallen behind", "blockNumber", latestChainInfo.BlockNumber, "totalDifficulty", latestChainInfo.TotalDifficulty, "nodeState", n.getCachedState()) + _, highest := n.poolInfoProvider.LatestChainInfo() + _, latestChainInfo := n.StateAndLatest() + lggr.Errorw("RPC endpoint has fallen behind", "blockNumber", latestChainInfo.BlockNumber, "bestLatestBlockNumber", + highest.BlockNumber, "totalDifficulty", latestChainInfo.TotalDifficulty, "nodeState", n.getCachedState()) if liveNodes < 2 { lggr.Criticalf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState) continue @@ -196,7 +198,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { case <-headsSub.NoNewHeads: // We haven't received a head on the channel for at least the // threshold amount of time, mark it broken - lggr.Errorw(fmt.Sprintf("RPC endpoint detected out of sync; no new heads received for %s (last head received was %v)", noNewHeadsTimeoutThreshold, localHighestChainInfo.BlockNumber), "nodeState", n.getCachedState(), "latestReceivedBlockNumber", localHighestChainInfo.BlockNumber, "noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold) + _, latestChainInfo := n.StateAndLatest() + lggr.Errorw(fmt.Sprintf("RPC endpoint detected out of sync; no new heads received for %s (last head received was %v)", noNewHeadsTimeoutThreshold, latestChainInfo.BlockNumber), "nodeState", n.getCachedState(), "latestReceivedBlockNumber", latestChainInfo.BlockNumber, "noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold) if n.poolInfoProvider != nil { if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 { lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState) @@ -310,7 +313,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) onNewFinalizedHead(lggr logger.SugaredLogger } latestFinalizedBN := latestFinalized.BlockNumber() - lggr.Tracew("Got latest finalized head", "latestFinalized", latestFinalized) + lggr.Debugw("Got latest finalized head", "latestFinalized", latestFinalized) if latestFinalizedBN <= chainInfo.FinalizedBlockNumber { lggr.Tracew("Ignoring previously seen finalized block number") return false @@ -328,7 +331,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) onNewHead(lggr logger.SugaredLogger, chainIn } promPoolRPCNodeNumSeenBlocks.WithLabelValues(n.chainID.String(), n.name).Inc() - lggr.Tracew("Got head", "head", head) + lggr.Debugw("Got head", "head", head) lggr = lggr.With("latestReceivedBlockNumber", chainInfo.BlockNumber, "blockNumber", head.BlockNumber(), "nodeState", n.getCachedState()) if head.BlockNumber() <= chainInfo.BlockNumber { lggr.Tracew("Ignoring previously seen block number") @@ -358,7 +361,7 @@ const ( // isOutOfSyncWithPool returns outOfSync true if num or td is more than SyncThresold behind the best node. // Always returns outOfSync false for SyncThreshold 0. // liveNodes is only included when outOfSync is true. -func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool(localState ChainInfo) (outOfSync bool, liveNodes int) { +func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool() (outOfSync bool, liveNodes int) { if n.poolInfoProvider == nil { n.lfcLog.Warn("skipping sync state against the pool - should only occur in tests") return // skip for tests @@ -369,13 +372,14 @@ func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool(localState ChainInfo) (o } // Check against best node ln, ci := n.poolInfoProvider.LatestChainInfo() + _, localChainInfo := n.StateAndLatest() mode := n.nodePoolCfg.SelectionMode() switch mode { case NodeSelectionModeHighestHead, NodeSelectionModeRoundRobin, NodeSelectionModePriorityLevel: - return localState.BlockNumber < ci.BlockNumber-int64(threshold), ln + return localChainInfo.BlockNumber < ci.BlockNumber-int64(threshold), ln case NodeSelectionModeTotalDifficulty: bigThreshold := big.NewInt(int64(threshold)) - return localState.TotalDifficulty.Cmp(bigmath.Sub(ci.TotalDifficulty, bigThreshold)) < 0, ln + return localChainInfo.TotalDifficulty.Cmp(bigmath.Sub(ci.TotalDifficulty, bigThreshold)) < 0, ln default: panic("unrecognized NodeSelectionMode: " + mode) } @@ -464,7 +468,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) { // received a new head - clear NoNewHead flag syncIssues &= ^syncStatusNoNewHead - if outOfSync, _ := n.isOutOfSyncWithPool(localHighestChainInfo); !outOfSync { + if outOfSync, _ := n.isOutOfSyncWithPool(); !outOfSync { // we caught up with the pool - clear NotInSyncWithPool flag syncIssues &= ^syncStatusNotInSyncWithPool } else { @@ -504,7 +508,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) { continue } - receivedNewHead := n.onNewFinalizedHead(lggr, &localHighestChainInfo, latestFinalized) + _, latestChainInfo := n.StateAndLatest() + receivedNewHead := n.onNewFinalizedHead(lggr, &latestChainInfo, latestFinalized) if !receivedNewHead { continue } diff --git a/common/client/node_lifecycle_test.go b/common/client/node_lifecycle_test.go index 6f9b4653393..60d1553a621 100644 --- a/common/client/node_lifecycle_test.go +++ b/common/client/node_lifecycle_test.go @@ -224,7 +224,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { poolInfo.On("LatestChainInfo").Return(10, ChainInfo{ BlockNumber: syncThreshold + mostRecentBlock + 1, TotalDifficulty: big.NewInt(10), - }).Once() + }) node.SetPoolChainInfoProvider(poolInfo) // tries to redial in outOfSync rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) { @@ -259,7 +259,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ BlockNumber: syncThreshold + mostRecentBlock + 1, TotalDifficulty: big.NewInt(10), - }).Once() + }) node.SetPoolChainInfoProvider(poolInfo) node.declareAlive() tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState)) @@ -288,7 +288,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { t.Run("when no new heads received for threshold, transitions to out of sync", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[types.ID, Head](t) - rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once() + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) node := newSubscribedNode(t, testNodeOpts{ config: testNodeConfig{}, chainConfig: clientMocks.ChainConfig{ @@ -312,7 +312,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { t.Run("when no new heads received for threshold but we are the last live node, forcibly stays alive", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[types.ID, Head](t) - rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once() + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) node := newSubscribedNode(t, testNodeOpts{ config: testNodeConfig{}, @@ -693,15 +693,25 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { t.Run("if fail to get chainID, transitions to unreachable", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[types.ID, Head](t) + chainID := types.RandomID() node := newAliveNode(t, testNodeOpts{ - rpc: rpc, + rpc: rpc, + chainID: chainID, }) defer func() { assert.NoError(t, node.close()) }() + rpc.On("ChainID", mock.Anything).Return(chainID, nil) // for out-of-sync rpc.On("Dial", mock.Anything).Return(nil).Once() // for unreachable rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe() + sub := mocks.NewSubscription(t) + errChan := make(chan error, 1) + errChan <- errors.New("subscription was terminate") + sub.On("Err").Return((<-chan error)(errChan)) + sub.On("Unsubscribe").Once() + rpc.On("SubscribeToHeads", mock.Anything).Return(make(<-chan Head), sub, nil) + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) expectedError := errors.New("failed to get chain ID") // might be called multiple times @@ -1025,7 +1035,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { sub.On("Err").Return((<-chan error)(errChan)) sub.On("Unsubscribe").Once() rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return(make(<-chan Head), sub, nil).Once() - rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once() + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) // unreachable rpc.On("Dial", mock.Anything).Return(errors.New("failed to redial")).Maybe() @@ -1056,7 +1066,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { rpc.On("SubscribeToFinalizedHeads", mock.Anything).Run(func(args mock.Arguments) { close(ch) }).Return((<-chan Head)(ch), sub, nil).Once() - rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once() + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) // unreachable rpc.On("Dial", mock.Anything).Return(errors.New("failed to redial")).Maybe() node.declareOutOfSync(syncStatusNoNewHead) @@ -1082,7 +1092,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { defer func() { assert.NoError(t, node.close()) }() const highestBlock = 13 - rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{FinalizedBlockNumber: highestBlock}).Once() + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{FinalizedBlockNumber: highestBlock}, ChainInfo{FinalizedBlockNumber: highestBlock}) outOfSyncSubscription := mocks.NewSubscription(t) outOfSyncSubscription.On("Err").Return((<-chan error)(nil)) @@ -1119,7 +1129,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { defer func() { assert.NoError(t, node.close()) }() const highestBlock = 13 - rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{FinalizedBlockNumber: highestBlock}).Once() + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{FinalizedBlockNumber: highestBlock}) outOfSyncSubscription := mocks.NewSubscription(t) outOfSyncSubscription.On("Err").Return((<-chan error)(nil)) @@ -1582,7 +1592,7 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) { t.Parallel() t.Run("skip if nLiveNodes is not configured", func(t *testing.T) { node := newTestNode(t, testNodeOpts{}) - outOfSync, liveNodes := node.isOutOfSyncWithPool(ChainInfo{}) + outOfSync, liveNodes := node.isOutOfSyncWithPool() assert.Equal(t, false, outOfSync) assert.Equal(t, 0, liveNodes) }) @@ -1590,7 +1600,7 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) { node := newTestNode(t, testNodeOpts{}) poolInfo := newMockPoolChainInfoProvider(t) node.SetPoolChainInfoProvider(poolInfo) - outOfSync, liveNodes := node.isOutOfSyncWithPool(ChainInfo{}) + outOfSync, liveNodes := node.isOutOfSyncWithPool() assert.Equal(t, false, outOfSync) assert.Equal(t, 0, liveNodes) }) @@ -1602,7 +1612,7 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) { poolInfo.On("LatestChainInfo").Return(1, ChainInfo{}).Once() node.SetPoolChainInfoProvider(poolInfo) assert.Panics(t, func() { - _, _ = node.isOutOfSyncWithPool(ChainInfo{}) + _, _ = node.isOutOfSyncWithPool() }) }) t.Run("block height selection mode", func(t *testing.T) { @@ -1653,7 +1663,11 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) { for _, td := range []int64{totalDifficulty - syncThreshold - 1, totalDifficulty - syncThreshold, totalDifficulty, totalDifficulty + 1} { for _, testCase := range testCases { t.Run(fmt.Sprintf("%s: SelectionModeVal: %s: total difficulty: %d", testCase.name, selectionMode, td), func(t *testing.T) { - outOfSync, liveNodes := node.isOutOfSyncWithPool(ChainInfo{BlockNumber: testCase.blockNumber, TotalDifficulty: big.NewInt(td)}) + chainInfo := ChainInfo{BlockNumber: testCase.blockNumber, TotalDifficulty: big.NewInt(td)} + rpc := newMockRPCClient[types.ID, Head](t) + rpc.On("GetInterceptedChainInfo").Return(chainInfo, ChainInfo{}).Once() + node.rpc = rpc + outOfSync, liveNodes := node.isOutOfSyncWithPool() assert.Equal(t, nodesNum, liveNodes) assert.Equal(t, testCase.outOfSync, outOfSync) }) @@ -1709,7 +1723,11 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) { for _, hb := range []int64{highestBlock - syncThreshold - 1, highestBlock - syncThreshold, highestBlock, highestBlock + 1} { for _, testCase := range testCases { t.Run(fmt.Sprintf("%s: SelectionModeVal: %s: highest block: %d", testCase.name, NodeSelectionModeTotalDifficulty, hb), func(t *testing.T) { - outOfSync, liveNodes := node.isOutOfSyncWithPool(ChainInfo{BlockNumber: hb, TotalDifficulty: big.NewInt(testCase.totalDifficulty)}) + chainInfo := ChainInfo{BlockNumber: hb, TotalDifficulty: big.NewInt(testCase.totalDifficulty)} + rpc := newMockRPCClient[types.ID, Head](t) + rpc.On("GetInterceptedChainInfo").Return(chainInfo, ChainInfo{}).Once() + node.rpc = rpc + outOfSync, liveNodes := node.isOutOfSyncWithPool() assert.Equal(t, nodesNum, liveNodes) assert.Equal(t, testCase.outOfSync, outOfSync) }) diff --git a/common/client/transaction_sender.go b/common/client/transaction_sender.go index 9365a82b290..5a746e538c0 100644 --- a/common/client/transaction_sender.go +++ b/common/client/transaction_sender.go @@ -14,6 +14,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink/v2/common/types" ) @@ -25,52 +26,49 @@ var ( }, []string{"network", "chainId", "invariant"}) ) -// TxErrorClassifier - defines interface of a function that transforms raw RPC error into the SendTxReturnCode enum -// (e.g. Successful, Fatal, Retryable, etc.) -type TxErrorClassifier[TX any] func(tx TX, err error) SendTxReturnCode - -type sendTxResult struct { - Err error - ResultCode SendTxReturnCode +type SendTxResult interface { + Code() SendTxReturnCode + TxError() error + Error() error } const sendTxQuorum = 0.7 // SendTxRPCClient - defines interface of an RPC used by TransactionSender to broadcast transaction -type SendTxRPCClient[TX any] interface { +type SendTxRPCClient[TX any, RESULT SendTxResult] interface { // SendTransaction errors returned should include name or other unique identifier of the RPC - SendTransaction(ctx context.Context, tx TX) error + SendTransaction(ctx context.Context, tx TX) RESULT } -func NewTransactionSender[TX any, CHAIN_ID types.ID, RPC SendTxRPCClient[TX]]( +func NewTransactionSender[TX any, RESULT SendTxResult, CHAIN_ID types.ID, RPC SendTxRPCClient[TX, RESULT]]( lggr logger.Logger, chainID CHAIN_ID, chainFamily string, multiNode *MultiNode[CHAIN_ID, RPC], - txErrorClassifier TxErrorClassifier[TX], + newResult func(err error) RESULT, sendTxSoftTimeout time.Duration, -) *TransactionSender[TX, CHAIN_ID, RPC] { +) *TransactionSender[TX, RESULT, CHAIN_ID, RPC] { if sendTxSoftTimeout == 0 { sendTxSoftTimeout = QueryTimeout / 2 } - return &TransactionSender[TX, CHAIN_ID, RPC]{ + return &TransactionSender[TX, RESULT, CHAIN_ID, RPC]{ chainID: chainID, chainFamily: chainFamily, lggr: logger.Sugared(lggr).Named("TransactionSender").With("chainID", chainID.String()), multiNode: multiNode, - txErrorClassifier: txErrorClassifier, + newResult: newResult, sendTxSoftTimeout: sendTxSoftTimeout, chStop: make(services.StopChan), } } -type TransactionSender[TX any, CHAIN_ID types.ID, RPC SendTxRPCClient[TX]] struct { +type TransactionSender[TX any, RESULT SendTxResult, CHAIN_ID types.ID, RPC SendTxRPCClient[TX, RESULT]] struct { services.StateMachine chainID CHAIN_ID chainFamily string lggr logger.SugaredLogger multiNode *MultiNode[CHAIN_ID, RPC] - txErrorClassifier TxErrorClassifier[TX] + newResult func(err error) RESULT sendTxSoftTimeout time.Duration // defines max waiting time from first response til responses evaluation wg sync.WaitGroup // waits for all reporting goroutines to finish @@ -95,17 +93,27 @@ type TransactionSender[TX any, CHAIN_ID types.ID, RPC SendTxRPCClient[TX]] struc // * If there is at least one terminal error - returns terminal error // * If there is both success and terminal error - returns success and reports invariant violation // * Otherwise, returns any (effectively random) of the errors. -func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) SendTransaction(ctx context.Context, tx TX) (SendTxReturnCode, error) { - txResults := make(chan sendTxResult) - txResultsToReport := make(chan sendTxResult) +func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ctx context.Context, tx TX) RESULT { + txResults := make(chan RESULT) + txResultsToReport := make(chan RESULT) primaryNodeWg := sync.WaitGroup{} if txSender.State() != "Started" { - return Retryable, errors.New("TransactionSender not started") + return txSender.newResult(errors.New("TransactionSender not started")) } + // Must wait for reportSendTxAnomalies and collectTxResults to complete before cancelling the context + txSenderCtx, cancel := txSender.chStop.NewCtx() + reportWg := sync.WaitGroup{} + defer func() { + go func() { + reportWg.Wait() + cancel() + }() + }() + healthyNodesNum := 0 - err := txSender.multiNode.DoAll(ctx, func(ctx context.Context, rpc RPC, isSendOnly bool) { + err := txSender.multiNode.DoAll(txSenderCtx, func(ctx context.Context, rpc RPC, isSendOnly bool) { if isSendOnly { txSender.wg.Add(1) go func() { @@ -122,17 +130,17 @@ func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) SendTransaction(ctx contex primaryNodeWg.Add(1) go func() { defer primaryNodeWg.Done() - result := txSender.broadcastTxAsync(ctx, rpc, tx) + r := txSender.broadcastTxAsync(ctx, rpc, tx) select { case <-ctx.Done(): return - case txResults <- result: + case txResults <- r: } select { case <-ctx.Done(): return - case txResultsToReport <- result: + case txResultsToReport <- r: } }() }) @@ -147,79 +155,80 @@ func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) SendTransaction(ctx contex }() if err != nil { - return Retryable, err + return txSender.newResult(err) } txSender.wg.Add(1) - go txSender.reportSendTxAnomalies(tx, txResultsToReport) + reportWg.Add(1) + go func() { + txSender.reportSendTxAnomalies(tx, txResultsToReport) + reportWg.Done() + }() return txSender.collectTxResults(ctx, tx, healthyNodesNum, txResults) } -func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) broadcastTxAsync(ctx context.Context, rpc RPC, tx TX) sendTxResult { - txErr := rpc.SendTransaction(ctx, tx) - txSender.lggr.Debugw("Node sent transaction", "tx", tx, "err", txErr) - resultCode := txSender.txErrorClassifier(tx, txErr) - if !slices.Contains(sendTxSuccessfulCodes, resultCode) { - txSender.lggr.Warnw("RPC returned error", "tx", tx, "err", txErr) +func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) broadcastTxAsync(ctx context.Context, rpc RPC, tx TX) RESULT { + result := rpc.SendTransaction(ctx, tx) + txSender.lggr.Debugw("Node sent transaction", "tx", tx, "err", result.TxError()) + if !slices.Contains(sendTxSuccessfulCodes, result.Code()) { + txSender.lggr.Warnw("RPC returned error", "tx", tx, "err", result.TxError()) } - return sendTxResult{Err: txErr, ResultCode: resultCode} + return result } -func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) reportSendTxAnomalies(tx TX, txResults <-chan sendTxResult) { +func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) reportSendTxAnomalies(tx TX, txResults <-chan RESULT) { defer txSender.wg.Done() - resultsByCode := sendTxResults{} + resultsByCode := sendTxResults[RESULT]{} // txResults eventually will be closed for txResult := range txResults { - resultsByCode[txResult.ResultCode] = append(resultsByCode[txResult.ResultCode], txResult.Err) + resultsByCode[txResult.Code()] = append(resultsByCode[txResult.Code()], txResult) } - _, _, criticalErr := aggregateTxResults(resultsByCode) + _, criticalErr := aggregateTxResults[RESULT](resultsByCode) if criticalErr != nil { txSender.lggr.Criticalw("observed invariant violation on SendTransaction", "tx", tx, "resultsByCode", resultsByCode, "err", criticalErr) PromMultiNodeInvariantViolations.WithLabelValues(txSender.chainFamily, txSender.chainID.String(), criticalErr.Error()).Inc() } } -type sendTxResults map[SendTxReturnCode][]error +type sendTxResults[RESULT any] map[SendTxReturnCode][]RESULT -func aggregateTxResults(resultsByCode sendTxResults) (returnCode SendTxReturnCode, txResult error, err error) { - severeCode, severeErrors, hasSevereErrors := findFirstIn(resultsByCode, sendTxSevereErrors) - successCode, successResults, hasSuccess := findFirstIn(resultsByCode, sendTxSuccessfulCodes) +func aggregateTxResults[RESULT any](resultsByCode sendTxResults[RESULT]) (result RESULT, criticalErr error) { + severeErrors, hasSevereErrors := findFirstIn(resultsByCode, sendTxSevereErrors) + successResults, hasSuccess := findFirstIn(resultsByCode, sendTxSuccessfulCodes) if hasSuccess { // We assume that primary node would never report false positive txResult for a transaction. // Thus, if such case occurs it's probably due to misconfiguration or a bug and requires manual intervention. if hasSevereErrors { const errMsg = "found contradictions in nodes replies on SendTransaction: got success and severe error" // return success, since at least 1 node has accepted our broadcasted Tx, and thus it can now be included onchain - return successCode, successResults[0], errors.New(errMsg) + return successResults[0], errors.New(errMsg) } // other errors are temporary - we are safe to return success - return successCode, successResults[0], nil + return successResults[0], nil } if hasSevereErrors { - return severeCode, severeErrors[0], nil + return severeErrors[0], nil } // return temporary error - for code, result := range resultsByCode { - return code, result[0], nil + for _, r := range resultsByCode { + return r[0], nil } - err = fmt.Errorf("expected at least one response on SendTransaction") - return Retryable, err, err + criticalErr = fmt.Errorf("expected at least one response on SendTransaction") + return result, criticalErr } -func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) collectTxResults(ctx context.Context, tx TX, healthyNodesNum int, txResults <-chan sendTxResult) (SendTxReturnCode, error) { +func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) collectTxResults(ctx context.Context, tx TX, healthyNodesNum int, txResults <-chan RESULT) RESULT { if healthyNodesNum == 0 { - return Retryable, ErroringNodeError + return txSender.newResult(ErroringNodeError) } - ctx, cancel := txSender.chStop.Ctx(ctx) - defer cancel() requiredResults := int(math.Ceil(float64(healthyNodesNum) * sendTxQuorum)) - errorsByCode := sendTxResults{} + errorsByCode := sendTxResults[RESULT]{} var softTimeoutChan <-chan time.Time var resultsCount int loop: @@ -227,11 +236,11 @@ loop: select { case <-ctx.Done(): txSender.lggr.Debugw("Failed to collect of the results before context was done", "tx", tx, "errorsByCode", errorsByCode) - return Retryable, ctx.Err() - case result := <-txResults: - errorsByCode[result.ResultCode] = append(errorsByCode[result.ResultCode], result.Err) + return txSender.newResult(ctx.Err()) + case r := <-txResults: + errorsByCode[r.Code()] = append(errorsByCode[r.Code()], r) resultsCount++ - if slices.Contains(sendTxSuccessfulCodes, result.ResultCode) || resultsCount >= requiredResults { + if slices.Contains(sendTxSuccessfulCodes, r.Code()) || resultsCount >= requiredResults { break loop } case <-softTimeoutChan: @@ -249,17 +258,17 @@ loop: } // ignore critical error as it's reported in reportSendTxAnomalies - returnCode, result, _ := aggregateTxResults(errorsByCode) - return returnCode, result + result, _ := aggregateTxResults(errorsByCode) + return result } -func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) Start(ctx context.Context) error { +func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Start(ctx context.Context) error { return txSender.StartOnce("TransactionSender", func() error { return nil }) } -func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) Close() error { +func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Close() error { return txSender.StopOnce("TransactionSender", func() error { close(txSender.chStop) txSender.wg.Wait() @@ -268,13 +277,12 @@ func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) Close() error { } // findFirstIn - returns the first existing key and value for the slice of keys -func findFirstIn[K comparable, V any](set map[K]V, keys []K) (K, V, bool) { +func findFirstIn[K comparable, V any](set map[K]V, keys []K) (V, bool) { for _, k := range keys { if v, ok := set[k]; ok { - return k, v, true + return v, true } } - var zeroK K var zeroV V - return zeroK, zeroV, false + return zeroV, false } diff --git a/common/client/transaction_sender_test.go b/common/client/transaction_sender_test.go index 5517a0c8dda..2d8e533783c 100644 --- a/common/client/transaction_sender_test.go +++ b/common/client/transaction_sender_test.go @@ -17,7 +17,7 @@ import ( ) type sendTxMultiNode struct { - *MultiNode[types.ID, SendTxRPCClient[any]] + *MultiNode[types.ID, SendTxRPCClient[any, *sendTxResult]] } type sendTxRPC struct { @@ -25,29 +25,57 @@ type sendTxRPC struct { sendTxErr error } -var _ SendTxRPCClient[any] = (*sendTxRPC)(nil) +type sendTxResult struct { + err error + txErr error + code SendTxReturnCode +} + +var _ SendTxResult = (*sendTxResult)(nil) + +func NewSendTxResult(err error) *sendTxResult { + result := &sendTxResult{ + err: err, + txErr: err, + } + return result +} + +func (r *sendTxResult) Error() error { + return r.err +} + +func (r *sendTxResult) TxError() error { + return r.txErr +} + +func (r *sendTxResult) Code() SendTxReturnCode { + return r.code +} + +var _ SendTxRPCClient[any, *sendTxResult] = (*sendTxRPC)(nil) func newSendTxRPC(sendTxErr error, sendTxRun func(args mock.Arguments)) *sendTxRPC { return &sendTxRPC{sendTxErr: sendTxErr, sendTxRun: sendTxRun} } -func (rpc *sendTxRPC) SendTransaction(ctx context.Context, _ any) error { +func (rpc *sendTxRPC) SendTransaction(ctx context.Context, _ any) *sendTxResult { if rpc.sendTxRun != nil { rpc.sendTxRun(mock.Arguments{ctx}) } - return rpc.sendTxErr + return &sendTxResult{err: rpc.sendTxErr, txErr: rpc.sendTxErr, code: classifySendTxError(nil, rpc.sendTxErr)} } func newTestTransactionSender(t *testing.T, chainID types.ID, lggr logger.Logger, - nodes []Node[types.ID, SendTxRPCClient[any]], - sendOnlyNodes []SendOnlyNode[types.ID, SendTxRPCClient[any]], -) (*sendTxMultiNode, *TransactionSender[any, types.ID, SendTxRPCClient[any]]) { - mn := sendTxMultiNode{NewMultiNode[types.ID, SendTxRPCClient[any]]( + nodes []Node[types.ID, SendTxRPCClient[any, *sendTxResult]], + sendOnlyNodes []SendOnlyNode[types.ID, SendTxRPCClient[any, *sendTxResult]], +) (*sendTxMultiNode, *TransactionSender[any, *sendTxResult, types.ID, SendTxRPCClient[any, *sendTxResult]]) { + mn := sendTxMultiNode{NewMultiNode[types.ID, SendTxRPCClient[any, *sendTxResult]]( lggr, NodeSelectionModeRoundRobin, 0, nodes, sendOnlyNodes, chainID, "chainFamily", 0)} err := mn.StartOnce("startedTestMultiNode", func() error { return nil }) require.NoError(t, err) - txSender := NewTransactionSender[any, types.ID, SendTxRPCClient[any]](lggr, chainID, mn.chainFamily, mn.MultiNode, classifySendTxError, tests.TestInterval) + txSender := NewTransactionSender[any, *sendTxResult, types.ID, SendTxRPCClient[any, *sendTxResult]](lggr, chainID, mn.chainFamily, mn.MultiNode, NewSendTxResult, tests.TestInterval) err = txSender.Start(tests.Context(t)) require.NoError(t, err) @@ -76,9 +104,9 @@ func classifySendTxError(_ any, err error) SendTxReturnCode { func TestTransactionSender_SendTransaction(t *testing.T) { t.Parallel() - newNodeWithState := func(t *testing.T, state nodeState, txErr error, sendTxRun func(args mock.Arguments)) *mockNode[types.ID, SendTxRPCClient[any]] { + newNodeWithState := func(t *testing.T, state nodeState, txErr error, sendTxRun func(args mock.Arguments)) *mockNode[types.ID, SendTxRPCClient[any, *sendTxResult]] { rpc := newSendTxRPC(txErr, sendTxRun) - node := newMockNode[types.ID, SendTxRPCClient[any]](t) + node := newMockNode[types.ID, SendTxRPCClient[any, *sendTxResult]](t) node.On("String").Return("node name").Maybe() node.On("RPC").Return(rpc).Maybe() node.On("State").Return(state).Maybe() @@ -86,15 +114,15 @@ func TestTransactionSender_SendTransaction(t *testing.T) { return node } - newNode := func(t *testing.T, txErr error, sendTxRun func(args mock.Arguments)) *mockNode[types.ID, SendTxRPCClient[any]] { + newNode := func(t *testing.T, txErr error, sendTxRun func(args mock.Arguments)) *mockNode[types.ID, SendTxRPCClient[any, *sendTxResult]] { return newNodeWithState(t, nodeStateAlive, txErr, sendTxRun) } t.Run("Fails if there is no nodes available", func(t *testing.T) { lggr, _ := logger.TestObserved(t, zap.DebugLevel) _, txSender := newTestTransactionSender(t, types.RandomID(), lggr, nil, nil) - _, err := txSender.SendTransaction(tests.Context(t), nil) - assert.EqualError(t, err, ErroringNodeError.Error()) + result := txSender.SendTransaction(tests.Context(t), nil) + assert.EqualError(t, result.Error(), ErroringNodeError.Error()) }) t.Run("Transaction failure happy path", func(t *testing.T) { @@ -103,12 +131,12 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) _, txSender := newTestTransactionSender(t, types.RandomID(), lggr, - []Node[types.ID, SendTxRPCClient[any]]{mainNode}, - []SendOnlyNode[types.ID, SendTxRPCClient[any]]{newNode(t, errors.New("unexpected error"), nil)}) + []Node[types.ID, SendTxRPCClient[any, *sendTxResult]]{mainNode}, + []SendOnlyNode[types.ID, SendTxRPCClient[any, *sendTxResult]]{newNode(t, errors.New("unexpected error"), nil)}) - result, sendErr := txSender.SendTransaction(tests.Context(t), nil) - require.ErrorIs(t, sendErr, expectedError) - require.Equal(t, Fatal, result) + result := txSender.SendTransaction(tests.Context(t), nil) + require.ErrorIs(t, result.TxError(), expectedError) + require.Equal(t, Fatal, result.Code()) tests.AssertLogCountEventually(t, observedLogs, "Node sent transaction", 2) tests.AssertLogCountEventually(t, observedLogs, "RPC returned error", 2) }) @@ -118,12 +146,12 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) _, txSender := newTestTransactionSender(t, types.RandomID(), lggr, - []Node[types.ID, SendTxRPCClient[any]]{mainNode}, - []SendOnlyNode[types.ID, SendTxRPCClient[any]]{newNode(t, errors.New("unexpected error"), nil)}) + []Node[types.ID, SendTxRPCClient[any, *sendTxResult]]{mainNode}, + []SendOnlyNode[types.ID, SendTxRPCClient[any, *sendTxResult]]{newNode(t, errors.New("unexpected error"), nil)}) - result, sendErr := txSender.SendTransaction(tests.Context(t), nil) - require.NoError(t, sendErr) - require.Equal(t, Successful, result) + result := txSender.SendTransaction(tests.Context(t), nil) + require.NoError(t, result.TxError()) + require.Equal(t, Successful, result.Code()) tests.AssertLogCountEventually(t, observedLogs, "Node sent transaction", 2) tests.AssertLogCountEventually(t, observedLogs, "RPC returned error", 1) }) @@ -140,12 +168,12 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr, _ := logger.TestObserved(t, zap.DebugLevel) _, txSender := newTestTransactionSender(t, types.RandomID(), lggr, - []Node[types.ID, SendTxRPCClient[any]]{mainNode}, nil) + []Node[types.ID, SendTxRPCClient[any, *sendTxResult]]{mainNode}, nil) requestContext, cancel := context.WithCancel(tests.Context(t)) cancel() - _, sendErr := txSender.SendTransaction(requestContext, nil) - require.EqualError(t, sendErr, "context canceled") + result := txSender.SendTransaction(requestContext, nil) + require.EqualError(t, result.TxError(), "context canceled") }) t.Run("Soft timeout stops results collection", func(t *testing.T) { @@ -163,9 +191,9 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr, _ := logger.TestObserved(t, zap.DebugLevel) - _, txSender := newTestTransactionSender(t, chainID, lggr, []Node[types.ID, SendTxRPCClient[any]]{fastNode, slowNode}, nil) - _, sendErr := txSender.SendTransaction(tests.Context(t), nil) - require.EqualError(t, sendErr, expectedError.Error()) + _, txSender := newTestTransactionSender(t, chainID, lggr, []Node[types.ID, SendTxRPCClient[any, *sendTxResult]]{fastNode, slowNode}, nil) + result := txSender.SendTransaction(tests.Context(t), nil) + require.EqualError(t, result.TxError(), expectedError.Error()) }) t.Run("Returns success without waiting for the rest of the nodes", func(t *testing.T) { chainID := types.RandomID() @@ -183,12 +211,12 @@ func TestTransactionSender_SendTransaction(t *testing.T) { }) lggr, _ := logger.TestObserved(t, zap.WarnLevel) mn, txSender := newTestTransactionSender(t, chainID, lggr, - []Node[types.ID, SendTxRPCClient[any]]{fastNode, slowNode}, - []SendOnlyNode[types.ID, SendTxRPCClient[any]]{slowSendOnly}) + []Node[types.ID, SendTxRPCClient[any, *sendTxResult]]{fastNode, slowNode}, + []SendOnlyNode[types.ID, SendTxRPCClient[any, *sendTxResult]]{slowSendOnly}) - rtnCode, err := txSender.SendTransaction(tests.Context(t), nil) - require.NoError(t, err) - require.Equal(t, Successful, rtnCode) + result := txSender.SendTransaction(tests.Context(t), nil) + require.NoError(t, result.Error()) + require.Equal(t, Successful, result.Code()) require.NoError(t, mn.Close()) }) t.Run("Fails when multinode is closed", func(t *testing.T) { @@ -209,12 +237,12 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr, _ := logger.TestObserved(t, zap.DebugLevel) mn, txSender := newTestTransactionSender(t, chainID, lggr, - []Node[types.ID, SendTxRPCClient[any]]{fastNode, slowNode}, - []SendOnlyNode[types.ID, SendTxRPCClient[any]]{slowSendOnly}) + []Node[types.ID, SendTxRPCClient[any, *sendTxResult]]{fastNode, slowNode}, + []SendOnlyNode[types.ID, SendTxRPCClient[any, *sendTxResult]]{slowSendOnly}) require.NoError(t, mn.Close()) - _, err := txSender.SendTransaction(tests.Context(t), nil) - require.EqualError(t, err, "MultiNode is stopped") + result := txSender.SendTransaction(tests.Context(t), nil) + require.EqualError(t, result.Error(), "MultiNode is stopped") }) t.Run("Fails when closed", func(t *testing.T) { chainID := types.RandomID() @@ -234,12 +262,12 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr, _ := logger.TestObserved(t, zap.DebugLevel) _, txSender := newTestTransactionSender(t, chainID, lggr, - []Node[types.ID, SendTxRPCClient[any]]{fastNode, slowNode}, - []SendOnlyNode[types.ID, SendTxRPCClient[any]]{slowSendOnly}) + []Node[types.ID, SendTxRPCClient[any, *sendTxResult]]{fastNode, slowNode}, + []SendOnlyNode[types.ID, SendTxRPCClient[any, *sendTxResult]]{slowSendOnly}) require.NoError(t, txSender.Close()) - _, err := txSender.SendTransaction(tests.Context(t), nil) - require.EqualError(t, err, "TransactionSender not started") + result := txSender.SendTransaction(tests.Context(t), nil) + require.EqualError(t, result.Error(), "TransactionSender not started") }) t.Run("Returns error if there is no healthy primary nodes", func(t *testing.T) { chainID := types.RandomID() @@ -249,11 +277,11 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr, _ := logger.TestObserved(t, zap.DebugLevel) _, txSender := newTestTransactionSender(t, chainID, lggr, - []Node[types.ID, SendTxRPCClient[any]]{primary}, - []SendOnlyNode[types.ID, SendTxRPCClient[any]]{sendOnly}) + []Node[types.ID, SendTxRPCClient[any, *sendTxResult]]{primary}, + []SendOnlyNode[types.ID, SendTxRPCClient[any, *sendTxResult]]{sendOnly}) - _, sendErr := txSender.SendTransaction(tests.Context(t), nil) - assert.EqualError(t, sendErr, ErroringNodeError.Error()) + result := txSender.SendTransaction(tests.Context(t), nil) + assert.EqualError(t, result.TxError(), ErroringNodeError.Error()) }) t.Run("Transaction success even if one of the nodes is unhealthy", func(t *testing.T) { @@ -268,12 +296,12 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr, _ := logger.TestObserved(t, zap.DebugLevel) _, txSender := newTestTransactionSender(t, chainID, lggr, - []Node[types.ID, SendTxRPCClient[any]]{mainNode, unhealthyNode}, - []SendOnlyNode[types.ID, SendTxRPCClient[any]]{unhealthySendOnlyNode}) + []Node[types.ID, SendTxRPCClient[any, *sendTxResult]]{mainNode, unhealthyNode}, + []SendOnlyNode[types.ID, SendTxRPCClient[any, *sendTxResult]]{unhealthySendOnlyNode}) - returnCode, sendErr := txSender.SendTransaction(tests.Context(t), nil) - require.NoError(t, sendErr) - require.Equal(t, Successful, returnCode) + result := txSender.SendTransaction(tests.Context(t), nil) + require.NoError(t, result.Error()) + require.Equal(t, Successful, result.Code()) }) } @@ -288,65 +316,66 @@ func TestTransactionSender_SendTransaction_aggregateTxResults(t *testing.T) { testCases := []struct { Name string ExpectedTxResult string + ExpectedNilResult bool ExpectedCriticalErr string - ResultsByCode sendTxResults + ResultsByCode sendTxResults[*sendTxResult] }{ { Name: "Returns success and logs critical error on success and Fatal", ExpectedTxResult: "success", ExpectedCriticalErr: "found contradictions in nodes replies on SendTransaction: got success and severe error", - ResultsByCode: sendTxResults{ - Successful: {errors.New("success")}, - Fatal: {errors.New("fatal")}, + ResultsByCode: sendTxResults[*sendTxResult]{ + Successful: {NewSendTxResult(errors.New("success"))}, + Fatal: {NewSendTxResult(errors.New("fatal"))}, }, }, { Name: "Returns TransactionAlreadyKnown and logs critical error on TransactionAlreadyKnown and Fatal", ExpectedTxResult: "tx_already_known", ExpectedCriticalErr: "found contradictions in nodes replies on SendTransaction: got success and severe error", - ResultsByCode: sendTxResults{ - TransactionAlreadyKnown: {errors.New("tx_already_known")}, - Unsupported: {errors.New("unsupported")}, + ResultsByCode: sendTxResults[*sendTxResult]{ + TransactionAlreadyKnown: {NewSendTxResult(errors.New("tx_already_known"))}, + Unsupported: {NewSendTxResult(errors.New("unsupported"))}, }, }, { Name: "Prefers sever error to temporary", ExpectedTxResult: "underpriced", ExpectedCriticalErr: "", - ResultsByCode: sendTxResults{ - Retryable: {errors.New("retryable")}, - Underpriced: {errors.New("underpriced")}, + ResultsByCode: sendTxResults[*sendTxResult]{ + Retryable: {NewSendTxResult(errors.New("retryable"))}, + Underpriced: {NewSendTxResult(errors.New("underpriced"))}, }, }, { Name: "Returns temporary error", ExpectedTxResult: "retryable", ExpectedCriticalErr: "", - ResultsByCode: sendTxResults{ - Retryable: {errors.New("retryable")}, + ResultsByCode: sendTxResults[*sendTxResult]{ + Retryable: {NewSendTxResult(errors.New("retryable"))}, }, }, { Name: "Insufficient funds is treated as error", ExpectedTxResult: "", ExpectedCriticalErr: "", - ResultsByCode: sendTxResults{ - Successful: {nil}, - InsufficientFunds: {errors.New("insufficientFunds")}, + ResultsByCode: sendTxResults[*sendTxResult]{ + Successful: {NewSendTxResult(nil)}, + InsufficientFunds: {NewSendTxResult(errors.New("insufficientFunds"))}, }, }, { Name: "Logs critical error on empty ResultsByCode", - ExpectedTxResult: "expected at least one response on SendTransaction", + ExpectedNilResult: true, ExpectedCriticalErr: "expected at least one response on SendTransaction", - ResultsByCode: sendTxResults{}, + ResultsByCode: sendTxResults[*sendTxResult]{}, }, { Name: "Zk terminally stuck", ExpectedTxResult: "not enough keccak counters to continue the execution", ExpectedCriticalErr: "", - ResultsByCode: sendTxResults{ - TerminallyStuck: {errors.New("not enough keccak counters to continue the execution")}, + ResultsByCode: sendTxResults[*sendTxResult]{ + TerminallyStuck: {NewSendTxResult(errors.New("not enough keccak counters to continue the execution"))}, }, }, } @@ -357,11 +386,13 @@ func TestTransactionSender_SendTransaction_aggregateTxResults(t *testing.T) { } t.Run(testCase.Name, func(t *testing.T) { - _, txResult, err := aggregateTxResults(testCase.ResultsByCode) - if testCase.ExpectedTxResult == "" { - assert.NoError(t, err) - } else { - assert.EqualError(t, txResult, testCase.ExpectedTxResult) + txResult, err := aggregateTxResults(testCase.ResultsByCode) + if !testCase.ExpectedNilResult { + if testCase.ExpectedTxResult == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, txResult.TxError(), testCase.ExpectedTxResult) + } } logger.Sugared(logger.Test(t)).Info("Map: " + fmt.Sprint(testCase.ResultsByCode)) diff --git a/core/chains/evm/client/chain_client.go b/core/chains/evm/client/chain_client.go index ee45226ab16..43d31d891a9 100644 --- a/core/chains/evm/client/chain_client.go +++ b/core/chains/evm/client/chain_client.go @@ -2,6 +2,7 @@ package client import ( "context" + "fmt" "math/big" "sync" "time" @@ -100,7 +101,7 @@ type chainClient struct { *big.Int, *RPCClient, ] - txSender *commonclient.TransactionSender[*types.Transaction, *big.Int, *RPCClient] + txSender *commonclient.TransactionSender[*types.Transaction, *SendTxResult, *big.Int, *RPCClient] logger logger.SugaredLogger chainType chaintype.ChainType clientErrors evmconfig.ClientErrors @@ -129,16 +130,12 @@ func NewChainClient( deathDeclarationDelay, ) - classifySendError := func(tx *types.Transaction, err error) commonclient.SendTxReturnCode { - return ClassifySendError(err, clientErrors, logger.Sugared(logger.Nop()), tx, common.Address{}, chainType.IsL2()) - } - - txSender := commonclient.NewTransactionSender[*types.Transaction, *big.Int, *RPCClient]( + txSender := commonclient.NewTransactionSender[*types.Transaction, *SendTxResult, *big.Int, *RPCClient]( lggr, chainID, chainFamily, multiNode, - classifySendError, + NewSendTxResult, 0, // use the default value provided by the implementation ) @@ -376,15 +373,24 @@ func (c *chainClient) PendingNonceAt(ctx context.Context, account common.Address } func (c *chainClient) SendTransaction(ctx context.Context, tx *types.Transaction) error { + var result *SendTxResult if c.chainType == chaintype.ChainHedera { activeRPC, err := c.multiNode.SelectRPC() if err != nil { return err } - return activeRPC.SendTransaction(ctx, tx) + result = activeRPC.SendTransaction(ctx, tx) + + } else { + result = c.txSender.SendTransaction(ctx, tx) + } + if result == nil { + return fmt.Errorf("SendTransaction failed: result is nil") + } + if result.Error() != nil { + return result.Error() } - _, err := c.txSender.SendTransaction(ctx, tx) - return err + return result.TxError() } func (c *chainClient) SendTransactionReturnCode(ctx context.Context, tx *types.Transaction, fromAddress common.Address) (commonclient.SendTxReturnCode, error) { diff --git a/core/chains/evm/client/evm_client.go b/core/chains/evm/client/evm_client.go index 18206265fd7..1f81981cca4 100644 --- a/core/chains/evm/client/evm_client.go +++ b/core/chains/evm/client/evm_client.go @@ -21,13 +21,13 @@ func NewEvmClient(cfg evmconfig.NodePool, chainCfg commonclient.ChainConfig, cli for i, node := range nodes { if node.SendOnly != nil && *node.SendOnly { rpc := NewRPCClient(cfg, lggr, nil, node.HTTPURL.URL(), *node.Name, i, chainID, - commonclient.Secondary, largePayloadRPCTimeout, defaultRPCTimeout, chainType) + commonclient.Secondary, largePayloadRPCTimeout, defaultRPCTimeout, chainType, clientErrors) sendonly := commonclient.NewSendOnlyNode(lggr, (url.URL)(*node.HTTPURL), *node.Name, chainID, rpc) sendonlys = append(sendonlys, sendonly) } else { rpc := NewRPCClient(cfg, lggr, node.WSURL.URL(), node.HTTPURL.URL(), *node.Name, i, - chainID, commonclient.Primary, largePayloadRPCTimeout, defaultRPCTimeout, chainType) + chainID, commonclient.Primary, largePayloadRPCTimeout, defaultRPCTimeout, chainType, clientErrors) primaryNode := commonclient.NewNode(cfg, chainCfg, lggr, node.WSURL.URL(), node.HTTPURL.URL(), *node.Name, i, chainID, *node.Order, rpc, "EVM") diff --git a/core/chains/evm/client/helpers_test.go b/core/chains/evm/client/helpers_test.go index f9751be765c..5d793e46765 100644 --- a/core/chains/evm/client/helpers_test.go +++ b/core/chains/evm/client/helpers_test.go @@ -150,7 +150,7 @@ func NewChainClientWithTestNode( nodePoolCfg := TestNodePoolConfig{ NodeFinalizedBlockPollInterval: 1 * time.Second, } - rpc := NewRPCClient(nodePoolCfg, lggr, parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := NewRPCClient(nodePoolCfg, lggr, parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) n := commonclient.NewNode[*big.Int, *evmtypes.Head, *RPCClient]( nodeCfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, parsed, rpcHTTPURL, "eth-primary-node-0", id, chainID, 1, rpc, "EVM") @@ -161,7 +161,7 @@ func NewChainClientWithTestNode( if u.Scheme != "http" && u.Scheme != "https" { return nil, pkgerrors.Errorf("sendonly ethereum rpc url scheme must be http(s): %s", u.String()) } - rpc := NewRPCClient(nodePoolCfg, lggr, nil, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := NewRPCClient(nodePoolCfg, lggr, nil, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) s := commonclient.NewSendOnlyNode[*big.Int, *RPCClient]( lggr, u, fmt.Sprintf("eth-sendonly-%d", i), chainID, rpc) sendonlys = append(sendonlys, s) diff --git a/core/chains/evm/client/rpc_client.go b/core/chains/evm/client/rpc_client.go index d2247b0343e..21e7fa720b4 100644 --- a/core/chains/evm/client/rpc_client.go +++ b/core/chains/evm/client/rpc_client.go @@ -98,6 +98,7 @@ type RPCClient struct { newHeadsPollInterval time.Duration rpcTimeout time.Duration chainType chaintype.ChainType + clientErrors config.ClientErrors ws *rawclient http *rawclient @@ -122,7 +123,7 @@ type RPCClient struct { } var _ commonclient.RPCClient[*big.Int, *evmtypes.Head] = (*RPCClient)(nil) -var _ commonclient.SendTxRPCClient[*types.Transaction] = (*RPCClient)(nil) +var _ commonclient.SendTxRPCClient[*types.Transaction, *SendTxResult] = (*RPCClient)(nil) func NewRPCClient( cfg config.NodePool, @@ -136,11 +137,13 @@ func NewRPCClient( largePayloadRPCTimeout time.Duration, rpcTimeout time.Duration, chainType chaintype.ChainType, + clientErrors config.ClientErrors, ) *RPCClient { r := &RPCClient{ largePayloadRPCTimeout: largePayloadRPCTimeout, rpcTimeout: rpcTimeout, chainType: chainType, + clientErrors: clientErrors, } r.cfg = cfg r.name = name @@ -790,7 +793,35 @@ func (r *RPCClient) BlockByNumberGeth(ctx context.Context, number *big.Int) (blo return } -func (r *RPCClient) SendTransaction(ctx context.Context, tx *types.Transaction) error { +type SendTxResult struct { + err error + txErr error + code commonclient.SendTxReturnCode +} + +var _ commonclient.SendTxResult = (*SendTxResult)(nil) + +func NewSendTxResult(err error) *SendTxResult { + result := &SendTxResult{ + err: err, + txErr: err, + } + return result +} + +func (r *SendTxResult) Error() error { + return r.err +} + +func (r *SendTxResult) TxError() error { + return r.txErr +} + +func (r *SendTxResult) Code() commonclient.SendTxReturnCode { + return r.code +} + +func (r *RPCClient) SendTransaction(ctx context.Context, tx *types.Transaction) *SendTxResult { ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.largePayloadRPCTimeout) defer cancel() lggr := r.newRqLggr().With("tx", tx) @@ -807,7 +838,11 @@ func (r *RPCClient) SendTransaction(ctx context.Context, tx *types.Transaction) r.logResult(lggr, err, duration, r.getRPCDomain(), "SendTransaction") - return err + return &SendTxResult{ + err: nil, + txErr: err, + code: ClassifySendError(err, r.clientErrors, logger.Sugared(logger.Nop()), tx, common.Address{}, r.chainType.IsL2()), + } } func (r *RPCClient) SimulateTransaction(ctx context.Context, tx *types.Transaction) error { diff --git a/core/chains/evm/client/rpc_client_test.go b/core/chains/evm/client/rpc_client_test.go index edbb10cc36f..67a35a33481 100644 --- a/core/chains/evm/client/rpc_client_test.go +++ b/core/chains/evm/client/rpc_client_test.go @@ -106,7 +106,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { t.Run("WS and HTTP URL cannot be both empty", func(t *testing.T) { // ws is optional when LogBroadcaster is disabled, however SubscribeFilterLogs will return error if ws is missing observedLggr, _ := logger.TestObserved(t, zap.DebugLevel) - rpcClient := client.NewRPCClient(nodePoolCfgHeadPolling, observedLggr, nil, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpcClient := client.NewRPCClient(nodePoolCfgHeadPolling, observedLggr, nil, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) require.Equal(t, errors.New("cannot dial rpc client when both ws and http info are missing"), rpcClient.Dial(ctx)) }) @@ -114,7 +114,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) // set to default values @@ -164,7 +164,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) @@ -189,7 +189,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) @@ -215,7 +215,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) var wg sync.WaitGroup @@ -238,7 +238,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { t.Run("Block's chain ID matched configured", func(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) ch, sub, err := rpc.SubscribeToHeads(tests.Context(t)) @@ -254,7 +254,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { }) wsURL := server.WSURL() observedLggr, observed := logger.TestObserved(t, zap.DebugLevel) - rpc := client.NewRPCClient(nodePoolCfgNoPolling, observedLggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(nodePoolCfgNoPolling, observedLggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) require.NoError(t, rpc.Dial(ctx)) server.Close() _, _, err := rpc.SubscribeToHeads(ctx) @@ -264,7 +264,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { t.Run("Closed rpc client should remove existing SubscribeToHeads subscription with WS", func(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(nodePoolCfgNoPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(nodePoolCfgNoPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) @@ -276,7 +276,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) @@ -288,7 +288,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(nodePoolCfgNoPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(nodePoolCfgNoPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) @@ -300,7 +300,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) @@ -312,7 +312,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) @@ -323,7 +323,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { t.Run("Subscription error is properly wrapper", func(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(nodePoolCfgNoPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(nodePoolCfgNoPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) SetNextRPCHead(nil) @@ -355,7 +355,7 @@ func TestRPCClient_SubscribeFilterLogs(t *testing.T) { t.Run("Failed SubscribeFilterLogs when WSURL is empty", func(t *testing.T) { // ws is optional when LogBroadcaster is disabled, however SubscribeFilterLogs will return error if ws is missing observedLggr, _ := logger.TestObserved(t, zap.DebugLevel) - rpcClient := client.NewRPCClient(nodePoolCfg, observedLggr, nil, &url.URL{}, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpcClient := client.NewRPCClient(nodePoolCfg, observedLggr, nil, &url.URL{}, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) require.Nil(t, rpcClient.Dial(ctx)) _, err := rpcClient.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, make(chan types.Log)) @@ -367,7 +367,7 @@ func TestRPCClient_SubscribeFilterLogs(t *testing.T) { }) wsURL := server.WSURL() observedLggr, observed := logger.TestObserved(t, zap.DebugLevel) - rpc := client.NewRPCClient(nodePoolCfg, observedLggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(nodePoolCfg, observedLggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) require.NoError(t, rpc.Dial(ctx)) server.Close() _, err := rpc.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, make(chan types.Log)) @@ -384,7 +384,7 @@ func TestRPCClient_SubscribeFilterLogs(t *testing.T) { return resp }) wsURL := server.WSURL() - rpc := client.NewRPCClient(nodePoolCfg, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(nodePoolCfg, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) sub, err := rpc.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, make(chan types.Log)) @@ -438,7 +438,7 @@ func TestRPCClient_LatestFinalizedBlock(t *testing.T) { } server := createRPCServer() - rpc := client.NewRPCClient(nodePoolCfg, lggr, server.URL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(nodePoolCfg, lggr, server.URL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) require.NoError(t, rpc.Dial(ctx)) defer rpc.Close() server.Head = &evmtypes.Head{Number: 128} @@ -502,7 +502,8 @@ func TestRpcClientLargePayloadTimeout(t *testing.T) { { Name: "SendTransaction", Fn: func(ctx context.Context, rpc *client.RPCClient) error { - return rpc.SendTransaction(ctx, types.NewTx(&types.LegacyTx{})) + result := rpc.SendTransaction(ctx, types.NewTx(&types.LegacyTx{})) + return result.TxError() }, }, { @@ -553,7 +554,7 @@ func TestRpcClientLargePayloadTimeout(t *testing.T) { // use something unreasonably large for RPC timeout to ensure that we use largePayloadRPCTimeout const rpcTimeout = time.Hour const largePayloadRPCTimeout = tests.TestInterval - rpc := client.NewRPCClient(nodePoolCfg, logger.Test(t), rpcURL, nil, "rpc", 1, chainId, commonclient.Primary, largePayloadRPCTimeout, rpcTimeout, "") + rpc := client.NewRPCClient(nodePoolCfg, logger.Test(t), rpcURL, nil, "rpc", 1, chainId, commonclient.Primary, largePayloadRPCTimeout, rpcTimeout, "", nil) require.NoError(t, rpc.Dial(ctx)) defer rpc.Close() err := testCase.Fn(ctx, rpc) @@ -598,7 +599,7 @@ func TestAstarCustomFinality(t *testing.T) { const expectedFinalizedBlockNumber = int64(4) const expectedFinalizedBlockHash = "0x7441e97acf83f555e0deefef86db636bc8a37eb84747603412884e4df4d22804" - rpcClient := client.NewRPCClient(nodePoolCfg, logger.Test(t), wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, chaintype.ChainAstar) + rpcClient := client.NewRPCClient(nodePoolCfg, logger.Test(t), wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, chaintype.ChainAstar, nil) defer rpcClient.Close() err := rpcClient.Dial(tests.Context(t)) require.NoError(t, err) From 492e57ff1a22959b539285b49e61930872a533ce Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Thu, 31 Oct 2024 15:07:18 -0400 Subject: [PATCH 02/11] changeset --- .changeset/silver-avocados-buy.md | 5 +++++ core/chains/evm/client/chain_client.go | 1 - 2 files changed, 5 insertions(+), 1 deletion(-) create mode 100644 .changeset/silver-avocados-buy.md diff --git a/.changeset/silver-avocados-buy.md b/.changeset/silver-avocados-buy.md new file mode 100644 index 00000000000..6b636ee267d --- /dev/null +++ b/.changeset/silver-avocados-buy.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Update MultiNode with latest changes and bug fixes. Fixes an issue that caused nodes to go OutOfSync incorrectly, and also fixed context handling for sending transactions. #internal #bugfix diff --git a/core/chains/evm/client/chain_client.go b/core/chains/evm/client/chain_client.go index 43d31d891a9..14892a78744 100644 --- a/core/chains/evm/client/chain_client.go +++ b/core/chains/evm/client/chain_client.go @@ -380,7 +380,6 @@ func (c *chainClient) SendTransaction(ctx context.Context, tx *types.Transaction return err } result = activeRPC.SendTransaction(ctx, tx) - } else { result = c.txSender.SendTransaction(ctx, tx) } From adc9c1d52fc26ea78d354334e80cd4872efc26cd Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Thu, 31 Oct 2024 15:12:59 -0400 Subject: [PATCH 03/11] lint --- common/client/transaction_sender.go | 3 +-- common/client/transaction_sender_test.go | 4 ++-- core/chains/evm/client/chain_client.go | 4 ++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/common/client/transaction_sender.go b/common/client/transaction_sender.go index 5a746e538c0..3b7315c577a 100644 --- a/common/client/transaction_sender.go +++ b/common/client/transaction_sender.go @@ -3,7 +3,6 @@ package client import ( "context" "errors" - "fmt" "math" "slices" "sync" @@ -219,7 +218,7 @@ func aggregateTxResults[RESULT any](resultsByCode sendTxResults[RESULT]) (result return r[0], nil } - criticalErr = fmt.Errorf("expected at least one response on SendTransaction") + criticalErr = errors.New("expected at least one response on SendTransaction") return result, criticalErr } diff --git a/common/client/transaction_sender_test.go b/common/client/transaction_sender_test.go index 2d8e533783c..75bd4999e5b 100644 --- a/common/client/transaction_sender_test.go +++ b/common/client/transaction_sender_test.go @@ -389,9 +389,9 @@ func TestTransactionSender_SendTransaction_aggregateTxResults(t *testing.T) { txResult, err := aggregateTxResults(testCase.ResultsByCode) if !testCase.ExpectedNilResult { if testCase.ExpectedTxResult == "" { - assert.NoError(t, err) + require.NoError(t, err) } else { - assert.EqualError(t, txResult.TxError(), testCase.ExpectedTxResult) + require.EqualError(t, txResult.TxError(), testCase.ExpectedTxResult) } } diff --git a/core/chains/evm/client/chain_client.go b/core/chains/evm/client/chain_client.go index 14892a78744..1db4e2ba7b7 100644 --- a/core/chains/evm/client/chain_client.go +++ b/core/chains/evm/client/chain_client.go @@ -2,7 +2,7 @@ package client import ( "context" - "fmt" + "errors" "math/big" "sync" "time" @@ -384,7 +384,7 @@ func (c *chainClient) SendTransaction(ctx context.Context, tx *types.Transaction result = c.txSender.SendTransaction(ctx, tx) } if result == nil { - return fmt.Errorf("SendTransaction failed: result is nil") + return errors.New("SendTransaction failed: result is nil") } if result.Error() != nil { return result.Error() From 630ea8da90e4995000e6e488ead101a855bb9d0b Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Tue, 5 Nov 2024 09:35:58 -0500 Subject: [PATCH 04/11] defer reportWg --- common/client/transaction_sender.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/client/transaction_sender.go b/common/client/transaction_sender.go index 3b7315c577a..610709189b8 100644 --- a/common/client/transaction_sender.go +++ b/common/client/transaction_sender.go @@ -160,8 +160,8 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ct txSender.wg.Add(1) reportWg.Add(1) go func() { + defer reportWg.Done() txSender.reportSendTxAnomalies(tx, txResultsToReport) - reportWg.Done() }() return txSender.collectTxResults(ctx, tx, healthyNodesNum, txResults) From 7b201524ee6113c4a7657c419392c9ef9168f29e Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Thu, 7 Nov 2024 12:28:47 -0500 Subject: [PATCH 05/11] Use latest changes --- common/client/node_lifecycle.go | 35 ++++---- common/client/transaction_sender.go | 133 ++++++++++++++-------------- 2 files changed, 84 insertions(+), 84 deletions(-) diff --git a/common/client/node_lifecycle.go b/common/client/node_lifecycle.go index 1dd59a0c0c0..3407a432ec6 100644 --- a/common/client/node_lifecycle.go +++ b/common/client/node_lifecycle.go @@ -7,8 +7,6 @@ import ( "math/big" "time" - "github.com/smartcontractkit/chainlink/v2/common/types" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -17,6 +15,7 @@ import ( bigmath "github.com/smartcontractkit/chainlink-common/pkg/utils/big_math" iutils "github.com/smartcontractkit/chainlink/v2/common/internal/utils" + "github.com/smartcontractkit/chainlink/v2/common/types" ) var ( @@ -132,6 +131,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { } } + // Get the latest chain info to use as local highest localHighestChainInfo, _ := n.rpc.GetInterceptedChainInfo() var pollFailures uint32 @@ -170,10 +170,6 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { } if outOfSync, liveNodes := n.isOutOfSyncWithPool(); outOfSync { // note: there must be another live node for us to be out of sync - _, highest := n.poolInfoProvider.LatestChainInfo() - _, latestChainInfo := n.StateAndLatest() - lggr.Errorw("RPC endpoint has fallen behind", "blockNumber", latestChainInfo.BlockNumber, "bestLatestBlockNumber", - highest.BlockNumber, "totalDifficulty", latestChainInfo.TotalDifficulty, "nodeState", n.getCachedState()) if liveNodes < 2 { lggr.Criticalf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState) continue @@ -198,8 +194,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { case <-headsSub.NoNewHeads: // We haven't received a head on the channel for at least the // threshold amount of time, mark it broken - _, latestChainInfo := n.StateAndLatest() - lggr.Errorw(fmt.Sprintf("RPC endpoint detected out of sync; no new heads received for %s (last head received was %v)", noNewHeadsTimeoutThreshold, latestChainInfo.BlockNumber), "nodeState", n.getCachedState(), "latestReceivedBlockNumber", latestChainInfo.BlockNumber, "noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold) + lggr.Errorw(fmt.Sprintf("RPC endpoint detected out of sync; no new heads received for %s (last head received was %v)", noNewHeadsTimeoutThreshold, localHighestChainInfo.BlockNumber), "nodeState", n.getCachedState(), "latestReceivedBlockNumber", localHighestChainInfo.BlockNumber, "noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold) if n.poolInfoProvider != nil { if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 { lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState) @@ -315,7 +310,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) onNewFinalizedHead(lggr logger.SugaredLogger latestFinalizedBN := latestFinalized.BlockNumber() lggr.Debugw("Got latest finalized head", "latestFinalized", latestFinalized) if latestFinalizedBN <= chainInfo.FinalizedBlockNumber { - lggr.Tracew("Ignoring previously seen finalized block number") + lggr.Debugw("Ignoring previously seen finalized block number") return false } @@ -334,7 +329,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) onNewHead(lggr logger.SugaredLogger, chainIn lggr.Debugw("Got head", "head", head) lggr = lggr.With("latestReceivedBlockNumber", chainInfo.BlockNumber, "blockNumber", head.BlockNumber(), "nodeState", n.getCachedState()) if head.BlockNumber() <= chainInfo.BlockNumber { - lggr.Tracew("Ignoring previously seen block number") + lggr.Debugw("Ignoring previously seen block number") return false } @@ -372,17 +367,22 @@ func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool() (outOfSync bool, liveN } // Check against best node ln, ci := n.poolInfoProvider.LatestChainInfo() - _, localChainInfo := n.StateAndLatest() + localChainInfo, _ := n.rpc.GetInterceptedChainInfo() mode := n.nodePoolCfg.SelectionMode() switch mode { case NodeSelectionModeHighestHead, NodeSelectionModeRoundRobin, NodeSelectionModePriorityLevel: - return localChainInfo.BlockNumber < ci.BlockNumber-int64(threshold), ln + outOfSync = localChainInfo.BlockNumber < ci.BlockNumber-int64(threshold) case NodeSelectionModeTotalDifficulty: bigThreshold := big.NewInt(int64(threshold)) - return localChainInfo.TotalDifficulty.Cmp(bigmath.Sub(ci.TotalDifficulty, bigThreshold)) < 0, ln + outOfSync = localChainInfo.TotalDifficulty.Cmp(bigmath.Sub(ci.TotalDifficulty, bigThreshold)) < 0 default: panic("unrecognized NodeSelectionMode: " + mode) } + + if outOfSync && n.getCachedState() == nodeStateAlive { + n.lfcLog.Errorw("RPC endpoint has fallen behind", "blockNumber", localChainInfo.BlockNumber, "bestLatestBlockNumber", ci.BlockNumber, "totalDifficulty", localChainInfo.TotalDifficulty) + } + return outOfSync, ln } // outOfSyncLoop takes an OutOfSync node and waits until isOutOfSync returns false to go back to live status @@ -508,8 +508,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) { continue } - _, latestChainInfo := n.StateAndLatest() - receivedNewHead := n.onNewFinalizedHead(lggr, &latestChainInfo, latestFinalized) + receivedNewHead := n.onNewFinalizedHead(lggr, &localHighestChainInfo, latestFinalized) if !receivedNewHead { continue } @@ -520,7 +519,9 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) { finalizedHeadsSub.ResetTimer(noNewFinalizedBlocksTimeoutThreshold) } - lggr.Debugw(msgReceivedFinalizedBlock, "blockNumber", latestFinalized.BlockNumber(), "syncIssues", syncIssues) + highestSeen := n.poolInfoProvider.HighestUserObservations() + + lggr.Debugw(msgReceivedFinalizedBlock, "blockNumber", latestFinalized.BlockNumber(), "poolHighestBlockNumber", highestSeen.FinalizedBlockNumber, "syncIssues", syncIssues) case err := <-finalizedHeadsSub.Errors: lggr.Errorw("Finalized head subscription was terminated", "err", err) n.declareUnreachable() @@ -653,7 +654,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) syncingLoop() { case nodeStateClosed: return default: - panic(fmt.Sprintf("syncingLoop can only run for node in nodeStateSyncing state, got: %s", state)) + panic(fmt.Sprintf("syncingLoop can only run for node in NodeStateSyncing state, got: %s", state)) } } diff --git a/common/client/transaction_sender.go b/common/client/transaction_sender.go index 610709189b8..b5cc37ae121 100644 --- a/common/client/transaction_sender.go +++ b/common/client/transaction_sender.go @@ -3,6 +3,7 @@ package client import ( "context" "errors" + "fmt" "math" "slices" "sync" @@ -93,90 +94,86 @@ type TransactionSender[TX any, RESULT SendTxResult, CHAIN_ID types.ID, RPC SendT // * If there is both success and terminal error - returns success and reports invariant violation // * Otherwise, returns any (effectively random) of the errors. func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ctx context.Context, tx TX) RESULT { - txResults := make(chan RESULT) - txResultsToReport := make(chan RESULT) - primaryNodeWg := sync.WaitGroup{} - - if txSender.State() != "Started" { - return txSender.newResult(errors.New("TransactionSender not started")) - } + var result RESULT + if !txSender.IfStarted(func() { + txResults := make(chan RESULT) + txResultsToReport := make(chan RESULT) + primaryNodeWg := sync.WaitGroup{} + + healthyNodesNum := 0 + err := txSender.multiNode.DoAll(ctx, func(ctx context.Context, rpc RPC, isSendOnly bool) { + if isSendOnly { + txSender.wg.Add(1) + go func(ctx context.Context) { + ctx, cancel := txSender.chStop.Ctx(context.WithoutCancel(ctx)) + defer cancel() + defer txSender.wg.Done() + // Send-only nodes' results are ignored as they tend to return false-positive responses. + // Broadcast to them is necessary to speed up the propagation of TX in the network. + _ = txSender.broadcastTxAsync(ctx, rpc, tx) + }(ctx) + return + } - // Must wait for reportSendTxAnomalies and collectTxResults to complete before cancelling the context - txSenderCtx, cancel := txSender.chStop.NewCtx() - reportWg := sync.WaitGroup{} - defer func() { + // Primary Nodes + healthyNodesNum++ + primaryNodeWg.Add(1) + go func(ctx context.Context) { + ctx, cancel := txSender.chStop.Ctx(context.WithoutCancel(ctx)) + defer cancel() + defer primaryNodeWg.Done() + r := txSender.broadcastTxAsync(ctx, rpc, tx) + select { + case <-ctx.Done(): + txSender.lggr.Debugw("Failed to send tx results", "err", ctx.Err()) + return + case txResults <- r: + } + + select { + case <-ctx.Done(): + txSender.lggr.Debugw("Failed to send tx results to report", "err", ctx.Err()) + return + case txResultsToReport <- r: + } + }(ctx) + }) + + // This needs to be done in parallel so the reporting knows when it's done (when the channel is closed) + txSender.wg.Add(1) go func() { - reportWg.Wait() - cancel() + defer txSender.wg.Done() + primaryNodeWg.Wait() + close(txResultsToReport) + close(txResults) }() - }() - - healthyNodesNum := 0 - err := txSender.multiNode.DoAll(txSenderCtx, func(ctx context.Context, rpc RPC, isSendOnly bool) { - if isSendOnly { - txSender.wg.Add(1) - go func() { - defer txSender.wg.Done() - // Send-only nodes' results are ignored as they tend to return false-positive responses. - // Broadcast to them is necessary to speed up the propagation of TX in the network. - _ = txSender.broadcastTxAsync(ctx, rpc, tx) - }() + + if err != nil { + result = txSender.newResult(err) return } - // Primary Nodes - healthyNodesNum++ - primaryNodeWg.Add(1) - go func() { - defer primaryNodeWg.Done() - r := txSender.broadcastTxAsync(ctx, rpc, tx) - select { - case <-ctx.Done(): - return - case txResults <- r: - } + txSender.wg.Add(1) + go txSender.reportSendTxAnomalies(ctx, tx, txResultsToReport) - select { - case <-ctx.Done(): - return - case txResultsToReport <- r: - } - }() - }) - - // This needs to be done in parallel so the reporting knows when it's done (when the channel is closed) - txSender.wg.Add(1) - go func() { - defer txSender.wg.Done() - primaryNodeWg.Wait() - close(txResultsToReport) - close(txResults) - }() - - if err != nil { - return txSender.newResult(err) + result = txSender.collectTxResults(ctx, tx, healthyNodesNum, txResults) + }) { + result = txSender.newResult(errors.New("TransactionSender not started")) } - txSender.wg.Add(1) - reportWg.Add(1) - go func() { - defer reportWg.Done() - txSender.reportSendTxAnomalies(tx, txResultsToReport) - }() - - return txSender.collectTxResults(ctx, tx, healthyNodesNum, txResults) + return result } func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) broadcastTxAsync(ctx context.Context, rpc RPC, tx TX) RESULT { result := rpc.SendTransaction(ctx, tx) txSender.lggr.Debugw("Node sent transaction", "tx", tx, "err", result.TxError()) - if !slices.Contains(sendTxSuccessfulCodes, result.Code()) { + if !slices.Contains(sendTxSuccessfulCodes, result.Code()) && ctx.Err() == nil { txSender.lggr.Warnw("RPC returned error", "tx", tx, "err", result.TxError()) } return result } -func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) reportSendTxAnomalies(tx TX, txResults <-chan RESULT) { +func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) reportSendTxAnomalies(ctx context.Context, tx TX, txResults <-chan RESULT) { defer txSender.wg.Done() resultsByCode := sendTxResults[RESULT]{} // txResults eventually will be closed @@ -185,7 +182,7 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) reportSendTxAnomal } _, criticalErr := aggregateTxResults[RESULT](resultsByCode) - if criticalErr != nil { + if criticalErr != nil && ctx.Err() == nil { txSender.lggr.Criticalw("observed invariant violation on SendTransaction", "tx", tx, "resultsByCode", resultsByCode, "err", criticalErr) PromMultiNodeInvariantViolations.WithLabelValues(txSender.chainFamily, txSender.chainID.String(), criticalErr.Error()).Inc() } @@ -218,7 +215,7 @@ func aggregateTxResults[RESULT any](resultsByCode sendTxResults[RESULT]) (result return r[0], nil } - criticalErr = errors.New("expected at least one response on SendTransaction") + criticalErr = fmt.Errorf("expected at least one response on SendTransaction") return result, criticalErr } @@ -258,6 +255,7 @@ loop: // ignore critical error as it's reported in reportSendTxAnomalies result, _ := aggregateTxResults(errorsByCode) + txSender.lggr.Debugw("Collected results", "errorsByCode", errorsByCode, "result", result) return result } @@ -269,6 +267,7 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Start(ctx context. func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Close() error { return txSender.StopOnce("TransactionSender", func() error { + txSender.lggr.Debug("Closing TransactionSender") close(txSender.chStop) txSender.wg.Wait() return nil From cd49ba250275b90850cde261d89afbdfd4dc10d8 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Thu, 7 Nov 2024 12:46:36 -0500 Subject: [PATCH 06/11] Update node_lifecycle.go --- common/client/node_lifecycle.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/common/client/node_lifecycle.go b/common/client/node_lifecycle.go index 3407a432ec6..6ec6a598eb2 100644 --- a/common/client/node_lifecycle.go +++ b/common/client/node_lifecycle.go @@ -519,7 +519,10 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) { finalizedHeadsSub.ResetTimer(noNewFinalizedBlocksTimeoutThreshold) } - highestSeen := n.poolInfoProvider.HighestUserObservations() + var highestSeen ChainInfo + if n.poolInfoProvider != nil { + highestSeen = n.poolInfoProvider.HighestUserObservations() + } lggr.Debugw(msgReceivedFinalizedBlock, "blockNumber", latestFinalized.BlockNumber(), "poolHighestBlockNumber", highestSeen.FinalizedBlockNumber, "syncIssues", syncIssues) case err := <-finalizedHeadsSub.Errors: From c77b3b5b635f7c6194b424906cbe006a01966c41 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Thu, 7 Nov 2024 12:57:20 -0500 Subject: [PATCH 07/11] lint --- common/client/transaction_sender.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/common/client/transaction_sender.go b/common/client/transaction_sender.go index b5cc37ae121..f9211812af1 100644 --- a/common/client/transaction_sender.go +++ b/common/client/transaction_sender.go @@ -3,7 +3,6 @@ package client import ( "context" "errors" - "fmt" "math" "slices" "sync" @@ -215,7 +214,7 @@ func aggregateTxResults[RESULT any](resultsByCode sendTxResults[RESULT]) (result return r[0], nil } - criticalErr = fmt.Errorf("expected at least one response on SendTransaction") + criticalErr = errors.New("expected at least one response on SendTransaction") return result, criticalErr } From fffd8334041bb21f11f9f6452265b34126b13e90 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Wed, 13 Nov 2024 11:17:54 -0500 Subject: [PATCH 08/11] Remove TxError --- common/client/transaction_sender.go | 5 ++--- core/chains/evm/client/chain_client.go | 5 +---- core/chains/evm/client/rpc_client.go | 17 +++++------------ 3 files changed, 8 insertions(+), 19 deletions(-) diff --git a/common/client/transaction_sender.go b/common/client/transaction_sender.go index f9211812af1..cd2ce96c5b2 100644 --- a/common/client/transaction_sender.go +++ b/common/client/transaction_sender.go @@ -27,7 +27,6 @@ var ( type SendTxResult interface { Code() SendTxReturnCode - TxError() error Error() error } @@ -165,9 +164,9 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ct func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) broadcastTxAsync(ctx context.Context, rpc RPC, tx TX) RESULT { result := rpc.SendTransaction(ctx, tx) - txSender.lggr.Debugw("Node sent transaction", "tx", tx, "err", result.TxError()) + txSender.lggr.Debugw("Node sent transaction", "tx", tx, "err", result.Error()) if !slices.Contains(sendTxSuccessfulCodes, result.Code()) && ctx.Err() == nil { - txSender.lggr.Warnw("RPC returned error", "tx", tx, "err", result.TxError()) + txSender.lggr.Warnw("RPC returned error", "tx", tx, "err", result.Error()) } return result } diff --git a/core/chains/evm/client/chain_client.go b/core/chains/evm/client/chain_client.go index 1db4e2ba7b7..23edb3d3f9e 100644 --- a/core/chains/evm/client/chain_client.go +++ b/core/chains/evm/client/chain_client.go @@ -386,10 +386,7 @@ func (c *chainClient) SendTransaction(ctx context.Context, tx *types.Transaction if result == nil { return errors.New("SendTransaction failed: result is nil") } - if result.Error() != nil { - return result.Error() - } - return result.TxError() + return result.Error() } func (c *chainClient) SendTransactionReturnCode(ctx context.Context, tx *types.Transaction, fromAddress common.Address) (commonclient.SendTxReturnCode, error) { diff --git a/core/chains/evm/client/rpc_client.go b/core/chains/evm/client/rpc_client.go index 0a1c50e7c81..fbf4026c793 100644 --- a/core/chains/evm/client/rpc_client.go +++ b/core/chains/evm/client/rpc_client.go @@ -806,17 +806,15 @@ func (r *RPCClient) BlockByNumberGeth(ctx context.Context, number *big.Int) (blo } type SendTxResult struct { - err error - txErr error - code commonclient.SendTxReturnCode + err error + code commonclient.SendTxReturnCode } var _ commonclient.SendTxResult = (*SendTxResult)(nil) func NewSendTxResult(err error) *SendTxResult { result := &SendTxResult{ - err: err, - txErr: err, + err: err, } return result } @@ -825,10 +823,6 @@ func (r *SendTxResult) Error() error { return r.err } -func (r *SendTxResult) TxError() error { - return r.txErr -} - func (r *SendTxResult) Code() commonclient.SendTxReturnCode { return r.code } @@ -851,9 +845,8 @@ func (r *RPCClient) SendTransaction(ctx context.Context, tx *types.Transaction) r.logResult(lggr, err, duration, r.getRPCDomain(), "SendTransaction") return &SendTxResult{ - err: nil, - txErr: err, - code: ClassifySendError(err, r.clientErrors, logger.Sugared(logger.Nop()), tx, common.Address{}, r.chainType.IsL2()), + err: err, + code: ClassifySendError(err, r.clientErrors, logger.Sugared(logger.Nop()), tx, common.Address{}, r.chainType.IsL2()), } } From 9ae86dcf7fcd530fb72205bf575514820b1953f8 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Wed, 13 Nov 2024 11:50:56 -0500 Subject: [PATCH 09/11] Use cfg clientErrors --- common/client/transaction_sender_test.go | 29 ++++++++---------- core/chains/evm/client/evm_client.go | 4 +-- core/chains/evm/client/helpers_test.go | 4 +-- core/chains/evm/client/rpc_client.go | 3 +- core/chains/evm/client/rpc_client_test.go | 36 +++++++++++------------ 5 files changed, 36 insertions(+), 40 deletions(-) diff --git a/common/client/transaction_sender_test.go b/common/client/transaction_sender_test.go index 75bd4999e5b..6db9719f9de 100644 --- a/common/client/transaction_sender_test.go +++ b/common/client/transaction_sender_test.go @@ -26,17 +26,16 @@ type sendTxRPC struct { } type sendTxResult struct { - err error - txErr error - code SendTxReturnCode + err error + code SendTxReturnCode } +/* var _ SendTxResult = (*sendTxResult)(nil) func NewSendTxResult(err error) *sendTxResult { result := &sendTxResult{ - err: err, - txErr: err, + err: err, } return result } @@ -45,14 +44,12 @@ func (r *sendTxResult) Error() error { return r.err } -func (r *sendTxResult) TxError() error { - return r.txErr -} - func (r *sendTxResult) Code() SendTxReturnCode { return r.code } +*/ + var _ SendTxRPCClient[any, *sendTxResult] = (*sendTxRPC)(nil) func newSendTxRPC(sendTxErr error, sendTxRun func(args mock.Arguments)) *sendTxRPC { @@ -63,7 +60,7 @@ func (rpc *sendTxRPC) SendTransaction(ctx context.Context, _ any) *sendTxResult if rpc.sendTxRun != nil { rpc.sendTxRun(mock.Arguments{ctx}) } - return &sendTxResult{err: rpc.sendTxErr, txErr: rpc.sendTxErr, code: classifySendTxError(nil, rpc.sendTxErr)} + return &sendTxResult{err: rpc.sendTxErr, code: classifySendTxError(nil, rpc.sendTxErr)} } func newTestTransactionSender(t *testing.T, chainID types.ID, lggr logger.Logger, @@ -135,7 +132,7 @@ func TestTransactionSender_SendTransaction(t *testing.T) { []SendOnlyNode[types.ID, SendTxRPCClient[any, *sendTxResult]]{newNode(t, errors.New("unexpected error"), nil)}) result := txSender.SendTransaction(tests.Context(t), nil) - require.ErrorIs(t, result.TxError(), expectedError) + require.ErrorIs(t, result.Error(), expectedError) require.Equal(t, Fatal, result.Code()) tests.AssertLogCountEventually(t, observedLogs, "Node sent transaction", 2) tests.AssertLogCountEventually(t, observedLogs, "RPC returned error", 2) @@ -150,7 +147,7 @@ func TestTransactionSender_SendTransaction(t *testing.T) { []SendOnlyNode[types.ID, SendTxRPCClient[any, *sendTxResult]]{newNode(t, errors.New("unexpected error"), nil)}) result := txSender.SendTransaction(tests.Context(t), nil) - require.NoError(t, result.TxError()) + require.NoError(t, result.Error()) require.Equal(t, Successful, result.Code()) tests.AssertLogCountEventually(t, observedLogs, "Node sent transaction", 2) tests.AssertLogCountEventually(t, observedLogs, "RPC returned error", 1) @@ -173,7 +170,7 @@ func TestTransactionSender_SendTransaction(t *testing.T) { requestContext, cancel := context.WithCancel(tests.Context(t)) cancel() result := txSender.SendTransaction(requestContext, nil) - require.EqualError(t, result.TxError(), "context canceled") + require.EqualError(t, result.Error(), "context canceled") }) t.Run("Soft timeout stops results collection", func(t *testing.T) { @@ -193,7 +190,7 @@ func TestTransactionSender_SendTransaction(t *testing.T) { _, txSender := newTestTransactionSender(t, chainID, lggr, []Node[types.ID, SendTxRPCClient[any, *sendTxResult]]{fastNode, slowNode}, nil) result := txSender.SendTransaction(tests.Context(t), nil) - require.EqualError(t, result.TxError(), expectedError.Error()) + require.EqualError(t, result.Error(), expectedError.Error()) }) t.Run("Returns success without waiting for the rest of the nodes", func(t *testing.T) { chainID := types.RandomID() @@ -281,7 +278,7 @@ func TestTransactionSender_SendTransaction(t *testing.T) { []SendOnlyNode[types.ID, SendTxRPCClient[any, *sendTxResult]]{sendOnly}) result := txSender.SendTransaction(tests.Context(t), nil) - assert.EqualError(t, result.TxError(), ErroringNodeError.Error()) + assert.EqualError(t, result.Error(), ErroringNodeError.Error()) }) t.Run("Transaction success even if one of the nodes is unhealthy", func(t *testing.T) { @@ -391,7 +388,7 @@ func TestTransactionSender_SendTransaction_aggregateTxResults(t *testing.T) { if testCase.ExpectedTxResult == "" { require.NoError(t, err) } else { - require.EqualError(t, txResult.TxError(), testCase.ExpectedTxResult) + require.EqualError(t, txResult.Error(), testCase.ExpectedTxResult) } } diff --git a/core/chains/evm/client/evm_client.go b/core/chains/evm/client/evm_client.go index 1f81981cca4..18206265fd7 100644 --- a/core/chains/evm/client/evm_client.go +++ b/core/chains/evm/client/evm_client.go @@ -21,13 +21,13 @@ func NewEvmClient(cfg evmconfig.NodePool, chainCfg commonclient.ChainConfig, cli for i, node := range nodes { if node.SendOnly != nil && *node.SendOnly { rpc := NewRPCClient(cfg, lggr, nil, node.HTTPURL.URL(), *node.Name, i, chainID, - commonclient.Secondary, largePayloadRPCTimeout, defaultRPCTimeout, chainType, clientErrors) + commonclient.Secondary, largePayloadRPCTimeout, defaultRPCTimeout, chainType) sendonly := commonclient.NewSendOnlyNode(lggr, (url.URL)(*node.HTTPURL), *node.Name, chainID, rpc) sendonlys = append(sendonlys, sendonly) } else { rpc := NewRPCClient(cfg, lggr, node.WSURL.URL(), node.HTTPURL.URL(), *node.Name, i, - chainID, commonclient.Primary, largePayloadRPCTimeout, defaultRPCTimeout, chainType, clientErrors) + chainID, commonclient.Primary, largePayloadRPCTimeout, defaultRPCTimeout, chainType) primaryNode := commonclient.NewNode(cfg, chainCfg, lggr, node.WSURL.URL(), node.HTTPURL.URL(), *node.Name, i, chainID, *node.Order, rpc, "EVM") diff --git a/core/chains/evm/client/helpers_test.go b/core/chains/evm/client/helpers_test.go index 5d793e46765..f9751be765c 100644 --- a/core/chains/evm/client/helpers_test.go +++ b/core/chains/evm/client/helpers_test.go @@ -150,7 +150,7 @@ func NewChainClientWithTestNode( nodePoolCfg := TestNodePoolConfig{ NodeFinalizedBlockPollInterval: 1 * time.Second, } - rpc := NewRPCClient(nodePoolCfg, lggr, parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) + rpc := NewRPCClient(nodePoolCfg, lggr, parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") n := commonclient.NewNode[*big.Int, *evmtypes.Head, *RPCClient]( nodeCfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, parsed, rpcHTTPURL, "eth-primary-node-0", id, chainID, 1, rpc, "EVM") @@ -161,7 +161,7 @@ func NewChainClientWithTestNode( if u.Scheme != "http" && u.Scheme != "https" { return nil, pkgerrors.Errorf("sendonly ethereum rpc url scheme must be http(s): %s", u.String()) } - rpc := NewRPCClient(nodePoolCfg, lggr, nil, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) + rpc := NewRPCClient(nodePoolCfg, lggr, nil, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") s := commonclient.NewSendOnlyNode[*big.Int, *RPCClient]( lggr, u, fmt.Sprintf("eth-sendonly-%d", i), chainID, rpc) sendonlys = append(sendonlys, s) diff --git a/core/chains/evm/client/rpc_client.go b/core/chains/evm/client/rpc_client.go index fbf4026c793..e67c1101381 100644 --- a/core/chains/evm/client/rpc_client.go +++ b/core/chains/evm/client/rpc_client.go @@ -137,13 +137,12 @@ func NewRPCClient( largePayloadRPCTimeout time.Duration, rpcTimeout time.Duration, chainType chaintype.ChainType, - clientErrors config.ClientErrors, ) *RPCClient { r := &RPCClient{ largePayloadRPCTimeout: largePayloadRPCTimeout, rpcTimeout: rpcTimeout, chainType: chainType, - clientErrors: clientErrors, + clientErrors: cfg.Errors(), } r.cfg = cfg r.name = name diff --git a/core/chains/evm/client/rpc_client_test.go b/core/chains/evm/client/rpc_client_test.go index 1f95417e3db..b91add1d2b1 100644 --- a/core/chains/evm/client/rpc_client_test.go +++ b/core/chains/evm/client/rpc_client_test.go @@ -87,7 +87,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { t.Run("WS and HTTP URL cannot be both empty", func(t *testing.T) { // ws is optional when LogBroadcaster is disabled, however SubscribeFilterLogs will return error if ws is missing observedLggr, _ := logger.TestObserved(t, zap.DebugLevel) - rpcClient := client.NewRPCClient(nodePoolCfgHeadPolling, observedLggr, nil, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) + rpcClient := client.NewRPCClient(nodePoolCfgHeadPolling, observedLggr, nil, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") require.Equal(t, errors.New("cannot dial rpc client when both ws and http info are missing"), rpcClient.Dial(ctx)) }) @@ -95,7 +95,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(nodePoolCfgWSSub, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) + rpc := client.NewRPCClient(nodePoolCfgWSSub, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) // set to default values @@ -144,7 +144,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(nodePoolCfgWSSub, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) + rpc := client.NewRPCClient(nodePoolCfgWSSub, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) @@ -187,7 +187,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { } server := createRPCServer() - rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, server.URL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) + rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, server.URL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) latest, highestUserObservations := rpc.GetInterceptedChainInfo() @@ -226,7 +226,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(nodePoolCfgWSSub, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) + rpc := client.NewRPCClient(nodePoolCfgWSSub, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) var wg sync.WaitGroup @@ -249,7 +249,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { t.Run("Block's chain ID matched configured", func(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(nodePoolCfgWSSub, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) + rpc := client.NewRPCClient(nodePoolCfgWSSub, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) ch, sub, err := rpc.SubscribeToHeads(tests.Context(t)) @@ -265,7 +265,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { }) wsURL := server.WSURL() observedLggr, observed := logger.TestObserved(t, zap.DebugLevel) - rpc := client.NewRPCClient(nodePoolCfgWSSub, observedLggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) + rpc := client.NewRPCClient(nodePoolCfgWSSub, observedLggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") require.NoError(t, rpc.Dial(ctx)) server.Close() _, _, err := rpc.SubscribeToHeads(ctx) @@ -275,7 +275,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { t.Run("Closed rpc client should remove existing SubscribeToHeads subscription with WS", func(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(nodePoolCfgWSSub, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) + rpc := client.NewRPCClient(nodePoolCfgWSSub, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) @@ -287,7 +287,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) + rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) @@ -299,7 +299,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) + rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) @@ -310,7 +310,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { t.Run("Subscription error is properly wrapper", func(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(nodePoolCfgWSSub, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) + rpc := client.NewRPCClient(nodePoolCfgWSSub, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) _, sub, err := rpc.SubscribeToHeads(ctx) @@ -340,7 +340,7 @@ func TestRPCClient_SubscribeFilterLogs(t *testing.T) { t.Run("Failed SubscribeFilterLogs when WSURL is empty", func(t *testing.T) { // ws is optional when LogBroadcaster is disabled, however SubscribeFilterLogs will return error if ws is missing observedLggr, _ := logger.TestObserved(t, zap.DebugLevel) - rpcClient := client.NewRPCClient(nodePoolCfg, observedLggr, nil, &url.URL{}, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) + rpcClient := client.NewRPCClient(nodePoolCfg, observedLggr, nil, &url.URL{}, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") require.Nil(t, rpcClient.Dial(ctx)) _, err := rpcClient.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, make(chan types.Log)) @@ -352,7 +352,7 @@ func TestRPCClient_SubscribeFilterLogs(t *testing.T) { }) wsURL := server.WSURL() observedLggr, observed := logger.TestObserved(t, zap.DebugLevel) - rpc := client.NewRPCClient(nodePoolCfg, observedLggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) + rpc := client.NewRPCClient(nodePoolCfg, observedLggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") require.NoError(t, rpc.Dial(ctx)) server.Close() _, err := rpc.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, make(chan types.Log)) @@ -369,7 +369,7 @@ func TestRPCClient_SubscribeFilterLogs(t *testing.T) { return resp }) wsURL := server.WSURL() - rpc := client.NewRPCClient(nodePoolCfg, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) + rpc := client.NewRPCClient(nodePoolCfg, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) sub, err := rpc.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, make(chan types.Log)) @@ -423,7 +423,7 @@ func TestRPCClient_LatestFinalizedBlock(t *testing.T) { } server := createRPCServer() - rpc := client.NewRPCClient(nodePoolCfg, lggr, server.URL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "", nil) + rpc := client.NewRPCClient(nodePoolCfg, lggr, server.URL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") require.NoError(t, rpc.Dial(ctx)) defer rpc.Close() server.Head = &evmtypes.Head{Number: 128} @@ -519,7 +519,7 @@ func TestRpcClientLargePayloadTimeout(t *testing.T) { Name: "SendTransaction", Fn: func(ctx context.Context, rpc *client.RPCClient) error { result := rpc.SendTransaction(ctx, types.NewTx(&types.LegacyTx{})) - return result.TxError() + return result.Error() }, }, { @@ -570,7 +570,7 @@ func TestRpcClientLargePayloadTimeout(t *testing.T) { // use something unreasonably large for RPC timeout to ensure that we use largePayloadRPCTimeout const rpcTimeout = time.Hour const largePayloadRPCTimeout = tests.TestInterval - rpc := client.NewRPCClient(nodePoolCfg, logger.Test(t), rpcURL, nil, "rpc", 1, chainId, commonclient.Primary, largePayloadRPCTimeout, rpcTimeout, "", nil) + rpc := client.NewRPCClient(nodePoolCfg, logger.Test(t), rpcURL, nil, "rpc", 1, chainId, commonclient.Primary, largePayloadRPCTimeout, rpcTimeout, "") require.NoError(t, rpc.Dial(ctx)) defer rpc.Close() err := testCase.Fn(ctx, rpc) @@ -615,7 +615,7 @@ func TestAstarCustomFinality(t *testing.T) { const expectedFinalizedBlockNumber = int64(4) const expectedFinalizedBlockHash = "0x7441e97acf83f555e0deefef86db636bc8a37eb84747603412884e4df4d22804" - rpcClient := client.NewRPCClient(nodePoolCfg, logger.Test(t), wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, chaintype.ChainAstar, nil) + rpcClient := client.NewRPCClient(nodePoolCfg, logger.Test(t), wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, chaintype.ChainAstar) defer rpcClient.Close() err := rpcClient.Dial(tests.Context(t)) require.NoError(t, err) From 94e77fcc53889060733aaea4fc7c168de0cbfb3a Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Wed, 13 Nov 2024 12:35:38 -0500 Subject: [PATCH 10/11] Update transaction_sender_test.go --- common/client/transaction_sender_test.go | 70 +++++++++++------------- 1 file changed, 31 insertions(+), 39 deletions(-) diff --git a/common/client/transaction_sender_test.go b/common/client/transaction_sender_test.go index 6db9719f9de..b545720f224 100644 --- a/common/client/transaction_sender_test.go +++ b/common/client/transaction_sender_test.go @@ -16,8 +16,10 @@ import ( "github.com/smartcontractkit/chainlink/v2/common/types" ) +type TestSendTxRPCClient SendTxRPCClient[any, *sendTxResult] + type sendTxMultiNode struct { - *MultiNode[types.ID, SendTxRPCClient[any, *sendTxResult]] + *MultiNode[types.ID, TestSendTxRPCClient] } type sendTxRPC struct { @@ -30,7 +32,6 @@ type sendTxResult struct { code SendTxReturnCode } -/* var _ SendTxResult = (*sendTxResult)(nil) func NewSendTxResult(err error) *sendTxResult { @@ -48,9 +49,7 @@ func (r *sendTxResult) Code() SendTxReturnCode { return r.code } -*/ - -var _ SendTxRPCClient[any, *sendTxResult] = (*sendTxRPC)(nil) +var _ TestSendTxRPCClient = (*sendTxRPC)(nil) func newSendTxRPC(sendTxErr error, sendTxRun func(args mock.Arguments)) *sendTxRPC { return &sendTxRPC{sendTxErr: sendTxErr, sendTxRun: sendTxRun} @@ -64,15 +63,15 @@ func (rpc *sendTxRPC) SendTransaction(ctx context.Context, _ any) *sendTxResult } func newTestTransactionSender(t *testing.T, chainID types.ID, lggr logger.Logger, - nodes []Node[types.ID, SendTxRPCClient[any, *sendTxResult]], - sendOnlyNodes []SendOnlyNode[types.ID, SendTxRPCClient[any, *sendTxResult]], -) (*sendTxMultiNode, *TransactionSender[any, *sendTxResult, types.ID, SendTxRPCClient[any, *sendTxResult]]) { - mn := sendTxMultiNode{NewMultiNode[types.ID, SendTxRPCClient[any, *sendTxResult]]( + nodes []Node[types.ID, TestSendTxRPCClient], + sendOnlyNodes []SendOnlyNode[types.ID, TestSendTxRPCClient], +) (*sendTxMultiNode, *TransactionSender[any, *sendTxResult, types.ID, TestSendTxRPCClient]) { + mn := sendTxMultiNode{NewMultiNode[types.ID, TestSendTxRPCClient]( lggr, NodeSelectionModeRoundRobin, 0, nodes, sendOnlyNodes, chainID, "chainFamily", 0)} err := mn.StartOnce("startedTestMultiNode", func() error { return nil }) require.NoError(t, err) - txSender := NewTransactionSender[any, *sendTxResult, types.ID, SendTxRPCClient[any, *sendTxResult]](lggr, chainID, mn.chainFamily, mn.MultiNode, NewSendTxResult, tests.TestInterval) + txSender := NewTransactionSender[any, *sendTxResult, types.ID, TestSendTxRPCClient](lggr, chainID, mn.chainFamily, mn.MultiNode, NewSendTxResult, tests.TestInterval) err = txSender.Start(tests.Context(t)) require.NoError(t, err) @@ -101,9 +100,9 @@ func classifySendTxError(_ any, err error) SendTxReturnCode { func TestTransactionSender_SendTransaction(t *testing.T) { t.Parallel() - newNodeWithState := func(t *testing.T, state nodeState, txErr error, sendTxRun func(args mock.Arguments)) *mockNode[types.ID, SendTxRPCClient[any, *sendTxResult]] { + newNodeWithState := func(t *testing.T, state nodeState, txErr error, sendTxRun func(args mock.Arguments)) *mockNode[types.ID, TestSendTxRPCClient] { rpc := newSendTxRPC(txErr, sendTxRun) - node := newMockNode[types.ID, SendTxRPCClient[any, *sendTxResult]](t) + node := newMockNode[types.ID, TestSendTxRPCClient](t) node.On("String").Return("node name").Maybe() node.On("RPC").Return(rpc).Maybe() node.On("State").Return(state).Maybe() @@ -111,7 +110,7 @@ func TestTransactionSender_SendTransaction(t *testing.T) { return node } - newNode := func(t *testing.T, txErr error, sendTxRun func(args mock.Arguments)) *mockNode[types.ID, SendTxRPCClient[any, *sendTxResult]] { + newNode := func(t *testing.T, txErr error, sendTxRun func(args mock.Arguments)) *mockNode[types.ID, TestSendTxRPCClient] { return newNodeWithState(t, nodeStateAlive, txErr, sendTxRun) } @@ -128,8 +127,8 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) _, txSender := newTestTransactionSender(t, types.RandomID(), lggr, - []Node[types.ID, SendTxRPCClient[any, *sendTxResult]]{mainNode}, - []SendOnlyNode[types.ID, SendTxRPCClient[any, *sendTxResult]]{newNode(t, errors.New("unexpected error"), nil)}) + []Node[types.ID, TestSendTxRPCClient]{mainNode}, + []SendOnlyNode[types.ID, TestSendTxRPCClient]{newNode(t, errors.New("unexpected error"), nil)}) result := txSender.SendTransaction(tests.Context(t), nil) require.ErrorIs(t, result.Error(), expectedError) @@ -143,8 +142,8 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) _, txSender := newTestTransactionSender(t, types.RandomID(), lggr, - []Node[types.ID, SendTxRPCClient[any, *sendTxResult]]{mainNode}, - []SendOnlyNode[types.ID, SendTxRPCClient[any, *sendTxResult]]{newNode(t, errors.New("unexpected error"), nil)}) + []Node[types.ID, TestSendTxRPCClient]{mainNode}, + []SendOnlyNode[types.ID, TestSendTxRPCClient]{newNode(t, errors.New("unexpected error"), nil)}) result := txSender.SendTransaction(tests.Context(t), nil) require.NoError(t, result.Error()) @@ -165,7 +164,7 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr, _ := logger.TestObserved(t, zap.DebugLevel) _, txSender := newTestTransactionSender(t, types.RandomID(), lggr, - []Node[types.ID, SendTxRPCClient[any, *sendTxResult]]{mainNode}, nil) + []Node[types.ID, TestSendTxRPCClient]{mainNode}, nil) requestContext, cancel := context.WithCancel(tests.Context(t)) cancel() @@ -188,7 +187,7 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr, _ := logger.TestObserved(t, zap.DebugLevel) - _, txSender := newTestTransactionSender(t, chainID, lggr, []Node[types.ID, SendTxRPCClient[any, *sendTxResult]]{fastNode, slowNode}, nil) + _, txSender := newTestTransactionSender(t, chainID, lggr, []Node[types.ID, TestSendTxRPCClient]{fastNode, slowNode}, nil) result := txSender.SendTransaction(tests.Context(t), nil) require.EqualError(t, result.Error(), expectedError.Error()) }) @@ -208,8 +207,8 @@ func TestTransactionSender_SendTransaction(t *testing.T) { }) lggr, _ := logger.TestObserved(t, zap.WarnLevel) mn, txSender := newTestTransactionSender(t, chainID, lggr, - []Node[types.ID, SendTxRPCClient[any, *sendTxResult]]{fastNode, slowNode}, - []SendOnlyNode[types.ID, SendTxRPCClient[any, *sendTxResult]]{slowSendOnly}) + []Node[types.ID, TestSendTxRPCClient]{fastNode, slowNode}, + []SendOnlyNode[types.ID, TestSendTxRPCClient]{slowSendOnly}) result := txSender.SendTransaction(tests.Context(t), nil) require.NoError(t, result.Error()) @@ -234,8 +233,8 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr, _ := logger.TestObserved(t, zap.DebugLevel) mn, txSender := newTestTransactionSender(t, chainID, lggr, - []Node[types.ID, SendTxRPCClient[any, *sendTxResult]]{fastNode, slowNode}, - []SendOnlyNode[types.ID, SendTxRPCClient[any, *sendTxResult]]{slowSendOnly}) + []Node[types.ID, TestSendTxRPCClient]{fastNode, slowNode}, + []SendOnlyNode[types.ID, TestSendTxRPCClient]{slowSendOnly}) require.NoError(t, mn.Close()) result := txSender.SendTransaction(tests.Context(t), nil) @@ -259,8 +258,8 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr, _ := logger.TestObserved(t, zap.DebugLevel) _, txSender := newTestTransactionSender(t, chainID, lggr, - []Node[types.ID, SendTxRPCClient[any, *sendTxResult]]{fastNode, slowNode}, - []SendOnlyNode[types.ID, SendTxRPCClient[any, *sendTxResult]]{slowSendOnly}) + []Node[types.ID, TestSendTxRPCClient]{fastNode, slowNode}, + []SendOnlyNode[types.ID, TestSendTxRPCClient]{slowSendOnly}) require.NoError(t, txSender.Close()) result := txSender.SendTransaction(tests.Context(t), nil) @@ -274,8 +273,8 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr, _ := logger.TestObserved(t, zap.DebugLevel) _, txSender := newTestTransactionSender(t, chainID, lggr, - []Node[types.ID, SendTxRPCClient[any, *sendTxResult]]{primary}, - []SendOnlyNode[types.ID, SendTxRPCClient[any, *sendTxResult]]{sendOnly}) + []Node[types.ID, TestSendTxRPCClient]{primary}, + []SendOnlyNode[types.ID, TestSendTxRPCClient]{sendOnly}) result := txSender.SendTransaction(tests.Context(t), nil) assert.EqualError(t, result.Error(), ErroringNodeError.Error()) @@ -293,8 +292,8 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr, _ := logger.TestObserved(t, zap.DebugLevel) _, txSender := newTestTransactionSender(t, chainID, lggr, - []Node[types.ID, SendTxRPCClient[any, *sendTxResult]]{mainNode, unhealthyNode}, - []SendOnlyNode[types.ID, SendTxRPCClient[any, *sendTxResult]]{unhealthySendOnlyNode}) + []Node[types.ID, TestSendTxRPCClient]{mainNode, unhealthyNode}, + []SendOnlyNode[types.ID, TestSendTxRPCClient]{unhealthySendOnlyNode}) result := txSender.SendTransaction(tests.Context(t), nil) require.NoError(t, result.Error()) @@ -313,7 +312,6 @@ func TestTransactionSender_SendTransaction_aggregateTxResults(t *testing.T) { testCases := []struct { Name string ExpectedTxResult string - ExpectedNilResult bool ExpectedCriticalErr string ResultsByCode sendTxResults[*sendTxResult] }{ @@ -354,16 +352,14 @@ func TestTransactionSender_SendTransaction_aggregateTxResults(t *testing.T) { }, { Name: "Insufficient funds is treated as error", - ExpectedTxResult: "", + ExpectedTxResult: "insufficientFunds", ExpectedCriticalErr: "", ResultsByCode: sendTxResults[*sendTxResult]{ - Successful: {NewSendTxResult(nil)}, InsufficientFunds: {NewSendTxResult(errors.New("insufficientFunds"))}, }, }, { Name: "Logs critical error on empty ResultsByCode", - ExpectedNilResult: true, ExpectedCriticalErr: "expected at least one response on SendTransaction", ResultsByCode: sendTxResults[*sendTxResult]{}, }, @@ -384,12 +380,8 @@ func TestTransactionSender_SendTransaction_aggregateTxResults(t *testing.T) { t.Run(testCase.Name, func(t *testing.T) { txResult, err := aggregateTxResults(testCase.ResultsByCode) - if !testCase.ExpectedNilResult { - if testCase.ExpectedTxResult == "" { - require.NoError(t, err) - } else { - require.EqualError(t, txResult.Error(), testCase.ExpectedTxResult) - } + if testCase.ExpectedTxResult != "" { + assert.EqualError(t, txResult.Error(), testCase.ExpectedTxResult) } logger.Sugared(logger.Test(t)).Info("Map: " + fmt.Sprint(testCase.ResultsByCode)) From 8a5d75cd7a3ac4ed6b53e490a95c2aad27a1879d Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Wed, 13 Nov 2024 12:45:32 -0500 Subject: [PATCH 11/11] Use require --- common/client/transaction_sender_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/common/client/transaction_sender_test.go b/common/client/transaction_sender_test.go index b545720f224..c7f3affc67a 100644 --- a/common/client/transaction_sender_test.go +++ b/common/client/transaction_sender_test.go @@ -381,16 +381,16 @@ func TestTransactionSender_SendTransaction_aggregateTxResults(t *testing.T) { t.Run(testCase.Name, func(t *testing.T) { txResult, err := aggregateTxResults(testCase.ResultsByCode) if testCase.ExpectedTxResult != "" { - assert.EqualError(t, txResult.Error(), testCase.ExpectedTxResult) + require.EqualError(t, txResult.Error(), testCase.ExpectedTxResult) } logger.Sugared(logger.Test(t)).Info("Map: " + fmt.Sprint(testCase.ResultsByCode)) logger.Sugared(logger.Test(t)).Criticalw("observed invariant violation on SendTransaction", "resultsByCode", testCase.ResultsByCode, "err", err) if testCase.ExpectedCriticalErr == "" { - assert.NoError(t, err) + require.NoError(t, err) } else { - assert.EqualError(t, err, testCase.ExpectedCriticalErr) + require.EqualError(t, err, testCase.ExpectedCriticalErr) } }) }