Skip to content

Commit

Permalink
Merge pull request #11391 from smartcontractkit/merge-base271-to-auto…
Browse files Browse the repository at this point in the history
…mation

Merge base release/2.7.1 to automation branch
  • Loading branch information
infiloop2 authored Nov 27, 2023
2 parents 3b9585e + 902dcd1 commit 1c3c6da
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 90 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ jobs:
uses: actions/checkout@f43a0e5ff2bd294095638e18286ca9a3d1956744 # v3.6.0
with:
repository: smartcontractkit/chainlink-solana
ref: 23816fcf7d380a30c87b6d87e4fb0ca94419b259 # swtich back to this after the next solana release${{ needs.get_solana_sha.outputs.sha }}
ref: a28100b7f2954604a8ca2ff9ec7bccc6ec952953 # swtich back to this after the next solana release${{ needs.get_solana_sha.outputs.sha }}
- name: Build Test Image
if: needs.changes.outputs.src == 'true' && needs.solana-test-image-exists.outputs.exists == 'false'
uses: ./.github/actions/build-test-image
Expand Down Expand Up @@ -805,7 +805,7 @@ jobs:
uses: actions/checkout@f43a0e5ff2bd294095638e18286ca9a3d1956744 # v3.6.0
with:
repository: smartcontractkit/chainlink-solana
ref: ${{ needs.get_solana_sha.outputs.sha }}
ref: a28100b7f2954604a8ca2ff9ec7bccc6ec952953 # temporarily using specific commit for release branch ${{ needs.get_solana_sha.outputs.sha }}
- name: Run Setup
if: needs.changes.outputs.src == 'true'
uses: smartcontractkit/chainlink-github-actions/chainlink-testing-framework/setup-run-tests-environment@eccde1970eca69f079d3efb3409938a72ade8497 # v2.2.13
Expand Down
77 changes: 50 additions & 27 deletions common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -243,10 +244,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) star

eb.sequenceLock.Lock()
defer eb.sequenceLock.Unlock()
eb.nextSequenceMap, err = eb.loadNextSequenceMap(eb.enabledAddresses)
if err != nil {
return errors.Wrap(err, "Broadcaster: failed to load next sequence map")
}
eb.nextSequenceMap = eb.loadNextSequenceMap(eb.enabledAddresses)

eb.isStarted = true
return nil
Expand Down Expand Up @@ -326,30 +324,38 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) txIn
}

// Load the next sequence map using the tx table or on-chain (if not found in tx table)
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) loadNextSequenceMap(addresses []ADDR) (map[ADDR]SEQ, error) {
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) loadNextSequenceMap(addresses []ADDR) map[ADDR]SEQ {
ctx, cancel := eb.chStop.NewCtx()
defer cancel()

nextSequenceMap := make(map[ADDR]SEQ)
for _, address := range addresses {
// Get the highest sequence from the tx table
// Will need to be incremented since this sequence is already used
seq, err := eb.txStore.FindLatestSequence(ctx, address, eb.chainID)
if err != nil {
// Look for nonce on-chain if no tx found for address in TxStore or if error occurred
// Returns the nonce that should be used for the next transaction so no need to increment
seq, err = eb.client.PendingSequenceAt(ctx, address)
if err != nil {
return nil, errors.New("failed to retrieve next sequence from on-chain causing failure to load next sequence map on broadcaster startup")
}

seq, err := eb.getSequenceForAddr(ctx, address)
if err == nil {
nextSequenceMap[address] = seq
} else {
nextSequenceMap[address] = eb.generateNextSequence(seq)
}
}

return nextSequenceMap, nil
return nextSequenceMap
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) getSequenceForAddr(ctx context.Context, address ADDR) (seq SEQ, err error) {
// Get the highest sequence from the tx table
// Will need to be incremented since this sequence is already used
seq, err = eb.txStore.FindLatestSequence(ctx, address, eb.chainID)
if err == nil {
seq = eb.generateNextSequence(seq)
return seq, nil
}
// Look for nonce on-chain if no tx found for address in TxStore or if error occurred
// Returns the nonce that should be used for the next transaction so no need to increment
seq, err = eb.client.PendingSequenceAt(ctx, address)
if err == nil {
return seq, nil
}
eb.logger.Criticalw("failed to retrieve next sequence from on-chain for address: ", "address", address.String())
return seq, err

}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) newSequenceSyncBackoff() backoff.Backoff {
Expand Down Expand Up @@ -432,7 +438,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) moni
// syncSequence tries to sync the key sequence, retrying indefinitely until success or stop signal is sent
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SyncSequence(ctx context.Context, addr ADDR) {
sequenceSyncRetryBackoff := eb.newSequenceSyncBackoff()
localSequence, err := eb.GetNextSequence(addr)
localSequence, err := eb.GetNextSequence(ctx, addr)
// Address not found in map so skip sync
if err != nil {
eb.logger.Criticalw("Failed to retrieve local next sequence for address", "address", addr.String(), "err", err)
Expand Down Expand Up @@ -646,7 +652,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
observeTimeUntilBroadcast(eb.chainID, etx.CreatedAt, time.Now())
// Check if from_address exists in map to ensure it is valid before broadcasting
var sequence SEQ
sequence, err = eb.GetNextSequence(etx.FromAddress)
sequence, err = eb.GetNextSequence(ctx, etx.FromAddress)
if err != nil {
return err, true
}
Expand Down Expand Up @@ -704,7 +710,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand

// Check if from_address exists in map to ensure it is valid before broadcasting
var sequence SEQ
sequence, err = eb.GetNextSequence(etx.FromAddress)
sequence, err = eb.GetNextSequence(ctx, etx.FromAddress)
if err != nil {
return err, true
}
Expand Down Expand Up @@ -741,7 +747,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next
return nil, errors.Wrap(err, "findNextUnstartedTransactionFromAddress failed")
}

sequence, err := eb.GetNextSequence(etx.FromAddress)
sequence, err := eb.GetNextSequence(ctx, etx.FromAddress)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -826,15 +832,32 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) save
}

// Used to get the next usable sequence for a transaction
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetNextSequence(address ADDR) (seq SEQ, err error) {
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetNextSequence(ctx context.Context, address ADDR) (seq SEQ, err error) {
eb.sequenceLock.Lock()
defer eb.sequenceLock.Unlock()
// Get next sequence from map
seq, exists := eb.nextSequenceMap[address]
if !exists {
return seq, errors.New(fmt.Sprint("address not found in next sequence map: ", address))
if exists {
return seq, nil
}

eb.logger.Infow("address not found in local next sequence map. Attempting to search and populate sequence.", "address", address.String())
// Check if address is in the enabled address list
if !slices.Contains(eb.enabledAddresses, address) {
return seq, fmt.Errorf("address disabled: %s", address)
}
return seq, nil

// Try to retrieve next sequence from tx table or on-chain to load the map
// A scenario could exist where loading the map during startup failed (e.g. All configured RPC's are unreachable at start)
// The expectation is that the node does not fail startup so sequences need to be loaded during runtime
foundSeq, err := eb.getSequenceForAddr(ctx, address)
if err != nil {
return seq, fmt.Errorf("failed to find next sequence for address: %s", address)
}

// Set sequence in map
eb.nextSequenceMap[address] = foundSeq
return foundSeq, nil
}

// Used to increment the sequence in the mapping to have the next usable one available for the next transaction
Expand Down
Loading

0 comments on commit 1c3c6da

Please sign in to comment.