Skip to content

Commit

Permalink
Make TransmitConcurrency a config var
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Nov 18, 2024
1 parent c58b163 commit 89e51d7
Show file tree
Hide file tree
Showing 26 changed files with 60 additions and 10 deletions.
5 changes: 5 additions & 0 deletions .changeset/mean-dots-move.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Add config var Mercury.Transmitter.TransmitConcurrency #added
4 changes: 4 additions & 0 deletions core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,10 @@ TransmitQueueMaxSize = 10_000 # Default
# when sending a message to the mercury server, before aborting and considering
# the transmission to be failed.
TransmitTimeout = "5s" # Default
# TransmitConcurrency is the max number of concurrent transmits to each server.
#
# Only has effect with LLO jobs.
TransmitConcurrency = 100 # Default

# Telemetry holds OTEL settings.
# This data includes open telemetry metrics, traces, & logs.
Expand Down
1 change: 1 addition & 0 deletions core/config/mercury_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type MercuryTLS interface {
type MercuryTransmitter interface {
TransmitQueueMaxSize() uint32
TransmitTimeout() commonconfig.Duration
TransmitConcurrency() uint32
}

type Mercury interface {
Expand Down
4 changes: 4 additions & 0 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1330,6 +1330,7 @@ func (m *MercuryTLS) ValidateConfig() (err error) {
type MercuryTransmitter struct {
TransmitQueueMaxSize *uint32
TransmitTimeout *commonconfig.Duration
TransmitConcurrency *uint32
}

func (m *MercuryTransmitter) setFrom(f *MercuryTransmitter) {
Expand All @@ -1339,6 +1340,9 @@ func (m *MercuryTransmitter) setFrom(f *MercuryTransmitter) {
if v := f.TransmitTimeout; v != nil {
m.TransmitTimeout = v
}
if v := f.TransmitConcurrency; v != nil {
m.TransmitConcurrency = v
}
}

type Mercury struct {
Expand Down
4 changes: 4 additions & 0 deletions core/services/chainlink/config_mercury.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (m *mercuryTransmitterConfig) TransmitTimeout() commonconfig.Duration {
return *m.c.TransmitTimeout
}

func (m *mercuryTransmitterConfig) TransmitConcurrency() uint32 {
return *m.c.TransmitConcurrency
}

type mercuryConfig struct {
c toml.Mercury
s toml.MercurySecrets
Expand Down
2 changes: 2 additions & 0 deletions core/services/chainlink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,7 @@ func TestConfig_Marshal(t *testing.T) {
Transmitter: toml.MercuryTransmitter{
TransmitQueueMaxSize: ptr(uint32(123)),
TransmitTimeout: commoncfg.MustNewDuration(234 * time.Second),
TransmitConcurrency: ptr(uint32(456)),
},
VerboseLogging: ptr(true),
}
Expand Down Expand Up @@ -1348,6 +1349,7 @@ CertFile = '/path/to/cert.pem'
[Mercury.Transmitter]
TransmitQueueMaxSize = 123
TransmitTimeout = '3m54s'
TransmitConcurrency = 456
`},
{"full", full, fullTOML},
{"multi-chain", multiChain, multiChainTOML},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ CertFile = ''
[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'
TransmitConcurrency = 100

[Capabilities]
[Capabilities.Peering]
Expand Down
1 change: 1 addition & 0 deletions core/services/chainlink/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ CertFile = '/path/to/cert.pem'
[Mercury.Transmitter]
TransmitQueueMaxSize = 123
TransmitTimeout = '3m54s'
TransmitConcurrency = 456

[Capabilities]
[Capabilities.Peering]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ CertFile = ''
[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'
TransmitConcurrency = 100

[Capabilities]
[Capabilities.Peering]
Expand Down
20 changes: 10 additions & 10 deletions core/services/llo/mercurytransmitter/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,6 @@ import (
const (
// Mercury server error codes
DuplicateReport = 2

// TransmitConcurrency is the number of concurrent transmits to each
// server. Number of goroutines per server will be roughly 2x this because
// each server has a delete queue and a transmit queue.
//
// This could be reduced by implementing transmit batching, see: https://smartcontract-it.atlassian.net/browse/MERC-6635
// TODO: Should this be a configuration variable (e.g. set to 1 or 2 for testing?)
TransmitConcurrency = 100
)

var (
Expand Down Expand Up @@ -105,6 +97,7 @@ var _ Transmitter = (*transmitter)(nil)
type Config interface {
TransmitQueueMaxSize() uint32
TransmitTimeout() commonconfig.Duration
TransmitConcurrency() uint32
}

type transmitter struct {
Expand Down Expand Up @@ -172,10 +165,17 @@ func (mt *transmitter) Start(ctx context.Context) (err error) {
return err
}
s.q.Init(transmissions)
// starting pm after loading from it is fine because it simply spawns some garbage collection/prune goroutines
// starting pm after loading from it is fine because it simply
// spawns some garbage collection/prune goroutines
startClosers = append(startClosers, s.c, s.q, s.pm)

nThreads := int(TransmitConcurrency)
// Number of goroutines per server will be roughly
// 2*nServers*TransmitConcurrency because each server has a
// delete queue and a transmit queue.
//
// This could potentially be reduced by implementing transmit batching,
// see: https://smartcontract-it.atlassian.net/browse/MERC-6635
nThreads := int(mt.cfg.TransmitConcurrency())
mt.wg.Add(2 * nThreads)
for i := 0; i < nThreads; i++ {
go s.runDeleteQueueLoop(mt.stopCh, mt.wg)
Expand Down
4 changes: 4 additions & 0 deletions core/services/llo/mercurytransmitter/transmitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func (m mockCfg) TransmitTimeout() commonconfig.Duration {
return *commonconfig.MustNewDuration(1 * time.Hour)
}

func (m mockCfg) TransmitConcurrency() uint32 {
return 5
}

func Test_Transmitter_Transmit(t *testing.T) {
lggr := logger.TestLogger(t)
db := pgtest.NewSqlxDB(t)
Expand Down
1 change: 1 addition & 0 deletions core/services/ocr2/plugins/llo/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func setupNode(

// [Mercury]
c.Mercury.VerboseLogging = ptr(true)
c.Mercury.Transmitter.TransmitConcurrency = ptr(uint32(5)) // Avoid a ridiculous number of goroutines
})

lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.DebugLevel)
Expand Down
1 change: 1 addition & 0 deletions core/web/resolver/testdata/config-empty-effective.toml
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ CertFile = ''
[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'
TransmitConcurrency = 100

[Capabilities]
[Capabilities.Peering]
Expand Down
1 change: 1 addition & 0 deletions core/web/resolver/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ CertFile = ''
[Mercury.Transmitter]
TransmitQueueMaxSize = 123
TransmitTimeout = '3m54s'
TransmitConcurrency = 456

[Capabilities]
[Capabilities.Peering]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ CertFile = ''
[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'
TransmitConcurrency = 100

[Capabilities]
[Capabilities.Peering]
Expand Down
9 changes: 9 additions & 0 deletions docs/CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -1875,6 +1875,7 @@ CertFile is the path to a PEM file of trusted root certificate authority certifi
[Mercury.Transmitter]
TransmitQueueMaxSize = 10_000 # Default
TransmitTimeout = "5s" # Default
TransmitConcurrency = 100 # Default
```
Mercury.Transmitter controls settings for the mercury transmitter

Expand All @@ -1897,6 +1898,14 @@ TransmitTimeout controls how long the transmitter will wait for a response
when sending a message to the mercury server, before aborting and considering
the transmission to be failed.

### TransmitConcurrency
```toml
TransmitConcurrency = 100 # Default
```
TransmitConcurrency is the max number of concurrent transmits to each server.

Only has effect with LLO jobs.

## Telemetry
```toml
[Telemetry]
Expand Down
1 change: 1 addition & 0 deletions testdata/scripts/config/merge_raw_configs.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ CertFile = ''
[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'
TransmitConcurrency = 100

[Capabilities]
[Capabilities.Peering]
Expand Down
1 change: 1 addition & 0 deletions testdata/scripts/node/validate/default.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ CertFile = ''
[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'
TransmitConcurrency = 100

[Capabilities]
[Capabilities.Peering]
Expand Down
1 change: 1 addition & 0 deletions testdata/scripts/node/validate/defaults-override.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ CertFile = ''
[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'
TransmitConcurrency = 100

[Capabilities]
[Capabilities.Peering]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ CertFile = ''
[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'
TransmitConcurrency = 100

[Capabilities]
[Capabilities.Peering]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ CertFile = ''
[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'
TransmitConcurrency = 100

[Capabilities]
[Capabilities.Peering]
Expand Down
1 change: 1 addition & 0 deletions testdata/scripts/node/validate/disk-based-logging.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ CertFile = ''
[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'
TransmitConcurrency = 100

[Capabilities]
[Capabilities.Peering]
Expand Down
1 change: 1 addition & 0 deletions testdata/scripts/node/validate/invalid-ocr-p2p.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ CertFile = ''
[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'
TransmitConcurrency = 100

[Capabilities]
[Capabilities.Peering]
Expand Down
1 change: 1 addition & 0 deletions testdata/scripts/node/validate/invalid.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ CertFile = ''
[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'
TransmitConcurrency = 100

[Capabilities]
[Capabilities.Peering]
Expand Down
1 change: 1 addition & 0 deletions testdata/scripts/node/validate/valid.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ CertFile = ''
[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'
TransmitConcurrency = 100

[Capabilities]
[Capabilities.Peering]
Expand Down
1 change: 1 addition & 0 deletions testdata/scripts/node/validate/warnings.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ CertFile = ''
[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'
TransmitConcurrency = 100

[Capabilities]
[Capabilities.Peering]
Expand Down

0 comments on commit 89e51d7

Please sign in to comment.