Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BCI-2525: check all responses on transaction submission #11599

Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
41cf0be
sendTx: signal success if one of the nodes accepted transaction
dhaidashenko Dec 18, 2023
7bdce25
fix logger
dhaidashenko Dec 18, 2023
7274949
fix merge
dhaidashenko Dec 18, 2023
6c5915a
fix race
dhaidashenko Dec 18, 2023
86104a2
fixed multinode tests race
dhaidashenko Dec 19, 2023
ed88f18
Merge branch 'develop' into feature/BCI-2525-send-transaction-check-a…
dhaidashenko Dec 19, 2023
6768c7e
improve test coverage
dhaidashenko Dec 19, 2023
fc41b06
Merge branch 'develop' into feature/BCI-2525-send-transaction-check-a…
dhaidashenko Dec 19, 2023
e2767df
WIP: wait for 70% of nodes to reply on send TX
dhaidashenko Dec 21, 2023
b27afc9
Merge branch 'feature/BCI-2525-send-transaction-check-all-responses' …
dhaidashenko Dec 21, 2023
5db54d7
tests
dhaidashenko Dec 22, 2023
169a90b
Merge branch 'develop' into feature/BCI-2525-send-transaction-check-a…
dhaidashenko Dec 22, 2023
6b0a7af
Report invariant violation via prom metrics
dhaidashenko Jan 8, 2024
16dd2b1
Merge branch 'feature/BCI-2525-send-transaction-check-all-responses' …
dhaidashenko Jan 8, 2024
70c7c2c
fixed sendTx tests
dhaidashenko Jan 8, 2024
ab5fa6c
address comments
dhaidashenko Jan 8, 2024
279f5f0
polish PR
dhaidashenko Jan 9, 2024
96f131e
Merge branch 'develop' into feature/BCI-2525-send-transaction-check-a…
dhaidashenko Jan 9, 2024
d5d6ce2
Describe implementation details in the comment to SendTransaction
dhaidashenko Jan 11, 2024
d4268af
nit fixes
dhaidashenko Jan 11, 2024
d43dc50
more fixes
dhaidashenko Jan 11, 2024
5ed9ab7
use softTimeOut default value
dhaidashenko Jan 11, 2024
6369972
nit fix
dhaidashenko Jan 15, 2024
8256c5d
ensure all goroutines are done before Close
dhaidashenko Jan 16, 2024
e376873
refactor broadcast
dhaidashenko Jan 17, 2024
9a85e97
use sendTxSuccessfulCodes slice to identify if result is successful
dhaidashenko Feb 8, 2024
46f0fd3
Merge branch 'develop' into feature/BCI-2525-send-transaction-check-a…
dhaidashenko Feb 8, 2024
0ee678f
Merge branch 'develop' into feature/BCI-2525-send-transaction-check-a…
prashantkumar1982 Feb 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 41 additions & 19 deletions common/client/multi_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type multiNode[
services.StateMachine
nodes []Node[CHAIN_ID, HEAD, RPC_CLIENT]
sendonlys []SendOnlyNode[CHAIN_ID, RPC_CLIENT]
allNodes []SendOnlyNode[CHAIN_ID, RPC_CLIENT]
chainID CHAIN_ID
chainType config.ChainType
lggr logger.SugaredLogger
Expand Down Expand Up @@ -131,6 +132,11 @@ func NewMultiNode[
) MultiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT] {
nodeSelector := newNodeSelector(selectionMode, nodes)

var all []SendOnlyNode[CHAIN_ID, RPC_CLIENT]
for _, n := range nodes {
all = append(all, n)
}
all = append(all, sendonlys...)
// Prometheus' default interval is 15s, set this to under 7.5s to avoid
// aliasing (see: https://en.wikipedia.org/wiki/Nyquist_frequency)
const reportInterval = 6500 * time.Millisecond
Expand All @@ -148,6 +154,7 @@ func NewMultiNode[
chainFamily: chainFamily,
sendOnlyErrorParser: sendOnlyErrorParser,
reportInterval: reportInterval,
allNodes: all,
}

c.lggr.Debugf("The MultiNode is configured to use NodeSelectionMode: %s", selectionMode)
Expand Down Expand Up @@ -546,21 +553,19 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
}

