Skip to content

Commit

Permalink
Merge branch 'develop' into ng/batch-price
Browse files Browse the repository at this point in the history
  • Loading branch information
0xnogo authored Dec 13, 2024
2 parents 20335ca + 8f1b956 commit 11a45db
Show file tree
Hide file tree
Showing 86 changed files with 3,193 additions and 1,422 deletions.
5 changes: 5 additions & 0 deletions .changeset/big-camels-report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#bugfix fix non-idempotent loopp registry.Register
5 changes: 5 additions & 0 deletions .changeset/giant-eels-jump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Add error handling for Arbitrum RPC server timeouts. #added
9 changes: 5 additions & 4 deletions .github/workflows/ci-core-partial.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ jobs:
permissions:
id-token: write
contents: write
actions: write
strategy:
fail-fast: false
matrix:
Expand Down Expand Up @@ -86,7 +87,7 @@ jobs:
go-mod-download-directory: ${{ matrix.type.test-suite == 'ccip-deployment' && matrix.type.module-directory || '' }}

- name: Build Tests
uses: smartcontractkit/.github/apps/go-conditional-tests@37882e110590e636627a26371bdbd56ddfcce821 # go-conditional-tests@0.1.0
uses: smartcontractkit/.github/apps/go-conditional-tests@57f99fbea73056c490c766d50ef582a13ec4f3bb # go-conditional-tests@0.2.0
timeout-minutes: 10
with:
pipeline-step: "build"
Expand All @@ -98,7 +99,7 @@ jobs:
build-flags: ${{ matrix.type.build-flags }}

- name: Run Tests
uses: smartcontractkit/.github/apps/go-conditional-tests@37882e110590e636627a26371bdbd56ddfcce821 # go-conditional-tests@0.1.0
uses: smartcontractkit/.github/apps/go-conditional-tests@57f99fbea73056c490c766d50ef582a13ec4f3bb # go-conditional-tests@0.2.0
timeout-minutes: 15
env:
CL_DATABASE_URL: ${{ env.DB_URL }}
Expand All @@ -112,7 +113,7 @@ jobs:
github-token: ${{ secrets.GITHUB_TOKEN }}

- name: Update Test Index
uses: smartcontractkit/.github/apps/go-conditional-tests@37882e110590e636627a26371bdbd56ddfcce821 # go-conditional-tests@0.1.0
uses: smartcontractkit/.github/apps/go-conditional-tests@57f99fbea73056c490c766d50ef582a13ec4f3bb # go-conditional-tests@0.2.0
with:
pipeline-step: "update"
collect-coverage: ${{ needs.filter.outputs.should-collect-coverage }}
Expand All @@ -130,7 +131,7 @@ jobs:
if: ${{ needs.filter.outputs.should-collect-coverage == 'true' }}
runs-on: ubuntu-latest
steps:
- name: Checkout the repo
- name: Checkout the repo
uses: actions/checkout@v4.2.1
with:
# fetches all history for all tags and branches to provide more metadata for sonar reports
Expand Down
6 changes: 0 additions & 6 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -583,12 +583,6 @@ packages:
github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer:
interfaces:
ORM:
ContractReader:
config:
mockname: "Mock{{ .InterfaceName }}"
filename: contract_reader_mock.go
inpackage: true
dir: "{{ .InterfaceDir }}"
Handler:
config:
mockname: "Mock{{ .InterfaceName }}"
Expand Down
2 changes: 2 additions & 0 deletions core/capabilities/remote/executable/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
)

