From 8c96682617ad90b57b759a95f1555c51b842ee00 Mon Sep 17 00:00:00 2001 From: Jim W Date: Tue, 31 Oct 2023 16:53:02 -0400 Subject: [PATCH] remove dependency of postgres trigger for broadcaster (#11109) * remove dependency of postgres trigger for broadcaster * remove extra argument in NewBroadcaster call * fixes for extra args * fix some failing tests and remove some error wraps * remove pkgerrors from txm * remove parseAddr which is now dead code * fix error handling * remove trigger from postgres via migration; use error wrapping in txmgr * fix naming of new migration --- common/txmgr/broadcaster.go | 40 ------------- common/txmgr/txmgr.go | 50 ++++++++++------ core/chains/evm/evm_txm.go | 1 - core/chains/evm/txmgr/broadcaster_test.go | 60 ++----------------- core/chains/evm/txmgr/builder.go | 7 +-- core/chains/evm/txmgr/common.go | 7 --- core/chains/evm/txmgr/txmgr_test.go | 28 +++------ core/services/pg/channels.go | 1 - .../0206_remove_tx_insert_trigger.sql | 18 ++++++ 9 files changed, 67 insertions(+), 145 deletions(-) create mode 100644 core/store/migrate/migrations/0206_remove_tx_insert_trigger.sql diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 011866bf39d..4f6ffae2ad8 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -21,7 +21,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/common/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/label" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -123,8 +122,6 @@ type Broadcaster[ // when Start is called autoSyncSequence bool - txInsertListener pg.Subscription - eventBroadcaster pg.EventBroadcaster processUnstartedTxsImpl ProcessUnstartedTxs[ADDR] ks txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] @@ -143,8 +140,6 @@ type Broadcaster[ initSync sync.Mutex isStarted bool - parseAddr func(string) (ADDR, error) - sequenceLock sync.RWMutex nextSequenceMap map[ADDR]SEQ generateNextSequence types.GenerateNextSequenceFunc[SEQ] @@ -166,13 +161,11 @@ func NewBroadcaster[ txConfig txmgrtypes.BroadcasterTransactionsConfig, listenerConfig txmgrtypes.BroadcasterListenerConfig, keystore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ], - eventBroadcaster pg.EventBroadcaster, txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ], logger logger.Logger, checkerFactory TransmitCheckerFactory[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], autoSyncSequence bool, - parseAddress func(string) (ADDR, error), generateNextSequence types.GenerateNextSequenceFunc[SEQ], ) *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { logger = logger.Named("Broadcaster") @@ -187,11 +180,9 @@ func NewBroadcaster[ feeConfig: feeConfig, txConfig: txConfig, listenerConfig: listenerConfig, - eventBroadcaster: eventBroadcaster, ks: keystore, checkerFactory: checkerFactory, autoSyncSequence: autoSyncSequence, - parseAddr: parseAddress, } b.processUnstartedTxsImpl = b.processUnstartedTxs @@ -215,10 +206,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) star return errors.New("Broadcaster is already started") } var err error - eb.txInsertListener, err = eb.eventBroadcaster.Subscribe(pg.ChannelInsertOnTx, "") - if err != nil { - return errors.Wrap(err, "Broadcaster could not start") - } eb.enabledAddresses, err = eb.ks.EnabledAddressesForChain(eb.chainID) if err != nil { return errors.Wrap(err, "Broadcaster: failed to load EnabledAddressesForChain") @@ -239,9 +226,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) star go eb.monitorTxs(addr, triggerCh) } - eb.wg.Add(1) - go eb.txInsertTriggerer() - eb.sequenceLock.Lock() defer eb.sequenceLock.Unlock() eb.nextSequenceMap, err = eb.loadNextSequenceMap(eb.enabledAddresses) @@ -266,9 +250,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) clos if !eb.isStarted { return errors.Wrap(utils.ErrAlreadyStopped, "Broadcaster is not started") } - if eb.txInsertListener != nil { - eb.txInsertListener.Close() - } close(eb.chStop) eb.wg.Wait() eb.isStarted = false @@ -305,27 +286,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Trig } } -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) txInsertTriggerer() { - defer eb.wg.Done() - for { - select { - case ev, ok := <-eb.txInsertListener.Events(): - if !ok { - eb.logger.Debug("txInsertListener channel closed, exiting trigger loop") - return - } - addr, err := eb.parseAddr(ev.Payload) - if err != nil { - eb.logger.Errorw("failed to parse address in trigger", "err", err) - continue - } - eb.Trigger(addr) - case <-eb.chStop: - return - } - } -} - // Load the next sequence map using the tx table or on-chain (if not found in tx table) func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) loadNextSequenceMap(addresses []ADDR) (map[ADDR]SEQ, error) { ctx, cancel := eb.chStop.NewCtx() diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index 0c7117afab0..d80f534ad26 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -10,7 +10,6 @@ import ( "time" "github.com/google/uuid" - pkgerrors "github.com/pkg/errors" "github.com/smartcontractkit/chainlink-relay/pkg/services" @@ -166,14 +165,14 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx return b.StartOnce("Txm", func() error { var ms services.MultiStart if err := ms.Start(ctx, b.broadcaster); err != nil { - return pkgerrors.Wrap(err, "Txm: Broadcaster failed to start") + return fmt.Errorf("Txm: Broadcaster failed to start: %w", err) } if err := ms.Start(ctx, b.confirmer); err != nil { - return pkgerrors.Wrap(err, "Txm: Confirmer failed to start") + return fmt.Errorf("Txm: Confirmer failed to start: %w", err) } if err := ms.Start(ctx, b.txAttemptBuilder); err != nil { - return pkgerrors.Wrap(err, "Txm: Estimator failed to start") + return fmt.Errorf("Txm: Estimator failed to start: %w", err) } b.wg.Add(1) @@ -190,7 +189,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx if b.fwdMgr != nil { if err := ms.Start(ctx, b.fwdMgr); err != nil { - return pkgerrors.Wrap(err, "Txm: ForwarderManager failed to start") + return fmt.Errorf("Txm: ForwarderManager failed to start: %w", err) } } @@ -223,8 +222,10 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Reset(addr func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) abandon(addr ADDR) (err error) { ctx, cancel := utils.StopChan(b.chStop).NewCtx() defer cancel() - err = b.txStore.Abandon(ctx, b.chainID, addr) - return pkgerrors.Wrapf(err, "abandon failed to update txes for key %s", addr.String()) + if err = b.txStore.Abandon(ctx, b.chainID, addr); err != nil { + return fmt.Errorf("abandon failed to update txes for key %s: %w", addr.String(), err) + } + return nil } func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() (merr error) { @@ -241,14 +242,14 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() (m } if b.fwdMgr != nil { if err := b.fwdMgr.Close(); err != nil { - merr = errors.Join(merr, pkgerrors.Wrap(err, "Txm: failed to stop ForwarderManager")) + merr = errors.Join(merr, fmt.Errorf("Txm: failed to stop ForwarderManager: %w", err)) } } b.wg.Wait() if err := b.txAttemptBuilder.Close(); err != nil { - merr = errors.Join(merr, pkgerrors.Wrap(err, "Txm: failed to close TxAttemptBuilder")) + merr = errors.Join(merr, fmt.Errorf("Txm: failed to close TxAttemptBuilder: %w", err)) } return nil @@ -444,7 +445,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTran var existingTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] existingTx, err = b.txStore.FindTxWithIdempotencyKey(ctx, *txRequest.IdempotencyKey, b.chainID) if err != nil && !errors.Is(err, sql.ErrNoRows) { - return tx, pkgerrors.Wrap(err, "Failed to search for transaction with IdempotencyKey") + return tx, fmt.Errorf("Failed to search for transaction with IdempotencyKey: %w", err) } if existingTx != nil { b.logger.Infow("Found a Tx with IdempotencyKey. Returning existing Tx without creating a new one.", "IdempotencyKey", *txRequest.IdempotencyKey) @@ -470,31 +471,40 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTran txRequest.ToAddress = txRequest.ForwarderAddress txRequest.EncodedPayload = fwdPayload } else { - b.logger.Errorf("Failed to use forwarder set upstream: %s", fwdErr.Error()) + b.logger.Errorf("Failed to use forwarder set upstream: %w", fwdErr.Error()) } } err = b.txStore.CheckTxQueueCapacity(ctx, txRequest.FromAddress, b.txConfig.MaxQueued(), b.chainID) if err != nil { - return tx, pkgerrors.Wrap(err, "Txm#CreateTransaction") + return tx, fmt.Errorf("Txm#CreateTransaction: %w", err) } tx, err = b.txStore.CreateTransaction(ctx, txRequest, b.chainID) - return + if err != nil { + return tx, err + } + + // Trigger the Broadcaster to check for new transaction + b.broadcaster.Trigger(txRequest.FromAddress) + + return tx, nil } // Calls forwarderMgr to get a proper forwarder for a given EOA. func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetForwarderForEOA(eoa ADDR) (forwarder ADDR, err error) { if !b.txConfig.ForwardersEnabled() { - return forwarder, pkgerrors.Errorf("Forwarding is not enabled, to enable set Transactions.ForwardersEnabled =true") + return forwarder, fmt.Errorf("forwarding is not enabled, to enable set Transactions.ForwardersEnabled =true") } forwarder, err = b.fwdMgr.ForwarderFor(eoa) return } func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) checkEnabled(addr ADDR) error { - err := b.keyStore.CheckEnabled(addr, b.chainID) - return pkgerrors.Wrapf(err, "cannot send transaction from %s on chain ID %s", addr, b.chainID.String()) + if err := b.keyStore.CheckEnabled(addr, b.chainID); err != nil { + return fmt.Errorf("cannot send transaction from %s on chain ID %s: %w", addr, b.chainID.String(), err) + } + return nil } // SendNativeToken creates a transaction that transfers the given value of native tokens @@ -511,7 +521,13 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SendNative Strategy: NewSendEveryStrategy(), } etx, err = b.txStore.CreateTransaction(ctx, txRequest, chainID) - return etx, pkgerrors.Wrap(err, "SendNativeToken failed to insert tx") + if err != nil { + return etx, fmt.Errorf("SendNativeToken failed to insert tx: %w", err) + } + + // Trigger the Broadcaster to check for new transaction + b.broadcaster.Trigger(from) + return etx, nil } type NullTxManager[ diff --git a/core/chains/evm/evm_txm.go b/core/chains/evm/evm_txm.go index d2f4178c7d9..a8673e954a6 100644 --- a/core/chains/evm/evm_txm.go +++ b/core/chains/evm/evm_txm.go @@ -61,7 +61,6 @@ func newEvmTxm( lggr, logPoller, opts.KeyStore, - opts.EventBroadcaster, estimator) } else { txm = opts.GenTxManager(chainID) diff --git a/core/chains/evm/txmgr/broadcaster_test.go b/core/chains/evm/txmgr/broadcaster_test.go index 3901da59eeb..61a230c21b1 100644 --- a/core/chains/evm/txmgr/broadcaster_test.go +++ b/core/chains/evm/txmgr/broadcaster_test.go @@ -15,7 +15,6 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" gethTypes "github.com/ethereum/go-ethereum/core/types" "github.com/google/uuid" - "github.com/onsi/gomega" "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -44,9 +43,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" ksmocks "github.com/smartcontractkit/chainlink/v2/core/services/keystore/mocks" - "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/services/pg/datatypes" - pgmocks "github.com/smartcontractkit/chainlink/v2/core/services/pg/mocks" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -61,17 +58,14 @@ func NewTestEthBroadcaster( nonceAutoSync bool, ) *txmgr.Broadcaster { t.Helper() - eb := cltest.NewEventBroadcaster(t, config.Database().URL()) ctx := testutils.Context(t) - require.NoError(t, eb.Start(ctx)) - t.Cleanup(func() { assert.NoError(t, eb.Close()) }) lggr := logger.TestLogger(t) ge := config.EVM().GasEstimator() estimator := gas.NewWrappedEvmEstimator(gas.NewFixedPriceEstimator(config.EVM().GasEstimator(), ge.BlockHistory(), lggr), ge.EIP1559DynamicFees(), nil) txBuilder := txmgr.NewEvmTxAttemptBuilder(*ethClient.ConfiguredChainID(), ge, keyStore, estimator) txNonceSyncer := txmgr.NewNonceSyncer(txStore, lggr, ethClient) - ethBroadcaster := txmgr.NewEvmBroadcaster(txStore, txmgr.NewEvmTxmClient(ethClient), txmgr.NewEvmTxmConfig(config.EVM()), txmgr.NewEvmTxmFeeConfig(config.EVM().GasEstimator()), config.EVM().Transactions(), config.Database().Listener(), keyStore, eb, txBuilder, txNonceSyncer, lggr, checkerFactory, nonceAutoSync) + ethBroadcaster := txmgr.NewEvmBroadcaster(txStore, txmgr.NewEvmTxmClient(ethClient), txmgr.NewEvmTxmConfig(config.EVM()), txmgr.NewEvmTxmFeeConfig(config.EVM().GasEstimator()), config.EVM().Transactions(), config.Database().Listener(), keyStore, txBuilder, txNonceSyncer, lggr, checkerFactory, nonceAutoSync) // Mark instance as test ethBroadcaster.XXXTestDisableUnstartedTxAutoProcessing() @@ -82,10 +76,6 @@ func NewTestEthBroadcaster( func TestEthBroadcaster_Lifecycle(t *testing.T) { cfg, db := heavyweight.FullTestDBV2(t, "eth_broadcaster_optimistic_locking", nil) - eventBroadcaster := cltest.NewEventBroadcaster(t, cfg.Database().URL()) - err := eventBroadcaster.Start(testutils.Context(t)) - require.NoError(t, err) - t.Cleanup(func() { assert.NoError(t, eventBroadcaster.Close()) }) txStore := cltest.NewTestTxStore(t, db, cfg.Database()) evmcfg := evmtest.NewChainScopedConfig(t, cfg) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) @@ -102,7 +92,6 @@ func TestEthBroadcaster_Lifecycle(t *testing.T) { evmcfg.EVM().Transactions(), evmcfg.Database().Listener(), ethKeyStore, - eventBroadcaster, txBuilder, nil, logger.TestLogger(t), @@ -111,7 +100,7 @@ func TestEthBroadcaster_Lifecycle(t *testing.T) { ) // Can't close an unstarted instance - err = eb.Close() + err := eb.Close() require.Error(t, err) ctx := testutils.Context(t) @@ -577,9 +566,6 @@ func TestEthBroadcaster_TransmitChecking(t *testing.T) { func TestEthBroadcaster_ProcessUnstartedEthTxs_OptimisticLockingOnEthTx(t *testing.T) { // non-transactional DB needed because we deliberately test for FK violation cfg, db := heavyweight.FullTestDBV2(t, "eth_broadcaster_optimistic_locking", nil) - eventBroadcaster := cltest.NewEventBroadcaster(t, cfg.Database().URL()) - require.NoError(t, eventBroadcaster.Start(testutils.Context(t))) - t.Cleanup(func() { assert.NoError(t, eventBroadcaster.Close()) }) txStore := cltest.NewTestTxStore(t, db, cfg.Database()) ccfg := evmtest.NewChainScopedConfig(t, cfg) evmcfg := txmgr.NewEvmTxmConfig(ccfg.EVM()) @@ -605,7 +591,6 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_OptimisticLockingOnEthTx(t *testi ccfg.EVM().Transactions(), cfg.Database().Listener(), ethKeyStore, - eventBroadcaster, txBuilder, nil, logger.TestLogger(t), @@ -1113,17 +1098,12 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { // same as the parent test, but callback is set by ctor t.Run("callback set by ctor", func(t *testing.T) { - eventBroadcaster := pg.NewEventBroadcaster(cfg.Database().URL(), 0, 0, logger.TestLogger(t), uuid.New()) - err := eventBroadcaster.Start(testutils.Context(t)) - require.NoError(t, err) - t.Cleanup(func() { assert.NoError(t, eventBroadcaster.Close()) }) lggr := logger.TestLogger(t) estimator := gas.NewWrappedEvmEstimator(gas.NewFixedPriceEstimator(evmcfg.EVM().GasEstimator(), evmcfg.EVM().GasEstimator().BlockHistory(), lggr), evmcfg.EVM().GasEstimator().EIP1559DynamicFees(), nil) txBuilder := txmgr.NewEvmTxAttemptBuilder(*ethClient.ConfiguredChainID(), evmcfg.EVM().GasEstimator(), ethKeyStore, estimator) localNextNonce = getLocalNextNonce(t, eb, fromAddress) ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(localNextNonce), nil).Once() - eb2 := txmgr.NewEvmBroadcaster(txStore, txmgr.NewEvmTxmClient(ethClient), txmgr.NewEvmTxmConfig(evmcfg.EVM()), txmgr.NewEvmTxmFeeConfig(evmcfg.EVM().GasEstimator()), evmcfg.EVM().Transactions(), evmcfg.Database().Listener(), ethKeyStore, eventBroadcaster, txBuilder, nil, lggr, &testCheckerFactory{}, false) - require.NoError(t, err) + eb2 := txmgr.NewEvmBroadcaster(txStore, txmgr.NewEvmTxmClient(ethClient), txmgr.NewEvmTxmConfig(evmcfg.EVM()), txmgr.NewEvmTxmFeeConfig(evmcfg.EVM().GasEstimator()), evmcfg.EVM().Transactions(), evmcfg.Database().Listener(), ethKeyStore, txBuilder, nil, lggr, &testCheckerFactory{}, false) retryable, err := eb2.ProcessUnstartedTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) @@ -1724,29 +1704,6 @@ func TestEthBroadcaster_Trigger(t *testing.T) { eb.Trigger(testutils.NewAddress()) } -func TestEthBroadcaster_EthTxInsertEventCausesTriggerToFire(t *testing.T) { - // NOTE: Testing triggers requires committing transactions and does not work with transactional tests - cfg, db := heavyweight.FullTestDBV2(t, "eth_tx_triggers", nil) - txStore := cltest.NewTestTxStore(t, db, cfg.Database()) - - evmcfg := evmtest.NewChainScopedConfig(t, cfg) - - ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() - _, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore) - eventBroadcaster := cltest.NewEventBroadcaster(t, evmcfg.Database().URL()) - require.NoError(t, eventBroadcaster.Start(testutils.Context(t))) - t.Cleanup(func() { require.NoError(t, eventBroadcaster.Close()) }) - - ethTxInsertListener, err := eventBroadcaster.Subscribe(pg.ChannelInsertOnTx, "") - require.NoError(t, err) - - // Give it some time to start listening - time.Sleep(100 * time.Millisecond) - - cltest.MustCreateUnstartedGeneratedTx(t, txStore, fromAddress, &cltest.FixtureChainID) - gomega.NewWithT(t).Eventually(ethTxInsertListener.Events()).Should(gomega.Receive()) -} - func TestEthBroadcaster_SyncNonce(t *testing.T) { db := pgtest.NewSqlxDB(t) ctx := testutils.Context(t) @@ -1765,11 +1722,6 @@ func TestEthBroadcaster_SyncNonce(t *testing.T) { ethNodeNonce := uint64(22) - eventBroadcaster := pgmocks.NewEventBroadcaster(t) - sub := pgmocks.NewSubscription(t) - sub.On("Events").Return(make(<-chan pg.Event)) - sub.On("Close") - eventBroadcaster.On("Subscribe", "evm.insert_on_txes", "").Return(sub, nil) estimator := gas.NewWrappedEvmEstimator(gas.NewFixedPriceEstimator(evmcfg.EVM().GasEstimator(), evmcfg.EVM().GasEstimator().BlockHistory(), lggr), evmcfg.EVM().GasEstimator().EIP1559DynamicFees(), nil) checkerFactory := &testCheckerFactory{} @@ -1783,7 +1735,7 @@ func TestEthBroadcaster_SyncNonce(t *testing.T) { addresses := []gethCommon.Address{fromAddress} kst.On("EnabledAddressesForChain", &cltest.FixtureChainID).Return(addresses, nil).Once() ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() - eb := txmgr.NewEvmBroadcaster(txStore, txmgr.NewEvmTxmClient(ethClient), evmTxmCfg, txmgr.NewEvmTxmFeeConfig(ge), evmcfg.EVM().Transactions(), cfg.Database().Listener(), kst, eventBroadcaster, txBuilder, nil, lggr, checkerFactory, false) + eb := txmgr.NewEvmBroadcaster(txStore, txmgr.NewEvmTxmClient(ethClient), evmTxmCfg, txmgr.NewEvmTxmFeeConfig(ge), evmcfg.EVM().Transactions(), cfg.Database().Listener(), kst, txBuilder, nil, lggr, checkerFactory, false) err := eb.Start(testutils.Context(t)) assert.NoError(t, err) @@ -1801,7 +1753,7 @@ func TestEthBroadcaster_SyncNonce(t *testing.T) { addresses := []gethCommon.Address{fromAddress} kst.On("EnabledAddressesForChain", &cltest.FixtureChainID).Return(addresses, nil).Once() ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() - eb := txmgr.NewEvmBroadcaster(txStore, txmgr.NewEvmTxmClient(ethClient), evmTxmCfg, txmgr.NewEvmTxmFeeConfig(ge), evmcfg.EVM().Transactions(), cfg.Database().Listener(), kst, eventBroadcaster, txBuilder, txNonceSyncer, lggr, checkerFactory, true) + eb := txmgr.NewEvmBroadcaster(txStore, txmgr.NewEvmTxmClient(ethClient), evmTxmCfg, txmgr.NewEvmTxmFeeConfig(ge), evmcfg.EVM().Transactions(), cfg.Database().Listener(), kst, txBuilder, txNonceSyncer, lggr, checkerFactory, true) ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(ethNodeNonce), nil).Once() require.NoError(t, eb.Start(ctx)) @@ -1832,7 +1784,7 @@ func TestEthBroadcaster_SyncNonce(t *testing.T) { kst.On("EnabledAddressesForChain", &cltest.FixtureChainID).Return(addresses, nil).Once() ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() - eb := txmgr.NewEvmBroadcaster(txStore, txmgr.NewEvmTxmClient(ethClient), evmTxmCfg, txmgr.NewEvmTxmFeeConfig(evmcfg.EVM().GasEstimator()), evmcfg.EVM().Transactions(), cfg.Database().Listener(), kst, eventBroadcaster, txBuilder, txNonceSyncer, lggr, checkerFactory, true) + eb := txmgr.NewEvmBroadcaster(txStore, txmgr.NewEvmTxmClient(ethClient), evmTxmCfg, txmgr.NewEvmTxmFeeConfig(evmcfg.EVM().GasEstimator()), evmcfg.EVM().Transactions(), cfg.Database().Listener(), kst, txBuilder, txNonceSyncer, lggr, checkerFactory, true) eb.XXXTestDisableUnstartedTxAutoProcessing() ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), errors.New("something exploded")).Once() diff --git a/core/chains/evm/txmgr/builder.go b/core/chains/evm/txmgr/builder.go index 39781e83f4c..9123d1dfc03 100644 --- a/core/chains/evm/txmgr/builder.go +++ b/core/chains/evm/txmgr/builder.go @@ -16,7 +16,6 @@ import ( evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" - "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) // NewTxm constructs the necessary dependencies for the EvmTxm (broadcaster, confirmer, etc) and returns a new EvmTxManager @@ -31,7 +30,6 @@ func NewTxm( lggr logger.Logger, logPoller logpoller.LogPoller, keyStore keystore.Eth, - eventBroadcaster pg.EventBroadcaster, estimator gas.EvmFeeEstimator, ) (txm TxManager, err error, @@ -52,7 +50,7 @@ func NewTxm( txmCfg := NewEvmTxmConfig(chainConfig) // wrap Evm specific config feeCfg := NewEvmTxmFeeConfig(fCfg) // wrap Evm specific config txmClient := NewEvmTxmClient(client) // wrap Evm specific client - ethBroadcaster := NewEvmBroadcaster(txStore, txmClient, txmCfg, feeCfg, txConfig, listenerConfig, keyStore, eventBroadcaster, txAttemptBuilder, txNonceSyncer, lggr, checker, chainConfig.NonceAutoSync()) + ethBroadcaster := NewEvmBroadcaster(txStore, txmClient, txmCfg, feeCfg, txConfig, listenerConfig, keyStore, txAttemptBuilder, txNonceSyncer, lggr, checker, chainConfig.NonceAutoSync()) ethConfirmer := NewEvmConfirmer(txStore, txmClient, txmCfg, feeCfg, txConfig, dbConfig, keyStore, txAttemptBuilder, lggr) var ethResender *Resender if txConfig.ResendAfterThreshold() > 0 { @@ -123,12 +121,11 @@ func NewEvmBroadcaster( txConfig txmgrtypes.BroadcasterTransactionsConfig, listenerConfig txmgrtypes.BroadcasterListenerConfig, keystore KeyStore, - eventBroadcaster pg.EventBroadcaster, txAttemptBuilder TxAttemptBuilder, nonceSyncer NonceSyncer, logger logger.Logger, checkerFactory TransmitCheckerFactory, autoSyncNonce bool, ) *Broadcaster { - return txmgr.NewBroadcaster(txStore, client, chainConfig, feeConfig, txConfig, listenerConfig, keystore, eventBroadcaster, txAttemptBuilder, nonceSyncer, logger, checkerFactory, autoSyncNonce, stringToGethAddress, evmtypes.GenerateNextNonce) + return txmgr.NewBroadcaster(txStore, client, chainConfig, feeConfig, txConfig, listenerConfig, keystore, txAttemptBuilder, nonceSyncer, logger, checkerFactory, autoSyncNonce, evmtypes.GenerateNextNonce) } diff --git a/core/chains/evm/txmgr/common.go b/core/chains/evm/txmgr/common.go index 5dbb2ef9611..37cc89dd7ac 100644 --- a/core/chains/evm/txmgr/common.go +++ b/core/chains/evm/txmgr/common.go @@ -69,10 +69,3 @@ func batchSendTransactions( } return reqs, now, successfulBroadcast, nil } - -func stringToGethAddress(s string) (common.Address, error) { - if !common.IsHexAddress(s) { - return common.Address{}, fmt.Errorf("invalid hex address: %s", s) - } - return common.HexToAddress(s), nil -} diff --git a/core/chains/evm/txmgr/txmgr_test.go b/core/chains/evm/txmgr/txmgr_test.go index 6cb43b27716..e9823ee0214 100644 --- a/core/chains/evm/txmgr/txmgr_test.go +++ b/core/chains/evm/txmgr/txmgr_test.go @@ -37,12 +37,11 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/keystore" ksmocks "github.com/smartcontractkit/chainlink/v2/core/services/keystore/mocks" "github.com/smartcontractkit/chainlink/v2/core/services/pg" - pgmocks "github.com/smartcontractkit/chainlink/v2/core/services/pg/mocks" "github.com/smartcontractkit/chainlink/v2/core/utils" ) func makeTestEvmTxm( - t *testing.T, db *sqlx.DB, ethClient evmclient.Client, estimator gas.EvmFeeEstimator, ccfg txmgr.ChainConfig, fcfg txmgr.FeeConfig, txConfig evmconfig.Transactions, dbConfig txmgr.DatabaseConfig, listenerConfig txmgr.ListenerConfig, keyStore keystore.Eth, eventBroadcaster pg.EventBroadcaster) (txmgr.TxManager, error) { + t *testing.T, db *sqlx.DB, ethClient evmclient.Client, estimator gas.EvmFeeEstimator, ccfg txmgr.ChainConfig, fcfg txmgr.FeeConfig, txConfig evmconfig.Transactions, dbConfig txmgr.DatabaseConfig, listenerConfig txmgr.ListenerConfig, keyStore keystore.Eth) (txmgr.TxManager, error) { lggr := logger.TestLogger(t) lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000) @@ -66,7 +65,6 @@ func makeTestEvmTxm( lggr, lp, keyStore, - eventBroadcaster, estimator) } @@ -83,7 +81,7 @@ func TestTxm_SendNativeToken_DoesNotSendToZero(t *testing.T) { keyStore := cltest.NewKeyStore(t, db, dbConfig).Eth() ethClient := evmtest.NewEthClientMockWithDefaultChain(t) estimator := gas.NewEstimator(logger.TestLogger(t), ethClient, config, evmConfig.GasEstimator()) - txm, err := makeTestEvmTxm(t, db, ethClient, estimator, evmConfig, evmConfig.GasEstimator(), evmConfig.Transactions(), dbConfig, dbConfig.Listener(), keyStore, nil) + txm, err := makeTestEvmTxm(t, db, ethClient, estimator, evmConfig, evmConfig.GasEstimator(), evmConfig.Transactions(), dbConfig, dbConfig.Listener(), keyStore) require.NoError(t, err) _, err = txm.SendNativeToken(testutils.Context(t), big.NewInt(0), from, to, *value, 21000) @@ -109,7 +107,7 @@ func TestTxm_CreateTransaction(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) estimator := gas.NewEstimator(logger.TestLogger(t), ethClient, config, evmConfig.GasEstimator()) - txm, err := makeTestEvmTxm(t, db, ethClient, estimator, evmConfig, evmConfig.GasEstimator(), evmConfig.Transactions(), dbConfig, dbConfig.Listener(), kst.Eth(), nil) + txm, err := makeTestEvmTxm(t, db, ethClient, estimator, evmConfig, evmConfig.GasEstimator(), evmConfig.Transactions(), dbConfig, dbConfig.Listener(), kst.Eth()) require.NoError(t, err) t.Run("with queue under capacity inserts eth_tx", func(t *testing.T) { @@ -523,7 +521,7 @@ func TestTxm_CreateTransaction_OutOfEth(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) estimator := gas.NewEstimator(logger.TestLogger(t), ethClient, config, evmConfig.GasEstimator()) - txm, err := makeTestEvmTxm(t, db, ethClient, estimator, evmConfig, evmConfig.GasEstimator(), evmConfig.Transactions(), dbConfig, dbConfig.Listener(), etKeyStore, nil) + txm, err := makeTestEvmTxm(t, db, ethClient, estimator, evmConfig, evmConfig.GasEstimator(), evmConfig.Transactions(), dbConfig, dbConfig.Listener(), etKeyStore) require.NoError(t, err) t.Run("if another key has any transactions with insufficient eth errors, transmits as normal", func(t *testing.T) { @@ -567,7 +565,7 @@ func TestTxm_CreateTransaction_OutOfEth(t *testing.T) { Meta: nil, Strategy: strategy, }) - assert.NoError(t, err) + require.NoError(t, err) require.Equal(t, payload, etx.EncodedPayload) }) @@ -589,7 +587,7 @@ func TestTxm_CreateTransaction_OutOfEth(t *testing.T) { Meta: nil, Strategy: strategy, }) - assert.NoError(t, err) + require.NoError(t, err) require.Equal(t, payload, etx.EncodedPayload) }) } @@ -599,7 +597,6 @@ func TestTxm_Lifecycle(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) kst := ksmocks.NewEth(t) - eventBroadcaster := pgmocks.NewEventBroadcaster(t) config, dbConfig, evmConfig := makeConfigs(t) config.finalityDepth = uint32(42) @@ -615,16 +612,13 @@ func TestTxm_Lifecycle(t *testing.T) { unsub := cltest.NewAwaiter() kst.On("SubscribeToKeyChanges").Return(keyChangeCh, unsub.ItHappened) estimator := gas.NewEstimator(logger.TestLogger(t), ethClient, config, evmConfig.GasEstimator()) - txm, err := makeTestEvmTxm(t, db, ethClient, estimator, evmConfig, evmConfig.GasEstimator(), evmConfig.Transactions(), dbConfig, dbConfig.Listener(), kst, eventBroadcaster) + txm, err := makeTestEvmTxm(t, db, ethClient, estimator, evmConfig, evmConfig.GasEstimator(), evmConfig.Transactions(), dbConfig, dbConfig.Listener(), kst) require.NoError(t, err) head := cltest.Head(42) // It should not hang or panic txm.OnNewLongestChain(testutils.Context(t), head) - sub := pgmocks.NewSubscription(t) - sub.On("Events").Return(make(<-chan pg.Event)) - eventBroadcaster.On("Subscribe", "evm.insert_on_txes", "").Return(sub, nil) evmConfig.bumpThreshold = uint64(1) require.NoError(t, txm.Start(testutils.Context(t))) @@ -638,7 +632,6 @@ func TestTxm_Lifecycle(t *testing.T) { addr := []gethcommon.Address{keyState.Address.Address()} kst.On("EnabledAddressesForChain", &cltest.FixtureChainID).Return(addr, nil) - sub.On("Close").Return() ethClient.On("PendingNonceAt", mock.AnythingOfType("*context.cancelCtx"), gethcommon.Address{}).Return(uint64(0), nil).Maybe() keyChangeCh <- struct{}{} @@ -670,14 +663,9 @@ func TestTxm_Reset(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(nil, nil) ethClient.On("BatchCallContextAll", mock.Anything, mock.Anything).Return(nil).Maybe() - eventBroadcaster := pgmocks.NewEventBroadcaster(t) - sub := pgmocks.NewSubscription(t) - sub.On("Events").Return(make(<-chan pg.Event)) - sub.On("Close") - eventBroadcaster.On("Subscribe", "evm.insert_on_txes", "").Return(sub, nil) estimator := gas.NewEstimator(logger.TestLogger(t), ethClient, cfg.EVM(), cfg.EVM().GasEstimator()) - txm, err := makeTestEvmTxm(t, db, ethClient, estimator, cfg.EVM(), cfg.EVM().GasEstimator(), cfg.EVM().Transactions(), cfg.Database(), cfg.Database().Listener(), kst.Eth(), eventBroadcaster) + txm, err := makeTestEvmTxm(t, db, ethClient, estimator, cfg.EVM(), cfg.EVM().GasEstimator(), cfg.EVM().Transactions(), cfg.Database(), cfg.Database().Listener(), kst.Eth()) require.NoError(t, err) cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 2, addr2) diff --git a/core/services/pg/channels.go b/core/services/pg/channels.go index 736cd407962..1d67dabe523 100644 --- a/core/services/pg/channels.go +++ b/core/services/pg/channels.go @@ -2,7 +2,6 @@ package pg // Postgres channel to listen for new evm.txes const ( - ChannelInsertOnTx = "evm.insert_on_txes" ChannelInsertOnCosmosMsg = "insert_on_cosmos_msg" ChannelInsertOnEVMLogs = "evm.insert_on_logs" ) diff --git a/core/store/migrate/migrations/0206_remove_tx_insert_trigger.sql b/core/store/migrate/migrations/0206_remove_tx_insert_trigger.sql new file mode 100644 index 00000000000..94b2e4aa8a6 --- /dev/null +++ b/core/store/migrate/migrations/0206_remove_tx_insert_trigger.sql @@ -0,0 +1,18 @@ +-- +goose Up +DROP TRIGGER IF EXISTS notify_tx_insertion on evm.txes; +DROP FUNCTION IF EXISTS evm.notifyethtxinsertion(); + + +-- +goose Down +-- +goose StatementBegin +CREATE OR REPLACE FUNCTION evm.notifytxinsertion() RETURNS trigger + LANGUAGE plpgsql + AS $$ + BEGIN + PERFORM pg_notify('evm.insert_on_txes'::text, encode(NEW.from_address, 'hex')); + RETURN NULL; + END + $$; + +CREATE TRIGGER notify_tx_insertion AFTER INSERT ON evm.txes FOR EACH ROW EXECUTE PROCEDURE evm.notifytxinsertion(); +-- +goose StatementEnd \ No newline at end of file