diff --git a/.changeset/silver-avocados-buy.md b/.changeset/silver-avocados-buy.md new file mode 100644 index 00000000000..6b636ee267d --- /dev/null +++ b/.changeset/silver-avocados-buy.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Update MultiNode with latest changes and bug fixes. Fixes an issue that caused nodes to go OutOfSync incorrectly, and also fixed context handling for sending transactions. #internal #bugfix diff --git a/common/client/node_lifecycle.go b/common/client/node_lifecycle.go index ce508a43dde..6ec6a598eb2 100644 --- a/common/client/node_lifecycle.go +++ b/common/client/node_lifecycle.go @@ -7,8 +7,6 @@ import ( "math/big" "time" - "github.com/smartcontractkit/chainlink/v2/common/types" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -17,6 +15,7 @@ import ( bigmath "github.com/smartcontractkit/chainlink-common/pkg/utils/big_math" iutils "github.com/smartcontractkit/chainlink/v2/common/internal/utils" + "github.com/smartcontractkit/chainlink/v2/common/types" ) var ( @@ -132,6 +131,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { } } + // Get the latest chain info to use as local highest localHighestChainInfo, _ := n.rpc.GetInterceptedChainInfo() var pollFailures uint32 @@ -168,10 +168,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { n.declareUnreachable() return } - _, latestChainInfo := n.StateAndLatest() - if outOfSync, liveNodes := n.isOutOfSyncWithPool(latestChainInfo); outOfSync { + if outOfSync, liveNodes := n.isOutOfSyncWithPool(); outOfSync { // note: there must be another live node for us to be out of sync - lggr.Errorw("RPC endpoint has fallen behind", "blockNumber", latestChainInfo.BlockNumber, "totalDifficulty", latestChainInfo.TotalDifficulty, "nodeState", n.getCachedState()) if liveNodes < 2 { lggr.Criticalf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState) continue @@ -310,9 +308,9 @@ func (n *node[CHAIN_ID, HEAD, RPC]) onNewFinalizedHead(lggr logger.SugaredLogger } latestFinalizedBN := latestFinalized.BlockNumber() - lggr.Tracew("Got latest finalized head", "latestFinalized", latestFinalized) + lggr.Debugw("Got latest finalized head", "latestFinalized", latestFinalized) if latestFinalizedBN <= chainInfo.FinalizedBlockNumber { - lggr.Tracew("Ignoring previously seen finalized block number") + lggr.Debugw("Ignoring previously seen finalized block number") return false } @@ -328,10 +326,10 @@ func (n *node[CHAIN_ID, HEAD, RPC]) onNewHead(lggr logger.SugaredLogger, chainIn } promPoolRPCNodeNumSeenBlocks.WithLabelValues(n.chainID.String(), n.name).Inc() - lggr.Tracew("Got head", "head", head) + lggr.Debugw("Got head", "head", head) lggr = lggr.With("latestReceivedBlockNumber", chainInfo.BlockNumber, "blockNumber", head.BlockNumber(), "nodeState", n.getCachedState()) if head.BlockNumber() <= chainInfo.BlockNumber { - lggr.Tracew("Ignoring previously seen block number") + lggr.Debugw("Ignoring previously seen block number") return false } @@ -358,7 +356,7 @@ const ( // isOutOfSyncWithPool returns outOfSync true if num or td is more than SyncThresold behind the best node. // Always returns outOfSync false for SyncThreshold 0. // liveNodes is only included when outOfSync is true. -func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool(localState ChainInfo) (outOfSync bool, liveNodes int) { +func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool() (outOfSync bool, liveNodes int) { if n.poolInfoProvider == nil { n.lfcLog.Warn("skipping sync state against the pool - should only occur in tests") return // skip for tests @@ -369,16 +367,22 @@ func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool(localState ChainInfo) (o } // Check against best node ln, ci := n.poolInfoProvider.LatestChainInfo() + localChainInfo, _ := n.rpc.GetInterceptedChainInfo() mode := n.nodePoolCfg.SelectionMode() switch mode { case NodeSelectionModeHighestHead, NodeSelectionModeRoundRobin, NodeSelectionModePriorityLevel: - return localState.BlockNumber < ci.BlockNumber-int64(threshold), ln + outOfSync = localChainInfo.BlockNumber < ci.BlockNumber-int64(threshold) case NodeSelectionModeTotalDifficulty: bigThreshold := big.NewInt(int64(threshold)) - return localState.TotalDifficulty.Cmp(bigmath.Sub(ci.TotalDifficulty, bigThreshold)) < 0, ln + outOfSync = localChainInfo.TotalDifficulty.Cmp(bigmath.Sub(ci.TotalDifficulty, bigThreshold)) < 0 default: panic("unrecognized NodeSelectionMode: " + mode) } + + if outOfSync && n.getCachedState() == nodeStateAlive { + n.lfcLog.Errorw("RPC endpoint has fallen behind", "blockNumber", localChainInfo.BlockNumber, "bestLatestBlockNumber", ci.BlockNumber, "totalDifficulty", localChainInfo.TotalDifficulty) + } + return outOfSync, ln } // outOfSyncLoop takes an OutOfSync node and waits until isOutOfSync returns false to go back to live status @@ -464,7 +468,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) { // received a new head - clear NoNewHead flag syncIssues &= ^syncStatusNoNewHead - if outOfSync, _ := n.isOutOfSyncWithPool(localHighestChainInfo); !outOfSync { + if outOfSync, _ := n.isOutOfSyncWithPool(); !outOfSync { // we caught up with the pool - clear NotInSyncWithPool flag syncIssues &= ^syncStatusNotInSyncWithPool } else { @@ -515,7 +519,12 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) { finalizedHeadsSub.ResetTimer(noNewFinalizedBlocksTimeoutThreshold) } - lggr.Debugw(msgReceivedFinalizedBlock, "blockNumber", latestFinalized.BlockNumber(), "syncIssues", syncIssues) + var highestSeen ChainInfo + if n.poolInfoProvider != nil { + highestSeen = n.poolInfoProvider.HighestUserObservations() + } + + lggr.Debugw(msgReceivedFinalizedBlock, "blockNumber", latestFinalized.BlockNumber(), "poolHighestBlockNumber", highestSeen.FinalizedBlockNumber, "syncIssues", syncIssues) case err := <-finalizedHeadsSub.Errors: lggr.Errorw("Finalized head subscription was terminated", "err", err) n.declareUnreachable() @@ -648,7 +657,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) syncingLoop() { case nodeStateClosed: return default: - panic(fmt.Sprintf("syncingLoop can only run for node in nodeStateSyncing state, got: %s", state)) + panic(fmt.Sprintf("syncingLoop can only run for node in NodeStateSyncing state, got: %s", state)) } } diff --git a/common/client/node_lifecycle_test.go b/common/client/node_lifecycle_test.go index e510e0a308a..39c39e318ef 100644 --- a/common/client/node_lifecycle_test.go +++ b/common/client/node_lifecycle_test.go @@ -224,7 +224,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { poolInfo.On("LatestChainInfo").Return(10, ChainInfo{ BlockNumber: syncThreshold + mostRecentBlock + 1, TotalDifficulty: big.NewInt(10), - }).Once() + }) node.SetPoolChainInfoProvider(poolInfo) // tries to redial in outOfSync rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) { @@ -259,7 +259,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ BlockNumber: syncThreshold + mostRecentBlock + 1, TotalDifficulty: big.NewInt(10), - }).Once() + }) node.SetPoolChainInfoProvider(poolInfo) node.declareAlive() tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState)) @@ -288,7 +288,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { t.Run("when no new heads received for threshold, transitions to out of sync", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[types.ID, Head](t) - rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once() + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) node := newSubscribedNode(t, testNodeOpts{ config: testNodeConfig{}, chainConfig: clientMocks.ChainConfig{ @@ -312,7 +312,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { t.Run("when no new heads received for threshold but we are the last live node, forcibly stays alive", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[types.ID, Head](t) - rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once() + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) node := newSubscribedNode(t, testNodeOpts{ config: testNodeConfig{}, @@ -693,15 +693,25 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { t.Run("if fail to get chainID, transitions to unreachable", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[types.ID, Head](t) + chainID := types.RandomID() node := newAliveNode(t, testNodeOpts{ - rpc: rpc, + rpc: rpc, + chainID: chainID, }) defer func() { assert.NoError(t, node.close()) }() + rpc.On("ChainID", mock.Anything).Return(chainID, nil) // for out-of-sync rpc.On("Dial", mock.Anything).Return(nil).Once() // for unreachable rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe() + sub := mocks.NewSubscription(t) + errChan := make(chan error, 1) + errChan <- errors.New("subscription was terminate") + sub.On("Err").Return((<-chan error)(errChan)) + sub.On("Unsubscribe").Once() + rpc.On("SubscribeToHeads", mock.Anything).Return(make(<-chan Head), sub, nil) + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) expectedError := errors.New("failed to get chain ID") // might be called multiple times @@ -1025,7 +1035,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { sub.On("Err").Return((<-chan error)(errChan)) sub.On("Unsubscribe").Once() rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return(make(<-chan Head), sub, nil).Once() - rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once() + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) // unreachable rpc.On("Dial", mock.Anything).Return(errors.New("failed to redial")).Maybe() @@ -1056,7 +1066,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { rpc.On("SubscribeToFinalizedHeads", mock.Anything).Run(func(args mock.Arguments) { close(ch) }).Return((<-chan Head)(ch), sub, nil).Once() - rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once() + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) // unreachable rpc.On("Dial", mock.Anything).Return(errors.New("failed to redial")).Maybe() node.declareOutOfSync(syncStatusNoNewHead) @@ -1082,7 +1092,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { defer func() { assert.NoError(t, node.close()) }() const highestBlock = 13 - rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{FinalizedBlockNumber: highestBlock}).Once() + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{FinalizedBlockNumber: highestBlock}, ChainInfo{FinalizedBlockNumber: highestBlock}) outOfSyncSubscription := mocks.NewSubscription(t) outOfSyncSubscription.On("Err").Return((<-chan error)(nil)) @@ -1119,7 +1129,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { defer func() { assert.NoError(t, node.close()) }() const highestBlock = 13 - rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{FinalizedBlockNumber: highestBlock}).Once() + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{FinalizedBlockNumber: highestBlock}) outOfSyncSubscription := mocks.NewSubscription(t) outOfSyncSubscription.On("Err").Return((<-chan error)(nil)) @@ -1582,7 +1592,7 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) { t.Parallel() t.Run("skip if nLiveNodes is not configured", func(t *testing.T) { node := newTestNode(t, testNodeOpts{}) - outOfSync, liveNodes := node.isOutOfSyncWithPool(ChainInfo{}) + outOfSync, liveNodes := node.isOutOfSyncWithPool() assert.Equal(t, false, outOfSync) assert.Equal(t, 0, liveNodes) }) @@ -1590,7 +1600,7 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) { node := newTestNode(t, testNodeOpts{}) poolInfo := newMockPoolChainInfoProvider(t) node.SetPoolChainInfoProvider(poolInfo) - outOfSync, liveNodes := node.isOutOfSyncWithPool(ChainInfo{}) + outOfSync, liveNodes := node.isOutOfSyncWithPool() assert.Equal(t, false, outOfSync) assert.Equal(t, 0, liveNodes) }) @@ -1602,7 +1612,7 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) { poolInfo.On("LatestChainInfo").Return(1, ChainInfo{}).Once() node.SetPoolChainInfoProvider(poolInfo) assert.Panics(t, func() { - _, _ = node.isOutOfSyncWithPool(ChainInfo{}) + _, _ = node.isOutOfSyncWithPool() }) }) t.Run("block height selection mode", func(t *testing.T) { @@ -1653,7 +1663,11 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) { for _, td := range []int64{totalDifficulty - syncThreshold - 1, totalDifficulty - syncThreshold, totalDifficulty, totalDifficulty + 1} { for _, testCase := range testCases { t.Run(fmt.Sprintf("%s: SelectionModeVal: %s: total difficulty: %d", testCase.name, selectionMode, td), func(t *testing.T) { - outOfSync, liveNodes := node.isOutOfSyncWithPool(ChainInfo{BlockNumber: testCase.blockNumber, TotalDifficulty: big.NewInt(td)}) + chainInfo := ChainInfo{BlockNumber: testCase.blockNumber, TotalDifficulty: big.NewInt(td)} + rpc := newMockRPCClient[types.ID, Head](t) + rpc.On("GetInterceptedChainInfo").Return(chainInfo, ChainInfo{}).Once() + node.rpc = rpc + outOfSync, liveNodes := node.isOutOfSyncWithPool() assert.Equal(t, nodesNum, liveNodes) assert.Equal(t, testCase.outOfSync, outOfSync) }) @@ -1709,7 +1723,11 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) { for _, hb := range []int64{highestBlock - syncThreshold - 1, highestBlock - syncThreshold, highestBlock, highestBlock + 1} { for _, testCase := range testCases { t.Run(fmt.Sprintf("%s: SelectionModeVal: %s: highest block: %d", testCase.name, NodeSelectionModeTotalDifficulty, hb), func(t *testing.T) { - outOfSync, liveNodes := node.isOutOfSyncWithPool(ChainInfo{BlockNumber: hb, TotalDifficulty: big.NewInt(testCase.totalDifficulty)}) + chainInfo := ChainInfo{BlockNumber: hb, TotalDifficulty: big.NewInt(testCase.totalDifficulty)} + rpc := newMockRPCClient[types.ID, Head](t) + rpc.On("GetInterceptedChainInfo").Return(chainInfo, ChainInfo{}).Once() + node.rpc = rpc + outOfSync, liveNodes := node.isOutOfSyncWithPool() assert.Equal(t, nodesNum, liveNodes) assert.Equal(t, testCase.outOfSync, outOfSync) }) diff --git a/common/client/transaction_sender.go b/common/client/transaction_sender.go index 9365a82b290..cd2ce96c5b2 100644 --- a/common/client/transaction_sender.go +++ b/common/client/transaction_sender.go @@ -3,7 +3,6 @@ package client import ( "context" "errors" - "fmt" "math" "slices" "sync" @@ -14,6 +13,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink/v2/common/types" ) @@ -25,52 +25,48 @@ var ( }, []string{"network", "chainId", "invariant"}) ) -// TxErrorClassifier - defines interface of a function that transforms raw RPC error into the SendTxReturnCode enum -// (e.g. Successful, Fatal, Retryable, etc.) -type TxErrorClassifier[TX any] func(tx TX, err error) SendTxReturnCode - -type sendTxResult struct { - Err error - ResultCode SendTxReturnCode +type SendTxResult interface { + Code() SendTxReturnCode + Error() error } const sendTxQuorum = 0.7 // SendTxRPCClient - defines interface of an RPC used by TransactionSender to broadcast transaction -type SendTxRPCClient[TX any] interface { +type SendTxRPCClient[TX any, RESULT SendTxResult] interface { // SendTransaction errors returned should include name or other unique identifier of the RPC - SendTransaction(ctx context.Context, tx TX) error + SendTransaction(ctx context.Context, tx TX) RESULT } -func NewTransactionSender[TX any, CHAIN_ID types.ID, RPC SendTxRPCClient[TX]]( +func NewTransactionSender[TX any, RESULT SendTxResult, CHAIN_ID types.ID, RPC SendTxRPCClient[TX, RESULT]]( lggr logger.Logger, chainID CHAIN_ID, chainFamily string, multiNode *MultiNode[CHAIN_ID, RPC], - txErrorClassifier TxErrorClassifier[TX], + newResult func(err error) RESULT, sendTxSoftTimeout time.Duration, -) *TransactionSender[TX, CHAIN_ID, RPC] { +) *TransactionSender[TX, RESULT, CHAIN_ID, RPC] { if sendTxSoftTimeout == 0 { sendTxSoftTimeout = QueryTimeout / 2 } - return &TransactionSender[TX, CHAIN_ID, RPC]{ + return &TransactionSender[TX, RESULT, CHAIN_ID, RPC]{ chainID: chainID, chainFamily: chainFamily, lggr: logger.Sugared(lggr).Named("TransactionSender").With("chainID", chainID.String()), multiNode: multiNode, - txErrorClassifier: txErrorClassifier, + newResult: newResult, sendTxSoftTimeout: sendTxSoftTimeout, chStop: make(services.StopChan), } } -type TransactionSender[TX any, CHAIN_ID types.ID, RPC SendTxRPCClient[TX]] struct { +type TransactionSender[TX any, RESULT SendTxResult, CHAIN_ID types.ID, RPC SendTxRPCClient[TX, RESULT]] struct { services.StateMachine chainID CHAIN_ID chainFamily string lggr logger.SugaredLogger multiNode *MultiNode[CHAIN_ID, RPC] - txErrorClassifier TxErrorClassifier[TX] + newResult func(err error) RESULT sendTxSoftTimeout time.Duration // defines max waiting time from first response til responses evaluation wg sync.WaitGroup // waits for all reporting goroutines to finish @@ -95,131 +91,138 @@ type TransactionSender[TX any, CHAIN_ID types.ID, RPC SendTxRPCClient[TX]] struc // * If there is at least one terminal error - returns terminal error // * If there is both success and terminal error - returns success and reports invariant violation // * Otherwise, returns any (effectively random) of the errors. -func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) SendTransaction(ctx context.Context, tx TX) (SendTxReturnCode, error) { - txResults := make(chan sendTxResult) - txResultsToReport := make(chan sendTxResult) - primaryNodeWg := sync.WaitGroup{} +func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ctx context.Context, tx TX) RESULT { + var result RESULT + if !txSender.IfStarted(func() { + txResults := make(chan RESULT) + txResultsToReport := make(chan RESULT) + primaryNodeWg := sync.WaitGroup{} + + healthyNodesNum := 0 + err := txSender.multiNode.DoAll(ctx, func(ctx context.Context, rpc RPC, isSendOnly bool) { + if isSendOnly { + txSender.wg.Add(1) + go func(ctx context.Context) { + ctx, cancel := txSender.chStop.Ctx(context.WithoutCancel(ctx)) + defer cancel() + defer txSender.wg.Done() + // Send-only nodes' results are ignored as they tend to return false-positive responses. + // Broadcast to them is necessary to speed up the propagation of TX in the network. + _ = txSender.broadcastTxAsync(ctx, rpc, tx) + }(ctx) + return + } - if txSender.State() != "Started" { - return Retryable, errors.New("TransactionSender not started") - } + // Primary Nodes + healthyNodesNum++ + primaryNodeWg.Add(1) + go func(ctx context.Context) { + ctx, cancel := txSender.chStop.Ctx(context.WithoutCancel(ctx)) + defer cancel() + defer primaryNodeWg.Done() + r := txSender.broadcastTxAsync(ctx, rpc, tx) + select { + case <-ctx.Done(): + txSender.lggr.Debugw("Failed to send tx results", "err", ctx.Err()) + return + case txResults <- r: + } + + select { + case <-ctx.Done(): + txSender.lggr.Debugw("Failed to send tx results to report", "err", ctx.Err()) + return + case txResultsToReport <- r: + } + }(ctx) + }) + + // This needs to be done in parallel so the reporting knows when it's done (when the channel is closed) + txSender.wg.Add(1) + go func() { + defer txSender.wg.Done() + primaryNodeWg.Wait() + close(txResultsToReport) + close(txResults) + }() - healthyNodesNum := 0 - err := txSender.multiNode.DoAll(ctx, func(ctx context.Context, rpc RPC, isSendOnly bool) { - if isSendOnly { - txSender.wg.Add(1) - go func() { - defer txSender.wg.Done() - // Send-only nodes' results are ignored as they tend to return false-positive responses. - // Broadcast to them is necessary to speed up the propagation of TX in the network. - _ = txSender.broadcastTxAsync(ctx, rpc, tx) - }() + if err != nil { + result = txSender.newResult(err) return } - // Primary Nodes - healthyNodesNum++ - primaryNodeWg.Add(1) - go func() { - defer primaryNodeWg.Done() - result := txSender.broadcastTxAsync(ctx, rpc, tx) - select { - case <-ctx.Done(): - return - case txResults <- result: - } - - select { - case <-ctx.Done(): - return - case txResultsToReport <- result: - } - }() - }) + txSender.wg.Add(1) + go txSender.reportSendTxAnomalies(ctx, tx, txResultsToReport) - // This needs to be done in parallel so the reporting knows when it's done (when the channel is closed) - txSender.wg.Add(1) - go func() { - defer txSender.wg.Done() - primaryNodeWg.Wait() - close(txResultsToReport) - close(txResults) - }() - - if err != nil { - return Retryable, err + result = txSender.collectTxResults(ctx, tx, healthyNodesNum, txResults) + }) { + result = txSender.newResult(errors.New("TransactionSender not started")) } - txSender.wg.Add(1) - go txSender.reportSendTxAnomalies(tx, txResultsToReport) - - return txSender.collectTxResults(ctx, tx, healthyNodesNum, txResults) + return result } -func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) broadcastTxAsync(ctx context.Context, rpc RPC, tx TX) sendTxResult { - txErr := rpc.SendTransaction(ctx, tx) - txSender.lggr.Debugw("Node sent transaction", "tx", tx, "err", txErr) - resultCode := txSender.txErrorClassifier(tx, txErr) - if !slices.Contains(sendTxSuccessfulCodes, resultCode) { - txSender.lggr.Warnw("RPC returned error", "tx", tx, "err", txErr) +func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) broadcastTxAsync(ctx context.Context, rpc RPC, tx TX) RESULT { + result := rpc.SendTransaction(ctx, tx) + txSender.lggr.Debugw("Node sent transaction", "tx", tx, "err", result.Error()) + if !slices.Contains(sendTxSuccessfulCodes, result.Code()) && ctx.Err() == nil { + txSender.lggr.Warnw("RPC returned error", "tx", tx, "err", result.Error()) } - return sendTxResult{Err: txErr, ResultCode: resultCode} + return result } -func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) reportSendTxAnomalies(tx TX, txResults <-chan sendTxResult) { +func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) reportSendTxAnomalies(ctx context.Context, tx TX, txResults <-chan RESULT) { defer txSender.wg.Done() - resultsByCode := sendTxResults{} + resultsByCode := sendTxResults[RESULT]{} // txResults eventually will be closed for txResult := range txResults { - resultsByCode[txResult.ResultCode] = append(resultsByCode[txResult.ResultCode], txResult.Err) + resultsByCode[txResult.Code()] = append(resultsByCode[txResult.Code()], txResult) } - _, _, criticalErr := aggregateTxResults(resultsByCode) - if criticalErr != nil { + _, criticalErr := aggregateTxResults[RESULT](resultsByCode) + if criticalErr != nil && ctx.Err() == nil { txSender.lggr.Criticalw("observed invariant violation on SendTransaction", "tx", tx, "resultsByCode", resultsByCode, "err", criticalErr) PromMultiNodeInvariantViolations.WithLabelValues(txSender.chainFamily, txSender.chainID.String(), criticalErr.Error()).Inc() } } -type sendTxResults map[SendTxReturnCode][]error +type sendTxResults[RESULT any] map[SendTxReturnCode][]RESULT -func aggregateTxResults(resultsByCode sendTxResults) (returnCode SendTxReturnCode, txResult error, err error) { - severeCode, severeErrors, hasSevereErrors := findFirstIn(resultsByCode, sendTxSevereErrors) - successCode, successResults, hasSuccess := findFirstIn(resultsByCode, sendTxSuccessfulCodes) +func aggregateTxResults[RESULT any](resultsByCode sendTxResults[RESULT]) (result RESULT, criticalErr error) { + severeErrors, hasSevereErrors := findFirstIn(resultsByCode, sendTxSevereErrors) + successResults, hasSuccess := findFirstIn(resultsByCode, sendTxSuccessfulCodes) if hasSuccess { // We assume that primary node would never report false positive txResult for a transaction. // Thus, if such case occurs it's probably due to misconfiguration or a bug and requires manual intervention. if hasSevereErrors { const errMsg = "found contradictions in nodes replies on SendTransaction: got success and severe error" // return success, since at least 1 node has accepted our broadcasted Tx, and thus it can now be included onchain - return successCode, successResults[0], errors.New(errMsg) + return successResults[0], errors.New(errMsg) } // other errors are temporary - we are safe to return success - return successCode, successResults[0], nil + return successResults[0], nil } if hasSevereErrors { - return severeCode, severeErrors[0], nil + return severeErrors[0], nil } // return temporary error - for code, result := range resultsByCode { - return code, result[0], nil + for _, r := range resultsByCode { + return r[0], nil } - err = fmt.Errorf("expected at least one response on SendTransaction") - return Retryable, err, err + criticalErr = errors.New("expected at least one response on SendTransaction") + return result, criticalErr } -func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) collectTxResults(ctx context.Context, tx TX, healthyNodesNum int, txResults <-chan sendTxResult) (SendTxReturnCode, error) { +func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) collectTxResults(ctx context.Context, tx TX, healthyNodesNum int, txResults <-chan RESULT) RESULT { if healthyNodesNum == 0 { - return Retryable, ErroringNodeError + return txSender.newResult(ErroringNodeError) } - ctx, cancel := txSender.chStop.Ctx(ctx) - defer cancel() requiredResults := int(math.Ceil(float64(healthyNodesNum) * sendTxQuorum)) - errorsByCode := sendTxResults{} + errorsByCode := sendTxResults[RESULT]{} var softTimeoutChan <-chan time.Time var resultsCount int loop: @@ -227,11 +230,11 @@ loop: select { case <-ctx.Done(): txSender.lggr.Debugw("Failed to collect of the results before context was done", "tx", tx, "errorsByCode", errorsByCode) - return Retryable, ctx.Err() - case result := <-txResults: - errorsByCode[result.ResultCode] = append(errorsByCode[result.ResultCode], result.Err) + return txSender.newResult(ctx.Err()) + case r := <-txResults: + errorsByCode[r.Code()] = append(errorsByCode[r.Code()], r) resultsCount++ - if slices.Contains(sendTxSuccessfulCodes, result.ResultCode) || resultsCount >= requiredResults { + if slices.Contains(sendTxSuccessfulCodes, r.Code()) || resultsCount >= requiredResults { break loop } case <-softTimeoutChan: @@ -249,18 +252,20 @@ loop: } // ignore critical error as it's reported in reportSendTxAnomalies - returnCode, result, _ := aggregateTxResults(errorsByCode) - return returnCode, result + result, _ := aggregateTxResults(errorsByCode) + txSender.lggr.Debugw("Collected results", "errorsByCode", errorsByCode, "result", result) + return result } -func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) Start(ctx context.Context) error { +func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Start(ctx context.Context) error { return txSender.StartOnce("TransactionSender", func() error { return nil }) } -func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) Close() error { +func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Close() error { return txSender.StopOnce("TransactionSender", func() error { + txSender.lggr.Debug("Closing TransactionSender") close(txSender.chStop) txSender.wg.Wait() return nil @@ -268,13 +273,12 @@ func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) Close() error { } // findFirstIn - returns the first existing key and value for the slice of keys -func findFirstIn[K comparable, V any](set map[K]V, keys []K) (K, V, bool) { +func findFirstIn[K comparable, V any](set map[K]V, keys []K) (V, bool) { for _, k := range keys { if v, ok := set[k]; ok { - return k, v, true + return v, true } } - var zeroK K var zeroV V - return zeroK, zeroV, false + return zeroV, false } diff --git a/common/client/transaction_sender_test.go b/common/client/transaction_sender_test.go index 3844a4536ff..e9869610828 100644 --- a/common/client/transaction_sender_test.go +++ b/common/client/transaction_sender_test.go @@ -17,8 +17,10 @@ import ( "github.com/smartcontractkit/chainlink/v2/common/types" ) +type TestSendTxRPCClient SendTxRPCClient[any, *sendTxResult] + type sendTxMultiNode struct { - *MultiNode[types.ID, SendTxRPCClient[any]] + *MultiNode[types.ID, TestSendTxRPCClient] } type sendTxRPC struct { @@ -26,29 +28,51 @@ type sendTxRPC struct { sendTxErr error } -var _ SendTxRPCClient[any] = (*sendTxRPC)(nil) +type sendTxResult struct { + err error + code SendTxReturnCode +} + +var _ SendTxResult = (*sendTxResult)(nil) + +func NewSendTxResult(err error) *sendTxResult { + result := &sendTxResult{ + err: err, + } + return result +} + +func (r *sendTxResult) Error() error { + return r.err +} + +func (r *sendTxResult) Code() SendTxReturnCode { + return r.code +} + +var _ TestSendTxRPCClient = (*sendTxRPC)(nil) func newSendTxRPC(sendTxErr error, sendTxRun func(args mock.Arguments)) *sendTxRPC { return &sendTxRPC{sendTxErr: sendTxErr, sendTxRun: sendTxRun} } -func (rpc *sendTxRPC) SendTransaction(ctx context.Context, _ any) error { +func (rpc *sendTxRPC) SendTransaction(ctx context.Context, _ any) *sendTxResult { if rpc.sendTxRun != nil { rpc.sendTxRun(mock.Arguments{ctx}) } - return rpc.sendTxErr + return &sendTxResult{err: rpc.sendTxErr, code: classifySendTxError(nil, rpc.sendTxErr)} } // newTestTransactionSender returns a sendTxMultiNode and TransactionSender. // Only the TransactionSender is run via Start/Close. func newTestTransactionSender(t *testing.T, chainID types.ID, lggr logger.Logger, - nodes []Node[types.ID, SendTxRPCClient[any]], - sendOnlyNodes []SendOnlyNode[types.ID, SendTxRPCClient[any]], -) (*sendTxMultiNode, *TransactionSender[any, types.ID, SendTxRPCClient[any]]) { - mn := sendTxMultiNode{NewMultiNode[types.ID, SendTxRPCClient[any]]( + nodes []Node[types.ID, TestSendTxRPCClient], + sendOnlyNodes []SendOnlyNode[types.ID, TestSendTxRPCClient], +) (*sendTxMultiNode, *TransactionSender[any, *sendTxResult, types.ID, TestSendTxRPCClient]) { + mn := sendTxMultiNode{NewMultiNode[types.ID, TestSendTxRPCClient]( lggr, NodeSelectionModeRoundRobin, 0, nodes, sendOnlyNodes, chainID, "chainFamily", 0)} - txSender := NewTransactionSender[any, types.ID, SendTxRPCClient[any]](lggr, chainID, mn.chainFamily, mn.MultiNode, classifySendTxError, tests.TestInterval) + txSender := NewTransactionSender[any, *sendTxResult, types.ID, TestSendTxRPCClient](lggr, chainID, mn.chainFamily, mn.MultiNode, NewSendTxResult, tests.TestInterval) servicetest.Run(t, txSender) return &mn, txSender } @@ -63,9 +87,9 @@ func classifySendTxError(_ any, err error) SendTxReturnCode { func TestTransactionSender_SendTransaction(t *testing.T) { t.Parallel() - newNodeWithState := func(t *testing.T, state nodeState, txErr error, sendTxRun func(args mock.Arguments)) *mockNode[types.ID, SendTxRPCClient[any]] { + newNodeWithState := func(t *testing.T, state nodeState, txErr error, sendTxRun func(args mock.Arguments)) *mockNode[types.ID, TestSendTxRPCClient] { rpc := newSendTxRPC(txErr, sendTxRun) - node := newMockNode[types.ID, SendTxRPCClient[any]](t) + node := newMockNode[types.ID, TestSendTxRPCClient](t) node.On("String").Return("node name").Maybe() node.On("RPC").Return(rpc).Maybe() node.On("State").Return(state).Maybe() @@ -75,15 +99,15 @@ func TestTransactionSender_SendTransaction(t *testing.T) { return node } - newNode := func(t *testing.T, txErr error, sendTxRun func(args mock.Arguments)) *mockNode[types.ID, SendTxRPCClient[any]] { + newNode := func(t *testing.T, txErr error, sendTxRun func(args mock.Arguments)) *mockNode[types.ID, TestSendTxRPCClient] { return newNodeWithState(t, nodeStateAlive, txErr, sendTxRun) } t.Run("Fails if there is no nodes available", func(t *testing.T) { lggr := logger.Test(t) _, txSender := newTestTransactionSender(t, types.RandomID(), lggr, nil, nil) - _, err := txSender.SendTransaction(tests.Context(t), nil) - assert.ErrorIs(t, err, ErroringNodeError) + result := txSender.SendTransaction(tests.Context(t), nil) + assert.EqualError(t, result.Error(), ErroringNodeError.Error()) }) t.Run("Transaction failure happy path", func(t *testing.T) { @@ -92,12 +116,12 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) _, txSender := newTestTransactionSender(t, types.RandomID(), lggr, - []Node[types.ID, SendTxRPCClient[any]]{mainNode}, - []SendOnlyNode[types.ID, SendTxRPCClient[any]]{newNode(t, errors.New("unexpected error"), nil)}) + []Node[types.ID, TestSendTxRPCClient]{mainNode}, + []SendOnlyNode[types.ID, TestSendTxRPCClient]{newNode(t, errors.New("unexpected error"), nil)}) - result, sendErr := txSender.SendTransaction(tests.Context(t), nil) - require.ErrorIs(t, sendErr, expectedError) - require.Equal(t, Fatal, result) + result := txSender.SendTransaction(tests.Context(t), nil) + require.ErrorIs(t, result.Error(), expectedError) + require.Equal(t, Fatal, result.Code()) tests.AssertLogCountEventually(t, observedLogs, "Node sent transaction", 2) tests.AssertLogCountEventually(t, observedLogs, "RPC returned error", 2) }) @@ -107,12 +131,12 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) _, txSender := newTestTransactionSender(t, types.RandomID(), lggr, - []Node[types.ID, SendTxRPCClient[any]]{mainNode}, - []SendOnlyNode[types.ID, SendTxRPCClient[any]]{newNode(t, errors.New("unexpected error"), nil)}) + []Node[types.ID, TestSendTxRPCClient]{mainNode}, + []SendOnlyNode[types.ID, TestSendTxRPCClient]{newNode(t, errors.New("unexpected error"), nil)}) - result, sendErr := txSender.SendTransaction(tests.Context(t), nil) - require.NoError(t, sendErr) - require.Equal(t, Successful, result) + result := txSender.SendTransaction(tests.Context(t), nil) + require.NoError(t, result.Error()) + require.Equal(t, Successful, result.Code()) tests.AssertLogCountEventually(t, observedLogs, "Node sent transaction", 2) tests.AssertLogCountEventually(t, observedLogs, "RPC returned error", 1) }) @@ -129,12 +153,12 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr := logger.Test(t) _, txSender := newTestTransactionSender(t, types.RandomID(), lggr, - []Node[types.ID, SendTxRPCClient[any]]{mainNode}, nil) + []Node[types.ID, TestSendTxRPCClient]{mainNode}, nil) requestContext, cancel := context.WithCancel(tests.Context(t)) cancel() - _, sendErr := txSender.SendTransaction(requestContext, nil) - require.EqualError(t, sendErr, "context canceled") + result := txSender.SendTransaction(requestContext, nil) + require.EqualError(t, result.Error(), "context canceled") }) t.Run("Soft timeout stops results collection", func(t *testing.T) { @@ -152,9 +176,9 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr := logger.Test(t) - _, txSender := newTestTransactionSender(t, chainID, lggr, []Node[types.ID, SendTxRPCClient[any]]{fastNode, slowNode}, nil) - _, sendErr := txSender.SendTransaction(tests.Context(t), nil) - require.ErrorIs(t, sendErr, expectedError) + _, txSender := newTestTransactionSender(t, chainID, lggr, []Node[types.ID, TestSendTxRPCClient]{fastNode, slowNode}, nil) + result := txSender.SendTransaction(tests.Context(t), nil) + require.EqualError(t, result.Error(), expectedError.Error()) }) t.Run("Returns success without waiting for the rest of the nodes", func(t *testing.T) { chainID := types.RandomID() @@ -170,14 +194,14 @@ func TestTransactionSender_SendTransaction(t *testing.T) { // block caller til end of the test <-testContext.Done() }) - lggr := logger.Test(t) + lggr, _ := logger.TestObserved(t, zap.WarnLevel) _, txSender := newTestTransactionSender(t, chainID, lggr, - []Node[types.ID, SendTxRPCClient[any]]{fastNode, slowNode}, - []SendOnlyNode[types.ID, SendTxRPCClient[any]]{slowSendOnly}) + []Node[types.ID, TestSendTxRPCClient]{fastNode, slowNode}, + []SendOnlyNode[types.ID, TestSendTxRPCClient]{slowSendOnly}) - rtnCode, err := txSender.SendTransaction(tests.Context(t), nil) - require.NoError(t, err) - require.Equal(t, Successful, rtnCode) + result := txSender.SendTransaction(tests.Context(t), nil) + require.NoError(t, result.Error()) + require.Equal(t, Successful, result.Code()) }) t.Run("Fails when multinode is closed", func(t *testing.T) { chainID := types.RandomID() @@ -197,14 +221,16 @@ func TestTransactionSender_SendTransaction(t *testing.T) { }) slowSendOnly.On("ConfiguredChainID").Return(chainID).Maybe() - mn, txSender := newTestTransactionSender(t, chainID, logger.Test(t), - []Node[types.ID, SendTxRPCClient[any]]{fastNode, slowNode}, - []SendOnlyNode[types.ID, SendTxRPCClient[any]]{slowSendOnly}) + lggr, _ := logger.TestObserved(t, zap.DebugLevel) + + mn, txSender := newTestTransactionSender(t, chainID, lggr, + []Node[types.ID, TestSendTxRPCClient]{fastNode, slowNode}, + []SendOnlyNode[types.ID, TestSendTxRPCClient]{slowSendOnly}) require.NoError(t, mn.Start(tests.Context(t))) require.NoError(t, mn.Close()) - _, err := txSender.SendTransaction(tests.Context(t), nil) - require.ErrorContains(t, err, "service is stopped") + result := txSender.SendTransaction(tests.Context(t), nil) + require.EqualError(t, result.Error(), "service is stopped") }) t.Run("Fails when closed", func(t *testing.T) { chainID := types.RandomID() @@ -221,16 +247,16 @@ func TestTransactionSender_SendTransaction(t *testing.T) { <-testContext.Done() }) - var txSender *TransactionSender[any, types.ID, SendTxRPCClient[any]] + var txSender *TransactionSender[any, *sendTxResult, types.ID, TestSendTxRPCClient] t.Cleanup(func() { // after txSender.Close() - _, err := txSender.SendTransaction(tests.Context(t), nil) - assert.EqualError(t, err, "TransactionSender not started") + result := txSender.SendTransaction(tests.Context(t), nil) + assert.EqualError(t, result.err, "TransactionSender not started") }) _, txSender = newTestTransactionSender(t, chainID, logger.Test(t), - []Node[types.ID, SendTxRPCClient[any]]{fastNode, slowNode}, - []SendOnlyNode[types.ID, SendTxRPCClient[any]]{slowSendOnly}) + []Node[types.ID, TestSendTxRPCClient]{fastNode, slowNode}, + []SendOnlyNode[types.ID, TestSendTxRPCClient]{slowSendOnly}) }) t.Run("Returns error if there is no healthy primary nodes", func(t *testing.T) { @@ -241,11 +267,11 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr := logger.Test(t) _, txSender := newTestTransactionSender(t, chainID, lggr, - []Node[types.ID, SendTxRPCClient[any]]{primary}, - []SendOnlyNode[types.ID, SendTxRPCClient[any]]{sendOnly}) + []Node[types.ID, TestSendTxRPCClient]{primary}, + []SendOnlyNode[types.ID, TestSendTxRPCClient]{sendOnly}) - _, sendErr := txSender.SendTransaction(tests.Context(t), nil) - assert.EqualError(t, sendErr, ErroringNodeError.Error()) + result := txSender.SendTransaction(tests.Context(t), nil) + assert.EqualError(t, result.Error(), ErroringNodeError.Error()) }) t.Run("Transaction success even if one of the nodes is unhealthy", func(t *testing.T) { @@ -260,12 +286,12 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr := logger.Test(t) _, txSender := newTestTransactionSender(t, chainID, lggr, - []Node[types.ID, SendTxRPCClient[any]]{mainNode, unhealthyNode}, - []SendOnlyNode[types.ID, SendTxRPCClient[any]]{unhealthySendOnlyNode}) + []Node[types.ID, TestSendTxRPCClient]{mainNode, unhealthyNode}, + []SendOnlyNode[types.ID, TestSendTxRPCClient]{unhealthySendOnlyNode}) - returnCode, sendErr := txSender.SendTransaction(tests.Context(t), nil) - require.NoError(t, sendErr) - require.Equal(t, Successful, returnCode) + result := txSender.SendTransaction(tests.Context(t), nil) + require.NoError(t, result.Error()) + require.Equal(t, Successful, result.Code()) }) } @@ -281,64 +307,62 @@ func TestTransactionSender_SendTransaction_aggregateTxResults(t *testing.T) { Name string ExpectedTxResult string ExpectedCriticalErr string - ResultsByCode sendTxResults + ResultsByCode sendTxResults[*sendTxResult] }{ { Name: "Returns success and logs critical error on success and Fatal", ExpectedTxResult: "success", ExpectedCriticalErr: "found contradictions in nodes replies on SendTransaction: got success and severe error", - ResultsByCode: sendTxResults{ - Successful: {errors.New("success")}, - Fatal: {errors.New("fatal")}, + ResultsByCode: sendTxResults[*sendTxResult]{ + Successful: {NewSendTxResult(errors.New("success"))}, + Fatal: {NewSendTxResult(errors.New("fatal"))}, }, }, { Name: "Returns TransactionAlreadyKnown and logs critical error on TransactionAlreadyKnown and Fatal", ExpectedTxResult: "tx_already_known", ExpectedCriticalErr: "found contradictions in nodes replies on SendTransaction: got success and severe error", - ResultsByCode: sendTxResults{ - TransactionAlreadyKnown: {errors.New("tx_already_known")}, - Unsupported: {errors.New("unsupported")}, + ResultsByCode: sendTxResults[*sendTxResult]{ + TransactionAlreadyKnown: {NewSendTxResult(errors.New("tx_already_known"))}, + Unsupported: {NewSendTxResult(errors.New("unsupported"))}, }, }, { Name: "Prefers sever error to temporary", ExpectedTxResult: "underpriced", ExpectedCriticalErr: "", - ResultsByCode: sendTxResults{ - Retryable: {errors.New("retryable")}, - Underpriced: {errors.New("underpriced")}, + ResultsByCode: sendTxResults[*sendTxResult]{ + Retryable: {NewSendTxResult(errors.New("retryable"))}, + Underpriced: {NewSendTxResult(errors.New("underpriced"))}, }, }, { Name: "Returns temporary error", ExpectedTxResult: "retryable", ExpectedCriticalErr: "", - ResultsByCode: sendTxResults{ - Retryable: {errors.New("retryable")}, + ResultsByCode: sendTxResults[*sendTxResult]{ + Retryable: {NewSendTxResult(errors.New("retryable"))}, }, }, { Name: "Insufficient funds is treated as error", - ExpectedTxResult: "", + ExpectedTxResult: "insufficientFunds", ExpectedCriticalErr: "", - ResultsByCode: sendTxResults{ - Successful: {nil}, - InsufficientFunds: {errors.New("insufficientFunds")}, + ResultsByCode: sendTxResults[*sendTxResult]{ + InsufficientFunds: {NewSendTxResult(errors.New("insufficientFunds"))}, }, }, { Name: "Logs critical error on empty ResultsByCode", - ExpectedTxResult: "expected at least one response on SendTransaction", ExpectedCriticalErr: "expected at least one response on SendTransaction", - ResultsByCode: sendTxResults{}, + ResultsByCode: sendTxResults[*sendTxResult]{}, }, { Name: "Zk terminally stuck", ExpectedTxResult: "not enough keccak counters to continue the execution", ExpectedCriticalErr: "", - ResultsByCode: sendTxResults{ - TerminallyStuck: {errors.New("not enough keccak counters to continue the execution")}, + ResultsByCode: sendTxResults[*sendTxResult]{ + TerminallyStuck: {NewSendTxResult(errors.New("not enough keccak counters to continue the execution"))}, }, }, } @@ -349,20 +373,18 @@ func TestTransactionSender_SendTransaction_aggregateTxResults(t *testing.T) { } t.Run(testCase.Name, func(t *testing.T) { - _, txResult, err := aggregateTxResults(testCase.ResultsByCode) - if testCase.ExpectedTxResult == "" { - assert.NoError(t, err) - } else { - assert.EqualError(t, txResult, testCase.ExpectedTxResult) + txResult, err := aggregateTxResults(testCase.ResultsByCode) + if testCase.ExpectedTxResult != "" { + require.EqualError(t, txResult.Error(), testCase.ExpectedTxResult) } logger.Sugared(logger.Test(t)).Info("Map: " + fmt.Sprint(testCase.ResultsByCode)) logger.Sugared(logger.Test(t)).Criticalw("observed invariant violation on SendTransaction", "resultsByCode", testCase.ResultsByCode, "err", err) if testCase.ExpectedCriticalErr == "" { - assert.NoError(t, err) + require.NoError(t, err) } else { - assert.EqualError(t, err, testCase.ExpectedCriticalErr) + require.EqualError(t, err, testCase.ExpectedCriticalErr) } }) } diff --git a/core/chains/evm/client/chain_client.go b/core/chains/evm/client/chain_client.go index 79c2eef9769..0835b4c0ed8 100644 --- a/core/chains/evm/client/chain_client.go +++ b/core/chains/evm/client/chain_client.go @@ -2,6 +2,7 @@ package client import ( "context" + "errors" "math/big" "sync" "time" @@ -100,7 +101,7 @@ type chainClient struct { *big.Int, *RPCClient, ] - txSender *commonclient.TransactionSender[*types.Transaction, *big.Int, *RPCClient] + txSender *commonclient.TransactionSender[*types.Transaction, *SendTxResult, *big.Int, *RPCClient] logger logger.SugaredLogger chainType chaintype.ChainType clientErrors evmconfig.ClientErrors @@ -129,16 +130,12 @@ func NewChainClient( deathDeclarationDelay, ) - classifySendError := func(tx *types.Transaction, err error) commonclient.SendTxReturnCode { - return ClassifySendError(err, clientErrors, logger.Sugared(logger.Nop()), tx, common.Address{}, chainType.IsL2()) - } - - txSender := commonclient.NewTransactionSender[*types.Transaction, *big.Int, *RPCClient]( + txSender := commonclient.NewTransactionSender[*types.Transaction, *SendTxResult, *big.Int, *RPCClient]( lggr, chainID, chainFamily, multiNode, - classifySendError, + NewSendTxResult, 0, // use the default value provided by the implementation ) @@ -376,15 +373,20 @@ func (c *chainClient) PendingNonceAt(ctx context.Context, account common.Address } func (c *chainClient) SendTransaction(ctx context.Context, tx *types.Transaction) error { + var result *SendTxResult if c.chainType == chaintype.ChainHedera { activeRPC, err := c.multiNode.SelectRPC() if err != nil { return err } - return activeRPC.SendTransaction(ctx, tx) + result = activeRPC.SendTransaction(ctx, tx) + } else { + result = c.txSender.SendTransaction(ctx, tx) + } + if result == nil { + return errors.New("SendTransaction failed: result is nil") } - _, err := c.txSender.SendTransaction(ctx, tx) - return err + return result.Error() } func (c *chainClient) SendTransactionReturnCode(ctx context.Context, tx *types.Transaction, fromAddress common.Address) (commonclient.SendTxReturnCode, error) { diff --git a/core/chains/evm/client/rpc_client.go b/core/chains/evm/client/rpc_client.go index 8d6e25d5540..97046b4eff2 100644 --- a/core/chains/evm/client/rpc_client.go +++ b/core/chains/evm/client/rpc_client.go @@ -98,6 +98,7 @@ type RPCClient struct { newHeadsPollInterval time.Duration rpcTimeout time.Duration chainType chaintype.ChainType + clientErrors config.ClientErrors ws *rawclient http *rawclient @@ -122,7 +123,7 @@ type RPCClient struct { } var _ commonclient.RPCClient[*big.Int, *evmtypes.Head] = (*RPCClient)(nil) -var _ commonclient.SendTxRPCClient[*types.Transaction] = (*RPCClient)(nil) +var _ commonclient.SendTxRPCClient[*types.Transaction, *SendTxResult] = (*RPCClient)(nil) func NewRPCClient( cfg config.NodePool, @@ -141,6 +142,7 @@ func NewRPCClient( largePayloadRPCTimeout: largePayloadRPCTimeout, rpcTimeout: rpcTimeout, chainType: chainType, + clientErrors: cfg.Errors(), } r.cfg = cfg r.name = name @@ -802,7 +804,29 @@ func (r *RPCClient) BlockByNumberGeth(ctx context.Context, number *big.Int) (blo return } -func (r *RPCClient) SendTransaction(ctx context.Context, tx *types.Transaction) error { +type SendTxResult struct { + err error + code commonclient.SendTxReturnCode +} + +var _ commonclient.SendTxResult = (*SendTxResult)(nil) + +func NewSendTxResult(err error) *SendTxResult { + result := &SendTxResult{ + err: err, + } + return result +} + +func (r *SendTxResult) Error() error { + return r.err +} + +func (r *SendTxResult) Code() commonclient.SendTxReturnCode { + return r.code +} + +func (r *RPCClient) SendTransaction(ctx context.Context, tx *types.Transaction) *SendTxResult { ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.largePayloadRPCTimeout) defer cancel() lggr := r.newRqLggr().With("tx", tx) @@ -819,7 +843,10 @@ func (r *RPCClient) SendTransaction(ctx context.Context, tx *types.Transaction) r.logResult(lggr, err, duration, r.getRPCDomain(), "SendTransaction") - return err + return &SendTxResult{ + err: err, + code: ClassifySendError(err, r.clientErrors, logger.Sugared(logger.Nop()), tx, common.Address{}, r.chainType.IsL2()), + } } func (r *RPCClient) SimulateTransaction(ctx context.Context, tx *types.Transaction) error { diff --git a/core/chains/evm/client/rpc_client_test.go b/core/chains/evm/client/rpc_client_test.go index c8edc7c557c..109a49d6e2f 100644 --- a/core/chains/evm/client/rpc_client_test.go +++ b/core/chains/evm/client/rpc_client_test.go @@ -518,7 +518,8 @@ func TestRpcClientLargePayloadTimeout(t *testing.T) { { Name: "SendTransaction", Fn: func(ctx context.Context, rpc *client.RPCClient) error { - return rpc.SendTransaction(ctx, types.NewTx(&types.LegacyTx{})) + result := rpc.SendTransaction(ctx, types.NewTx(&types.LegacyTx{})) + return result.Error() }, }, {