Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jtw/initial in memory work confirmer #11625

Closed
wants to merge 84 commits into from

Conversation

poopoothegorilla
Copy link
Contributor

WIP

Comment on lines 336 to 338
if as.inprogress != nil {
return fmt.Errorf("find_next_unstarted_transaction_from_address: address %s is already busy with a transaction in progress", fromAddress)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is just a Find function, I don't think we need validations like this in the memory store. I feel like the responsibility to check this should be with the Broadcaster

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great point!

Comment on lines 518 to 521
// sort by tx_id ASC, gas_price DESC, gas_tip_cap DESC
sort.SliceStable(attempts, func(i, j int) bool {
return attempts[i].TxID < attempts[j].TxID
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks we only sort by tx ID here and don't include gas price and gas tip cap.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, can we add them here for now.
Later on, we need to look for different design where the generic TXM doesn't sort these. Instead the chain specific logic should do any sorting by chain-specific params like gas price etc.

wg.Add(1)
go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) {
for _, txID := range txIDs {
if err := as.MoveConfirmedMissingReceiptToUnconfirmed(txID); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method is assuming some logic from the method is function is called from. I noticed UpdateTxsUnconfirmed is currently only called in the context of moving txs from TxConfirmedMissingReceipt to TxUnconfirmed but the memory store function to mark tx as unconfirmed only cares about txIDs. To maintain existing behavior, think we should apply this change across all tx states as long as their IDs match.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, so the caller here just want to move from ConfirmedMissingReceipt --> Unconfirmed.
However the underlying method is generic unnecessarily.

I would prefer to just rename the TxStore.UpdateTxsUnconfirmed() method to TxStore. MoveConfirmedMissingReceiptToUnconfirmed().

Let the actual SQL query be untouched, but if nobody wants a generic enough implementation, lets not worry about adding it here. The code logic here seems sufficient to only look inside this state.

Comment on lines 592 to 595
if tx.TxAttempts != nil && len(tx.TxAttempts) > 0 {
attempt := tx.TxAttempts[0]
return attempt.State != txmgrtypes.TxAttemptInsufficientFunds
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't a tx have attempts with different states? In which case, I think we'd have to traverse the whole attempts list for at least one to match the TxAttemptInsufficientFunds state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch will add 👍

Comment on lines 269 to 271
if filter(tx) {
txAttempts = append(txAttempts, tx.TxAttempts...)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might need your context on all of the methods that use this. At least for FindTxAttemptsRequiringReceiptFetch, I believe the query would only return the specific attempts whose state does not equal insufficient_eth. Looks like we're returning all of the attempts for a tx here. Maybe the filter should be on TxAttempts instead of Txs for this method so we could narrow down specific attempts to return?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great point added an addition filter for filtering attempts as well


if tx.PipelineTaskRunID.Valid && tx.SignalCallback && !tx.CallbackCompleted &&
tx.TxAttempts[0].Receipts[0].GetBlockNumber() != nil &&
big.NewInt(blockNum-int64(tx.MinConfirmations.Uint32)).Cmp(tx.TxAttempts[0].Receipts[0].GetBlockNumber()) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'd also want to allow these to be equal

Suggested change
big.NewInt(blockNum-int64(tx.MinConfirmations.Uint32)).Cmp(tx.TxAttempts[0].Receipts[0].GetBlockNumber()) > 0 {
big.NewInt(blockNum-int64(tx.MinConfirmations.Uint32)).Cmp(tx.TxAttempts[0].Receipts[0].GetBlockNumber()) >= 0 {

if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 {
return false
}
attempt := tx.TxAttempts[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we traverse all of the attempts to find the one that has receipts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, any attempt could have the receipt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great catch

Comment on lines +883 to +885
filterFn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool {
return true
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how to improve this with the current setup but feel like we shouldn't have to do this. For future contributors, I feel like having to pass a filter even when it's not needed could cause some confusion.

if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 {
return false
}
attempt := tx.TxAttempts[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another place I think we should be traversing all attempts for the receipt

return false
}

return tx.TxAttempts[0].ID == attempt.ID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just adding a note to track but think we should traverse attempts here too

if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 {
return false
}
attempt := tx.TxAttempts[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just marking another attempt filter

Copy link
Contributor

@prashantkumar1982 prashantkumar1982 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great!
Added some comments. I might do another detailed pass of inmemory_store.go later, since that is the most sensitive part.

@@ -487,7 +486,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTran
if txRequest.IdempotencyKey != nil {
var existingTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
existingTx, err = b.txStore.FindTxWithIdempotencyKey(ctx, *txRequest.IdempotencyKey, b.chainID)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
if err != nil && !errors.Is(err, ErrTxnNotFound) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this error type changed in this PR. Was this a bug previously?
Do we have a unit-test to cover this case? If not, can you please add it, for coverage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was trying to remove the sql dependency slowly, but will take it out of this PR and do it in a separate attempt

)

// TxPriorityQueue is a priority queue of transactions prioritized by creation time. The oldest transaction is at the front of the queue.
type TxPriorityQueue[
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's many functions in this, to just satisfy the heap package's use.
Seems confusing for the caller to know which method to call.

Is it possible to create a different struct for callers, and let that contain an inner struct for our inner implementation for heap package, and hide it from callers?
That will just make things easier to read and understand for the caller.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in #12121

@@ -709,7 +709,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next
defer cancel()
etx := &txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{}
if err := eb.txStore.FindNextUnstartedTransactionFromAddress(ctx, etx, fromAddress, eb.chainID); err != nil {
if errors.Is(err, sql.ErrNoRows) {
if errors.Is(err, ErrTxnNotFound) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be sure, please ensure there's a unit-test for this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the unit test https://github.com/smartcontractkit/chainlink/pull/11625/files#diff-e740fd5bcf523cd12057fd17ccdcf9d65101d1c247d0b4b447bd4340e5f4e890L1266 for that method.

I am just going to break this into a separate PR which removes the PG dependancy for the txm

return txAttempts
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) fetchTxAttemptsFromStorage(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why not just rename to fetchTxAttempts?
The FromStorage seems confusing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weird thought i had done that before. thanks for reminding me

delete(as.allTransactions, txID)
delete(as.unconfirmed, txID)
delete(as.confirmedMissingReceipt, txID)
delete(as.allTransactions, txID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
This needs to be as. unstarted

wg.Add(1)
go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) {
for _, txID := range txIDs {
if err := as.MoveConfirmedMissingReceiptToUnconfirmed(txID); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, so the caller here just want to move from ConfirmedMissingReceipt --> Unconfirmed.
However the underlying method is generic unnecessarily.

I would prefer to just rename the TxStore.UpdateTxsUnconfirmed() method to TxStore. MoveConfirmedMissingReceiptToUnconfirmed().

Let the actual SQL query be untouched, but if nobody wants a generic enough implementation, lets not worry about adding it here. The code logic here seems sufficient to only look inside this state.

wg := sync.WaitGroup{}
for _, as := range ms.addressStates {
wg.Add(1)
go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Up to you, but just pointing out that running a sequential loop over in-memory code would also be quick enough, so as to not require multiple threads.
It is ok to leave it as it is though.

for _, as := range ms.addressStates {
attempts = append(attempts, as.FetchTxAttempts(states, filterFn)...)
}
// sort by sequence ASC, gas_price DESC, gas_tip_cap DESC
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to also sort by gas price and gas tip cap

return err
}

// Update in memory store
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these changes to in-memory store same as the persistent store?
In the EvmTxStore, I see many things being done for this.
We update fields in attempt, we change Tx state,

if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 {
return false
}
attempt := tx.TxAttempts[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, any attempt could have the receipt

@cl-sonarqube-production
Copy link

Quality Gate failed Quality Gate failed

Failed conditions
4.4% Duplication on New Code (required ≤ 3%)
7 New Major Issues (required ≤ 5)
C Security Rating on New Code (required ≥ A)
C Reliability Rating on New Code (required ≥ A)

See analysis details on SonarQube

Catch issues before they fail your Quality Gate with our IDE extension SonarLint SonarLint

}

// SaveReplacementInProgressAttempt saves a replacement attempt for a transaction that is in_progress.
func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveReplacementInProgressAttempt(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think alot of this modification logic might make sense in the address store... not in this component

Copy link
Contributor

This PR is stale because it has been open 60 days with no activity. Remove stale label or comment or this will be closed in 7 days.

@github-actions github-actions bot added the Stale label Apr 28, 2024
@github-actions github-actions bot closed this May 6, 2024
@amit-momin amit-momin reopened this Jun 26, 2024
@amit-momin amit-momin removed the Stale label Jun 26, 2024
Copy link
Contributor

This PR is stale because it has been open 60 days with no activity. Remove stale label or comment or this will be closed in 7 days.

@github-actions github-actions bot added the Stale label Aug 30, 2024
@github-actions github-actions bot closed this Sep 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants