Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempt fix for duplicate metrics collector error #15663

Merged
merged 2 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 48 additions & 11 deletions core/services/llo/mercurytransmitter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,22 @@ var (
},
[]string{"donID", "serverURL", "code"},
)
promTransmitConcurrentTransmitGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "llo",
Subsystem: "mercurytransmitter",
Name: "concurrent_transmit_gauge",
Help: "Gauge that measures the number of transmit threads currently waiting on a remote transmit call. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.",
},
[]string{"donID", "serverURL"},
)
promTransmitConcurrentDeleteGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "llo",
Subsystem: "mercurytransmitter",
Name: "concurrent_delete_gauge",
Help: "Gauge that measures the number of delete threads currently waiting on a delete call to the DB. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.",
},
[]string{"donID", "serverURL"},
)
)

type ReportPacker interface {
Expand All @@ -87,12 +103,14 @@ type server struct {
evmPremiumLegacyPacker ReportPacker
jsonPacker ReportPacker

transmitSuccessCount prometheus.Counter
transmitDuplicateCount prometheus.Counter
transmitConnectionErrorCount prometheus.Counter
transmitQueueDeleteErrorCount prometheus.Counter
transmitQueueInsertErrorCount prometheus.Counter
transmitQueuePushErrorCount prometheus.Counter
transmitSuccessCount prometheus.Counter
transmitDuplicateCount prometheus.Counter
transmitConnectionErrorCount prometheus.Counter
transmitQueueDeleteErrorCount prometheus.Counter
transmitQueueInsertErrorCount prometheus.Counter
transmitQueuePushErrorCount prometheus.Counter
transmitConcurrentTransmitGauge prometheus.Gauge
transmitConcurrentDeleteGauge prometheus.Gauge

transmitThreadBusyCount atomic.Int32
deleteThreadBusyCount atomic.Int32
Expand Down Expand Up @@ -130,6 +148,8 @@ func newServer(lggr logger.Logger, verboseLogging bool, cfg QueueConfig, client
promTransmitQueueDeleteErrorCount.WithLabelValues(donIDStr, serverURL),
promTransmitQueueInsertErrorCount.WithLabelValues(donIDStr, serverURL),
promTransmitQueuePushErrorCount.WithLabelValues(donIDStr, serverURL),
promTransmitConcurrentTransmitGauge.WithLabelValues(donIDStr, serverURL),
promTransmitConcurrentDeleteGauge.WithLabelValues(donIDStr, serverURL),
atomic.Int32{},
atomic.Int32{},
}
Expand Down Expand Up @@ -161,7 +181,7 @@ func (s *server) runDeleteQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup
select {
case hash := <-s.deleteQueue:
for {
s.deleteThreadBusyCount.Add(1)
s.deleteThreadBusyCountInc()
if err := s.pm.orm.Delete(ctx, [][32]byte{hash}); err != nil {
s.lggr.Errorw("Failed to delete transmission record", "err", err, "transmissionHash", hash)
s.transmitQueueDeleteErrorCount.Inc()
Expand All @@ -170,7 +190,7 @@ func (s *server) runDeleteQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup
// Wait a backoff duration before trying to delete again
continue
case <-stopCh:
s.deleteThreadBusyCount.Add(-1)
s.deleteThreadBusyCountDec()
// abort and return immediately on stop even if items remain in queue
return
}
Expand All @@ -179,14 +199,31 @@ func (s *server) runDeleteQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup
}
// success
b.Reset()
s.deleteThreadBusyCount.Add(-1)
s.deleteThreadBusyCountDec()
case <-stopCh:
// abort and return immediately on stop even if items remain in queue
return
}
}
}

func (s *server) transmitThreadBusyCountInc() {
val := s.transmitThreadBusyCount.Add(1)
s.transmitConcurrentTransmitGauge.Set(float64(val))
}
func (s *server) transmitThreadBusyCountDec() {
val := s.transmitThreadBusyCount.Add(-1)
s.transmitConcurrentTransmitGauge.Set(float64(val))
}
func (s *server) deleteThreadBusyCountInc() {
val := s.deleteThreadBusyCount.Add(1)
s.transmitConcurrentDeleteGauge.Set(float64(val))
}
func (s *server) deleteThreadBusyCountDec() {
val := s.deleteThreadBusyCount.Add(-1)
s.transmitConcurrentDeleteGauge.Set(float64(val))
}

func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, donIDStr string) {
defer wg.Done()
// Exponential backoff with very short retry interval (since latency is a priority)
Expand All @@ -208,8 +245,8 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, donI
return false
}

s.transmitThreadBusyCount.Add(1)
defer s.transmitThreadBusyCount.Add(-1)
s.transmitThreadBusyCountInc()
defer s.transmitThreadBusyCountDec()

req, res, err := func(ctx context.Context) (*pb.TransmitRequest, *pb.TransmitResponse, error) {
ctx, cancelFn := context.WithTimeout(ctx, utils.WithJitter(s.transmitTimeout))
Expand Down
34 changes: 1 addition & 33 deletions core/services/llo/mercurytransmitter/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ type transmitter struct {
orm ORM
servers map[string]*server
registerer prometheus.Registerer
collectors []prometheus.Collector

donID uint32
fromAccount string
Expand Down Expand Up @@ -155,7 +154,6 @@ func newTransmitter(opts Opts) *transmitter {
opts.ORM,
servers,
opts.Registerer,
nil,
opts.DonID,
fmt.Sprintf("%x", opts.FromAccount),
make(services.StopChan),
Expand Down Expand Up @@ -194,31 +192,6 @@ func (mt *transmitter) Start(ctx context.Context) (err error) {
go s.runDeleteQueueLoop(mt.stopCh, mt.wg)
go s.runQueueLoop(mt.stopCh, mt.wg, donIDStr)
}
mt.collectors = append(mt.collectors, prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "llo",
Subsystem: "mercurytransmitter",
Name: "concurrent_transmit_gauge",
Help: "Gauge that measures the number of transmit threads currently waiting on a remote transmit call. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.",
ConstLabels: prometheus.Labels{"donID": donIDStr, "serverURL": s.url, "maxConcurrentTransmits": strconv.FormatInt(int64(nThreads), 10)},
}, func() float64 {
return float64(s.transmitThreadBusyCount.Load())
}))
mt.collectors = append(mt.collectors, prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "llo",
Subsystem: "mercurytransmitter",
Name: "concurrent_delete_gauge",
Help: "Gauge that measures the number of delete threads currently waiting on a delete call to the DB. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.",
ConstLabels: prometheus.Labels{"donID": donIDStr, "serverURL": s.url, "maxConcurrentDeletes": strconv.FormatInt(int64(nThreads), 10)},
}, func() float64 {
return float64(s.deleteThreadBusyCount.Load())
}))
for _, c := range mt.collectors {
if err := mt.registerer.Register(c); err != nil {
return err
}
}
}
if err := (&services.MultiStart{}).Start(ctx, startClosers...); err != nil {
return err
Expand Down Expand Up @@ -250,12 +223,7 @@ func (mt *transmitter) Close() error {
closers = append(closers, s.pm)
closers = append(closers, s.c)
}
err := services.CloseAll(closers...)
// Unregister all the gauge funcs
for _, c := range mt.collectors {
mt.registerer.Unregister(c)
}
return err
return services.CloseAll(closers...)
})
}

Expand Down
Loading