-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Txmv2 #15467
base: develop
Are you sure you want to change the base?
Conversation
return fmt.Sprintf(`{txID:%d, IdempotencyKey:%v, ChainID:%v, Nonce:%d, FromAddress:%v, ToAddress:%v, Value:%v, `+ | ||
`Data:%v, SpecifiedGasLimit:%d, CreatedAt:%v, InitialBroadcastAt:%v, LastBroadcastAt:%v, State:%v, IsPurgeable:%v, AttemptCount:%d, `+ | ||
`Meta:%v, Subject:%v}`, | ||
t.ID, *t.IdempotencyKey, t.ChainID, t.Nonce, t.FromAddress, t.ToAddress, t.Value, t.Data, t.SpecifiedGasLimit, t.CreatedAt, t.InitialBroadcastAt, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a risk if an IdempotencyKey
wasn't provided?
core/chains/evm/txm/txm.go
Outdated
} | ||
|
||
// Optimistically send up to 1/3 of the maxInFlightTransactions. After that threshold, broadcast more cautiously | ||
// by checking the pending nonce so no more than maxInFlightTransactions/3 can get stuck simultaneously i.e. due |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really feedback but curious why maxInFlightTransactions/3
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A third of the maxInFlightTransactions
seemed like a reasonable number of transactions to be potentially stuck before "paying" for an RPC request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also noticed maxInFlightTransactions
is a static number now. Will it wire up to the config in a later iteration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, that was on purpose. I've never seen this value change, so I want to remove the config and only add it if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, 1/3 of maxInFlightTransactions
is also a static number. Could we set an explicit limit instead? Think it might be clearer if we say we broadcast more carefully after 5 transactions.
var nonce uint64 | ||
// Start | ||
client.On("PendingNonceAt", mock.Anything, address).Return(nonce, nil).Once() | ||
servicetest.Run(t, txm) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we have a txm.Trigger(address)
call here?
core/chains/evm/txm/txm_test.go
Outdated
assert.Len(t, tx.Attempts, 1) | ||
var zeroTime time.Time | ||
assert.Greater(t, tx.LastBroadcastAt, zeroTime) | ||
assert.Greater(t, tx.Attempts[0].BroadcastAt, zeroTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also check that it properly updated the tx.InitialBroadcastAt
also?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need some more deeper discussions around some things I mentioned in comments.
I still haven't managed to review the complete core logic of broadcaster/backfiller.
core/chains/evm/txm/txm.go
Outdated
func (t *Txm) startAddress(address common.Address) error { | ||
triggerCh := make(chan struct{}, 1) | ||
t.triggerCh[address] = triggerCh | ||
pendingNonce, err := t.client.PendingNonceAt(context.TODO(), address) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think previously we've been bitten by making RPC calls during txm initialization.
What if this call times out? This will block txm from starting, and likely also block core node from starting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest, keep the nonce for the address empty at startup.
And during regular use, if nonce is empty, try to fetch it from rpc.
} | ||
|
||
func (t *Txm) startAddress(address common.Address) error { | ||
triggerCh := make(chan struct{}, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this seems wrong. You are creating a map of just 1 element, and setting current address in it.
This variable will be reset everytime startAddress(addr) is called with a new address.
Also, what are you using the triggerCh for?
Seems like it is a no-op in the code logic.
} | ||
} | ||
|
||
func (o *Orchestrator[BLOCK_HASH, HEAD]) CreateTransaction(ctx context.Context, request txmgrtypes.TxRequest[common.Address, common.Hash]) (tx txmgrtypes.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this function not exactly similar to the CreateTransaction() in txmgr.go?
That had some more things like:
- checking for checkEnabled().
- txStore.CheckTxQueueCapacity()
- txRequest.Strategy.PruneQueue
Also, I didn't fully understand why we need a wrapped Tx, and extra logic for pipelineTaskRunID and meta json marshaling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If possible, could the common code between txmv1 and v2 here be extracted to util function and shared between both? That avoids accidental code differences
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, we can't do that because the two methods use two different underlying tx structures. TXM has generics whereas TXMv2 doesn't. We would have to make changes to the existing TXM to do that, which is something we don't want to do. The difference in the pruning mechanism comes from the fact that as of now, the InMemory layer TXMv2 uses already has an embedded pruning mechanism.
@@ -23,6 +23,7 @@ const ( | |||
ChainZkEvm ChainType = "zkevm" | |||
ChainZkSync ChainType = "zksync" | |||
ChainZircuit ChainType = "zircuit" | |||
ChainDualBroadcast ChainType = "dualBroadcast" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this dual broadcast type chain? Is there more info on this new chain type?
Also, just curious, why should this be in the TXMv2 PR?
|
||
chainID := client.ConfiguredChainID() | ||
|
||
var stuckTxDetector txm.StuckTxDetector |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason you have all the extra code around StuckTxDetector to check if it is enabled or not?
Just adds more complexity here, and in the txm.
In the Txmv1, all that logic is inside the StuckTxDetector, and that makes the txm code cleaner.
Can't we do the same pattern here too?
SignMessage(ctx context.Context, address common.Address, message []byte) ([]byte, error) | ||
} | ||
|
||
type DualBroadcastClient struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there more info on this feature?
Hard to understand the code without knowing context on this.
Also, it seems worrying to me that we are writing a new raw client here, and not trying to reuse our MultiNode/Node code. Even if this is a separate single RPC, why not wrap the Node around this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking more.
Could we instead not create a new instance of EvmClient, with a single node that uses the custom url?
That way, we don't need to reimplement the PendingNonceAt() and SendTransaction() here?
You may be able to still keep the DualBroadcastClient as a wrapper over old client, and the new client, and route to whichever one you need to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a lot to unpack here. The problem is that the custom URL is public and unreliable in terms of responses (perhaps it doesn't even support all calls), so it doesn't seem optimal to create a new multinode instance for it. We'll not be getting accurate indications, and we still only have a single RPC endpoint. To make matters worse, multinode is RPC intensive and I don't want us to be rate-limited at any point for no reason.
We already have custom TXM clients for the existing TXM to wrap some necessary behavior, I think this is no different than that. Let me know if there are any benefits I'm missing.
} | ||
|
||
attemptBuilder := txm.NewAttemptBuilder(chainID, fCfg.PriceMax(), estimator, keyStore) | ||
inMemoryStoreManager := storage.NewInMemoryStoreManager(lggr, chainID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we are losing our persistence layer entirely in TXMv2?
This seems to me a huge regression, one which we won't be able to explain to others why we decided to do so.
SignMessage(ctx context.Context, address common.Address, message []byte) ([]byte, error) | ||
} | ||
|
||
type DualBroadcastClient struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking more.
Could we instead not create a new instance of EvmClient, with a single node that uses the custom url?
That way, we don't need to reimplement the PendingNonceAt() and SendTransaction() here?
You may be able to still keep the DualBroadcastClient as a wrapper over old client, and the new client, and route to whichever one you need to.
*ethclient.Client | ||
} | ||
|
||
func NewGethClient(client *ethclient.Client) *GethClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this for?
Don't see this used anywhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This decouples the TXM from the Core node and allows the users to utilize a standard Geth client RPC to use the TXMv2.
}) | ||
} | ||
|
||
func (o *Orchestrator[BLOCK_HASH, HEAD]) Trigger(addr common.Address) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function might not be needed.
I think this can be removed from the TxManager interface too, and from Txmv1 too. It likely is a legacy leftover which we forgot to clean up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How so? This tells the TXM to look for unstarted transactions and not wait for the 30s interval.
} | ||
} | ||
|
||
func (o *Orchestrator[BLOCK_HASH, HEAD]) CreateTransaction(ctx context.Context, request txmgrtypes.TxRequest[common.Address, common.Hash]) (tx txmgrtypes.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If possible, could the common code between txmv1 and v2 here be extracted to util function and shared between both? That avoids accidental code differences
@@ -0,0 +1,203 @@ | |||
package types |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see many types here are just a copy from existing ones in TXMv1.
Why did we require them again, and not reuse?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of reasons include:
- Cleaning up: some things are not needed anymore
- Removing generics: we wanted to degeneralize the TXM anyway so this seems the way to do so
- Cyclic references: I wasn't sure if I would hit any cyclic reference errors so I tried to avoid having dependencies on the existing TXM
switch apiResponse.Status { | ||
case APIStatusPending, APIStatusIncluded: | ||
return false, nil | ||
case APIStatusFailed, APIStatusCancelled: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a Tx has 2 attempts, 1 was included, and thus the other failed, can't this logic give incorrect error?
We likely need to return true if any of the attempt got included right?
func (t *Txm) pollForPendingNonce(ctx context.Context, address common.Address) (pendingNonce uint64, err error) { | ||
ctxWithTimeout, cancel := context.WithTimeout(ctx, pendingNonceDefaultTimeout) | ||
defer cancel() | ||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will block the node startup if the RPC isn't returning success for some reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can move this inside broadcastLoop
core/chains/evm/txm/txm.go
Outdated
} | ||
if pendingNonce <= *tx.Nonce { | ||
t.lggr.Debugf("Pending nonce for txID: %v didn't increase. PendingNonce: %d, TxNonce: %d", tx.ID, pendingNonce, *tx.Nonce) | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why return nil?
This means there was a failure, which needs to be somehow addressed right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any issues will be addressed by the backfill loop. Here we return because we don't want to update the broadcast times and the issue has already been printed above. If we want custom handling for this we need to implement the HandleError
above.
} | ||
|
||
if tx.LastBroadcastAt == nil || time.Since(*tx.LastBroadcastAt) > (t.config.BlockTime*time.Duration(t.config.RetryBlockThreshold)) { | ||
// TODO: add optional graceful bumping strategy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this version has no gas bumping as of now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the default strategy should be to rely on the estimator and retry. Custom strategies should be marked as optional and only added if the product wants to.
core/chains/evm/txm/txm.go
Outdated
return err | ||
} | ||
if pendingNonce <= *tx.Nonce { | ||
t.lggr.Debugf("Pending nonce for txID: %v didn't increase. PendingNonce: %d, TxNonce: %d", tx.ID, pendingNonce, *tx.Nonce) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So here's the part where we don't look at the error reason right?
What happens if:
- RPC says funds not enough
- RPC says gas limit too low
- RPC says Tx is invalid for censored
- RPC says Tx causes Overflow
This was my main concern for not reading the error responses. How do we decide further action if we don't know why the tx failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the concern around logging or taking action? All of these errors are being logged. If we want improved logging we can always implement the same error parsing we have now. But in terms of functionality, pretty much every error that doesn't get fixed via a retry will make the node get stuck no matter if it's TXM v1 or v2. With TXMv2, after a few retries, you'll get an error message and notice that the node got stuck. To me, it seems unreasonable to have a dependency on something that is slowing integration time and it's very unreliable just so we can potentially notice an issue a few txs earlier for a node that eventually will get stuck anyway.
|
||
## Configs | ||
- `EIP1559`: enables EIP-1559 mode. This means the transaction manager will create and broadcast Dynamic attempts. Set this to false to broadcast Legacy transactions. | ||
- `BlockTime`: controls the interval of the backfill loop. This dictates how frequently the transaction manager will check for confirmed transactions, rebroadcast stuck ones, and fill any nonce gaps. Transactions are getting confirmed only during new blocks so it's best if you set this to a value close to the block time. At least one RPC call is made during each BlockTime interval so the absolute minimum should be 2s. A small jitter is applied so the timeout won't be exactly the same each time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the absolute minimum should be 2s, is it maybe worth adding a config validation so we can fail fast? If we still want to leave it up to the user, maybe we should phrase this as a recommended minimum?
} | ||
} | ||
|
||
func (s *stuckTxDetector) timeBasedDetection(tx *types.Transaction) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would we be dropping the heuristic and custom detection logic we have for ZK chains or this just a temporary measure? If we are moving forward with this, I think we need to be careful with marking all inflight transactions as stuck here. If we broadcast multiple in quick succession, they can all have similar broadcast timestamps but just the lowest nonce one could be stuck holding up the rest of them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was hoping we could merge the logics because currently the Stuck Tx Detector handles multiple txs at the same time and ideally I want us to process one transaction at a time.
As for this logic, you bring up a good point. However, the broadcasted txs with a higher nonce are already in the mempool so if the current nonce gets filled, they should be picked right away, up to our current threshold. I do agree though that we can use some of the more complex mechanisms of the Heuristic method to improve this, or somehow combine them.
Flakeguard SummaryRan new or updated tests between View Flaky Detector Details | Compare Changes Found Flaky Tests ❌
ArtifactsFor detailed logs of the failed tests, please refer to the artifact failed-test-results-with-logs.json. |
No description provided.