Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix panic on mercury server error (#13231) #13245

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/eighty-hotels-sit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Fix panic if mercury server returns error #bugfix
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
You may disable if this results in excessive log volume. Disable like so:

```
[Pipeline]
[JobPipeline]
VerboseLogging = false
```

Expand Down Expand Up @@ -79,7 +79,7 @@

- [#12404](https://github.com/smartcontractkit/chainlink/pull/12404) [`b74079b672`](https://github.com/smartcontractkit/chainlink/commit/b74079b672f36fb0c241f90ea1e875ea3a9524da) Thanks [@HenryNguyen5](https://github.com/HenryNguyen5)! - Add OCR3 capability contract wrapper

- [#12498](https://github.com/smartcontractkit/chainlink/pull/12498) [`1c576d0e34`](https://github.com/smartcontractkit/chainlink/commit/1c576d0e34d93a6298ddcb662ee89fd04eeda53e) Thanks [@samsondav](https://github.com/samsondav)! - Add new config option Pipeline.VerboseLogging
- [#12498](https://github.com/smartcontractkit/chainlink/pull/12498) [`1c576d0e34`](https://github.com/smartcontractkit/chainlink/commit/1c576d0e34d93a6298ddcb662ee89fd04eeda53e) Thanks [@samsondav](https://github.com/samsondav)! - Add new config option JobPipeline.VerboseLogging

VerboseLogging enables detailed logging of pipeline execution steps. This is
disabled by default because it increases log volume for pipeline runs, but can
Expand All @@ -90,7 +90,7 @@
Set it like the following example:

```
[Pipeline]
[JobPipeline]
VerboseLogging = true
```

Expand Down
43 changes: 26 additions & 17 deletions core/services/relay/evm/mercury/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type asyncDeleter interface {
AsyncDelete(req *pb.TransmitRequest)
}

var _ services.Service = (*TransmitQueue)(nil)
var _ services.Service = (*transmitQueue)(nil)

var transmitQueueLoad = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "mercury_transmit_queue_load",
Expand All @@ -40,7 +40,7 @@ const promInterval = 6500 * time.Millisecond

// TransmitQueue is the high-level package that everything outside of this file should be using
// It stores pending transmissions, yielding the latest (highest priority) first to the caller
type TransmitQueue struct {
type transmitQueue struct {
services.StateMachine

cond sync.Cond
Expand All @@ -62,11 +62,20 @@ type Transmission struct {
ReportCtx ocrtypes.ReportContext // contains priority information (latest epoch/round wins)
}

type TransmitQueue interface {
services.Service

BlockingPop() (t *Transmission)
Push(req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) (ok bool)
Init(transmissions []*Transmission)
IsEmpty() bool
}

// maxlen controls how many items will be stored in the queue
// 0 means unlimited - be careful, this can cause memory leaks
func NewTransmitQueue(lggr logger.Logger, serverURL, feedID string, maxlen int, asyncDeleter asyncDeleter) *TransmitQueue {
func NewTransmitQueue(lggr logger.Logger, serverURL, feedID string, maxlen int, asyncDeleter asyncDeleter) TransmitQueue {
mu := new(sync.RWMutex)
return &TransmitQueue{
return &transmitQueue{
services.StateMachine{},
sync.Cond{L: mu},
lggr.Named("TransmitQueue"),
Expand All @@ -80,13 +89,13 @@ func NewTransmitQueue(lggr logger.Logger, serverURL, feedID string, maxlen int,
}
}

func (tq *TransmitQueue) Init(transmissions []*Transmission) {
func (tq *transmitQueue) Init(transmissions []*Transmission) {
pq := priorityQueue(transmissions)
heap.Init(&pq) // ensure the heap is ordered
tq.pq = &pq
}

func (tq *TransmitQueue) Push(req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) (ok bool) {
func (tq *transmitQueue) Push(req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) (ok bool) {
tq.cond.L.Lock()
defer tq.cond.L.Unlock()

Expand All @@ -111,7 +120,7 @@ func (tq *TransmitQueue) Push(req *pb.TransmitRequest, reportCtx ocrtypes.Report

// BlockingPop will block until at least one item is in the heap, and then return it
// If the queue is closed, it will immediately return nil
func (tq *TransmitQueue) BlockingPop() (t *Transmission) {
func (tq *transmitQueue) BlockingPop() (t *Transmission) {
tq.cond.L.Lock()
defer tq.cond.L.Unlock()
if tq.closed {
Expand All @@ -126,13 +135,13 @@ func (tq *TransmitQueue) BlockingPop() (t *Transmission) {
return t
}

func (tq *TransmitQueue) IsEmpty() bool {
func (tq *transmitQueue) IsEmpty() bool {
tq.mu.RLock()
defer tq.mu.RUnlock()
return tq.pq.Len() == 0
}

func (tq *TransmitQueue) Start(context.Context) error {
func (tq *transmitQueue) Start(context.Context) error {
return tq.StartOnce("TransmitQueue", func() error {
t := time.NewTicker(utils.WithJitter(promInterval))
wg := new(sync.WaitGroup)
Expand All @@ -148,7 +157,7 @@ func (tq *TransmitQueue) Start(context.Context) error {
})
}

func (tq *TransmitQueue) Close() error {
func (tq *transmitQueue) Close() error {
return tq.StopOnce("TransmitQueue", func() error {
tq.cond.L.Lock()
tq.closed = true
Expand All @@ -159,7 +168,7 @@ func (tq *TransmitQueue) Close() error {
})
}

func (tq *TransmitQueue) monitorLoop(c <-chan time.Time, chStop <-chan struct{}, wg *sync.WaitGroup) {
func (tq *transmitQueue) monitorLoop(c <-chan time.Time, chStop <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()

for {
Expand All @@ -172,25 +181,25 @@ func (tq *TransmitQueue) monitorLoop(c <-chan time.Time, chStop <-chan struct{},
}
}

func (tq *TransmitQueue) report() {
func (tq *transmitQueue) report() {
tq.mu.RLock()
length := tq.pq.Len()
tq.mu.RUnlock()
tq.transmitQueueLoad.Set(float64(length))
}

func (tq *TransmitQueue) Ready() error {
func (tq *transmitQueue) Ready() error {
return nil
}
func (tq *TransmitQueue) Name() string { return tq.lggr.Name() }
func (tq *TransmitQueue) HealthReport() map[string]error {
func (tq *transmitQueue) Name() string { return tq.lggr.Name() }
func (tq *transmitQueue) HealthReport() map[string]error {
report := map[string]error{tq.Name(): errors.Join(
tq.status(),
)}
return report
}

func (tq *TransmitQueue) status() (merr error) {
func (tq *transmitQueue) status() (merr error) {
tq.mu.RLock()
length := tq.pq.Len()
closed := tq.closed
Expand All @@ -206,7 +215,7 @@ func (tq *TransmitQueue) status() (merr error) {

// pop latest Transmission from the heap
// Not thread-safe
func (tq *TransmitQueue) pop() *Transmission {
func (tq *transmitQueue) pop() *Transmission {
if tq.pq.Len() == 0 {
return nil
}
Expand Down
7 changes: 5 additions & 2 deletions core/services/relay/evm/mercury/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,12 @@ type server struct {

c wsrpc.Client
pm *PersistenceManager
q *TransmitQueue
q TransmitQueue

deleteQueue chan *pb.TransmitRequest

url string

transmitSuccessCount prometheus.Counter
transmitDuplicateCount prometheus.Counter
transmitConnectionErrorCount prometheus.Counter
Expand Down Expand Up @@ -259,7 +261,7 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, feed
s.transmitDuplicateCount.Inc()
s.lggr.Debugw("Transmit report success; duplicate report", "payload", hexutil.Encode(t.Req.Payload), "response", res, "reportCtx", t.ReportCtx)
default:
transmitServerErrorCount.WithLabelValues(feedIDHex, fmt.Sprintf("%d", res.Code)).Inc()
transmitServerErrorCount.WithLabelValues(feedIDHex, s.url, fmt.Sprintf("%d", res.Code)).Inc()
s.lggr.Errorw("Transmit report failed; mercury server returned error", "response", res, "reportCtx", t.ReportCtx, "err", res.Error, "code", res.Code)
}
}
Expand All @@ -284,6 +286,7 @@ func NewTransmitter(lggr logger.Logger, clients map[string]wsrpc.Client, fromAcc
pm,
NewTransmitQueue(cLggr, serverURL, feedIDHex, maxTransmitQueueSize, pm),
make(chan *pb.TransmitRequest, maxDeleteQueueSize),
serverURL,
transmitSuccessCount.WithLabelValues(feedIDHex, serverURL),
transmitDuplicateCount.WithLabelValues(feedIDHex, serverURL),
transmitConnectionErrorCount.WithLabelValues(feedIDHex, serverURL),
Expand Down
Loading
Loading