Skip to content

Commit

Permalink
handling connection fault on wsrpc LatestReport (#13626)
Browse files Browse the repository at this point in the history
* changeset

* handle conn fault

* handle conn fault

* update test

* update test

* refactoring of client to centralize handling of handling errors with raw client usage

* fix lint

* remove unnecessary handling of bug scenario

* remove unnecessary handling of bug scenario

* cleanup

* cleanup

* fixup changeset

* use existing unit test from Transmit
  • Loading branch information
akuzni2 authored Jun 25, 2024
1 parent ac5b39d commit 0dd8b13
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 7 deletions.
5 changes: 5 additions & 0 deletions .changeset/fast-dolphins-cry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

handle connection timeout on cache path for ws client LatestReport #bugfix
7 changes: 3 additions & 4 deletions core/services/relay/evm/mercury/wsrpc/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,8 @@ type Fetcher interface {
}

type Client interface {
Fetcher
ServerURL() string
RawClient() pb.MercuryClient
RawLatestReport(ctx context.Context, req *pb.LatestReportRequest) (resp *pb.LatestReportResponse, err error)
}

// Cache is scoped to one particular mercury server
Expand Down Expand Up @@ -194,7 +193,7 @@ func (m *memCache) LatestReport(ctx context.Context, req *pb.LatestReportRequest
}
feedIDHex := mercuryutils.BytesToFeedID(req.FeedId).String()
if m.cfg.LatestReportTTL <= 0 {
return m.client.RawClient().LatestReport(ctx, req)
return m.client.RawLatestReport(ctx, req)
}
vi, loaded := m.cache.LoadOrStore(feedIDHex, &cacheVal{
sync.RWMutex{},
Expand Down Expand Up @@ -311,7 +310,7 @@ func (m *memCache) fetch(req *pb.LatestReportRequest, v *cacheVal) {
// NOTE: must drop down to RawClient here otherwise we enter an
// infinite loop of calling a client that calls back to this same cache
// and on and on
val, err = m.client.RawClient().LatestReport(ctx, req)
val, err = m.client.RawLatestReport(ctx, req)
cancel()
v.setError(err)
if memcacheCtx.Err() != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type mockClient struct {
err error
}

func (m *mockClient) LatestReport(ctx context.Context, req *pb.LatestReportRequest) (resp *pb.LatestReportResponse, err error) {
func (m *mockClient) RawLatestReport(ctx context.Context, req *pb.LatestReportRequest) (resp *pb.LatestReportResponse, err error) {
return m.resp, m.err
}

Expand Down
10 changes: 8 additions & 2 deletions core/services/relay/evm/mercury/wsrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,13 @@ func (w *client) handleTimeout(err error) {
} else {
w.consecutiveTimeoutCnt.Store(0)
}

}

func (w *client) RawLatestReport(ctx context.Context, req *pb.LatestReportRequest) (resp *pb.LatestReportResponse, err error) {
resp, err = w.rawClient.LatestReport(ctx, req)
w.handleTimeout(err)
return
}

func (w *client) LatestReport(ctx context.Context, req *pb.LatestReportRequest) (resp *pb.LatestReportResponse, err error) {
Expand All @@ -312,8 +319,7 @@ func (w *client) LatestReport(ctx context.Context, req *pb.LatestReportRequest)
}
var cached bool
if w.cache == nil {
resp, err = w.rawClient.LatestReport(ctx, req)
w.handleTimeout(err)
resp, err = w.RawLatestReport(ctx, req)
} else {
cached = true
resp, err = w.cache.LatestReport(ctx, req)
Expand Down
62 changes: 62 additions & 0 deletions core/services/relay/evm/mercury/wsrpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,65 @@ func Test_Client_LatestReport(t *testing.T) {
})
}
}

func Test_Client_RawLatestReport(t *testing.T) {
lggr := logger.TestLogger(t)
ctx := testutils.Context(t)

t.Run("sends on reset channel after MaxConsecutiveRequestFailures timed out transmits", func(t *testing.T) {
noopCacheSet := newNoopCacheSet()
req := &pb.LatestReportRequest{}
calls := 0
timeoutErr := context.DeadlineExceeded
wsrpcClient := &mocks.MockWSRPCClient{
LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (*pb.LatestReportResponse, error) {
calls++
return nil, timeoutErr
},
}
conn := &mocks.MockConn{
Ready: true,
}

c := newClient(lggr, csakey.KeyV2{}, nil, "", noopCacheSet)
c.conn = conn
c.rawClient = wsrpcClient
require.NoError(t, c.StartOnce("Mock WSRPC Client", func() error { return nil }))
for i := 1; i < MaxConsecutiveRequestFailures; i++ {
_, err := c.RawLatestReport(ctx, req)
require.EqualError(t, err, "context deadline exceeded")
}
assert.Equal(t, MaxConsecutiveRequestFailures-1, calls)
select {
case <-c.chResetTransport:
t.Fatal("unexpected send on chResetTransport")
default:
}
_, err := c.RawLatestReport(ctx, req)
require.EqualError(t, err, "context deadline exceeded")
assert.Equal(t, MaxConsecutiveRequestFailures, calls)
select {
case <-c.chResetTransport:
default:
t.Fatal("expected send on chResetTransport")
}

t.Run("successful LatestReport resets the counter", func(t *testing.T) {
timeoutErr = nil
// working LatestReport to reset counter
_, err := c.RawLatestReport(ctx, req)
require.NoError(t, err)
assert.Equal(t, MaxConsecutiveRequestFailures+1, calls)
assert.Equal(t, 0, int(c.consecutiveTimeoutCnt.Load()))
})

t.Run("doesn't block in case channel is full", func(t *testing.T) {
timeoutErr = context.DeadlineExceeded
c.chResetTransport = nil // simulate full channel
for i := 0; i < MaxConsecutiveRequestFailures; i++ {
_, err := c.RawLatestReport(ctx, req)
require.EqualError(t, err, "context deadline exceeded")
}
})
})
}

0 comments on commit 0dd8b13

Please sign in to comment.