func Test_Client_DonTopologies(t *testing.T) {
testutils.SkipFlakey(t, "https://smartcontract-it.atlassian.net/browse/CAPPL-363")
ctx := testutils.Context(t)

transmissionSchedule, err := values.NewMap(map[string]any{
Expand Down Expand Up @@ -87,6 +88,7 @@ func Test_Client_DonTopologies(t *testing.T) {
}

func Test_Client_TransmissionSchedules(t *testing.T) {
testutils.SkipFlakey(t, "https://smartcontract-it.atlassian.net/browse/CAPPL-363")
ctx := testutils.Context(t)

responseTest := func(t *testing.T, response commoncap.CapabilityResponse, responseError error) {
Expand Down
15 changes: 13 additions & 2 deletions core/chains/evm/client/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ const (
ServiceUnavailable
TerminallyStuck
TooManyResults
ServiceTimeout
)

type ClientErrors map[int]*regexp.Regexp
Expand Down Expand Up @@ -160,7 +161,8 @@ var arbitrum = ClientErrors{
Fatal: arbitrumFatal,
L2FeeTooLow: regexp.MustCompile(`(: |^)max fee per gas less than block base fee(:|$)`),
L2Full: regexp.MustCompile(`(: |^)(queue full|sequencer pending tx pool full, please try again)(:|$)`),
ServiceUnavailable: regexp.MustCompile(`(: |^)502 Bad Gateway: [\s\S]*$|network is unreachable|i/o timeout`),
ServiceUnavailable: regexp.MustCompile(`(: |^)502 Bad Gateway: [\s\S]*$|network is unreachable|i/o timeout|(: |^)503 Service Temporarily Unavailable(:|$)`),
ServiceTimeout: regexp.MustCompile(`(: |^)408 Request Timeout(:|$)`),
}

// Treasure
Expand Down Expand Up @@ -398,6 +400,11 @@ func (s *SendError) IsServiceUnavailable(configErrors *ClientErrors) bool {
return s.is(ServiceUnavailable, configErrors) || pkgerrors.Is(s.err, commonclient.ErroringNodeError)
}

// IsServiceTimeout indicates if the error was caused by a service timeout
func (s *SendError) IsServiceTimeout(configErrors *ClientErrors) bool {
return s.is(ServiceTimeout, configErrors)
}

// IsTerminallyStuck indicates if a transaction was stuck without any chance of inclusion
func (s *SendError) IsTerminallyStuckConfigError(configErrors *ClientErrors) bool {
return s.is(TerminallyStuck, configErrors)
Expand Down Expand Up @@ -619,6 +626,10 @@ func ClassifySendError(err error, clientErrors config.ClientErrors, lggr logger.
lggr.Errorw(fmt.Sprintf("service unavailable while sending transaction %x", tx.Hash()), "err", sendError, "etx", tx)
return commonclient.Retryable
}
if sendError.IsServiceTimeout(configErrors) {
lggr.Errorw(fmt.Sprintf("service timed out while sending transaction %x", tx.Hash()), "err", sendError, "etx", tx)
return commonclient.Retryable
}
if sendError.IsTimeout() {
lggr.Errorw(fmt.Sprintf("timeout while sending transaction %x", tx.Hash()), "err", sendError, "etx", tx)
return commonclient.Retryable
Expand Down Expand Up @@ -666,7 +677,7 @@ var drpc = ClientErrors{

// Linkpool, Blockdaemon, and Chainstack all return "request timed out" if the log results are too large for them to process
var defaultClient = ClientErrors{
TooManyResults: regexp.MustCompile(`request timed out`),
TooManyResults: regexp.MustCompile(`request timed out|408 Request Timed Out`),
}

// JSON-RPC error codes which can indicate a refusal of the server to process an eth_getLogs request because the result set is too large
Expand Down
15 changes: 15 additions & 0 deletions core/chains/evm/client/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ func Test_Eth_Errors(t *testing.T) {
{"network is unreachable", true, "Arbitrum"},
{"client error service unavailable", true, "tomlConfig"},
{"[Request ID: 825608a8-fd8a-4b5b-aea7-92999509306d] Error invoking RPC: [Request ID: 825608a8-fd8a-4b5b-aea7-92999509306d] Transaction execution returns a null value for transaction", true, "hedera"},
{"call failed: 503 Service Temporarily Unavailable: <html>\r\n<head><title>503 Service Temporarily Unavailable</title></head>\r\n<body>\r\n<center><h1>503 Service Temporarily Unavailable</h1></center>\r\n</body>\r\n</html>\r\n", true, "Arbitrum"},
}
for _, test := range tests {
err = evmclient.NewSendErrorS(test.message)
Expand All @@ -260,6 +261,20 @@ func Test_Eth_Errors(t *testing.T) {
}
})

t.Run("IsServiceTimeout", func(t *testing.T) {
tests := []errorCase{
{"call failed: 408 Request Timeout: {", true, "Arbitrum"},
{"408 Request Timeout: {\"id\":303,\"jsonrpc\":\"2.0\",\"error\":{\"code\\\":-32009,\\\"message\\\":\\\"request timeout\\\"}}\",\"errVerbose\":\"408 Request Timeout:\n", true, "Arbitrum"},
{"request timeout", false, "tomlConfig"},
}
for _, test := range tests {
err = evmclient.NewSendErrorS(test.message)
assert.Equal(t, err.IsServiceTimeout(clientErrors), test.expect)
err = newSendErrorWrapped(test.message)
assert.Equal(t, err.IsServiceTimeout(clientErrors), test.expect)
}
})

t.Run("IsTxFeeExceedsCap", func(t *testing.T) {
tests := []errorCase{
{"tx fee (1.10 ether) exceeds the configured cap (1.00 ether)", true, "geth"},
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/prometheus/client_golang v1.20.5
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210192653-a9c706f99e83
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241212163958-6a43e61b9d49
github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
Expand Down
59 changes: 48 additions & 11 deletions core/services/llo/mercurytransmitter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,22 @@ var (
},
[]string{"donID", "serverURL", "code"},
)
promTransmitConcurrentTransmitGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "llo",
Subsystem: "mercurytransmitter",
Name: "concurrent_transmit_gauge",
Help: "Gauge that measures the number of transmit threads currently waiting on a remote transmit call. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.",
},
[]string{"donID", "serverURL"},
)
promTransmitConcurrentDeleteGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "llo",
Subsystem: "mercurytransmitter",
Name: "concurrent_delete_gauge",
Help: "Gauge that measures the number of delete threads currently waiting on a delete call to the DB. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.",
},
[]string{"donID", "serverURL"},
)
)

type ReportPacker interface {
Expand All @@ -87,12 +103,14 @@ type server struct {
evmPremiumLegacyPacker ReportPacker
jsonPacker ReportPacker

transmitSuccessCount prometheus.Counter
transmitDuplicateCount prometheus.Counter
transmitConnectionErrorCount prometheus.Counter
transmitQueueDeleteErrorCount prometheus.Counter
transmitQueueInsertErrorCount prometheus.Counter
transmitQueuePushErrorCount prometheus.Counter
transmitSuccessCount prometheus.Counter
transmitDuplicateCount prometheus.Counter
transmitConnectionErrorCount prometheus.Counter
transmitQueueDeleteErrorCount prometheus.Counter
transmitQueueInsertErrorCount prometheus.Counter
transmitQueuePushErrorCount prometheus.Counter
transmitConcurrentTransmitGauge prometheus.Gauge
transmitConcurrentDeleteGauge prometheus.Gauge

transmitThreadBusyCount atomic.Int32
deleteThreadBusyCount atomic.Int32
Expand Down Expand Up @@ -130,6 +148,8 @@ func newServer(lggr logger.Logger, verboseLogging bool, cfg QueueConfig, client
promTransmitQueueDeleteErrorCount.WithLabelValues(donIDStr, serverURL),
promTransmitQueueInsertErrorCount.WithLabelValues(donIDStr, serverURL),
promTransmitQueuePushErrorCount.WithLabelValues(donIDStr, serverURL),
promTransmitConcurrentTransmitGauge.WithLabelValues(donIDStr, serverURL),
promTransmitConcurrentDeleteGauge.WithLabelValues(donIDStr, serverURL),
atomic.Int32{},
atomic.Int32{},
}
Expand Down Expand Up @@ -161,7 +181,7 @@ func (s *server) runDeleteQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup
select {
case hash := <-s.deleteQueue:
for {
s.deleteThreadBusyCount.Add(1)
s.deleteThreadBusyCountInc()
if err := s.pm.orm.Delete(ctx, [][32]byte{hash}); err != nil {
s.lggr.Errorw("Failed to delete transmission record", "err", err, "transmissionHash", hash)
s.transmitQueueDeleteErrorCount.Inc()
Expand All @@ -170,7 +190,7 @@ func (s *server) runDeleteQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup
// Wait a backoff duration before trying to delete again
continue
case <-stopCh:
s.deleteThreadBusyCount.Add(-1)
s.deleteThreadBusyCountDec()
// abort and return immediately on stop even if items remain in queue
return
}
Expand All @@ -179,14 +199,31 @@ func (s *server) runDeleteQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup
}
// success
b.Reset()
s.deleteThreadBusyCount.Add(-1)
s.deleteThreadBusyCountDec()
case <-stopCh:
// abort and return immediately on stop even if items remain in queue
return
}
}
}

func (s *server) transmitThreadBusyCountInc() {
val := s.transmitThreadBusyCount.Add(1)
s.transmitConcurrentTransmitGauge.Set(float64(val))
}
func (s *server) transmitThreadBusyCountDec() {
val := s.transmitThreadBusyCount.Add(-1)
s.transmitConcurrentTransmitGauge.Set(float64(val))
}
func (s *server) deleteThreadBusyCountInc() {
val := s.deleteThreadBusyCount.Add(1)
s.transmitConcurrentDeleteGauge.Set(float64(val))
}
func (s *server) deleteThreadBusyCountDec() {
val := s.deleteThreadBusyCount.Add(-1)
s.transmitConcurrentDeleteGauge.Set(float64(val))
}

func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, donIDStr string) {
defer wg.Done()
// Exponential backoff with very short retry interval (since latency is a priority)
Expand All @@ -208,8 +245,8 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, donI
return false
}

s.transmitThreadBusyCount.Add(1)
defer s.transmitThreadBusyCount.Add(-1)
s.transmitThreadBusyCountInc()
defer s.transmitThreadBusyCountDec()

req, res, err := func(ctx context.Context) (*pb.TransmitRequest, *pb.TransmitResponse, error) {
ctx, cancelFn := context.WithTimeout(ctx, utils.WithJitter(s.transmitTimeout))
Expand Down
34 changes: 1 addition & 33 deletions core/services/llo/mercurytransmitter/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ type transmitter struct {
orm ORM
servers map[string]*server
registerer prometheus.Registerer
collectors []prometheus.Collector

donID uint32
fromAccount string
Expand Down Expand Up @@ -155,7 +154,6 @@ func newTransmitter(opts Opts) *transmitter {
opts.ORM,
servers,
opts.Registerer,
nil,
opts.DonID,
fmt.Sprintf("%x", opts.FromAccount),
make(services.StopChan),
Expand Down Expand Up @@ -194,31 +192,6 @@ func (mt *transmitter) Start(ctx context.Context) (err error) {
go s.runDeleteQueueLoop(mt.stopCh, mt.wg)
go s.runQueueLoop(mt.stopCh, mt.wg, donIDStr)
}
mt.collectors = append(mt.collectors, prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "llo",
Subsystem: "mercurytransmitter",
Name: "concurrent_transmit_gauge",
Help: "Gauge that measures the number of transmit threads currently waiting on a remote transmit call. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.",
ConstLabels: prometheus.Labels{"donID": donIDStr, "serverURL": s.url, "maxConcurrentTransmits": strconv.FormatInt(int64(nThreads), 10)},
}, func() float64 {
return float64(s.transmitThreadBusyCount.Load())
}))
mt.collectors = append(mt.collectors, prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "llo",
Subsystem: "mercurytransmitter",
Name: "concurrent_delete_gauge",
Help: "Gauge that measures the number of delete threads currently waiting on a delete call to the DB. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.",
ConstLabels: prometheus.Labels{"donID": donIDStr, "serverURL": s.url, "maxConcurrentDeletes": strconv.FormatInt(int64(nThreads), 10)},
}, func() float64 {
return float64(s.deleteThreadBusyCount.Load())
}))
for _, c := range mt.collectors {
if err := mt.registerer.Register(c); err != nil {
return err
}
}
}
if err := (&services.MultiStart{}).Start(ctx, startClosers...); err != nil {
return err
Expand Down Expand Up @@ -250,12 +223,7 @@ func (mt *transmitter) Close() error {
closers = append(closers, s.pm)
closers = append(closers, s.c)
}
err := services.CloseAll(closers...)
// Unregister all the gauge funcs
for _, c := range mt.collectors {
mt.registerer.Unregister(c)
}
return err
return services.CloseAll(closers...)
})
}

Expand Down
Loading

0 comments on commit 11a45db

Please sign in to comment.