func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) SendTransaction(ctx context.Context, tx TX) error {
main, nodeError := c.selectNode()
var all []SendOnlyNode[CHAIN_ID, RPC_CLIENT]
for _, n := range c.nodes {
all = append(all, n)
if len(c.allNodes) == 0 {
return ErroringNodeError
}
all = append(all, c.sendonlys...)
for _, n := range all {
if n == main {
// main node is used at the end for the return value
continue
}
// Parallel send to all other nodes with ignored return value
// Async - we do not want to block the main thread with secondary nodes
// in case they are unreliable/slow.
// It is purely a "best effort" send.
result := make(chan error, 1)
// Even if we fail to select a main node, try sending the tx and notify the caller of failure.
// It gives us a chance that tx will be applied while we are trying to recover.
main, nodeError := c.selectNode()
for _, n := range c.allNodes {
// Parallel send to all nodes.
// Release the caller on the success of any node or on the error from the main.
// This way, we:
// * prefer potentially the healthiest node to report the error;
// * improve performance for cases when the main node is degraded and would eventually return time out.
// Resource is not unbounded because the default context has a timeout.
ok := c.IfNotStopped(func() {
// Must wrap inside IfNotStopped to avoid waitgroup racing with Close
Expand All @@ -569,21 +574,38 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
defer c.wg.Done()

txErr := n.RPC().SendTransaction(ctx, tx)
c.lggr.Debugw("Sendonly node sent transaction", "name", n.String(), "tx", tx, "err", txErr)
sendOnlyError := c.sendOnlyErrorParser(txErr)
if sendOnlyError != Successful {
c.lggr.Debugw("Node sent transaction", "name", n.String(), "tx", tx, "err", txErr)
isSuccess := c.sendOnlyErrorParser(txErr) == Successful
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The name sendOnlyErrorParser is misleading.
Can you please rename it to sendTxErrorParser.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's important to keep the sendOnly prefix as it emphasizes the distinction between the standard ClassifySendError function and the ClassifySendOnlyError

if !isSuccess {
c.lggr.Warnw("RPC returned error", "name", n.String(), "tx", tx, "err", txErr)
}

if isSuccess || n == main {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like that we still will return error as soon as main node says so, without waiting for other nodes.
Could we wait till other nodes have had a chance to return success?
I don't think its a huge concern if that takes a long time. Or, maybe we could use some heuristics, like, if main node has errored, then atleast wait till 50% of nodes have also errored, before returning that error code back.

select {
case result <- txErr:
default:
}
}

}(n)
})
if !ok {
c.lggr.Debug("Cannot send transaction on sendonly node; MultiNode is stopped", "node", n.String())
c.lggr.Debugw("Cannot send transaction on node; MultiNode is stopped", "node", n.String())
return fmt.Errorf("MulltiNode is stopped: %w", context.Canceled)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch!

}
}

if nodeError != nil {
return nodeError
}
return main.RPC().SendTransaction(ctx, tx)

select {
case err := <-result:
return err
case <-ctx.Done():
return ctx.Err()
}

}

func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) SequenceAt(ctx context.Context, account ADDR, blockNumber *big.Int) (s SEQ, err error) {
Expand Down
188 changes: 131 additions & 57 deletions common/client/multi_node_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package client

import (
"context"
"errors"
"fmt"
big "math/big"
"math/big"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -559,77 +560,150 @@ func TestMultiNode_BatchCallContextAll(t *testing.T) {

func TestMultiNode_SendTransaction(t *testing.T) {
t.Parallel()
t.Run("Fails if failed to select active node", func(t *testing.T) {
chainID := types.RandomID()
mn := newTestMultiNode(t, multiNodeOpts{
sendOnlyErrorParser := func(err error) SendTxReturnCode {
if err != nil {
return Fatal
}

return Successful
}
newNode := func(t *testing.T, txErr error, sendTxRun func(args mock.Arguments)) *mockNode[types.ID, types.Head[Hashable], multiNodeRPCClient] {
rpc := newMultiNodeRPCClient(t)
rpc.On("SendTransaction", mock.Anything, mock.Anything).Return(txErr).Run(sendTxRun).Maybe()
node := newMockNode[types.ID, types.Head[Hashable], multiNodeRPCClient](t)
node.On("String").Return("node name").Maybe()
node.On("RPC").Return(rpc).Maybe()
node.On("Close").Return(nil).Once()
return node
}
newStartedMultiNode := func(t *testing.T, opts multiNodeOpts) testMultiNode {
mn := newTestMultiNode(t, opts)
err := mn.StartOnce("startedTestMultiNode", func() error { return nil })
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, mn.Close())
})
return mn
}
newNodeSelector := func(node Node[types.ID, types.Head[Hashable], multiNodeRPCClient]) *mockNodeSelector[types.ID, types.Head[Hashable], multiNodeRPCClient] {
ns := newMockNodeSelector[types.ID, types.Head[Hashable], multiNodeRPCClient](t)
ns.On("Select").Return(node).Once()
ns.On("Name").Return("MockedNodeSelector").Maybe()
return ns
}
t.Run("Fails if there is no nodes available", func(t *testing.T) {
mn := newStartedMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeRoundRobin,
chainID: chainID,
chainID: types.RandomID(),
})
nodeSelector := newMockNodeSelector[types.ID, types.Head[Hashable], multiNodeRPCClient](t)
nodeSelector.On("Select").Return(nil).Once()
nodeSelector.On("Name").Return("MockedNodeSelector").Once()
mn.nodeSelector = nodeSelector
err := mn.SendTransaction(tests.Context(t), nil)
assert.EqualError(t, err, ErroringNodeError.Error())
})
t.Run("Fails if failed to select active node, even if managed to send tx", func(t *testing.T) {
chainID := types.RandomID()
node := newNode(t, nil, nil)
mn := newStartedMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeRoundRobin,
chainID: chainID,
nodes: []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]{node},
sendOnlyErrorParser: sendOnlyErrorParser,
})

mn.nodeSelector = newNodeSelector(nil)
err := mn.SendTransaction(tests.Context(t), nil)
require.EqualError(t, err, ErroringNodeError.Error())
})
t.Run("Returns error if RPC call fails for active node", func(t *testing.T) {
t.Run("Returns error from main node, logs all", func(t *testing.T) {
chainID := types.RandomID()
rpc := newMultiNodeRPCClient(t)
expectedError := errors.New("rpc failed to do the batch call")
rpc.On("SendTransaction", mock.Anything, mock.Anything).Return(expectedError).Once()
node := newMockNode[types.ID, types.Head[Hashable], multiNodeRPCClient](t)
node.On("RPC").Return(rpc)
nodeSelector := newMockNodeSelector[types.ID, types.Head[Hashable], multiNodeRPCClient](t)
nodeSelector.On("Select").Return(node).Once()
mn := newTestMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeRoundRobin,
chainID: chainID,
unexpectedError := errors.New("error returned by non main node must be ignored")
mainNode := newNode(t, expectedError, nil)
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
mn := newStartedMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeRoundRobin,
chainID: chainID,
nodes: []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]{mainNode, newNode(t, unexpectedError, nil)},
sendonlys: []SendOnlyNode[types.ID, multiNodeRPCClient]{newNode(t, unexpectedError, nil)},
sendOnlyErrorParser: sendOnlyErrorParser,
logger: lggr,
})
mn.nodeSelector = nodeSelector
mn.nodeSelector = newNodeSelector(mainNode)
err := mn.SendTransaction(tests.Context(t), nil)
require.EqualError(t, err, expectedError.Error())
tests.AssertLogCountEventually(t, observedLogs, "Node sent transaction", 3)
tests.AssertLogCountEventually(t, observedLogs, "RPC returned error", 3)
})
t.Run("Returns result of main node and logs secondary nodes results", func(t *testing.T) {
t.Run("Main node submission is successful", func(t *testing.T) {
// setup RPCs
failedRPC := newMultiNodeRPCClient(t)
failedRPC.On("SendTransaction", mock.Anything, mock.Anything).
Return(errors.New("rpc failed to do the batch call")).Once()
okRPC := newMultiNodeRPCClient(t)
okRPC.On("SendTransaction", mock.Anything, mock.Anything).Return(nil).Twice()

// setup ok and failed auxiliary nodes
okNode := newMockSendOnlyNode[types.ID, multiNodeRPCClient](t)
okNode.On("RPC").Return(okRPC).Once()
okNode.On("String").Return("okNode")
failedNode := newMockNode[types.ID, types.Head[Hashable], multiNodeRPCClient](t)
failedNode.On("RPC").Return(failedRPC).Once()
failedNode.On("String").Return("failedNode")

// setup main node
mainNode := newMockNode[types.ID, types.Head[Hashable], multiNodeRPCClient](t)
mainNode.On("RPC").Return(okRPC)
nodeSelector := newMockNodeSelector[types.ID, types.Head[Hashable], multiNodeRPCClient](t)
nodeSelector.On("Select").Return(mainNode).Once()
failedNode := newNode(t, errors.New("rpc failed to send transaction"), nil)
failedSendOnly := newNode(t, errors.New("send only rpc failed to send transaction"), nil)
mainNode := newNode(t, nil, nil)
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
mn := newTestMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeRoundRobin,
chainID: types.RandomID(),
nodes: []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]{failedNode, mainNode},
sendonlys: []SendOnlyNode[types.ID, multiNodeRPCClient]{okNode},
logger: lggr,
sendOnlyErrorParser: func(err error) SendTxReturnCode {
if err != nil {
return Fatal
}

return Successful
},
mn := newStartedMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeRoundRobin,
chainID: types.RandomID(),
nodes: []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]{failedNode, mainNode},
sendonlys: []SendOnlyNode[types.ID, multiNodeRPCClient]{failedSendOnly},
logger: lggr,
sendOnlyErrorParser: sendOnlyErrorParser,
})
mn.nodeSelector = nodeSelector

