Skip to content

Commit

Permalink
Remove core utils dependencies from common (#11425)
Browse files Browse the repository at this point in the history
* Change difficulty from Big to BigInt

* Fix headtracker mock head

* Remove EsnureClosed

* Fix mock heads

* Migrate to common Mailbox

* Fix Tracker close on txm

* Change to EnsureHexPrefix

* Change names to mailbox

* Remove core/null dependency from common

* Remove core mailbox

* Fix dependencies

* Tidy

* Fix dependencies

* Change path to internal utils

* Minor fixes

* Rename MinKey function

* Update MinFunc

* Fix utils conflicts
  • Loading branch information
dimriou authored Dec 12, 2023
1 parent 0c63446 commit 306eadc
Show file tree
Hide file tree
Showing 56 changed files with 200 additions and 550 deletions.
7 changes: 4 additions & 3 deletions common/client/node_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils"
bigmath "github.com/smartcontractkit/chainlink-common/pkg/utils/big_math"

"github.com/smartcontractkit/chainlink/v2/core/utils"
iutils "github.com/smartcontractkit/chainlink/v2/common/internal/utils"
)

var (
Expand Down Expand Up @@ -360,7 +361,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() {
lggr := logger.Named(n.lfcLog, "Unreachable")
lggr.Debugw("Trying to revive unreachable RPC node", "nodeState", n.State())

dialRetryBackoff := utils.NewRedialBackoff()
dialRetryBackoff := iutils.NewRedialBackoff()

for {
select {
Expand Down Expand Up @@ -416,7 +417,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) invalidChainIDLoop() {
lggr := logger.Named(n.lfcLog, "InvalidChainID")
lggr.Debugw(fmt.Sprintf("Periodically re-checking RPC node %s with invalid chain ID", n.String()), "nodeState", n.State())

chainIDRecheckBackoff := utils.NewRedialBackoff()
chainIDRecheckBackoff := iutils.NewRedialBackoff()

for {
select {
Expand Down
2 changes: 1 addition & 1 deletion common/client/send_only_node_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"time"

"github.com/smartcontractkit/chainlink/v2/core/utils"
"github.com/smartcontractkit/chainlink/v2/common/internal/utils"
)

// verifyLoop may only be triggered once, on Start, if initial chain ID check
Expand Down
6 changes: 3 additions & 3 deletions common/headtracker/head_broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"

"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

const TrackableCallbackTimeout = 2 * time.Second
Expand All @@ -30,7 +30,7 @@ type HeadBroadcaster[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] struct
services.StateMachine
logger logger.Logger
callbacks callbackSet[H, BLOCK_HASH]
mailbox *utils.Mailbox[H]
mailbox *mailbox.Mailbox[H]
mutex sync.Mutex
chClose services.StopChan
wgDone sync.WaitGroup
Expand All @@ -48,7 +48,7 @@ func NewHeadBroadcaster[
return &HeadBroadcaster[H, BLOCK_HASH]{
logger: logger.Named(lggr, "HeadBroadcaster"),
callbacks: make(callbackSet[H, BLOCK_HASH]),
mailbox: utils.NewSingleMailbox[H](),
mailbox: mailbox.NewSingle[H](),
chClose: make(chan struct{}),
}
}
Expand Down
2 changes: 1 addition & 1 deletion common/headtracker/head_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/services"

htrktypes "github.com/smartcontractkit/chainlink/v2/common/headtracker/types"
"github.com/smartcontractkit/chainlink/v2/common/internal/utils"
"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

var (
Expand Down
14 changes: 7 additions & 7 deletions common/headtracker/head_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"

htrktypes "github.com/smartcontractkit/chainlink/v2/common/headtracker/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

var (
Expand Down Expand Up @@ -43,14 +43,14 @@ type HeadTracker[
log logger.Logger
headBroadcaster types.HeadBroadcaster[HTH, BLOCK_HASH]
headSaver types.HeadSaver[HTH, BLOCK_HASH]
mailMon *utils.MailboxMonitor
mailMon *mailbox.Monitor
client htrktypes.Client[HTH, S, ID, BLOCK_HASH]
chainID ID
config htrktypes.Config
htConfig htrktypes.HeadTrackerConfig

backfillMB *utils.Mailbox[HTH]
broadcastMB *utils.Mailbox[HTH]
backfillMB *mailbox.Mailbox[HTH]
broadcastMB *mailbox.Mailbox[HTH]
headListener types.HeadListener[HTH, BLOCK_HASH]
chStop services.StopChan
wgDone sync.WaitGroup
Expand All @@ -70,7 +70,7 @@ func NewHeadTracker[
htConfig htrktypes.HeadTrackerConfig,
headBroadcaster types.HeadBroadcaster[HTH, BLOCK_HASH],
headSaver types.HeadSaver[HTH, BLOCK_HASH],
mailMon *utils.MailboxMonitor,
mailMon *mailbox.Monitor,
getNilHead func() HTH,
) types.HeadTracker[HTH, BLOCK_HASH] {
chStop := make(chan struct{})
Expand All @@ -82,8 +82,8 @@ func NewHeadTracker[
config: config,
htConfig: htConfig,
log: lggr,
backfillMB: utils.NewSingleMailbox[HTH](),
broadcastMB: utils.NewMailbox[HTH](HeadsBufferSize),
backfillMB: mailbox.NewSingle[HTH](),
broadcastMB: mailbox.New[HTH](HeadsBufferSize),
chStop: chStop,
headListener: NewHeadListener[HTH, S, ID, BLOCK_HASH](lggr, client, config, chStop),
headSaver: headSaver,
Expand Down
36 changes: 36 additions & 0 deletions common/internal/utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package utils

import (
"cmp"
"slices"
"time"

"github.com/jpillora/backoff"
"golang.org/x/exp/constraints"
)

// NewRedialBackoff is a standard backoff to use for redialling or reconnecting to
// unreachable network endpoints
func NewRedialBackoff() backoff.Backoff {
return backoff.Backoff{
Min: 1 * time.Second,
Max: 15 * time.Second,
Jitter: true,
}

}

// MinFunc returns the minimum value of the given element array with respect
// to the given key function. In the event U is not a compound type (e.g a
// struct) an identity function can be provided.
func MinFunc[U any, T constraints.Ordered](elems []U, f func(U) T) T {
var min T
if len(elems) == 0 {
return min
}

e := slices.MinFunc(elems, func(a, b U) int {
return cmp.Compare(f(a), f(b))
})
return f(e)
}
2 changes: 1 addition & 1 deletion common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/chains/label"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

"github.com/smartcontractkit/chainlink/v2/common/client"
feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

const (
Expand Down
14 changes: 8 additions & 6 deletions common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/chains/label"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"

"github.com/smartcontractkit/chainlink/v2/common/client"
commonfee "github.com/smartcontractkit/chainlink/v2/common/fee"
feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
iutils "github.com/smartcontractkit/chainlink/v2/common/internal/utils"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

const (
Expand Down Expand Up @@ -129,7 +131,7 @@ type Confirmer[
ks txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ]
enabledAddresses []ADDR

mb *utils.Mailbox[HEAD]
mb *mailbox.Mailbox[HEAD]
ctx context.Context
ctxCancel context.CancelFunc
wg sync.WaitGroup
Expand Down Expand Up @@ -174,7 +176,7 @@ func NewConfirmer[
dbConfig: dbConfig,
chainID: client.ConfiguredChainID(),
ks: keystore,
mb: utils.NewSingleMailbox[HEAD](),
mb: mailbox.NewSingle[HEAD](),
isReceiptNil: isReceiptNil,
}
}
Expand Down Expand Up @@ -223,7 +225,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) clo
ec.initSync.Lock()
defer ec.initSync.Unlock()
if !ec.isStarted {
return fmt.Errorf("Confirmer is not started: %w", utils.ErrAlreadyStopped)
return fmt.Errorf("Confirmer is not started: %w", services.ErrAlreadyStopped)
}
ec.ctxCancel()
ec.wg.Wait()
Expand Down Expand Up @@ -869,7 +871,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han
"err", sendError,
"fee", attempt.TxFee,
"feeLimit", etx.FeeLimit,
"signedRawTx", utils.AddHexPrefix(hex.EncodeToString(attempt.SignedRawTx)),
"signedRawTx", utils.EnsureHexPrefix(hex.EncodeToString(attempt.SignedRawTx)),
"blockHeight", blockHeight,
)
ec.SvcErrBuffer.Append(sendError)
Expand Down Expand Up @@ -1147,7 +1149,7 @@ func observeUntilTxConfirmed[

// Since a tx can have many attempts, we take the number of blocks to confirm as the block number
// of the receipt minus the block number of the first ever broadcast for this transaction.
broadcastBefore := utils.MinKey(attempt.Tx.TxAttempts, func(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) int64 {
broadcastBefore := iutils.MinFunc(attempt.Tx.TxAttempts, func(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) int64 {
if attempt.BroadcastBeforeBlockNum != nil {
return *attempt.BroadcastBeforeBlockNum
}
Expand Down
2 changes: 1 addition & 1 deletion common/txmgr/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

// Reaper handles periodic database cleanup for Txm
Expand Down
2 changes: 1 addition & 1 deletion common/txmgr/resender.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/chains/label"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

"github.com/smartcontractkit/chainlink/v2/common/client"
feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

const (
Expand Down
8 changes: 4 additions & 4 deletions common/txmgr/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"

feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/common/types"

"github.com/smartcontractkit/chainlink/v2/core/utils"
)

const (
Expand Down Expand Up @@ -56,7 +56,7 @@ type Tracker[
txCache map[int64]AbandonedTx[ADDR]
ttl time.Duration
lock sync.Mutex
mb *utils.Mailbox[int64]
mb *mailbox.Mailbox[int64]
wg sync.WaitGroup
isStarted bool
ctx context.Context
Expand Down Expand Up @@ -85,7 +85,7 @@ func NewTracker[
enabledAddrs: map[ADDR]bool{},
txCache: map[int64]AbandonedTx[ADDR]{},
ttl: defaultTTL,
mb: utils.NewSingleMailbox[int64](),
mb: mailbox.NewSingle[int64](),
lock: sync.Mutex{},
wg: sync.WaitGroup{},
}
Expand Down
8 changes: 5 additions & 3 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
iutils "github.com/smartcontractkit/chainlink/v2/common/internal/utils"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

// For more information about the Txm architecture, see the design doc:
Expand Down Expand Up @@ -342,7 +344,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
ctx, cancel := b.chStop.NewCtx()
defer cancel()
// Retry indefinitely on failure
backoff := utils.NewRedialBackoff()
backoff := iutils.NewRedialBackoff()
for {
select {
case <-time.After(backoff.Duration()):
Expand All @@ -361,7 +363,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
go func() {
defer wg.Done()
// Retry indefinitely on failure
backoff := utils.NewRedialBackoff()
backoff := iutils.NewRedialBackoff()
for {
select {
case <-time.After(backoff.Duration()):
Expand Down
3 changes: 2 additions & 1 deletion common/txmgr/types/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
clnull "github.com/smartcontractkit/chainlink-common/pkg/utils/null"

feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
clnull "github.com/smartcontractkit/chainlink/v2/core/null"
)

// TxStrategy controls how txes are queued and sent
Expand Down
6 changes: 3 additions & 3 deletions core/chains/evm/gas/block_history_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mathutil"

"github.com/smartcontractkit/chainlink/v2/common/config"
Expand All @@ -25,7 +26,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

// MaxStartTime is the maximum amount of time we are allowed to spend
Expand Down Expand Up @@ -109,7 +109,7 @@ type (
blocks []evmtypes.Block
blocksMu sync.RWMutex
size int64
mb *utils.Mailbox[*evmtypes.Head]
mb *mailbox.Mailbox[*evmtypes.Head]
wg *sync.WaitGroup
ctx context.Context
ctxCancel context.CancelFunc
Expand Down Expand Up @@ -139,7 +139,7 @@ func NewBlockHistoryEstimator(lggr logger.Logger, ethClient evmclient.Client, cf
blocks: make([]evmtypes.Block, 0),
// Must have enough blocks for both estimator and connectivity checker
size: int64(mathutil.Max(bhCfg.BlockHistorySize(), bhCfg.CheckInclusionBlocks())),
mb: utils.NewSingleMailbox[*evmtypes.Head](),
mb: mailbox.NewSingle[*evmtypes.Head](),
wg: new(sync.WaitGroup),
ctx: ctx,
ctxCancel: cancel,
Expand Down
3 changes: 2 additions & 1 deletion core/chains/evm/headtracker/head_broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"

commonhtrk "github.com/smartcontractkit/chainlink/v2/common/headtracker"
commonmocks "github.com/smartcontractkit/chainlink/v2/common/types/mocks"
Expand Down Expand Up @@ -71,7 +72,7 @@ func TestHeadBroadcaster_Subscribe(t *testing.T) {

orm := headtracker.NewORM(db, logger, cfg.Database(), *ethClient.ConfiguredChainID())
hs := headtracker.NewHeadSaver(logger, orm, evmCfg.EVM(), evmCfg.EVM().HeadTracker())
mailMon := utils.NewMailboxMonitor(t.Name())
mailMon := mailbox.NewMonitor(t.Name())
servicetest.Run(t, mailMon)
hb := headtracker.NewHeadBroadcaster(logger)
servicetest.Run(t, hb)
Expand Down
Loading

0 comments on commit 306eadc

Please sign in to comment.