Skip to content

Commit

Permalink
Txmv2_from_2.18
Browse files Browse the repository at this point in the history
  • Loading branch information
dimriou committed Nov 29, 2024
1 parent fb7d6e8 commit 37f1edf
Show file tree
Hide file tree
Showing 57 changed files with 4,443 additions and 16 deletions.
6 changes: 6 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ packages:
BalanceMonitor:
config:
dir: "{{ .InterfaceDir }}/../mocks"
github.com/smartcontractkit/chainlink/v2/core/chains/evm/txm:
interfaces:
Client:
TxStore:
AttemptBuilder:
Keystore:
github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr:
interfaces:
ChainConfig:
Expand Down
33 changes: 32 additions & 1 deletion common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,19 @@ type TxManager[
GetTransactionStatus(ctx context.Context, transactionID string) (state commontypes.TransactionStatus, err error)
}

type TxmV2Wrapper[
CHAIN_ID types.ID,
HEAD types.Head[BLOCK_HASH],
ADDR types.Hashable,
TX_HASH types.Hashable,
BLOCK_HASH types.Hashable,
SEQ types.Sequence,
FEE feetypes.Fee,
] interface {
services.Service
CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
}

type reset struct {
// f is the function to execute between stopping/starting the
// Broadcaster and Confirmer
Expand Down Expand Up @@ -112,6 +125,7 @@ type Txm[
fwdMgr txmgrtypes.ForwarderManager[ADDR]
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
newErrorClassifier NewErrorClassifier
txmv2wrapper TxmV2Wrapper[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RegisterResumeCallback(fn ResumeCallback) {
Expand Down Expand Up @@ -146,6 +160,7 @@ func NewTxm[
tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
finalizer txmgrtypes.Finalizer[BLOCK_HASH, HEAD],
newErrorClassifierFunc NewErrorClassifier,
txmv2wrapper TxmV2Wrapper[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
b := Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
logger: logger.Sugared(lggr),
Expand All @@ -168,6 +183,7 @@ func NewTxm[
tracker: tracker,
newErrorClassifier: newErrorClassifierFunc,
finalizer: finalizer,
txmv2wrapper: txmv2wrapper,
}

if txCfg.ResendAfterThreshold() <= 0 {
Expand Down Expand Up @@ -206,6 +222,12 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx
return fmt.Errorf("Txm: Finalizer failed to start: %w", err)
}

if b.txmv2wrapper != nil {
if err := ms.Start(ctx, b.txmv2wrapper); err != nil {
return fmt.Errorf("Txm: Txmv2 failed to start: %w", err)
}
}

b.logger.Info("Txm starting runLoop")
b.wg.Add(1)
go b.runLoop()
Expand Down Expand Up @@ -459,6 +481,12 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) {
b.logger.Errorw(fmt.Sprintf("Failed to Close Finalizer: %v", err), "err", err)
}
if b.txmv2wrapper != nil {
err = b.txmv2wrapper.Close()
if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) {
b.logger.Errorw(fmt.Sprintf("Failed to Close Finalizer: %v", err), "err", err)
}
}
return
case <-keysChanged:
// This check prevents the weird edge-case where you can select
Expand Down Expand Up @@ -512,11 +540,14 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Trigger(ad
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) {
// Check for existing Tx with IdempotencyKey. If found, return the Tx and do nothing
// Skipping CreateTransaction to avoid double send
if b.txmv2wrapper != nil && txRequest.Meta != nil && txRequest.Meta.DualBroadcast != nil && *txRequest.Meta.DualBroadcast {
return b.txmv2wrapper.CreateTransaction(ctx, txRequest)
}
if txRequest.IdempotencyKey != nil {
var existingTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
existingTx, err = b.txStore.FindTxWithIdempotencyKey(ctx, *txRequest.IdempotencyKey, b.chainID)
if err != nil {
return tx, fmt.Errorf("Failed to search for transaction with IdempotencyKey: %w", err)
return tx, fmt.Errorf("failed to search for transaction with IdempotencyKey: %w", err)
}
if existingTx != nil {
b.logger.Infow("Found a Tx with IdempotencyKey. Returning existing Tx without creating a new one.", "IdempotencyKey", *txRequest.IdempotencyKey)
Expand Down
4 changes: 4 additions & 0 deletions common/txmgr/types/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ type TxMeta[ADDR types.Hashable, TX_HASH types.Hashable] struct {
MessageIDs []string `json:"MessageIDs,omitempty"`
// SeqNumbers is used by CCIP for tx to committed sequence numbers correlation in logs
SeqNumbers []uint64 `json:"SeqNumbers,omitempty"`

// Dual Broadcast
DualBroadcast *bool `json:"DualBroadcast,omitempty"`
DualBroadcastParams *string `json:"DualBroadcastParams,omitempty"`
}

type TxAttempt[
Expand Down
3 changes: 2 additions & 1 deletion core/capabilities/ccip/ocrimpls/contract_transmitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,8 @@ func makeTestEvmTxm(
lp,
keyStore,
estimator,
ht)
ht,
nil)
require.NoError(t, err, "can't create tx manager")

_, unsub := broadcaster.Subscribe(txm)
Expand Down
4 changes: 4 additions & 0 deletions core/chains/evm/config/chain_scoped.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (e *EVMConfig) BalanceMonitor() BalanceMonitor {
return &balanceMonitorConfig{c: e.C.BalanceMonitor}
}

func (e *EVMConfig) TxmV2() TxmV2 {
return &txmv2Config{c: e.C.TxmV2}
}

func (e *EVMConfig) Transactions() Transactions {
return &transactionsConfig{c: e.C.Transactions}
}
Expand Down
25 changes: 25 additions & 0 deletions core/chains/evm/config/chain_scoped_txmv2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package config

import (
"net/url"
"time"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/config/toml"
)

type txmv2Config struct {
c toml.TxmV2
}

func (t *txmv2Config) Enabled() bool {
return *t.c.Enabled
}

func (t *txmv2Config) BlockTime() *time.Duration {
d := t.c.BlockTime.Duration()
return &d
}

func (t *txmv2Config) CustomURL() *url.URL {
return t.c.CustomURL.URL()
}
6 changes: 5 additions & 1 deletion core/chains/evm/config/chaintype/chaintype.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
ChainZkEvm ChainType = "zkevm"
ChainZkSync ChainType = "zksync"
ChainZircuit ChainType = "zircuit"
ChainDualBroadcast ChainType = "dualBroadcast"
)

// IsL2 returns true if this chain is a Layer 2 chain. Notably:
Expand All @@ -39,7 +40,7 @@ func (c ChainType) IsL2() bool {

func (c ChainType) IsValid() bool {
switch c {
case "", ChainArbitrum, ChainAstar, ChainCelo, ChainGnosis, ChainHedera, ChainKroma, ChainMantle, ChainMetis, ChainOptimismBedrock, ChainScroll, ChainWeMix, ChainXLayer, ChainZkEvm, ChainZkSync, ChainZircuit:
case "", ChainArbitrum, ChainAstar, ChainCelo, ChainGnosis, ChainHedera, ChainKroma, ChainMantle, ChainMetis, ChainOptimismBedrock, ChainScroll, ChainWeMix, ChainXLayer, ChainZkEvm, ChainZkSync, ChainZircuit, ChainDualBroadcast:
return true
}
return false
Expand Down Expand Up @@ -77,6 +78,8 @@ func FromSlug(slug string) ChainType {
return ChainZkSync
case "zircuit":
return ChainZircuit
case "dualBroadcast":
return ChainDualBroadcast
default:
return ChainType(slug)
}
Expand Down Expand Up @@ -144,4 +147,5 @@ var ErrInvalid = fmt.Errorf("must be one of %s or omitted", strings.Join([]strin
string(ChainZkEvm),
string(ChainZkSync),
string(ChainZircuit),
string(ChainDualBroadcast),
}, ", "))
7 changes: 7 additions & 0 deletions core/chains/evm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
type EVM interface {
HeadTracker() HeadTracker
BalanceMonitor() BalanceMonitor
TxmV2() TxmV2
Transactions() Transactions
GasEstimator() GasEstimator
OCR() OCR
Expand Down Expand Up @@ -102,6 +103,12 @@ type ClientErrors interface {
TooManyResults() string
}

type TxmV2 interface {
Enabled() bool
BlockTime() *time.Duration
CustomURL() *url.URL
}

type Transactions interface {
ForwardersEnabled() bool
ReaperInterval() time.Duration
Expand Down
27 changes: 25 additions & 2 deletions core/chains/evm/config/toml/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,10 @@ func (c *EVMConfig) ValidateConfig() (err error) {
is := c.ChainType.ChainType()
if is != must {
if must == "" {
err = multierr.Append(err, commonconfig.ErrInvalid{Name: "ChainType", Value: c.ChainType.ChainType(),
Msg: "must not be set with this chain id"})
if c.ChainType.ChainType() != chaintype.ChainDualBroadcast {
err = multierr.Append(err, commonconfig.ErrInvalid{Name: "ChainType", Value: c.ChainType.ChainType(),
Msg: "must not be set with this chain id"})
}
} else {
err = multierr.Append(err, commonconfig.ErrInvalid{Name: "ChainType", Value: c.ChainType.ChainType(),
Msg: fmt.Sprintf("only %q can be used with this chain id", must)})
Expand Down Expand Up @@ -376,6 +378,7 @@ type Chain struct {
FinalizedBlockOffset *uint32
NoNewFinalizedHeadsThreshold *commonconfig.Duration

TxmV2 TxmV2 `toml:",omitempty"`
Transactions Transactions `toml:",omitempty"`
BalanceMonitor BalanceMonitor `toml:",omitempty"`
GasEstimator GasEstimator `toml:",omitempty"`
Expand Down Expand Up @@ -460,6 +463,26 @@ func (c *Chain) ValidateConfig() (err error) {
return
}

type TxmV2 struct {
Enabled *bool `toml:",omitempty"`
BlockTime *commonconfig.Duration `toml:",omitempty"`
CustomURL *commonconfig.URL `toml:",omitempty"`
}

func (t *TxmV2) setFrom(f *TxmV2) {
if v := f.Enabled; v != nil {
t.Enabled = f.Enabled
}

if v := f.BlockTime; v != nil {
t.BlockTime = f.BlockTime
}

if v := f.CustomURL; v != nil {
t.CustomURL = f.CustomURL
}
}

type Transactions struct {
ForwardersEnabled *bool
MaxInFlight *uint32
Expand Down
1 change: 1 addition & 0 deletions core/chains/evm/config/toml/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func (c *Chain) SetFrom(f *Chain) {
c.NoNewFinalizedHeadsThreshold = v
}

c.TxmV2.setFrom(&f.TxmV2)
c.Transactions.setFrom(&f.Transactions)
c.BalanceMonitor.setFrom(&f.BalanceMonitor)
c.GasEstimator.setFrom(&f.GasEstimator)
Expand Down
3 changes: 3 additions & 0 deletions core/chains/evm/config/toml/defaults/fallback.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ FinalizedBlockOffset = 0
NoNewFinalizedHeadsThreshold = '0'
LogBroadcasterEnabled = true

[TxmV2]
Enabled = false

[Transactions]
ForwardersEnabled = false
MaxInFlight = 16
Expand Down
1 change: 1 addition & 0 deletions core/chains/evm/keystore/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ type Eth interface {
CheckEnabled(ctx context.Context, address common.Address, chainID *big.Int) error
EnabledAddressesForChain(ctx context.Context, chainID *big.Int) (addresses []common.Address, err error)
SignTx(ctx context.Context, fromAddress common.Address, tx *types.Transaction, chainID *big.Int) (*types.Transaction, error)
SignMessage(ctx context.Context, address common.Address, message []byte) ([]byte, error)
SubscribeToKeyChanges(ctx context.Context) (ch chan struct{}, unsub func())
}
60 changes: 60 additions & 0 deletions core/chains/evm/keystore/mocks/eth.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 37f1edf

Please sign in to comment.