mn.nodeSelector = newNodeSelector(mainNode)
err := mn.SendTransaction(tests.Context(t), nil)
require.NoError(t, err)
tests.AssertLogEventually(t, observedLogs, "Sendonly node sent transaction")
tests.AssertLogEventually(t, observedLogs, "RPC returned error")
tests.AssertLogCountEventually(t, observedLogs, "Node sent transaction", 3)
tests.AssertLogCountEventually(t, observedLogs, "RPC returned error", 2)
})
t.Run("Secondary node returns success before main node fails", func(t *testing.T) {
// setup RPCs
ctx, cancel := context.WithCancel(context.Background())
mainNode := newNode(t, errors.New("main node failure"), func(args mock.Arguments) {
// ensure that main node returns result after secondary succeeded
<-ctx.Done()
})
secondary := newNode(t, nil, nil)
secondaryFailed := newNode(t, errors.New("secondary node failure"), nil)
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
mn := newStartedMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeRoundRobin,
chainID: types.RandomID(),
nodes: []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]{mainNode, secondaryFailed},
sendonlys: []SendOnlyNode[types.ID, multiNodeRPCClient]{secondary},
logger: lggr,
sendOnlyErrorParser: sendOnlyErrorParser,
})
mn.nodeSelector = newNodeSelector(mainNode)
err := mn.SendTransaction(tests.Context(t), nil)
require.NoError(t, err)
cancel()
tests.AssertLogCountEventually(t, observedLogs, "Node sent transaction", 3)
tests.AssertLogCountEventually(t, observedLogs, "RPC returned error", 2)
})
t.Run("If there are several results to report, child goroutines are not stuck forever", func(t *testing.T) {
mainNode := newNode(t, nil, nil)
mn := newStartedMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeRoundRobin,
chainID: types.RandomID(),
nodes: []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]{newNode(t, nil, nil), mainNode},
sendonlys: []SendOnlyNode[types.ID, multiNodeRPCClient]{newNode(t, nil, nil)},
sendOnlyErrorParser: sendOnlyErrorParser,
})
mn.nodeSelector = newNodeSelector(mainNode)
err := mn.SendTransaction(tests.Context(t), nil)
require.NoError(t, err)
})
t.Run("Fails if already closed", func(t *testing.T) {
mainNode := newNode(t, nil, nil)
mn := newTestMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeRoundRobin,
chainID: types.RandomID(),
nodes: []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]{mainNode},
sendOnlyErrorParser: sendOnlyErrorParser,
})
mn.nodeSelector = newNodeSelector(mainNode)
err := mn.StartOnce("startedTestMultiNode", func() error { return nil })
require.NoError(t, err)
require.NoError(t, mn.Close())
err = mn.SendTransaction(tests.Context(t), nil)
require.EqualError(t, err, "MulltiNode is stopped: context canceled")
})
}
16 changes: 16 additions & 0 deletions core/chains/evm/client/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink-common/pkg/logger"

