Skip to content

Commit

Permalink
remove dependency of postgres trigger for broadcaster (#11109)
Browse files Browse the repository at this point in the history
* remove dependency of postgres trigger for broadcaster

* remove extra argument in NewBroadcaster call

* fixes for extra args

* fix some failing tests and remove some error wraps

* remove pkgerrors from txm

* remove parseAddr which is now dead code

* fix error handling

* remove trigger from postgres via migration; use error wrapping in txmgr

* fix naming of new migration
  • Loading branch information
poopoothegorilla authored Oct 31, 2023
1 parent 921a89c commit 8c96682
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 145 deletions.
40 changes: 0 additions & 40 deletions common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/label"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

Expand Down Expand Up @@ -123,8 +122,6 @@ type Broadcaster[
// when Start is called
autoSyncSequence bool

txInsertListener pg.Subscription
eventBroadcaster pg.EventBroadcaster
processUnstartedTxsImpl ProcessUnstartedTxs[ADDR]

ks txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ]
Expand All @@ -143,8 +140,6 @@ type Broadcaster[
initSync sync.Mutex
isStarted bool

parseAddr func(string) (ADDR, error)

sequenceLock sync.RWMutex
nextSequenceMap map[ADDR]SEQ
generateNextSequence types.GenerateNextSequenceFunc[SEQ]
Expand All @@ -166,13 +161,11 @@ func NewBroadcaster[
txConfig txmgrtypes.BroadcasterTransactionsConfig,
listenerConfig txmgrtypes.BroadcasterListenerConfig,
keystore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ],
eventBroadcaster pg.EventBroadcaster,
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ],
logger logger.Logger,
checkerFactory TransmitCheckerFactory[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
autoSyncSequence bool,
parseAddress func(string) (ADDR, error),
generateNextSequence types.GenerateNextSequenceFunc[SEQ],
) *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
logger = logger.Named("Broadcaster")
Expand All @@ -187,11 +180,9 @@ func NewBroadcaster[
feeConfig: feeConfig,
txConfig: txConfig,
listenerConfig: listenerConfig,
eventBroadcaster: eventBroadcaster,
ks: keystore,
checkerFactory: checkerFactory,
autoSyncSequence: autoSyncSequence,
parseAddr: parseAddress,
}

b.processUnstartedTxsImpl = b.processUnstartedTxs
Expand All @@ -215,10 +206,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) star
return errors.New("Broadcaster is already started")
}
var err error
eb.txInsertListener, err = eb.eventBroadcaster.Subscribe(pg.ChannelInsertOnTx, "")
if err != nil {
return errors.Wrap(err, "Broadcaster could not start")
}
eb.enabledAddresses, err = eb.ks.EnabledAddressesForChain(eb.chainID)
if err != nil {
return errors.Wrap(err, "Broadcaster: failed to load EnabledAddressesForChain")
Expand All @@ -239,9 +226,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) star
go eb.monitorTxs(addr, triggerCh)
}

eb.wg.Add(1)
go eb.txInsertTriggerer()

eb.sequenceLock.Lock()
defer eb.sequenceLock.Unlock()
eb.nextSequenceMap, err = eb.loadNextSequenceMap(eb.enabledAddresses)
Expand All @@ -266,9 +250,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) clos
if !eb.isStarted {
return errors.Wrap(utils.ErrAlreadyStopped, "Broadcaster is not started")
}
if eb.txInsertListener != nil {
eb.txInsertListener.Close()
}
close(eb.chStop)
eb.wg.Wait()
eb.isStarted = false
Expand Down Expand Up @@ -305,27 +286,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Trig
}
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) txInsertTriggerer() {
defer eb.wg.Done()
for {
select {
case ev, ok := <-eb.txInsertListener.Events():
if !ok {
eb.logger.Debug("txInsertListener channel closed, exiting trigger loop")
return
}
addr, err := eb.parseAddr(ev.Payload)
if err != nil {
eb.logger.Errorw("failed to parse address in trigger", "err", err)
continue
}
eb.Trigger(addr)
case <-eb.chStop:
return
}
}
}

