Skip to content

Commit

Permalink
bump common; use SugaredLogger methods (#11556)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 authored Dec 13, 2023
1 parent 9b50041 commit 862f79a
Show file tree
Hide file tree
Showing 34 changed files with 203 additions and 220 deletions.
17 changes: 7 additions & 10 deletions common/client/multi_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type multiNode[
sendonlys []SendOnlyNode[CHAIN_ID, RPC_CLIENT]
chainID CHAIN_ID
chainType config.ChainType
lggr logger.Logger
lggr logger.SugaredLogger
selectionMode string
noNewHeadsThreshold time.Duration
nodeSelector NodeSelector[CHAIN_ID, HEAD, RPC_CLIENT]
Expand Down Expand Up @@ -118,7 +118,7 @@ func NewMultiNode[
HEAD types.Head[BLOCK_HASH],
RPC_CLIENT RPC[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD],
](
l logger.Logger,
lggr logger.Logger,
selectionMode string,
leaseDuration time.Duration,
noNewHeadsThreshold time.Duration,
Expand All @@ -131,9 +131,6 @@ func NewMultiNode[
) MultiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT] {
nodeSelector := newNodeSelector(selectionMode, nodes)

lggr := logger.Named(l, "MultiNode")
lggr = logger.With(lggr, "chainID", chainID.String())

// Prometheus' default interval is 15s, set this to under 7.5s to avoid
// aliasing (see: https://en.wikipedia.org/wiki/Nyquist_frequency)
const reportInterval = 6500 * time.Millisecond
Expand All @@ -142,7 +139,7 @@ func NewMultiNode[
sendonlys: sendonlys,
chainID: chainID,
chainType: chainType,
lggr: lggr,
lggr: logger.Sugared(lggr).Named("MultiNode").With("chainID", chainID.String()),
selectionMode: selectionMode,
noNewHeadsThreshold: noNewHeadsThreshold,
nodeSelector: nodeSelector,
Expand Down Expand Up @@ -249,7 +246,7 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
c.activeNode = c.nodeSelector.Select()

if c.activeNode == nil {
logger.Criticalw(c.lggr, "No live RPC nodes available", "NodeSelectionMode", c.nodeSelector.Name())
c.lggr.Criticalw("No live RPC nodes available", "NodeSelectionMode", c.nodeSelector.Name())
errmsg := fmt.Errorf("no live nodes available for chain %s", c.chainID.String())
c.SvcErrBuffer.Append(errmsg)
err = ErroringNodeError
Expand Down Expand Up @@ -351,10 +348,10 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
}

live := total - dead
logger.Tracew(c.lggr, fmt.Sprintf("MultiNode state: %d/%d nodes are alive", live, total), "nodeStates", nodeStates)
c.lggr.Tracew(fmt.Sprintf("MultiNode state: %d/%d nodes are alive", live, total), "nodeStates", nodeStates)
if total == dead {
rerr := fmt.Errorf("no primary nodes available: 0/%d nodes are alive", total)
logger.Criticalw(c.lggr, rerr.Error(), "nodeStates", nodeStates)
c.lggr.Criticalw(rerr.Error(), "nodeStates", nodeStates)
c.SvcErrBuffer.Append(rerr)
} else if dead > 0 {
c.lggr.Errorw(fmt.Sprintf("At least one primary node is dead: %d/%d nodes are alive", live, total), "nodeStates", nodeStates)
Expand Down Expand Up @@ -405,7 +402,7 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
if err != nil {
c.lggr.Debugw("Secondary node BatchCallContext failed", "err", err)
} else {
logger.Trace(c.lggr, "Secondary node BatchCallContext success")
c.lggr.Trace("Secondary node BatchCallContext success")
}
}(n)
}
Expand Down
29 changes: 14 additions & 15 deletions common/client/node_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
pollFailureThreshold := n.nodePoolCfg.PollFailureThreshold()
pollInterval := n.nodePoolCfg.PollInterval()

lggr := logger.Named(n.lfcLog, "Alive")
lggr = logger.With(lggr, "noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold, "pollInterval", pollInterval, "pollFailureThreshold", pollFailureThreshold)
logger.Tracew(lggr, "Alive loop starting", "nodeState", n.State())
lggr := logger.Sugared(n.lfcLog).Named("Alive").With("noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold, "pollInterval", pollInterval, "pollFailureThreshold", pollFailureThreshold)
lggr.Tracew("Alive loop starting", "nodeState", n.State())

headsC := make(chan HEAD)
sub, err := n.rpc.Subscribe(n.nodeCtx, headsC, rpcSubscriptionMethodNewHeads)
Expand Down Expand Up @@ -146,7 +145,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
case <-pollCh:
var version string
promPoolRPCNodePolls.WithLabelValues(n.chainID.String(), n.name).Inc()
logger.Tracew(lggr, "Polling for version", "nodeState", n.State(), "pollFailures", pollFailures)
lggr.Tracew("Polling for version", "nodeState", n.State(), "pollFailures", pollFailures)
ctx, cancel := context.WithTimeout(n.nodeCtx, pollInterval)
version, err := n.RPC().ClientVersion(ctx)
cancel()
Expand All @@ -166,7 +165,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
lggr.Errorw(fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailures), "pollFailures", pollFailures, "nodeState", n.State())
if n.nLiveNodes != nil {
if l, _, _ := n.nLiveNodes(); l < 2 {
logger.Criticalf(lggr, "RPC endpoint failed to respond to polls; %s %s", msgCannotDisable, msgDegradedState)
lggr.Criticalf("RPC endpoint failed to respond to polls; %s %s", msgCannotDisable, msgDegradedState)
continue
}
}
Expand All @@ -178,7 +177,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
// note: there must be another live node for us to be out of sync
lggr.Errorw("RPC endpoint has fallen behind", "blockNumber", num, "totalDifficulty", td, "nodeState", n.State())
if liveNodes < 2 {
logger.Criticalf(lggr, "RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState)
lggr.Criticalf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState)
continue
}
n.declareOutOfSync(n.isOutOfSync)
Expand All @@ -191,13 +190,13 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
return
}
promPoolRPCNodeNumSeenBlocks.WithLabelValues(n.chainID.String(), n.name).Inc()
logger.Tracew(lggr, "Got head", "head", bh)
lggr.Tracew("Got head", "head", bh)
if bh.BlockNumber() > highestReceivedBlockNumber {
promPoolRPCNodeHighestSeenBlock.WithLabelValues(n.chainID.String(), n.name).Set(float64(bh.BlockNumber()))
logger.Tracew(lggr, "Got higher block number, resetting timer", "latestReceivedBlockNumber", highestReceivedBlockNumber, "blockNumber", bh.BlockNumber(), "nodeState", n.State())
lggr.Tracew("Got higher block number, resetting timer", "latestReceivedBlockNumber", highestReceivedBlockNumber, "blockNumber", bh.BlockNumber(), "nodeState", n.State())
highestReceivedBlockNumber = bh.BlockNumber()
} else {
logger.Tracew(lggr, "Ignoring previously seen block number", "latestReceivedBlockNumber", highestReceivedBlockNumber, "blockNumber", bh.BlockNumber(), "nodeState", n.State())
lggr.Tracew("Ignoring previously seen block number", "latestReceivedBlockNumber", highestReceivedBlockNumber, "blockNumber", bh.BlockNumber(), "nodeState", n.State())
}
if outOfSyncT != nil {
outOfSyncT.Reset(noNewHeadsTimeoutThreshold)
Expand All @@ -213,7 +212,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
lggr.Errorw(fmt.Sprintf("RPC endpoint detected out of sync; no new heads received for %s (last head received was %v)", noNewHeadsTimeoutThreshold, highestReceivedBlockNumber), "nodeState", n.State(), "latestReceivedBlockNumber", highestReceivedBlockNumber, "noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold)
if n.nLiveNodes != nil {
if l, _, _ := n.nLiveNodes(); l < 2 {
logger.Criticalf(lggr, "RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState)
lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState)
// We don't necessarily want to wait the full timeout to check again, we should
// check regularly and log noisily in this state
outOfSyncT.Reset(zombieNodeCheckInterval(n.noNewHeadsThreshold))
Expand Down Expand Up @@ -279,7 +278,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(isOutOfSync func(num int64, td

outOfSyncAt := time.Now()

lggr := logger.Named(n.lfcLog, "OutOfSync")
lggr := logger.Sugared(logger.Named(n.lfcLog, "OutOfSync"))
lggr.Debugw("Trying to revive out-of-sync RPC node", "nodeState", n.State())

// Need to redial since out-of-sync nodes are automatically disconnected
Expand All @@ -296,7 +295,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(isOutOfSync func(num int64, td
return
}

logger.Tracew(lggr, "Successfully subscribed to heads feed on out-of-sync RPC node", "nodeState", n.State())
lggr.Tracew("Successfully subscribed to heads feed on out-of-sync RPC node", "nodeState", n.State())

ch := make(chan HEAD)
sub, err := n.rpc.Subscribe(n.nodeCtx, ch, rpcSubscriptionMethodNewHeads)
Expand Down Expand Up @@ -328,7 +327,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(isOutOfSync func(num int64, td
case <-time.After(zombieNodeCheckInterval(n.noNewHeadsThreshold)):
if n.nLiveNodes != nil {
if l, _, _ := n.nLiveNodes(); l < 1 {
logger.Critical(lggr, "RPC endpoint is still out of sync, but there are no other available nodes. This RPC node will be forcibly moved back into the live pool in a degraded state")
lggr.Critical("RPC endpoint is still out of sync, but there are no other available nodes. This RPC node will be forcibly moved back into the live pool in a degraded state")
n.declareInSync()
return
}
Expand Down Expand Up @@ -358,7 +357,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() {

unreachableAt := time.Now()

lggr := logger.Named(n.lfcLog, "Unreachable")
lggr := logger.Sugared(logger.Named(n.lfcLog, "Unreachable"))
lggr.Debugw("Trying to revive unreachable RPC node", "nodeState", n.State())

dialRetryBackoff := iutils.NewRedialBackoff()
Expand All @@ -368,7 +367,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() {
case <-n.nodeCtx.Done():
return
case <-time.After(dialRetryBackoff.Duration()):
logger.Tracew(lggr, "Trying to re-dial RPC node", "nodeState", n.State())
lggr.Tracew("Trying to re-dial RPC node", "nodeState", n.State())

err := n.rpc.Dial(n.nodeCtx)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions common/headtracker/head_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type HeadTracker[
BLOCK_HASH types.Hashable,
] struct {
services.StateMachine
log logger.Logger
log logger.SugaredLogger
headBroadcaster types.HeadBroadcaster[HTH, BLOCK_HASH]
headSaver types.HeadSaver[HTH, BLOCK_HASH]
mailMon *mailbox.Monitor
Expand Down Expand Up @@ -81,7 +81,7 @@ func NewHeadTracker[
chainID: client.ConfiguredChainID(),
config: config,
htConfig: htConfig,
log: lggr,
log: logger.Sugared(lggr),
backfillMB: mailbox.NewSingle[HTH](),
broadcastMB: mailbox.New[HTH](HeadsBufferSize),
chStop: chStop,
Expand Down Expand Up @@ -227,7 +227,7 @@ func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) handleNewHead(ctx context.Context
prevUnFinalizedHead := prevHead.BlockNumber() - int64(ht.config.FinalityDepth())
if head.BlockNumber() < prevUnFinalizedHead {
promOldHead.WithLabelValues(ht.chainID.String()).Inc()
logger.Criticalf(ht.log, "Got very old block with number %d (highest seen was %d). This is a problem and either means a very deep re-org occurred, one of the RPC nodes has gotten far out of sync, or the chain went backwards in block numbers. This node may not function correctly without manual intervention.", head.BlockNumber(), prevHead.BlockNumber())
ht.log.Criticalf("Got very old block with number %d (highest seen was %d). This is a problem and either means a very deep re-org occurred, one of the RPC nodes has gotten far out of sync, or the chain went backwards in block numbers. This node may not function correctly without manual intervention.", head.BlockNumber(), prevHead.BlockNumber())
ht.SvcErrBuffer.Append(errors.New("got very old block"))
}
}
Expand Down Expand Up @@ -310,7 +310,7 @@ func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) backfill(ctx context.Context, hea
}
mark := time.Now()
fetched := 0
l := logger.With(ht.log, "blockNumber", headBlockNumber,
l := ht.log.With("blockNumber", headBlockNumber,
"n", headBlockNumber-baseHeight,
"fromBlockHeight", baseHeight,
"toBlockHeight", headBlockNumber-1)
Expand Down
18 changes: 9 additions & 9 deletions common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type TransmitChecker[
// is returned. Errors should only be returned if the checker can confirm that a transaction
// should not be sent, other errors (for example connection or other unexpected errors) should
// be logged and swallowed.
Check(ctx context.Context, l logger.Logger, tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], a txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
Check(ctx context.Context, l logger.SugaredLogger, tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], a txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
}

// Broadcaster monitors txes for transactions that need to
Expand All @@ -108,7 +108,7 @@ type Broadcaster[
FEE feetypes.Fee,
] struct {
services.StateMachine
lggr logger.Logger
lggr logger.SugaredLogger
txStore txmgrtypes.TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE]
client txmgrtypes.TransactionClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
Expand Down Expand Up @@ -172,7 +172,7 @@ func NewBroadcaster[
) *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
lggr = logger.Named(lggr, "Broadcaster")
b := &Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{
lggr: lggr,
lggr: logger.Sugared(lggr),
txStore: txStore,
client: client,
TxAttemptBuilder: txAttemptBuilder,
Expand Down Expand Up @@ -311,7 +311,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) getS
if err == nil {
return seq, nil
}
logger.Criticalw(eb.lggr, "failed to retrieve next sequence from on-chain for address: ", "address", address.String())
eb.lggr.Criticalw("failed to retrieve next sequence from on-chain for address: ", "address", address.String())
return seq, err

}
Expand Down Expand Up @@ -399,7 +399,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Sync
localSequence, err := eb.GetNextSequence(ctx, addr)
// Address not found in map so skip sync
if err != nil {
logger.Criticalw(eb.lggr, "Failed to retrieve local next sequence for address", "address", addr.String(), "err", err)
eb.lggr.Criticalw("Failed to retrieve local next sequence for address", "address", addr.String(), "err", err)
return
}

Expand All @@ -414,7 +414,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Sync
newNextSequence, err := eb.sequenceSyncer.Sync(ctx, addr, localSequence)
if err != nil {
if attempt > 5 {
logger.Criticalw(eb.lggr, "Failed to sync with on-chain sequence", "address", addr.String(), "attempt", attempt, "err", err)
eb.lggr.Criticalw("Failed to sync with on-chain sequence", "address", addr.String(), "attempt", attempt, "err", err)
eb.SvcErrBuffer.Append(err)
} else {
eb.lggr.Warnw("Failed to sync with on-chain sequence", "address", addr.String(), "attempt", attempt, "err", err)
Expand Down Expand Up @@ -537,7 +537,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
return fmt.Errorf("building transmit checker: %w", err), false
}

lgr := etx.GetLogger(logger.With(eb.lggr, "fee", attempt.TxFee))
lgr := etx.GetLogger(eb.lggr.With("fee", attempt.TxFee))

// If the transmit check does not complete within the timeout, the transaction will be sent
// anyway.
Expand Down Expand Up @@ -647,14 +647,14 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
// If there is only one RPC node, or all RPC nodes have the same
// configured cap, this transaction will get stuck and keep repeating
// forever until the issue is resolved.
logger.Criticalw(lgr, `RPC node rejected this tx as outside Fee Cap`)
lgr.Criticalw(`RPC node rejected this tx as outside Fee Cap`)
fallthrough
default:
// Every error that doesn't fall under one of the above categories will be treated as Unknown.
fallthrough
case client.Unknown:
eb.SvcErrBuffer.Append(err)
logger.Criticalw(lgr, `Unknown error occurred while handling tx queue in ProcessUnstartedTxs. This chain/RPC client may not be supported. `+
lgr.Criticalw(`Unknown error occurred while handling tx queue in ProcessUnstartedTxs. This chain/RPC client may not be supported. `+
`Urgent resolution required, Chainlink is currently operating in a degraded state and may miss transactions`, "err", err, "etx", etx, "attempt", attempt)
nextSequence, e := eb.client.PendingSequenceAt(ctx, etx.FromAddress)
if e != nil {
Expand Down
Loading

0 comments on commit 862f79a

Please sign in to comment.