commonclient "github.com/smartcontractkit/chainlink/v2/common/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/label"
)
Expand Down Expand Up @@ -311,6 +312,17 @@ func (s *SendError) IsTimeout() bool {
return errors.Is(s.err, context.DeadlineExceeded)
}

// IsCanceled indicates if the error was caused by an context cancellation
func (s *SendError) IsCanceled() bool {
if s == nil {
return false
}
if s.err == nil {
return false
}
return errors.Is(s.err, context.Canceled)
}

func NewFatalSendError(e error) *SendError {
if e == nil {
return nil
Expand Down Expand Up @@ -475,6 +487,10 @@ func ClassifySendError(err error, lggr logger.SugaredLogger, tx *types.Transacti
lggr.Errorw("timeout while sending transaction %x", tx.Hash(), "err", sendError, "etx", tx)
return commonclient.Retryable
}
if sendError.IsCanceled() {
lggr.Errorw("context was canceled while sending transaction %x", tx.Hash(), "err", sendError, "etx", tx)
return commonclient.Retryable
}
if sendError.IsTxFeeExceedsCap() {
lggr.Criticalw(fmt.Sprintf("Sending transaction failed: %s", label.RPCTxFeeCapConfiguredIncorrectlyWarning),
"etx", tx,
Expand Down
Loading