From c5aa49bf0e2a19550886f0dfcc06a92b07058518 Mon Sep 17 00:00:00 2001 From: Sam Date: Thu, 21 Dec 2023 09:51:26 -0500 Subject: [PATCH] Increase disablement of cache if LatestReportTTL=0 (#11636) * Increase disablement of cache if LatestReportTTL=0 * Also reset transport on consecutive LatestReport request failures * Optimize case where caching disabled * Fix comment * Fix tests --- .../evm/mercury/wsrpc/cache/cache_set.go | 6 ++- .../evm/mercury/wsrpc/cache/cache_set_test.go | 13 +++++- .../relay/evm/mercury/wsrpc/client.go | 45 ++++++++++--------- .../relay/evm/mercury/wsrpc/client_test.go | 15 +++---- 4 files changed, 48 insertions(+), 31 deletions(-) diff --git a/core/services/relay/evm/mercury/wsrpc/cache/cache_set.go b/core/services/relay/evm/mercury/wsrpc/cache/cache_set.go index 98388442d9a..01d47743950 100644 --- a/core/services/relay/evm/mercury/wsrpc/cache/cache_set.go +++ b/core/services/relay/evm/mercury/wsrpc/cache/cache_set.go @@ -46,7 +46,7 @@ func newCacheSet(lggr logger.Logger, cfg Config) *cacheSet { func (cs *cacheSet) Start(context.Context) error { return cs.StartOnce("CacheSet", func() error { - cs.lggr.Debugw("CacheSet starting", "config", cs.cfg) + cs.lggr.Debugw("CacheSet starting", "config", cs.cfg, "cachingEnabled", cs.cfg.LatestReportTTL > 0) return nil }) } @@ -65,6 +65,10 @@ func (cs *cacheSet) Close() error { } func (cs *cacheSet) Get(ctx context.Context, client Client) (f Fetcher, err error) { + if cs.cfg.LatestReportTTL == 0 { + // caching disabled + return client, nil + } ok := cs.IfStarted(func() { f, err = cs.get(ctx, client) }) diff --git a/core/services/relay/evm/mercury/wsrpc/cache/cache_set_test.go b/core/services/relay/evm/mercury/wsrpc/cache/cache_set_test.go index 7d754b8326e..59be76ed265 100644 --- a/core/services/relay/evm/mercury/wsrpc/cache/cache_set_test.go +++ b/core/services/relay/evm/mercury/wsrpc/cache/cache_set_test.go @@ -13,7 +13,8 @@ import ( func Test_CacheSet(t *testing.T) { lggr := logger.TestLogger(t) - cs := newCacheSet(lggr, Config{}) + cs := newCacheSet(lggr, Config{LatestReportTTL: 1}) + disabledCs := newCacheSet(lggr, Config{LatestReportTTL: 0}) ctx := testutils.Context(t) servicetest.Run(t, cs) @@ -22,6 +23,16 @@ func Test_CacheSet(t *testing.T) { var err error var f Fetcher + t.Run("with caching disabled, returns the passed client", func(t *testing.T) { + assert.Len(t, disabledCs.caches, 0) + + f, err = disabledCs.Get(ctx, c) + require.NoError(t, err) + + assert.Same(t, c, f) + assert.Len(t, disabledCs.caches, 0) + }) + t.Run("with virgin cacheset, makes new entry and returns it", func(t *testing.T) { assert.Len(t, cs.caches, 0) diff --git a/core/services/relay/evm/mercury/wsrpc/client.go b/core/services/relay/evm/mercury/wsrpc/client.go index 5b6bfa1a9b0..c9533717757 100644 --- a/core/services/relay/evm/mercury/wsrpc/client.go +++ b/core/services/relay/evm/mercury/wsrpc/client.go @@ -24,9 +24,9 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/utils" ) -// MaxConsecutiveTransmitFailures controls how many consecutive requests are +// MaxConsecutiveRequestFailures controls how many consecutive requests are // allowed to time out before we reset the connection -const MaxConsecutiveTransmitFailures = 5 +const MaxConsecutiveRequestFailures = 10 var ( timeoutCount = promauto.NewCounterVec(prometheus.CounterOpts{ @@ -55,7 +55,7 @@ var ( ) connectionResetCount = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "mercury_connection_reset_count", - Help: fmt.Sprintf("Running count of times connection to mercury server has been reset (connection reset happens automatically after %d consecutive transmit failures)", MaxConsecutiveTransmitFailures), + Help: fmt.Sprintf("Running count of times connection to mercury server has been reset (connection reset happens automatically after %d consecutive request failures)", MaxConsecutiveRequestFailures), }, []string{"serverURL"}, ) @@ -256,13 +256,26 @@ func (w *client) Transmit(ctx context.Context, req *pb.TransmitRequest) (resp *p return nil, errors.Wrap(err, "Transmit call failed") } resp, err = w.rawClient.Transmit(ctx, req) + w.handleTimeout(err) + if err != nil { + w.logger.Warnw("Transmit call failed due to networking error", "err", err, "resp", resp) + incRequestStatusMetric(statusFailed) + } else { + w.logger.Tracew("Transmit call succeeded", "resp", resp) + incRequestStatusMetric(statusSuccess) + setRequestLatencyMetric(float64(time.Since(start).Milliseconds())) + } + return +} + +func (w *client) handleTimeout(err error) { if errors.Is(err, context.DeadlineExceeded) { w.timeoutCountMetric.Inc() cnt := w.consecutiveTimeoutCnt.Add(1) - if cnt == MaxConsecutiveTransmitFailures { + if cnt == MaxConsecutiveRequestFailures { w.logger.Errorf("Timed out on %d consecutive transmits, resetting transport", cnt) - // NOTE: If we get 5+ request timeouts in a row, close and re-open - // the websocket connection. + // NOTE: If we get at least MaxConsecutiveRequestFailures request + // timeouts in a row, close and re-open the websocket connection. // // This *shouldn't* be necessary in theory (ideally, wsrpc would // handle it for us) but it acts as a "belts and braces" approach @@ -271,11 +284,11 @@ func (w *client) Transmit(ctx context.Context, req *pb.TransmitRequest) (resp *p select { case w.chResetTransport <- struct{}{}: default: - // This can happen if we had 5 consecutive timeouts, already - // sent a reset signal, then the connection started working - // again (resetting the count) then we got 5 additional - // failures before the runloop was able to close the bad - // connection. + // This can happen if we had MaxConsecutiveRequestFailures + // consecutive timeouts, already sent a reset signal, then the + // connection started working again (resetting the count) then + // we got MaxConsecutiveRequestFailures additional failures + // before the runloop was able to close the bad connection. // // It should be safe to just ignore in this case. // @@ -286,15 +299,6 @@ func (w *client) Transmit(ctx context.Context, req *pb.TransmitRequest) (resp *p } else { w.consecutiveTimeoutCnt.Store(0) } - if err != nil { - w.logger.Warnw("Transmit call failed due to networking error", "err", err, "resp", resp) - incRequestStatusMetric(statusFailed) - } else { - w.logger.Tracew("Transmit call succeeded", "resp", resp) - incRequestStatusMetric(statusSuccess) - setRequestLatencyMetric(float64(time.Since(start).Milliseconds())) - } - return } func (w *client) LatestReport(ctx context.Context, req *pb.LatestReportRequest) (resp *pb.LatestReportResponse, err error) { @@ -306,6 +310,7 @@ func (w *client) LatestReport(ctx context.Context, req *pb.LatestReportRequest) var cached bool if w.cache == nil { resp, err = w.rawClient.LatestReport(ctx, req) + w.handleTimeout(err) } else { cached = true resp, err = w.cache.LatestReport(ctx, req) diff --git a/core/services/relay/evm/mercury/wsrpc/client_test.go b/core/services/relay/evm/mercury/wsrpc/client_test.go index f265d54879c..10712461ae1 100644 --- a/core/services/relay/evm/mercury/wsrpc/client_test.go +++ b/core/services/relay/evm/mercury/wsrpc/client_test.go @@ -54,7 +54,7 @@ func Test_Client_Transmit(t *testing.T) { noopCacheSet := newNoopCacheSet() - t.Run("sends on reset channel after MaxConsecutiveTransmitFailures timed out transmits", func(t *testing.T) { + t.Run("sends on reset channel after MaxConsecutiveRequestFailures timed out transmits", func(t *testing.T) { calls := 0 transmitErr := context.DeadlineExceeded wsrpcClient := &mocks.MockWSRPCClient{ @@ -70,11 +70,11 @@ func Test_Client_Transmit(t *testing.T) { c.conn = conn c.rawClient = wsrpcClient require.NoError(t, c.StartOnce("Mock WSRPC Client", func() error { return nil })) - for i := 1; i < MaxConsecutiveTransmitFailures; i++ { + for i := 1; i < MaxConsecutiveRequestFailures; i++ { _, err := c.Transmit(ctx, req) require.EqualError(t, err, "context deadline exceeded") } - assert.Equal(t, 4, calls) + assert.Equal(t, MaxConsecutiveRequestFailures-1, calls) select { case <-c.chResetTransport: t.Fatal("unexpected send on chResetTransport") @@ -82,7 +82,7 @@ func Test_Client_Transmit(t *testing.T) { } _, err := c.Transmit(ctx, req) require.EqualError(t, err, "context deadline exceeded") - assert.Equal(t, 5, calls) + assert.Equal(t, MaxConsecutiveRequestFailures, calls) select { case <-c.chResetTransport: default: @@ -94,14 +94,14 @@ func Test_Client_Transmit(t *testing.T) { // working transmit to reset counter _, err = c.Transmit(ctx, req) require.NoError(t, err) - assert.Equal(t, 6, calls) + assert.Equal(t, MaxConsecutiveRequestFailures+1, calls) assert.Equal(t, 0, int(c.consecutiveTimeoutCnt.Load())) }) t.Run("doesn't block in case channel is full", func(t *testing.T) { transmitErr = context.DeadlineExceeded c.chResetTransport = nil // simulate full channel - for i := 0; i < MaxConsecutiveTransmitFailures; i++ { + for i := 0; i < MaxConsecutiveRequestFailures; i++ { _, err := c.Transmit(ctx, req) require.EqualError(t, err, "context deadline exceeded") } @@ -162,10 +162,7 @@ func Test_Client_LatestReport(t *testing.T) { // simulate start without dialling require.NoError(t, c.StartOnce("Mock WSRPC Client", func() error { return nil })) - var err error servicetest.Run(t, cacheSet) - c.cache, err = cacheSet.Get(ctx, c) - require.NoError(t, err) for i := 0; i < 5; i++ { r, err := c.LatestReport(ctx, req)