-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BCI-2525: check all responses on transaction submission #11599
Changes from 12 commits
41cf0be
7bdce25
7274949
6c5915a
86104a2
ed88f18
6768c7e
fc41b06
e2767df
b27afc9
5db54d7
169a90b
6b0a7af
16dd2b1
70c7c2c
ab5fa6c
279f5f0
96f131e
d5d6ce2
d4268af
d43dc50
5ed9ab7
6369972
8256c5d
e376873
9a85e97
46f0fd3
0ee678f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ package client | |
import ( | ||
"context" | ||
"fmt" | ||
"math" | ||
"math/big" | ||
"sync" | ||
"time" | ||
|
@@ -94,14 +95,15 @@ type multiNode[ | |
leaseTicker *time.Ticker | ||
chainFamily string | ||
reportInterval time.Duration | ||
sendTxSoftTimeout time.Duration // defines max waiting time from first response til responses evaluation | ||
|
||
activeMu sync.RWMutex | ||
activeNode Node[CHAIN_ID, HEAD, RPC_CLIENT] | ||
|
||
chStop services.StopChan | ||
wg sync.WaitGroup | ||
|
||
sendOnlyErrorParser func(err error) SendTxReturnCode | ||
classifySendTxError func(tx TX, err error) SendTxReturnCode | ||
} | ||
|
||
func NewMultiNode[ | ||
|
@@ -127,13 +129,16 @@ func NewMultiNode[ | |
chainID CHAIN_ID, | ||
chainType config.ChainType, | ||
chainFamily string, | ||
sendOnlyErrorParser func(err error) SendTxReturnCode, | ||
classifySendTxError func(tx TX, err error) SendTxReturnCode, | ||
sendTxSoftTimeout time.Duration, | ||
) MultiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT] { | ||
nodeSelector := newNodeSelector(selectionMode, nodes) | ||
|
||
// Prometheus' default interval is 15s, set this to under 7.5s to avoid | ||
// aliasing (see: https://en.wikipedia.org/wiki/Nyquist_frequency) | ||
const reportInterval = 6500 * time.Millisecond | ||
if sendTxSoftTimeout == 0 { | ||
sendTxSoftTimeout = QueryTimeout / 2 | ||
} | ||
c := &multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]{ | ||
nodes: nodes, | ||
sendonlys: sendonlys, | ||
|
@@ -146,8 +151,9 @@ func NewMultiNode[ | |
chStop: make(services.StopChan), | ||
leaseDuration: leaseDuration, | ||
chainFamily: chainFamily, | ||
sendOnlyErrorParser: sendOnlyErrorParser, | ||
classifySendTxError: classifySendTxError, | ||
reportInterval: reportInterval, | ||
sendTxSoftTimeout: sendTxSoftTimeout, | ||
} | ||
|
||
c.lggr.Debugf("The MultiNode is configured to use NodeSelectionMode: %s", selectionMode) | ||
|
@@ -545,45 +551,154 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP | |
return n.RPC().SendEmptyTransaction(ctx, newTxAttempt, seq, gasLimit, fee, fromAddress) | ||
} | ||
|
||
type sendTxResult struct { | ||
Err error | ||
ResultCode SendTxReturnCode | ||
} | ||
|
||
// broadcastTxAsync - creates a goroutine that sends transaction to the node. Returns false, if MultiNode is Stopped | ||
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) broadcastTxAsync(ctx context.Context, | ||
n SendOnlyNode[CHAIN_ID, RPC_CLIENT], tx TX, txResults chan sendTxResult) bool { | ||
|
||
ok := c.IfNotStopped(func() { | ||
// Must wrap inside IfNotStopped to avoid waitgroup racing with Close | ||
c.wg.Add(1) | ||
go func() { | ||
defer c.wg.Done() | ||
|
||
txErr := n.RPC().SendTransaction(ctx, tx) | ||
c.lggr.Debugw("Node sent transaction", "name", n.String(), "tx", tx, "err", txErr) | ||
resultCode := c.classifySendTxError(tx, txErr) | ||
if resultCode != Successful && resultCode != TransactionAlreadyKnown { | ||
c.lggr.Warnw("RPC returned error", "name", n.String(), "tx", tx, "err", txErr) | ||
} | ||
|
||
// we expected txResults to have sufficient buffer, otherwise we are not interested in the response | ||
// and can drop it | ||
select { | ||
case txResults <- sendTxResult{Err: txErr, ResultCode: resultCode}: | ||
default: | ||
} | ||
|
||
}() | ||
}) | ||
if !ok { | ||
c.lggr.Debugw("Cannot broadcast transaction to node; MultiNode is stopped", "node", n.String()) | ||
} | ||
|
||
return ok | ||
} | ||
|
||
// collectTxResults - reads send transaction results from the provided channel and groups them by `SendTxReturnCode.` | ||
// We balance the waiting time and the number of collected results. Our target is replies from 70% of nodes, | ||
// but we won't wait longer than sendTxSoftTimeout since the first reply to avoid waiting | ||
// for a timeout from slow/unhealthy nodes. | ||
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) collectTxResults(ctx context.Context, tx TX, txResults <-chan sendTxResult) (map[SendTxReturnCode][]error, error) { | ||
const quorum = 0.7 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How did we come up with this number? In the majority of cases, we use at most 2-3 primary nodes. Usually, it's either 1 or 2. Effectively that means that you will either have to wait for all of them (not a quorum) or wait for the timeout. Seems like a good solution in theory, but what is the actual problem we are trying to solve here? Sending to a primary node feels like an eligibility operation. It might be better to wait for a single success instead, even if it's an out of sync node (although selectNode already solves that to some extend). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Good point. Given how we handle collected results it might be good idea to wait for first success, unblock the caller and do sanity checks of the rest of the responses in the background.
I thought that we are not adding more complexity as we are reusing |
||
requiredResults := int(math.Ceil(float64(len(c.nodes)) * quorum)) | ||
errorsByCode := map[SendTxReturnCode][]error{} | ||
var softTimeoutChan <-chan time.Time | ||
var resultsCount int | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
c.lggr.Debugw("Failed to collect of the results before context was done", "tx", tx, "errorsByCode", errorsByCode) | ||
return nil, ctx.Err() | ||
case result := <-txResults: | ||
errorsByCode[result.ResultCode] = append(errorsByCode[result.ResultCode], result.Err) | ||
resultsCount++ | ||
if resultsCount >= requiredResults { | ||
return errorsByCode, nil | ||
} | ||
case <-softTimeoutChan: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if we have 0 responses so far? This seems to be half of the value for a standard There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If there is 0 responses, softTimeoutChan is nil and won't fire. We initialize it when we receive first reply |
||
c.lggr.Debugw("Send Tx soft timeout expired - returning responses we've collected so far", "tx", tx, "resultsCount", resultsCount, "requiredResults", requiredResults) | ||
return errorsByCode, nil | ||
} | ||
|
||
if softTimeoutChan == nil { | ||
tm := time.NewTimer(c.sendTxSoftTimeout) | ||
softTimeoutChan = tm.C | ||
// we are fine with stopping timer at the end of function | ||
//nolint | ||
defer tm.Stop() | ||
} | ||
} | ||
} | ||
|
||
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) aggregateTxResults(tx TX, resultsByCode map[SendTxReturnCode][]error) error { | ||
severeErrors, hasSevereErrors := findFirstIn(resultsByCode, []SendTxReturnCode{Fatal, Underpriced, Unsupported, ExceedsMaxFee, FeeOutOfValidRange, Unknown}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The classification here seems a bit implicit. Maybe we need to declare this under the model file. |
||
successResults, hasSuccess := findFirstIn(resultsByCode, []SendTxReturnCode{Successful, TransactionAlreadyKnown}) | ||
if hasSuccess { | ||
// We assume that primary node would never report false positive result for a transaction. | ||
// Thus, if such case occurs it's probably due to misconfiguration or a bug and requires manual intervention. | ||
// Our best option in such situation is to return the error. | ||
if hasSevereErrors { | ||
const errMsg = "found contradictions in nodes replies on SendTransaction: got Successful and severe error" | ||
c.lggr.Criticalw(errMsg, "tx", tx, "resultsByCode", resultsByCode) | ||
err := fmt.Errorf(errMsg) | ||
c.SvcErrBuffer.Append(err) | ||
return severeErrors[0] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should return success here, since atleast 1 node has accepted our broadcasted Tx, and thus it can now be included onchain. Regarding manual intervention, you are already logging a Critical Log, that implies someone should be investigating it manually. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider supplementing with a prom metric (or adding prom metrics to this more generally) since that is more likely to get alerted/actioned than a critical log |
||
} | ||
|
||
// other errors are temporary - we are safe to return success | ||
return successResults[0] | ||
} | ||
|
||
if hasSevereErrors { | ||
return severeErrors[0] | ||
} | ||
|
||
// return temporary error | ||
for _, result := range resultsByCode { | ||
return result[0] | ||
} | ||
|
||
const errMsg = "invariant violation: expected at least one response on SendTransaction" | ||
c.lggr.Criticalw(errMsg, "tx", tx) | ||
err := fmt.Errorf(errMsg) | ||
c.SvcErrBuffer.Append(err) | ||
return err | ||
} | ||
|
||
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) SendTransaction(ctx context.Context, tx TX) error { | ||
main, nodeError := c.selectNode() | ||
var all []SendOnlyNode[CHAIN_ID, RPC_CLIENT] | ||
for _, n := range c.nodes { | ||
all = append(all, n) | ||
if len(c.nodes) == 0 { | ||
return ErroringNodeError | ||
} | ||
all = append(all, c.sendonlys...) | ||
for _, n := range all { | ||
if n == main { | ||
// main node is used at the end for the return value | ||
continue | ||
|
||
for _, n := range c.sendonlys { | ||
// fire-n-forget, as sendOnlyNodes can not be trusted with result reporting | ||
if !c.broadcastTxAsync(ctx, n, tx, nil) { | ||
return fmt.Errorf("aborted while broadcasting to sendonlys - multinode is stopped: %w", context.Canceled) | ||
} | ||
// Parallel send to all other nodes with ignored return value | ||
// Async - we do not want to block the main thread with secondary nodes | ||
// in case they are unreliable/slow. | ||
// It is purely a "best effort" send. | ||
// Resource is not unbounded because the default context has a timeout. | ||
ok := c.IfNotStopped(func() { | ||
// Must wrap inside IfNotStopped to avoid waitgroup racing with Close | ||
c.wg.Add(1) | ||
go func(n SendOnlyNode[CHAIN_ID, RPC_CLIENT]) { | ||
defer c.wg.Done() | ||
|
||
txErr := n.RPC().SendTransaction(ctx, tx) | ||
c.lggr.Debugw("Sendonly node sent transaction", "name", n.String(), "tx", tx, "err", txErr) | ||
sendOnlyError := c.sendOnlyErrorParser(txErr) | ||
if sendOnlyError != Successful { | ||
c.lggr.Warnw("RPC returned error", "name", n.String(), "tx", tx, "err", txErr) | ||
} | ||
}(n) | ||
}) | ||
if !ok { | ||
c.lggr.Debug("Cannot send transaction on sendonly node; MultiNode is stopped", "node", n.String()) | ||
} | ||
|
||
txResults := make(chan sendTxResult, len(c.nodes)) | ||
for _, n := range c.nodes { | ||
if !c.broadcastTxAsync(ctx, n, tx, txResults) { | ||
return fmt.Errorf("aborted while broadcasting to primary - multinode is stopped: %w", context.Canceled) | ||
} | ||
} | ||
if nodeError != nil { | ||
return nodeError | ||
|
||
// combine context and stop channel to ensure we are not waiting for responses that won't arrive because MultiNode was stopped | ||
ctx, cancel := c.chStop.Ctx(ctx) | ||
defer cancel() | ||
resultsByCode, err := c.collectTxResults(ctx, tx, txResults) | ||
if err != nil { | ||
return fmt.Errorf("failed to collect tx results: %w", err) | ||
} | ||
|
||
return c.aggregateTxResults(tx, resultsByCode) | ||
} | ||
|
||
// findFirstIn - returns first existing value for the slice of keys | ||
func findFirstIn[K comparable, V any](set map[K]V, keys []K) (V, bool) { | ||
for _, k := range keys { | ||
if v, ok := set[k]; ok { | ||
return v, true | ||
} | ||
} | ||
return main.RPC().SendTransaction(ctx, tx) | ||
var v V | ||
return v, false | ||
} | ||
|
||
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) SequenceAt(ctx context.Context, account ADDR, blockNumber *big.Int) (s SEQ, err error) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this supposed to be private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we only need it to ensure that local unit tests cover all values