From 3090b59ea47f5866bbee524b8c0253bd3f1794bf Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Thu, 12 Dec 2024 09:35:08 -0500 Subject: [PATCH 1/2] Attempt fix for duplicate metrics collector error --- .../llo/mercurytransmitter/transmitter.go | 50 +++++++++---------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/core/services/llo/mercurytransmitter/transmitter.go b/core/services/llo/mercurytransmitter/transmitter.go index 8e60bf938a5..92f17534782 100644 --- a/core/services/llo/mercurytransmitter/transmitter.go +++ b/core/services/llo/mercurytransmitter/transmitter.go @@ -194,31 +194,31 @@ func (mt *transmitter) Start(ctx context.Context) (err error) { go s.runDeleteQueueLoop(mt.stopCh, mt.wg) go s.runQueueLoop(mt.stopCh, mt.wg, donIDStr) } - mt.collectors = append(mt.collectors, prometheus.NewGaugeFunc( - prometheus.GaugeOpts{ - Namespace: "llo", - Subsystem: "mercurytransmitter", - Name: "concurrent_transmit_gauge", - Help: "Gauge that measures the number of transmit threads currently waiting on a remote transmit call. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.", - ConstLabels: prometheus.Labels{"donID": donIDStr, "serverURL": s.url, "maxConcurrentTransmits": strconv.FormatInt(int64(nThreads), 10)}, - }, func() float64 { - return float64(s.transmitThreadBusyCount.Load()) - })) - mt.collectors = append(mt.collectors, prometheus.NewGaugeFunc( - prometheus.GaugeOpts{ - Namespace: "llo", - Subsystem: "mercurytransmitter", - Name: "concurrent_delete_gauge", - Help: "Gauge that measures the number of delete threads currently waiting on a delete call to the DB. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.", - ConstLabels: prometheus.Labels{"donID": donIDStr, "serverURL": s.url, "maxConcurrentDeletes": strconv.FormatInt(int64(nThreads), 10)}, - }, func() float64 { - return float64(s.deleteThreadBusyCount.Load()) - })) - for _, c := range mt.collectors { - if err := mt.registerer.Register(c); err != nil { - return err - } - } + // mt.collectors = append(mt.collectors, prometheus.NewGaugeFunc( + // prometheus.GaugeOpts{ + // Namespace: "llo", + // Subsystem: "mercurytransmitter", + // Name: "concurrent_transmit_gauge", + // Help: "Gauge that measures the number of transmit threads currently waiting on a remote transmit call. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.", + // ConstLabels: prometheus.Labels{"donID": donIDStr, "serverURL": s.url, "maxConcurrentTransmits": strconv.FormatInt(int64(nThreads), 10)}, + // }, func() float64 { + // return float64(s.transmitThreadBusyCount.Load()) + // })) + // mt.collectors = append(mt.collectors, prometheus.NewGaugeFunc( + // prometheus.GaugeOpts{ + // Namespace: "llo", + // Subsystem: "mercurytransmitter", + // Name: "concurrent_delete_gauge", + // Help: "Gauge that measures the number of delete threads currently waiting on a delete call to the DB. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.", + // ConstLabels: prometheus.Labels{"donID": donIDStr, "serverURL": s.url, "maxConcurrentDeletes": strconv.FormatInt(int64(nThreads), 10)}, + // }, func() float64 { + // return float64(s.deleteThreadBusyCount.Load()) + // })) + // for i, c := range mt.collectors { + // if err := mt.registerer.Register(c); err != nil { + // return fmt.Errorf("failed to register prometheus collector %d: %w", i, err) + // } + // } } if err := (&services.MultiStart{}).Start(ctx, startClosers...); err != nil { return err From 893a34c1c3b221f5791e0202578e413d82dd1899 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Thu, 12 Dec 2024 09:56:20 -0500 Subject: [PATCH 2/2] Fix thread busy metrics in llo mercury transmitter --- .../services/llo/mercurytransmitter/server.go | 59 +++++++++++++++---- .../llo/mercurytransmitter/transmitter.go | 34 +---------- 2 files changed, 49 insertions(+), 44 deletions(-) diff --git a/core/services/llo/mercurytransmitter/server.go b/core/services/llo/mercurytransmitter/server.go index 4e97c0483b3..3ce2b0a4b4a 100644 --- a/core/services/llo/mercurytransmitter/server.go +++ b/core/services/llo/mercurytransmitter/server.go @@ -62,6 +62,22 @@ var ( }, []string{"donID", "serverURL", "code"}, ) + promTransmitConcurrentTransmitGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "llo", + Subsystem: "mercurytransmitter", + Name: "concurrent_transmit_gauge", + Help: "Gauge that measures the number of transmit threads currently waiting on a remote transmit call. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.", + }, + []string{"donID", "serverURL"}, + ) + promTransmitConcurrentDeleteGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "llo", + Subsystem: "mercurytransmitter", + Name: "concurrent_delete_gauge", + Help: "Gauge that measures the number of delete threads currently waiting on a delete call to the DB. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.", + }, + []string{"donID", "serverURL"}, + ) ) type ReportPacker interface { @@ -87,12 +103,14 @@ type server struct { evmPremiumLegacyPacker ReportPacker jsonPacker ReportPacker - transmitSuccessCount prometheus.Counter - transmitDuplicateCount prometheus.Counter - transmitConnectionErrorCount prometheus.Counter - transmitQueueDeleteErrorCount prometheus.Counter - transmitQueueInsertErrorCount prometheus.Counter - transmitQueuePushErrorCount prometheus.Counter + transmitSuccessCount prometheus.Counter + transmitDuplicateCount prometheus.Counter + transmitConnectionErrorCount prometheus.Counter + transmitQueueDeleteErrorCount prometheus.Counter + transmitQueueInsertErrorCount prometheus.Counter + transmitQueuePushErrorCount prometheus.Counter + transmitConcurrentTransmitGauge prometheus.Gauge + transmitConcurrentDeleteGauge prometheus.Gauge transmitThreadBusyCount atomic.Int32 deleteThreadBusyCount atomic.Int32 @@ -130,6 +148,8 @@ func newServer(lggr logger.Logger, verboseLogging bool, cfg QueueConfig, client promTransmitQueueDeleteErrorCount.WithLabelValues(donIDStr, serverURL), promTransmitQueueInsertErrorCount.WithLabelValues(donIDStr, serverURL), promTransmitQueuePushErrorCount.WithLabelValues(donIDStr, serverURL), + promTransmitConcurrentTransmitGauge.WithLabelValues(donIDStr, serverURL), + promTransmitConcurrentDeleteGauge.WithLabelValues(donIDStr, serverURL), atomic.Int32{}, atomic.Int32{}, } @@ -161,7 +181,7 @@ func (s *server) runDeleteQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup select { case hash := <-s.deleteQueue: for { - s.deleteThreadBusyCount.Add(1) + s.deleteThreadBusyCountInc() if err := s.pm.orm.Delete(ctx, [][32]byte{hash}); err != nil { s.lggr.Errorw("Failed to delete transmission record", "err", err, "transmissionHash", hash) s.transmitQueueDeleteErrorCount.Inc() @@ -170,7 +190,7 @@ func (s *server) runDeleteQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup // Wait a backoff duration before trying to delete again continue case <-stopCh: - s.deleteThreadBusyCount.Add(-1) + s.deleteThreadBusyCountDec() // abort and return immediately on stop even if items remain in queue return } @@ -179,7 +199,7 @@ func (s *server) runDeleteQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup } // success b.Reset() - s.deleteThreadBusyCount.Add(-1) + s.deleteThreadBusyCountDec() case <-stopCh: // abort and return immediately on stop even if items remain in queue return @@ -187,6 +207,23 @@ func (s *server) runDeleteQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup } } +func (s *server) transmitThreadBusyCountInc() { + val := s.transmitThreadBusyCount.Add(1) + s.transmitConcurrentTransmitGauge.Set(float64(val)) +} +func (s *server) transmitThreadBusyCountDec() { + val := s.transmitThreadBusyCount.Add(-1) + s.transmitConcurrentTransmitGauge.Set(float64(val)) +} +func (s *server) deleteThreadBusyCountInc() { + val := s.deleteThreadBusyCount.Add(1) + s.transmitConcurrentDeleteGauge.Set(float64(val)) +} +func (s *server) deleteThreadBusyCountDec() { + val := s.deleteThreadBusyCount.Add(-1) + s.transmitConcurrentDeleteGauge.Set(float64(val)) +} + func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, donIDStr string) { defer wg.Done() // Exponential backoff with very short retry interval (since latency is a priority) @@ -208,8 +245,8 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, donI return false } - s.transmitThreadBusyCount.Add(1) - defer s.transmitThreadBusyCount.Add(-1) + s.transmitThreadBusyCountInc() + defer s.transmitThreadBusyCountDec() req, res, err := func(ctx context.Context) (*pb.TransmitRequest, *pb.TransmitResponse, error) { ctx, cancelFn := context.WithTimeout(ctx, utils.WithJitter(s.transmitTimeout)) diff --git a/core/services/llo/mercurytransmitter/transmitter.go b/core/services/llo/mercurytransmitter/transmitter.go index 92f17534782..23aa4b79e58 100644 --- a/core/services/llo/mercurytransmitter/transmitter.go +++ b/core/services/llo/mercurytransmitter/transmitter.go @@ -116,7 +116,6 @@ type transmitter struct { orm ORM servers map[string]*server registerer prometheus.Registerer - collectors []prometheus.Collector donID uint32 fromAccount string @@ -155,7 +154,6 @@ func newTransmitter(opts Opts) *transmitter { opts.ORM, servers, opts.Registerer, - nil, opts.DonID, fmt.Sprintf("%x", opts.FromAccount), make(services.StopChan), @@ -194,31 +192,6 @@ func (mt *transmitter) Start(ctx context.Context) (err error) { go s.runDeleteQueueLoop(mt.stopCh, mt.wg) go s.runQueueLoop(mt.stopCh, mt.wg, donIDStr) } - // mt.collectors = append(mt.collectors, prometheus.NewGaugeFunc( - // prometheus.GaugeOpts{ - // Namespace: "llo", - // Subsystem: "mercurytransmitter", - // Name: "concurrent_transmit_gauge", - // Help: "Gauge that measures the number of transmit threads currently waiting on a remote transmit call. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.", - // ConstLabels: prometheus.Labels{"donID": donIDStr, "serverURL": s.url, "maxConcurrentTransmits": strconv.FormatInt(int64(nThreads), 10)}, - // }, func() float64 { - // return float64(s.transmitThreadBusyCount.Load()) - // })) - // mt.collectors = append(mt.collectors, prometheus.NewGaugeFunc( - // prometheus.GaugeOpts{ - // Namespace: "llo", - // Subsystem: "mercurytransmitter", - // Name: "concurrent_delete_gauge", - // Help: "Gauge that measures the number of delete threads currently waiting on a delete call to the DB. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.", - // ConstLabels: prometheus.Labels{"donID": donIDStr, "serverURL": s.url, "maxConcurrentDeletes": strconv.FormatInt(int64(nThreads), 10)}, - // }, func() float64 { - // return float64(s.deleteThreadBusyCount.Load()) - // })) - // for i, c := range mt.collectors { - // if err := mt.registerer.Register(c); err != nil { - // return fmt.Errorf("failed to register prometheus collector %d: %w", i, err) - // } - // } } if err := (&services.MultiStart{}).Start(ctx, startClosers...); err != nil { return err @@ -250,12 +223,7 @@ func (mt *transmitter) Close() error { closers = append(closers, s.pm) closers = append(closers, s.c) } - err := services.CloseAll(closers...) - // Unregister all the gauge funcs - for _, c := range mt.collectors { - mt.registerer.Unregister(c) - } - return err + return services.CloseAll(closers...) }) }