// Load the next sequence map using the tx table or on-chain (if not found in tx table)
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) loadNextSequenceMap(addresses []ADDR) (map[ADDR]SEQ, error) {
ctx, cancel := eb.chStop.NewCtx()
Expand Down
50 changes: 33 additions & 17 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/google/uuid"
pkgerrors "github.com/pkg/errors"

"github.com/smartcontractkit/chainlink-relay/pkg/services"

Expand Down Expand Up @@ -166,14 +165,14 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx
return b.StartOnce("Txm", func() error {
var ms services.MultiStart
if err := ms.Start(ctx, b.broadcaster); err != nil {
return pkgerrors.Wrap(err, "Txm: Broadcaster failed to start")
return fmt.Errorf("Txm: Broadcaster failed to start: %w", err)
}
if err := ms.Start(ctx, b.confirmer); err != nil {
return pkgerrors.Wrap(err, "Txm: Confirmer failed to start")
return fmt.Errorf("Txm: Confirmer failed to start: %w", err)
}

if err := ms.Start(ctx, b.txAttemptBuilder); err != nil {
return pkgerrors.Wrap(err, "Txm: Estimator failed to start")
return fmt.Errorf("Txm: Estimator failed to start: %w", err)
}

b.wg.Add(1)
Expand All @@ -190,7 +189,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx

if b.fwdMgr != nil {
if err := ms.Start(ctx, b.fwdMgr); err != nil {
return pkgerrors.Wrap(err, "Txm: ForwarderManager failed to start")
return fmt.Errorf("Txm: ForwarderManager failed to start: %w", err)
}
}

Expand Down Expand Up @@ -223,8 +222,10 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Reset(addr
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) abandon(addr ADDR) (err error) {
ctx, cancel := utils.StopChan(b.chStop).NewCtx()
defer cancel()
err = b.txStore.Abandon(ctx, b.chainID, addr)
return pkgerrors.Wrapf(err, "abandon failed to update txes for key %s", addr.String())
if err = b.txStore.Abandon(ctx, b.chainID, addr); err != nil {
return fmt.Errorf("abandon failed to update txes for key %s: %w", addr.String(), err)
}
return nil
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() (merr error) {
Expand All @@ -241,14 +242,14 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() (m
}
if b.fwdMgr != nil {
if err := b.fwdMgr.Close(); err != nil {
merr = errors.Join(merr, pkgerrors.Wrap(err, "Txm: failed to stop ForwarderManager"))
merr = errors.Join(merr, fmt.Errorf("Txm: failed to stop ForwarderManager: %w", err))
}
}

b.wg.Wait()

if err := b.txAttemptBuilder.Close(); err != nil {
merr = errors.Join(merr, pkgerrors.Wrap(err, "Txm: failed to close TxAttemptBuilder"))
merr = errors.Join(merr, fmt.Errorf("Txm: failed to close TxAttemptBuilder: %w", err))
}

return nil
Expand Down Expand Up @@ -444,7 +445,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTran
var existingTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
existingTx, err = b.txStore.FindTxWithIdempotencyKey(ctx, *txRequest.IdempotencyKey, b.chainID)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return tx, pkgerrors.Wrap(err, "Failed to search for transaction with IdempotencyKey")
return tx, fmt.Errorf("Failed to search for transaction with IdempotencyKey: %w", err)
}
if existingTx != nil {
b.logger.Infow("Found a Tx with IdempotencyKey. Returning existing Tx without creating a new one.", "IdempotencyKey", *txRequest.IdempotencyKey)
Expand All @@ -470,31 +471,40 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTran
txRequest.ToAddress = txRequest.ForwarderAddress
txRequest.EncodedPayload = fwdPayload
} else {
b.logger.Errorf("Failed to use forwarder set upstream: %s", fwdErr.Error())
b.logger.Errorf("Failed to use forwarder set upstream: %w", fwdErr.Error())
}
}

err = b.txStore.CheckTxQueueCapacity(ctx, txRequest.FromAddress, b.txConfig.MaxQueued(), b.chainID)
if err != nil {
return tx, pkgerrors.Wrap(err, "Txm#CreateTransaction")
return tx, fmt.Errorf("Txm#CreateTransaction: %w", err)
}

tx, err = b.txStore.CreateTransaction(ctx, txRequest, b.chainID)
return
if err != nil {
return tx, err
}

// Trigger the Broadcaster to check for new transaction
b.broadcaster.Trigger(txRequest.FromAddress)

return tx, nil
}

// Calls forwarderMgr to get a proper forwarder for a given EOA.
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetForwarderForEOA(eoa ADDR) (forwarder ADDR, err error) {
if !b.txConfig.ForwardersEnabled() {
return forwarder, pkgerrors.Errorf("Forwarding is not enabled, to enable set Transactions.ForwardersEnabled =true")
return forwarder, fmt.Errorf("forwarding is not enabled, to enable set Transactions.ForwardersEnabled =true")
}
forwarder, err = b.fwdMgr.ForwarderFor(eoa)
return
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) checkEnabled(addr ADDR) error {
err := b.keyStore.CheckEnabled(addr, b.chainID)
return pkgerrors.Wrapf(err, "cannot send transaction from %s on chain ID %s", addr, b.chainID.String())
if err := b.keyStore.CheckEnabled(addr, b.chainID); err != nil {
return fmt.Errorf("cannot send transaction from %s on chain ID %s: %w", addr, b.chainID.String(), err)
}
return nil
}

// SendNativeToken creates a transaction that transfers the given value of native tokens
Expand All @@ -511,7 +521,13 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SendNative
Strategy: NewSendEveryStrategy(),
}
etx, err = b.txStore.CreateTransaction(ctx, txRequest, chainID)
return etx, pkgerrors.Wrap(err, "SendNativeToken failed to insert tx")
if err != nil {
return etx, fmt.Errorf("SendNativeToken failed to insert tx: %w", err)
}

// Trigger the Broadcaster to check for new transaction
b.broadcaster.Trigger(from)
return etx, nil
}

type NullTxManager[
Expand Down
1 change: 0 additions & 1 deletion core/chains/evm/evm_txm.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func newEvmTxm(
lggr,
logPoller,
opts.KeyStore,
opts.EventBroadcaster,
estimator)
} else {
txm = opts.GenTxManager(chainID)
Expand Down
Loading

0 comments on commit 8c96682

Please sign in to comment.