From b1227857e507376304588f55f1f78cd05c0bfe51 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Thu, 14 Nov 2024 11:16:25 -0500 Subject: [PATCH] Logging and misc LLO fixes Fixes: MERC-6626 --- .changeset/mean-dots-move.md | 5 ++ .changeset/shiny-owls-destroy.md | 6 ++ core/cmd/shell.go | 6 +- core/config/docs/core.toml | 4 ++ core/config/mercury_config.go | 1 + core/config/toml/types.go | 4 ++ core/internal/cltest/cltest.go | 4 +- core/scripts/go.mod | 2 +- core/scripts/go.sum | 4 +- core/services/chainlink/config_mercury.go | 4 ++ core/services/chainlink/config_test.go | 2 + core/services/chainlink/relayer_factory.go | 4 +- .../testdata/config-empty-effective.toml | 1 + .../chainlink/testdata/config-full.toml | 1 + .../config-multi-chain-effective.toml | 1 + core/services/llo/codecs.go | 5 +- core/services/llo/codecs_test.go | 3 +- core/services/llo/data_source.go | 61 +++++++++++-------- core/services/llo/delegate.go | 13 +++- core/services/llo/evm/fees_test.go | 10 +++ .../llo/evm/report_codec_premium_legacy.go | 13 ++-- .../evm/report_codec_premium_legacy_test.go | 3 +- core/services/llo/mercurytransmitter/queue.go | 2 +- .../services/llo/mercurytransmitter/server.go | 52 +++++++++++----- .../llo/mercurytransmitter/transmitter.go | 50 ++++++++++----- .../mercurytransmitter/transmitter_test.go | 6 +- core/services/llo/suppressed_logger.go | 51 ++++++++++++++++ core/services/llo/telemetry.go | 6 +- core/services/llo/transmitter.go | 11 +++- .../services/ocr2/plugins/llo/helpers_test.go | 1 + .../ocr2/plugins/llo/integration_test.go | 51 +++++++++------- core/services/ocrcommon/telemetry.go | 1 - core/services/ocrcommon/telemetry_test.go | 14 ++--- core/services/pipeline/runner.go | 26 +++++--- core/services/relay/evm/evm.go | 36 ++++++----- .../testdata/config-empty-effective.toml | 1 + core/web/resolver/testdata/config-full.toml | 1 + .../config-multi-chain-effective.toml | 1 + deployment/go.mod | 2 +- deployment/go.sum | 4 +- docs/CONFIG.md | 9 +++ go.mod | 2 +- go.sum | 4 +- integration-tests/go.mod | 2 +- integration-tests/go.sum | 4 +- integration-tests/load/go.mod | 2 +- integration-tests/load/go.sum | 4 +- .../scripts/config/merge_raw_configs.txtar | 1 + testdata/scripts/node/validate/default.txtar | 1 + .../node/validate/defaults-override.txtar | 1 + .../disk-based-logging-disabled.txtar | 1 + .../validate/disk-based-logging-no-dir.txtar | 1 + .../node/validate/disk-based-logging.txtar | 1 + .../node/validate/invalid-ocr-p2p.txtar | 1 + testdata/scripts/node/validate/invalid.txtar | 1 + testdata/scripts/node/validate/valid.txtar | 1 + testdata/scripts/node/validate/warnings.txtar | 1 + 57 files changed, 365 insertions(+), 145 deletions(-) create mode 100644 .changeset/mean-dots-move.md create mode 100644 .changeset/shiny-owls-destroy.md create mode 100644 core/services/llo/suppressed_logger.go diff --git a/.changeset/mean-dots-move.md b/.changeset/mean-dots-move.md new file mode 100644 index 00000000000..1169d8379e9 --- /dev/null +++ b/.changeset/mean-dots-move.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Add config var Mercury.Transmitter.TransmitConcurrency #added diff --git a/.changeset/shiny-owls-destroy.md b/.changeset/shiny-owls-destroy.md new file mode 100644 index 00000000000..d132d6dbff8 --- /dev/null +++ b/.changeset/shiny-owls-destroy.md @@ -0,0 +1,6 @@ +--- +"chainlink": patch +--- + +Logging improvements for LLO +#internal diff --git a/core/cmd/shell.go b/core/cmd/shell.go index 966fa1a0ff8..e4f4c5bd6e3 100644 --- a/core/cmd/shell.go +++ b/core/cmd/shell.go @@ -246,9 +246,9 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G } evmFactoryCfg := chainlink.EVMFactoryConfig{ - CSAETHKeystore: keyStore, - ChainOpts: legacyevm.ChainOpts{AppConfig: cfg, MailMon: mailMon, DS: ds}, - MercuryTransmitter: cfg.Mercury().Transmitter(), + CSAETHKeystore: keyStore, + ChainOpts: legacyevm.ChainOpts{AppConfig: cfg, MailMon: mailMon, DS: ds}, + MercuryConfig: cfg.Mercury(), } // evm always enabled for backward compatibility // TODO BCF-2510 this needs to change in order to clear the path for EVM extraction diff --git a/core/config/docs/core.toml b/core/config/docs/core.toml index e0fc76f449c..edd1494e4f0 100644 --- a/core/config/docs/core.toml +++ b/core/config/docs/core.toml @@ -689,6 +689,10 @@ TransmitQueueMaxSize = 10_000 # Default # when sending a message to the mercury server, before aborting and considering # the transmission to be failed. TransmitTimeout = "5s" # Default +# TransmitConcurrency is the max number of concurrent transmits to each server. +# +# Only has effect with LLO jobs. +TransmitConcurrency = 100 # Default # Telemetry holds OTEL settings. # This data includes open telemetry metrics, traces, & logs. diff --git a/core/config/mercury_config.go b/core/config/mercury_config.go index d1b4b142e20..2e58ff0ee9d 100644 --- a/core/config/mercury_config.go +++ b/core/config/mercury_config.go @@ -20,6 +20,7 @@ type MercuryTLS interface { type MercuryTransmitter interface { TransmitQueueMaxSize() uint32 TransmitTimeout() commonconfig.Duration + TransmitConcurrency() uint32 } type Mercury interface { diff --git a/core/config/toml/types.go b/core/config/toml/types.go index d9302b81fb0..610d18b6b4d 100644 --- a/core/config/toml/types.go +++ b/core/config/toml/types.go @@ -1330,6 +1330,7 @@ func (m *MercuryTLS) ValidateConfig() (err error) { type MercuryTransmitter struct { TransmitQueueMaxSize *uint32 TransmitTimeout *commonconfig.Duration + TransmitConcurrency *uint32 } func (m *MercuryTransmitter) setFrom(f *MercuryTransmitter) { @@ -1339,6 +1340,9 @@ func (m *MercuryTransmitter) setFrom(f *MercuryTransmitter) { if v := f.TransmitTimeout; v != nil { m.TransmitTimeout = v } + if v := f.TransmitConcurrency; v != nil { + m.TransmitConcurrency = v + } } type Mercury struct { diff --git a/core/internal/cltest/cltest.go b/core/internal/cltest/cltest.go index 32c63e7944c..29515df7034 100644 --- a/core/internal/cltest/cltest.go +++ b/core/internal/cltest/cltest.go @@ -418,8 +418,8 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn MailMon: mailMon, DS: ds, }, - CSAETHKeystore: keyStore, - MercuryTransmitter: cfg.Mercury().Transmitter(), + CSAETHKeystore: keyStore, + MercuryConfig: cfg.Mercury(), } if cfg.EVMEnabled() { diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 97fcfe46ec5..8d0bd34b6c3 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -408,7 +408,7 @@ require ( github.com/smartcontractkit/chain-selectors v1.0.29 // indirect github.com/smartcontractkit/chainlink-ccip v0.0.0-20241118091009-43c2b4804cec // indirect github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f // indirect - github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e // indirect + github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241114154055-8d29ea018b57 // indirect github.com/smartcontractkit/chainlink-feeds v0.1.1 // indirect github.com/smartcontractkit/chainlink-protos/job-distributor v0.6.0 // indirect github.com/smartcontractkit/chainlink-protos/orchestrator v0.3.0 // indirect diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 0678468c384..02fb1e63a90 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1413,8 +1413,8 @@ github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef06 github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068/go.mod h1:ny87uTW6hLjCTLiBqBRNFEhETSXhHWevYlPclT5lSco= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo= -github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg= -github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e/go.mod h1:iK3BNHKCLgSgkOyiu3iE7sfZ20Qnuk7xwjV/yO/6gnQ= +github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241114154055-8d29ea018b57 h1:1BMTG66HnCIz+KMBWGvyzELNM6VHGwv2WKFhN7H49Sg= +github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241114154055-8d29ea018b57/go.mod h1:QPiorgpbLv4+Jn4YO6xxU4ftTu4T3QN8HwX3ImP59DE= github.com/smartcontractkit/chainlink-feeds v0.1.1 h1:JzvUOM/OgGQA1sOqTXXl52R6AnNt+Wg64sVG+XSA49c= github.com/smartcontractkit/chainlink-feeds v0.1.1/go.mod h1:55EZ94HlKCfAsUiKUTNI7QlE/3d3IwTlsU3YNa/nBb4= github.com/smartcontractkit/chainlink-protos/job-distributor v0.6.0 h1:0ewLMbAz3rZrovdRUCgd028yOXX8KigB4FndAUdI2kM= diff --git a/core/services/chainlink/config_mercury.go b/core/services/chainlink/config_mercury.go index bc4aed6fb07..0e56105406b 100644 --- a/core/services/chainlink/config_mercury.go +++ b/core/services/chainlink/config_mercury.go @@ -50,6 +50,10 @@ func (m *mercuryTransmitterConfig) TransmitTimeout() commonconfig.Duration { return *m.c.TransmitTimeout } +func (m *mercuryTransmitterConfig) TransmitConcurrency() uint32 { + return *m.c.TransmitConcurrency +} + type mercuryConfig struct { c toml.Mercury s toml.MercurySecrets diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go index 76b80672dbb..e04d6d7e25b 100644 --- a/core/services/chainlink/config_test.go +++ b/core/services/chainlink/config_test.go @@ -838,6 +838,7 @@ func TestConfig_Marshal(t *testing.T) { Transmitter: toml.MercuryTransmitter{ TransmitQueueMaxSize: ptr(uint32(123)), TransmitTimeout: commoncfg.MustNewDuration(234 * time.Second), + TransmitConcurrency: ptr(uint32(456)), }, VerboseLogging: ptr(true), } @@ -1348,6 +1349,7 @@ CertFile = '/path/to/cert.pem' [Mercury.Transmitter] TransmitQueueMaxSize = 123 TransmitTimeout = '3m54s' +TransmitConcurrency = 456 `}, {"full", full, fullTOML}, {"multi-chain", multiChain, multiChainTOML}, diff --git a/core/services/chainlink/relayer_factory.go b/core/services/chainlink/relayer_factory.go index 3740878fd19..cec7e5bb48c 100644 --- a/core/services/chainlink/relayer_factory.go +++ b/core/services/chainlink/relayer_factory.go @@ -54,7 +54,7 @@ func (r *RelayerFactory) NewDummy(config DummyFactoryConfig) (loop.Relayer, erro type EVMFactoryConfig struct { legacyevm.ChainOpts evmrelay.CSAETHKeystore - coreconfig.MercuryTransmitter + MercuryConfig coreconfig.Mercury } func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (map[types.RelayID]evmrelay.LOOPRelayAdapter, error) { @@ -83,7 +83,7 @@ func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (m DS: ccOpts.DS, CSAETHKeystore: config.CSAETHKeystore, MercuryPool: r.MercuryPool, - TransmitterConfig: config.MercuryTransmitter, + MercuryConfig: config.MercuryConfig, CapabilitiesRegistry: r.CapabilitiesRegistry, HTTPClient: r.HTTPClient, RetirementReportCache: r.RetirementReportCache, diff --git a/core/services/chainlink/testdata/config-empty-effective.toml b/core/services/chainlink/testdata/config-empty-effective.toml index cd51afac5f8..0f26b02ab6f 100644 --- a/core/services/chainlink/testdata/config-empty-effective.toml +++ b/core/services/chainlink/testdata/config-empty-effective.toml @@ -237,6 +237,7 @@ CertFile = '' [Mercury.Transmitter] TransmitQueueMaxSize = 10000 TransmitTimeout = '5s' +TransmitConcurrency = 100 [Capabilities] [Capabilities.Peering] diff --git a/core/services/chainlink/testdata/config-full.toml b/core/services/chainlink/testdata/config-full.toml index c6a5302a459..3191ce576ed 100644 --- a/core/services/chainlink/testdata/config-full.toml +++ b/core/services/chainlink/testdata/config-full.toml @@ -247,6 +247,7 @@ CertFile = '/path/to/cert.pem' [Mercury.Transmitter] TransmitQueueMaxSize = 123 TransmitTimeout = '3m54s' +TransmitConcurrency = 456 [Capabilities] [Capabilities.Peering] diff --git a/core/services/chainlink/testdata/config-multi-chain-effective.toml b/core/services/chainlink/testdata/config-multi-chain-effective.toml index e8da8142181..5f52c06ca1f 100644 --- a/core/services/chainlink/testdata/config-multi-chain-effective.toml +++ b/core/services/chainlink/testdata/config-multi-chain-effective.toml @@ -237,6 +237,7 @@ CertFile = '' [Mercury.Transmitter] TransmitQueueMaxSize = 10000 TransmitTimeout = '5s' +TransmitConcurrency = 100 [Capabilities] [Capabilities.Peering] diff --git a/core/services/llo/codecs.go b/core/services/llo/codecs.go index 7813c8923ea..2ccadfe330b 100644 --- a/core/services/llo/codecs.go +++ b/core/services/llo/codecs.go @@ -1,6 +1,7 @@ package llo import ( + "github.com/smartcontractkit/chainlink-common/pkg/logger" llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" "github.com/smartcontractkit/chainlink-data-streams/llo" @@ -8,11 +9,11 @@ import ( ) // NOTE: All supported codecs must be specified here -func NewReportCodecs() map[llotypes.ReportFormat]llo.ReportCodec { +func NewReportCodecs(lggr logger.Logger) map[llotypes.ReportFormat]llo.ReportCodec { codecs := make(map[llotypes.ReportFormat]llo.ReportCodec) codecs[llotypes.ReportFormatJSON] = llo.JSONReportCodec{} - codecs[llotypes.ReportFormatEVMPremiumLegacy] = evm.ReportCodecPremiumLegacy{} + codecs[llotypes.ReportFormatEVMPremiumLegacy] = evm.NewReportCodecPremiumLegacy(lggr) return codecs } diff --git a/core/services/llo/codecs_test.go b/core/services/llo/codecs_test.go index 4a7f3f65571..3af881a1de0 100644 --- a/core/services/llo/codecs_test.go +++ b/core/services/llo/codecs_test.go @@ -6,10 +6,11 @@ import ( "github.com/stretchr/testify/assert" llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" + "github.com/smartcontractkit/chainlink/v2/core/logger" ) func Test_NewReportCodecs(t *testing.T) { - c := NewReportCodecs() + c := NewReportCodecs(logger.TestLogger(t)) assert.Contains(t, c, llotypes.ReportFormatJSON, "expected JSON to be supported") assert.Contains(t, c, llotypes.ReportFormatEVMPremiumLegacy, "expected EVMPremiumLegacy to be supported") diff --git a/core/services/llo/data_source.go b/core/services/llo/data_source.go index ef333f821a1..0585dec49dc 100644 --- a/core/services/llo/data_source.go +++ b/core/services/llo/data_source.go @@ -3,8 +3,10 @@ package llo import ( "context" "fmt" + "slices" "sort" "sync" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -85,11 +87,7 @@ func newDataSource(lggr logger.Logger, registry Registry, t Telemeter) *dataSour // Observe looks up all streams in the registry and populates a map of stream ID => value func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, opts llo.DSOpts) error { - var wg sync.WaitGroup - wg.Add(len(streamValues)) - var svmu sync.Mutex - var errs []ErrObservationFailed - var errmu sync.Mutex + now := time.Now() if opts.VerboseLogging() { streamIDs := make([]streams.StreamID, 0, len(streamValues)) @@ -100,6 +98,13 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, d.lggr.Debugw("Observing streams", "streamIDs", streamIDs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr) } + var wg sync.WaitGroup + wg.Add(len(streamValues)) + + var mu sync.Mutex + successfulStreamIDs := make([]streams.StreamID, 0, len(streamValues)) + var errs []ErrObservationFailed + for _, streamID := range maps.Keys(streamValues) { go func(streamID llotypes.StreamID) { defer wg.Done() @@ -108,17 +113,17 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, stream, exists := d.registry.Get(streamID) if !exists { - errmu.Lock() + mu.Lock() errs = append(errs, ErrObservationFailed{streamID: streamID, reason: fmt.Sprintf("missing stream: %d", streamID)}) - errmu.Unlock() + mu.Unlock() promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc() return } run, trrs, err := stream.Run(ctx) if err != nil { - errmu.Lock() + mu.Lock() errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "pipeline run failed"}) - errmu.Unlock() + mu.Unlock() promObservationErrorCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc() // TODO: Consolidate/reduce telemetry. We should send all observation results in a single packet // https://smartcontract-it.atlassian.net/browse/MERC-6290 @@ -129,44 +134,50 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, // https://smartcontract-it.atlassian.net/browse/MERC-6290 val, err = ExtractStreamValue(trrs) if err != nil { - errmu.Lock() + mu.Lock() errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "failed to extract big.Int"}) - errmu.Unlock() + mu.Unlock() return } d.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, nil) + mu.Lock() + defer mu.Unlock() + + successfulStreamIDs = append(successfulStreamIDs, streamID) if val != nil { - svmu.Lock() - defer svmu.Unlock() streamValues[streamID] = val } }(streamID) } wg.Wait() + elapsed := time.Since(now) - // Failed observations are always logged at warn level - var failedStreamIDs []streams.StreamID - if len(errs) > 0 { + // Only log on errors or if VerboseLogging is turned on + if len(errs) > 0 || opts.VerboseLogging() { + slices.Sort(successfulStreamIDs) sort.Slice(errs, func(i, j int) bool { return errs[i].streamID < errs[j].streamID }) - failedStreamIDs = make([]streams.StreamID, len(errs)) + + failedStreamIDs := make([]streams.StreamID, len(errs)) errStrs := make([]string, len(errs)) for i, e := range errs { errStrs[i] = e.String() failedStreamIDs[i] = e.streamID } - d.lggr.Warnw("Observation failed for streams", "failedStreamIDs", failedStreamIDs, "errs", errStrs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr) - } - if opts.VerboseLogging() { - successes := make([]streams.StreamID, 0, len(streamValues)) - for strmID := range streamValues { - successes = append(successes, strmID) + lggr := logger.With(d.lggr, "elapsed", elapsed, "nSuccessfulStreams", len(successfulStreamIDs), "nFailedStreams", len(failedStreamIDs), "successfulStreamIDs", successfulStreamIDs, "failedStreamIDs", failedStreamIDs, "errs", errStrs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr) + + if opts.VerboseLogging() { + lggr = logger.With(lggr, "streamValues", streamValues) + } + + if len(errs) == 0 && opts.VerboseLogging() { + lggr.Infow("Observation succeeded for all streams") + } else if len(errs) > 0 { + lggr.Warnw("Observation failed for streams") } - sort.Slice(successes, func(i, j int) bool { return successes[i] < successes[j] }) - d.lggr.Debugw("Observation complete", "successfulStreamIDs", successes, "failedStreamIDs", failedStreamIDs, "configDigest", opts.ConfigDigest(), "values", streamValues, "seqNr", opts.OutCtx().SeqNr) } return nil diff --git a/core/services/llo/delegate.go b/core/services/llo/delegate.go index f5f9b5f05f1..fabc8dc2682 100644 --- a/core/services/llo/delegate.go +++ b/core/services/llo/delegate.go @@ -19,6 +19,7 @@ import ( "github.com/smartcontractkit/chainlink-data-streams/llo" datastreamsllo "github.com/smartcontractkit/chainlink-data-streams/llo" + corelogger "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/streams" ) @@ -91,7 +92,13 @@ func NewDelegate(cfg DelegateConfig) (job.ServiceCtx, error) { if cfg.ShouldRetireCache == nil { return nil, errors.New("ShouldRetireCache must not be nil") } - reportCodecs := NewReportCodecs() + var codecLggr logger.Logger + if cfg.ReportingPluginConfig.VerboseLogging { + codecLggr = logger.Named(lggr, "ReportCodecs") + } else { + codecLggr = corelogger.NullLogger + } + reportCodecs := NewReportCodecs(codecLggr) var t TelemeterService if cfg.CaptureEATelemetry { @@ -126,7 +133,7 @@ func (d *delegate) Start(ctx context.Context) error { case 1: lggr = logger.With(lggr, "instanceType", "Green") } - ocrLogger := logger.NewOCRWrapper(lggr, d.cfg.TraceLogging, func(msg string) { + ocrLogger := logger.NewOCRWrapper(NewSuppressedLogger(lggr, d.cfg.ReportingPluginConfig.VerboseLogging), d.cfg.TraceLogging, func(msg string) { // TODO: do we actually need to DB-persist errors? // MERC-3524 }) @@ -144,7 +151,7 @@ func (d *delegate) Start(ctx context.Context) error { OffchainKeyring: d.cfg.OffchainKeyring, OnchainKeyring: d.cfg.OnchainKeyring, ReportingPluginFactory: datastreamsllo.NewPluginFactory( - d.cfg.ReportingPluginConfig, psrrc, d.src, d.cfg.RetirementReportCodec, d.cfg.ChannelDefinitionCache, d.ds, logger.Named(lggr, "LLOReportingPlugin"), llo.EVMOnchainConfigCodec{}, d.reportCodecs, + d.cfg.ReportingPluginConfig, psrrc, d.src, d.cfg.RetirementReportCodec, d.cfg.ChannelDefinitionCache, d.ds, logger.Named(lggr, "ReportingPlugin"), llo.EVMOnchainConfigCodec{}, d.reportCodecs, ), MetricsRegisterer: prometheus.WrapRegistererWith(map[string]string{"job_name": d.cfg.JobName.ValueOrZero()}, prometheus.DefaultRegisterer), }) diff --git a/core/services/llo/evm/fees_test.go b/core/services/llo/evm/fees_test.go index 16ee98db7df..33888de14ec 100644 --- a/core/services/llo/evm/fees_test.go +++ b/core/services/llo/evm/fees_test.go @@ -42,4 +42,14 @@ func Test_Fees(t *testing.T) { fee := CalculateFee(tokenPriceInUSD, BaseUSDFee) assert.Equal(t, big.NewInt(0), fee) }) + + t.Run("ridiculously high value rounds down fee to zero", func(t *testing.T) { + // 20dp + tokenPriceInUSD, err := decimal.NewFromString("12984833000000000000") + require.NoError(t, err) + BaseUSDFee, err = decimal.NewFromString("0.1") + require.NoError(t, err) + fee := CalculateFee(tokenPriceInUSD, BaseUSDFee) + assert.Equal(t, big.NewInt(0), fee) + }) } diff --git a/core/services/llo/evm/report_codec_premium_legacy.go b/core/services/llo/evm/report_codec_premium_legacy.go index 9bca9587a0e..700ba6e6533 100644 --- a/core/services/llo/evm/report_codec_premium_legacy.go +++ b/core/services/llo/evm/report_codec_premium_legacy.go @@ -17,8 +17,8 @@ import ( v3 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3" "github.com/smartcontractkit/chainlink-data-streams/llo" + "github.com/smartcontractkit/chainlink-common/pkg/logger" ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" - "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury" reportcodecv3 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3/reportcodec" reporttypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3/types" @@ -28,10 +28,12 @@ var ( _ llo.ReportCodec = ReportCodecPremiumLegacy{} ) -type ReportCodecPremiumLegacy struct{ logger.Logger } +type ReportCodecPremiumLegacy struct { + logger.Logger +} -func NewReportCodecPremiumLegacy(lggr logger.Logger) llo.ReportCodec { - return ReportCodecPremiumLegacy{lggr.Named("ReportCodecPremiumLegacy")} +func NewReportCodecPremiumLegacy(lggr logger.Logger) ReportCodecPremiumLegacy { + return ReportCodecPremiumLegacy{logger.Sugared(lggr).Named("ReportCodecPremiumLegacy")} } type ReportFormatEVMPremiumLegacyOpts struct { @@ -92,6 +94,9 @@ func (r ReportCodecPremiumLegacy) Encode(ctx context.Context, report llo.Report, Bid: quote.Bid.Mul(multiplier).BigInt(), Ask: quote.Ask.Mul(multiplier).BigInt(), } + + r.Logger.Debugw("Encoding report", "report", report, "opts", opts, "nativePrice", nativePrice, "linkPrice", linkPrice, "quote", quote, "multiplier", multiplier, "rf", rf) + return codec.BuildReport(ctx, rf) } diff --git a/core/services/llo/evm/report_codec_premium_legacy_test.go b/core/services/llo/evm/report_codec_premium_legacy_test.go index 804555d06be..d5d816da1d5 100644 --- a/core/services/llo/evm/report_codec_premium_legacy_test.go +++ b/core/services/llo/evm/report_codec_premium_legacy_test.go @@ -12,6 +12,7 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + "github.com/smartcontractkit/chainlink/v2/core/logger" reporttypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3/types" llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" @@ -32,7 +33,7 @@ func newValidPremiumLegacyReport() llo.Report { } func Test_ReportCodecPremiumLegacy(t *testing.T) { - rc := ReportCodecPremiumLegacy{} + rc := ReportCodecPremiumLegacy{logger.TestLogger(t)} feedID := [32]uint8{0x1, 0x2, 0x3, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0} cd := llotypes.ChannelDefinition{Opts: llotypes.ChannelOpts(fmt.Sprintf(`{"baseUSDFee":"10.50","expirationWindow":60,"feedId":"0x%x","multiplier":10}`, feedID))} diff --git a/core/services/llo/mercurytransmitter/queue.go b/core/services/llo/mercurytransmitter/queue.go index a5a606c5b32..eae9a0b9d0c 100644 --- a/core/services/llo/mercurytransmitter/queue.go +++ b/core/services/llo/mercurytransmitter/queue.go @@ -95,8 +95,8 @@ func (tq *transmitQueue) Push(t *Transmission) (ok bool) { if tq.maxlen != 0 && tq.pq.Len() == tq.maxlen { // evict oldest entry to make room - tq.lggr.Criticalf("Transmit queue is full; dropping oldest transmission (reached max length of %d)", tq.maxlen) removed := heap.PopMax(tq.pq) + tq.lggr.Criticalw(fmt.Sprintf("Transmit queue is full; dropping oldest transmission (reached max length of %d)", tq.maxlen), "transmission", removed) if removed, ok := removed.(*Transmission); ok { tq.asyncDeleter.AsyncDelete(removed.Hash()) } diff --git a/core/services/llo/mercurytransmitter/server.go b/core/services/llo/mercurytransmitter/server.go index 70e76655961..84b2c2889fa 100644 --- a/core/services/llo/mercurytransmitter/server.go +++ b/core/services/llo/mercurytransmitter/server.go @@ -10,12 +10,17 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + + ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" "github.com/smartcontractkit/chainlink-data-streams/llo" + corelogger "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/llo/evm" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb" @@ -49,10 +54,15 @@ var ( ) ) +type ReportPacker interface { + Pack(digest types.ConfigDigest, seqNr uint64, report ocr2types.Report, sigs []ocr2types.AttributedOnchainSignature) ([]byte, error) +} + // A server handles the queue for a given mercury server type server struct { - lggr logger.SugaredLogger + lggr logger.SugaredLogger + verboseLogging bool transmitTimeout time.Duration @@ -64,6 +74,9 @@ type server struct { url string + evmPremiumLegacyPacker ReportPacker + jsonPacker ReportPacker + transmitSuccessCount prometheus.Counter transmitDuplicateCount prometheus.Counter transmitConnectionErrorCount prometheus.Counter @@ -77,17 +90,27 @@ type QueueConfig interface { TransmitTimeout() commonconfig.Duration } -func newServer(lggr logger.Logger, cfg QueueConfig, client wsrpc.Client, orm ORM, serverURL string) *server { +func newServer(lggr logger.Logger, verboseLogging bool, cfg QueueConfig, client wsrpc.Client, orm ORM, serverURL string) *server { pm := NewPersistenceManager(lggr, orm, serverURL, int(cfg.TransmitQueueMaxSize()), flushDeletesFrequency, pruneFrequency) donIDStr := fmt.Sprintf("%d", pm.DonID()) + var codecLggr logger.Logger + if verboseLogging { + codecLggr = lggr + } else { + codecLggr = corelogger.NullLogger + } + return &server{ logger.Sugared(lggr), + verboseLogging, cfg.TransmitTimeout().Duration(), client, pm, NewTransmitQueue(lggr, serverURL, int(cfg.TransmitQueueMaxSize()), pm), make(chan [32]byte, int(cfg.TransmitQueueMaxSize())), serverURL, + evm.NewReportCodecPremiumLegacy(codecLggr), + llo.JSONReportCodec{}, transmitSuccessCount.WithLabelValues(donIDStr, serverURL), transmitDuplicateCount.WithLabelValues(donIDStr, serverURL), transmitConnectionErrorCount.WithLabelValues(donIDStr, serverURL), @@ -162,7 +185,7 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, donI // queue was closed return } - res, err := func(ctx context.Context) (*pb.TransmitResponse, error) { + req, res, err := func(ctx context.Context) (*pb.TransmitRequest, *pb.TransmitResponse, error) { ctx, cancelFn := context.WithTimeout(ctx, utils.WithJitter(s.transmitTimeout)) defer cancelFn() return s.transmit(ctx, t) @@ -172,7 +195,7 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, donI return } else if err != nil { s.transmitConnectionErrorCount.Inc() - s.lggr.Errorw("Transmit report failed", "err", err, "transmission", t) + s.lggr.Errorw("Transmit report failed", "err", err, "req", req, "transmission", t) if ok := s.q.Push(t); !ok { s.lggr.Error("Failed to push report to transmit queue; queue is closed") return @@ -190,7 +213,7 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, donI b.Reset() if res.Error == "" { s.transmitSuccessCount.Inc() - s.lggr.Debugw("Transmit report success", "transmission", t, "response", res) + s.lggr.Debugw("Transmit report success", "req.ReportFormat", req.ReportFormat, "req.Payload", req.Payload, "transmission", t, "response", res) } else { // We don't need to retry here because the mercury server // has confirmed it received the report. We only need to retry @@ -199,36 +222,36 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, donI case DuplicateReport: s.transmitSuccessCount.Inc() s.transmitDuplicateCount.Inc() - s.lggr.Debugw("Transmit report success; duplicate report", "transmission", t, "response", res) + s.lggr.Debugw("Transmit report success; duplicate report", "req.ReportFormat", req.ReportFormat, "req.Payload", req.Payload, "transmission", t, "response", res) default: transmitServerErrorCount.WithLabelValues(donIDStr, s.url, fmt.Sprintf("%d", res.Code)).Inc() - s.lggr.Errorw("Transmit report failed; mercury server returned error", "response", res, "transmission", t, "err", res.Error, "code", res.Code) + s.lggr.Errorw("Transmit report failed; mercury server returned error", "req.ReportFormat", req.ReportFormat, "req.Payload", req.Payload, "response", res, "transmission", t, "err", res.Error, "code", res.Code) } } select { case s.deleteQueue <- t.Hash(): default: - s.lggr.Criticalw("Delete queue is full", "transmission", t) + s.lggr.Criticalw("Delete queue is full", "transmission", t, "transmissionHash", fmt.Sprintf("%x", t.Hash())) } } } -func (s *server) transmit(ctx context.Context, t *Transmission) (*pb.TransmitResponse, error) { +func (s *server) transmit(ctx context.Context, t *Transmission) (*pb.TransmitRequest, *pb.TransmitResponse, error) { var payload []byte var err error switch t.Report.Info.ReportFormat { case llotypes.ReportFormatJSON: - payload, err = llo.JSONReportCodec{}.Pack(t.ConfigDigest, t.SeqNr, t.Report.Report, t.Sigs) + payload, err = s.jsonPacker.Pack(t.ConfigDigest, t.SeqNr, t.Report.Report, t.Sigs) case llotypes.ReportFormatEVMPremiumLegacy: - payload, err = evm.ReportCodecPremiumLegacy{}.Pack(t.ConfigDigest, t.SeqNr, t.Report.Report, t.Sigs) + payload, err = s.evmPremiumLegacyPacker.Pack(t.ConfigDigest, t.SeqNr, t.Report.Report, t.Sigs) default: - return nil, fmt.Errorf("Transmit failed; unsupported report format: %q", t.Report.Info.ReportFormat) + return nil, nil, fmt.Errorf("Transmit failed; don't know how to Pack unsupported report format: %q", t.Report.Info.ReportFormat) } if err != nil { - return nil, fmt.Errorf("Transmit: encode failed; %w", err) + return nil, nil, fmt.Errorf("Transmit: encode failed; %w", err) } req := &pb.TransmitRequest{ @@ -236,5 +259,6 @@ func (s *server) transmit(ctx context.Context, t *Transmission) (*pb.TransmitRes ReportFormat: uint32(t.Report.Info.ReportFormat), } - return s.c.Transmit(ctx, req) + resp, err := s.c.Transmit(ctx, req) + return req, resp, err } diff --git a/core/services/llo/mercurytransmitter/transmitter.go b/core/services/llo/mercurytransmitter/transmitter.go index 33090ed9574..024a98174c6 100644 --- a/core/services/llo/mercurytransmitter/transmitter.go +++ b/core/services/llo/mercurytransmitter/transmitter.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "io" + "strconv" "sync" "github.com/prometheus/client_golang/prometheus" @@ -97,12 +98,14 @@ var _ Transmitter = (*transmitter)(nil) type Config interface { TransmitQueueMaxSize() uint32 TransmitTimeout() commonconfig.Duration + TransmitConcurrency() uint32 } type transmitter struct { services.StateMachine - lggr logger.SugaredLogger - cfg Config + lggr logger.SugaredLogger + verboseLogging bool + cfg Config orm ORM servers map[string]*server @@ -115,12 +118,13 @@ type transmitter struct { } type Opts struct { - Lggr logger.Logger - Cfg Config - Clients map[string]wsrpc.Client - FromAccount ed25519.PublicKey - DonID uint32 - ORM ORM + Lggr logger.Logger + VerboseLogging bool + Cfg Config + Clients map[string]wsrpc.Client + FromAccount ed25519.PublicKey + DonID uint32 + ORM ORM } func New(opts Opts) Transmitter { @@ -132,11 +136,12 @@ func newTransmitter(opts Opts) *transmitter { servers := make(map[string]*server, len(opts.Clients)) for serverURL, client := range opts.Clients { sLggr := sugared.Named(serverURL).With("serverURL", serverURL) - servers[serverURL] = newServer(sLggr, opts.Cfg, client, opts.ORM, serverURL) + servers[serverURL] = newServer(sLggr, opts.VerboseLogging, opts.Cfg, client, opts.ORM, serverURL) } return &transmitter{ services.StateMachine{}, sugared.Named("LLOMercuryTransmitter").With("donID", opts.ORM.DonID()), + opts.VerboseLogging, opts.Cfg, opts.ORM, servers, @@ -149,7 +154,9 @@ func newTransmitter(opts Opts) *transmitter { func (mt *transmitter) Start(ctx context.Context) (err error) { return mt.StartOnce("LLOMercuryTransmitter", func() error { - mt.lggr.Debugw("Loading transmit requests from database") + if mt.verboseLogging { + mt.lggr.Debugw("Loading transmit requests from database") + } { var startClosers []services.StartClose @@ -159,12 +166,23 @@ func (mt *transmitter) Start(ctx context.Context) (err error) { return err } s.q.Init(transmissions) - // starting pm after loading from it is fine because it simply spawns some garbage collection/prune goroutines + // starting pm after loading from it is fine because it simply + // spawns some garbage collection/prune goroutines startClosers = append(startClosers, s.c, s.q, s.pm) - mt.wg.Add(2) - go s.runDeleteQueueLoop(mt.stopCh, mt.wg) - go s.runQueueLoop(mt.stopCh, mt.wg, fmt.Sprintf("%d", mt.donID)) + // Number of goroutines per server will be roughly + // 2*nServers*TransmitConcurrency because each server has a + // delete queue and a transmit queue. + // + // This could potentially be reduced by implementing transmit batching, + // see: https://smartcontract-it.atlassian.net/browse/MERC-6635 + nThreads := int(mt.cfg.TransmitConcurrency()) + mt.wg.Add(2 * nThreads) + donIDStr := strconv.FormatUint(uint64(mt.donID), 10) + for i := 0; i < nThreads; i++ { + go s.runDeleteQueueLoop(mt.stopCh, mt.wg) + go s.runQueueLoop(mt.stopCh, mt.wg, donIDStr) + } } if err := (&services.MultiStart{}).Start(ctx, startClosers...); err != nil { return err @@ -234,7 +252,9 @@ func (mt *transmitter) Transmit( g := new(errgroup.Group) for i := range transmissions { t := transmissions[i] - mt.lggr.Debugw("LLOMercuryTransmit", "digest", digest.Hex(), "seqNr", seqNr, "reportFormat", report.Info.ReportFormat, "reportLifeCycleStage", report.Info.LifeCycleStage, "transmissionHash", fmt.Sprintf("%x", t.Hash())) + if mt.verboseLogging { + mt.lggr.Debugw("LLOMercuryTransmit", "digest", digest.Hex(), "seqNr", seqNr, "reportFormat", report.Info.ReportFormat, "reportLifeCycleStage", report.Info.LifeCycleStage, "transmissionHash", fmt.Sprintf("%x", t.Hash())) + } g.Go(func() error { s := mt.servers[t.ServerURL] if ok := s.q.Push(t); !ok { diff --git a/core/services/llo/mercurytransmitter/transmitter_test.go b/core/services/llo/mercurytransmitter/transmitter_test.go index db3d0d2e584..7477e848b78 100644 --- a/core/services/llo/mercurytransmitter/transmitter_test.go +++ b/core/services/llo/mercurytransmitter/transmitter_test.go @@ -33,6 +33,10 @@ func (m mockCfg) TransmitTimeout() commonconfig.Duration { return *commonconfig.MustNewDuration(1 * time.Hour) } +func (m mockCfg) TransmitConcurrency() uint32 { + return 5 +} + func Test_Transmitter_Transmit(t *testing.T) { lggr := logger.TestLogger(t) db := pgtest.NewSqlxDB(t) @@ -135,7 +139,7 @@ func Test_Transmitter_runQueueLoop(t *testing.T) { orm := NewORM(db, donID) cfg := mockCfg{} - s := newServer(lggr, cfg, c, orm, sURL) + s := newServer(lggr, true, cfg, c, orm, sURL) t.Run("pulls from queue and transmits successfully", func(t *testing.T) { transmit := make(chan *pb.TransmitRequest, 1) diff --git a/core/services/llo/suppressed_logger.go b/core/services/llo/suppressed_logger.go new file mode 100644 index 00000000000..9fe6e6731e5 --- /dev/null +++ b/core/services/llo/suppressed_logger.go @@ -0,0 +1,51 @@ +package llo + +import "github.com/smartcontractkit/chainlink-common/pkg/logger" + +// Suppressed logger swallows debug/info unless the verbose flag is turned on +// Useful for OCR to calm down its verbosity + +var _ logger.Logger = &SuppressedLogger{} + +func NewSuppressedLogger(lggr logger.Logger, verbose bool) logger.Logger { + return &SuppressedLogger{ + Logger: lggr, + Verbose: verbose, + } +} + +type SuppressedLogger struct { + logger.Logger + Verbose bool +} + +func (s *SuppressedLogger) Debug(args ...interface{}) { + if s.Verbose { + s.Logger.Debug(args...) + } +} +func (s *SuppressedLogger) Info(args ...interface{}) { + if s.Verbose { + s.Logger.Info(args...) + } +} +func (s *SuppressedLogger) Debugf(format string, values ...interface{}) { + if s.Verbose { + s.Logger.Debugf(format, values...) + } +} +func (s *SuppressedLogger) Infof(format string, values ...interface{}) { + if s.Verbose { + s.Logger.Infof(format, values...) + } +} +func (s *SuppressedLogger) Debugw(msg string, keysAndValues ...interface{}) { + if s.Verbose { + s.Logger.Debugw(msg, keysAndValues...) + } +} +func (s *SuppressedLogger) Infow(msg string, keysAndValues ...interface{}) { + if s.Verbose { + s.Logger.Infow(msg, keysAndValues...) + } +} diff --git a/core/services/llo/telemetry.go b/core/services/llo/telemetry.go index 888ee9d5d36..bb86679dc52 100644 --- a/core/services/llo/telemetry.go +++ b/core/services/llo/telemetry.go @@ -39,7 +39,11 @@ func NewTelemeterService(lggr logger.Logger, monitoringEndpoint commontypes.Moni } func newTelemeter(lggr logger.Logger, monitoringEndpoint commontypes.MonitoringEndpoint, donID uint32) *telemeter { - chTelemetryObservation := make(chan TelemetryObservation, 100) + // NOTE: This channel must take multiple telemetry packets per round (1 per + // feed) so we need to make sure the buffer is large enough. + // + // 2000 feeds * 5s/250ms = 40_000 should hold ~5s of buffer in the worst case. + chTelemetryObservation := make(chan TelemetryObservation, 40_000) t := &telemeter{ chTelemetryObservation: chTelemetryObservation, monitoringEndpoint: monitoringEndpoint, diff --git a/core/services/llo/transmitter.go b/core/services/llo/transmitter.go index 7696c69c291..1ff5c1b36ac 100644 --- a/core/services/llo/transmitter.go +++ b/core/services/llo/transmitter.go @@ -47,8 +47,9 @@ type TransmitterRetirementReportCacheWriter interface { type transmitter struct { services.StateMachine - lggr logger.Logger - fromAccount string + lggr logger.Logger + verboseLogging bool + fromAccount string subTransmitters []Transmitter retirementReportCache TransmitterRetirementReportCacheWriter @@ -56,6 +57,7 @@ type transmitter struct { type TransmitterOpts struct { Lggr logger.Logger + VerboseLogging bool FromAccount string MercuryTransmitterOpts mercurytransmitter.Opts RetirementReportCache TransmitterRetirementReportCacheWriter @@ -69,6 +71,7 @@ func NewTransmitter(opts TransmitterOpts) Transmitter { return &transmitter{ services.StateMachine{}, opts.Lggr, + opts.VerboseLogging, opts.FromAccount, subTransmitters, opts.RetirementReportCache, @@ -114,6 +117,10 @@ func (t *transmitter) Transmit( report ocr3types.ReportWithInfo[llotypes.ReportInfo], sigs []types.AttributedOnchainSignature, ) (err error) { + if t.verboseLogging { + t.lggr.Debugw("Transmit report", "digest", digest, "seqNr", seqNr, "report", report, "sigs", sigs) + } + if report.Info.ReportFormat == llotypes.ReportFormatRetirement { // Retirement reports don't get transmitted; rather, they are stored in // the RetirementReportCache diff --git a/core/services/ocr2/plugins/llo/helpers_test.go b/core/services/ocr2/plugins/llo/helpers_test.go index 0ca6eeb60cb..9cd8742ffa8 100644 --- a/core/services/ocr2/plugins/llo/helpers_test.go +++ b/core/services/ocr2/plugins/llo/helpers_test.go @@ -185,6 +185,7 @@ func setupNode( // [Mercury] c.Mercury.VerboseLogging = ptr(true) + c.Mercury.Transmitter.TransmitConcurrency = ptr(uint32(5)) // Avoid a ridiculous number of goroutines }) lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.DebugLevel) diff --git a/core/services/ocr2/plugins/llo/integration_test.go b/core/services/ocr2/plugins/llo/integration_test.go index bdd773910f4..043ce34c946 100644 --- a/core/services/ocr2/plugins/llo/integration_test.go +++ b/core/services/ocr2/plugins/llo/integration_test.go @@ -509,7 +509,8 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi assert.Equal(t, expectedBid.String(), reportElems["bid"].(*big.Int).String()) assert.Equal(t, expectedAsk.String(), reportElems["ask"].(*big.Int).String()) - t.Run(fmt.Sprintf("emulate mercury server verifying report (local verification) - node %x", req.pk), func(t *testing.T) { + // emulate mercury server verifying report (local verification) + { rv := mercuryverifier.NewVerifier() reportSigners, err := rv.Verify(mercuryverifier.SignedReport{ @@ -522,14 +523,13 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi require.NoError(t, err) assert.GreaterOrEqual(t, len(reportSigners), int(fNodes+1)) assert.Subset(t, signerAddresses, reportSigners) - }) + } - t.Run(fmt.Sprintf("test on-chain verification - node %x", req.pk), func(t *testing.T) { - t.Run("destination verifier", func(t *testing.T) { - _, err = verifierProxy.Verify(steve, req.req.Payload, []byte{}) - require.NoError(t, err) - }) - }) + // test on-chain verification + { + _, err = verifierProxy.Verify(steve, req.req.Payload, []byte{}) + require.NoError(t, err) + } t.Logf("oracle %x reported for 0x%x", req.pk[:], feedID[:]) @@ -597,7 +597,8 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi var greenDigest ocr2types.ConfigDigest allReports := make(map[types.ConfigDigest][]datastreamsllo.Report) - t.Run("start off with blue=production, green=staging (specimen reports)", func(t *testing.T) { + // start off with blue=production, green=staging (specimen reports) + { // Set config on configurator blueDigest = setProductionConfig( t, donID, steve, backend, configurator, configuratorAddress, nodes, oracles, @@ -617,8 +618,9 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi assert.Equal(t, "2976.39", r.Values[0].(*datastreamsllo.Decimal).String()) break } - }) - t.Run("setStagingConfig does not affect production", func(t *testing.T) { + } + // setStagingConfig does not affect production + { greenDigest = setStagingConfig( t, donID, steve, backend, configurator, configuratorAddress, nodes, oracles, blueDigest, ) @@ -639,8 +641,9 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi } assert.Equal(t, blueDigest, r.ConfigDigest) } - }) - t.Run("promoteStagingConfig flow has clean and gapless hand off from old production to newly promoted staging instance, leaving old production instance in 'retired' state", func(t *testing.T) { + } + // promoteStagingConfig flow has clean and gapless hand off from old production to newly promoted staging instance, leaving old production instance in 'retired' state + { promoteStagingConfig(t, donID, steve, backend, configurator, configuratorAddress, false) // NOTE: Wait for first non-specimen report for the newly promoted (green) instance @@ -704,8 +707,9 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi assert.Less(t, finalBlueReport.ValidAfterSeconds, finalBlueReport.ObservationTimestampSeconds) assert.Equal(t, finalBlueReport.ObservationTimestampSeconds, initialPromotedGreenReport.ValidAfterSeconds) assert.Less(t, initialPromotedGreenReport.ValidAfterSeconds, initialPromotedGreenReport.ObservationTimestampSeconds) - }) - t.Run("retired instance does not produce reports", func(t *testing.T) { + } + // retired instance does not produce reports + { // NOTE: Wait for five "green" reports to be produced and assert no "blue" reports i := 0 @@ -721,8 +725,9 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi assert.False(t, r.Specimen) assert.Equal(t, greenDigest, r.ConfigDigest) } - }) - t.Run("setStagingConfig replaces 'retired' instance with new config and starts producing specimen reports again", func(t *testing.T) { + } + // setStagingConfig replaces 'retired' instance with new config and starts producing specimen reports again + { blueDigest = setStagingConfig( t, donID, steve, backend, configurator, configuratorAddress, nodes, oracles, greenDigest, ) @@ -740,8 +745,9 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi } assert.Equal(t, greenDigest, r.ConfigDigest) } - }) - t.Run("promoteStagingConfig swaps the instances again", func(t *testing.T) { + } + // promoteStagingConfig swaps the instances again + { // TODO: Check that once an instance enters 'retired' state, it // doesn't produce reports or bother making observations promoteStagingConfig(t, donID, steve, backend, configurator, configuratorAddress, true) @@ -766,8 +772,9 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi assert.Less(t, finalGreenReport.ValidAfterSeconds, finalGreenReport.ObservationTimestampSeconds) assert.Equal(t, finalGreenReport.ObservationTimestampSeconds, initialPromotedBlueReport.ValidAfterSeconds) assert.Less(t, initialPromotedBlueReport.ValidAfterSeconds, initialPromotedBlueReport.ObservationTimestampSeconds) - }) - t.Run("adding a new channel definition is picked up on the fly", func(t *testing.T) { + } + // adding a new channel definition is picked up on the fly + { channelDefinitions[2] = llotypes.ChannelDefinition{ ReportFormat: llotypes.ReportFormatJSON, Streams: []llotypes.Stream{ @@ -805,7 +812,7 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi assert.Len(t, r.Values, 1) assert.Equal(t, "2976.39", r.Values[0].(*datastreamsllo.Decimal).String()) } - }) + } t.Run("deleting the jobs turns off oracles and cleans up resources", func(t *testing.T) { t.Skip("TODO - MERC-3524") }) diff --git a/core/services/ocrcommon/telemetry.go b/core/services/ocrcommon/telemetry.go index e20b2485d86..58e57950cd3 100644 --- a/core/services/ocrcommon/telemetry.go +++ b/core/services/ocrcommon/telemetry.go @@ -164,7 +164,6 @@ func ParseMercuryEATelemetry(lggr logger.Logger, trrs pipeline.TaskRunResults, f bridgeRawResponse, ok := trr.Result.Value.(string) if !ok { - lggr.Warnw(fmt.Sprintf("cannot get bridge response from bridge task, id=%s, name=%q, expected string got %T", trr.Task.DotID(), bridgeName, trr.Result.Value), "dotID", trr.Task.DotID(), "bridgeName", bridgeName) continue } eaTelem, err := parseEATelemetry([]byte(bridgeRawResponse)) diff --git a/core/services/ocrcommon/telemetry_test.go b/core/services/ocrcommon/telemetry_test.go index 8fac0ab2cbf..ae682524e0f 100644 --- a/core/services/ocrcommon/telemetry_test.go +++ b/core/services/ocrcommon/telemetry_test.go @@ -1,6 +1,7 @@ package ocrcommon import ( + "fmt" "math/big" "sync" "testing" @@ -1024,9 +1025,8 @@ func TestCollectMercuryEnhancedTelemetryV1(t *testing.T) { } wg.Wait() - require.Equal(t, 2, logs.Len()) - require.Contains(t, logs.All()[0].Message, `cannot get bridge response from bridge task, id=ds1, name="test-mercury-bridge-1"`) - require.Contains(t, logs.All()[1].Message, "cannot parse EA telemetry") + require.Equal(t, 1, logs.Len()) + require.Contains(t, logs.All()[0].Message, "cannot parse EA telemetry") chDone <- struct{}{} } @@ -1140,11 +1140,11 @@ func TestCollectMercuryEnhancedTelemetryV2(t *testing.T) { } wg.Wait() - require.Equal(t, 4, logs.Len()) + require.Equal(t, 3, logs.Len()) + fmt.Println(logs.All()) require.Contains(t, logs.All()[0].Message, "cannot parse enhanced EA telemetry bid price") - require.Contains(t, logs.All()[1].Message, "cannot get bridge response from bridge task") - require.Contains(t, logs.All()[2].Message, "cannot parse EA telemetry") - require.Contains(t, logs.All()[3].Message, "cannot parse enhanced EA telemetry bid price") + require.Contains(t, logs.All()[1].Message, "cannot parse EA telemetry") + require.Contains(t, logs.All()[2].Message, "cannot parse enhanced EA telemetry bid price") chDone <- struct{}{} } diff --git a/core/services/pipeline/runner.go b/core/services/pipeline/runner.go index 1fc2fc46336..2194cb8be46 100644 --- a/core/services/pipeline/runner.go +++ b/core/services/pipeline/runner.go @@ -510,15 +510,23 @@ func (r *runner) run(ctx context.Context, pipeline *Pipeline, run *Run, vars Var ) } l = l.With("run.State", run.State, "fatal", run.HasFatalErrors(), "runTime", runTime) - if run.HasFatalErrors() { - // This will also log at error level in OCR if it fails Observe so the - // level is appropriate - l = l.With("run.FatalErrors", run.FatalErrors) - l.Debugw("Completed pipeline run with fatal errors") - } else if run.HasErrors() { - l = l.With("run.AllErrors", run.AllErrors) - l.Debugw("Completed pipeline run with errors") - } else { + if run.HasFatalErrors() || run.HasErrors() { + var errorsWithID []string + for _, taskRun := range run.PipelineTaskRuns { + if taskRun.Error.Valid { + err := fmt.Sprintf("%s(%s); %s", taskRun.DotID, taskRun.Type, taskRun.Error.ValueOrZero()) + errorsWithID = append(errorsWithID, err) + } + } + l = l.With("run.Errors", errorsWithID) + if run.HasFatalErrors() { + l = l.With("run.FatalErrors", run.FatalErrors) + l.Debugw("Completed pipeline run with fatal errors") + } else if run.HasErrors() { + l = l.With("run.AllErrors", run.AllErrors) + l.Debugw("Completed pipeline run with errors") + } + } else if r.config.VerboseLogging() { l.Debugw("Completed pipeline run successfully") } diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index db0fe90796b..8008fc4fd9e 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -39,6 +39,7 @@ import ( txm "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" + coreconfig "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" "github.com/smartcontractkit/chainlink/v2/core/services/llo" "github.com/smartcontractkit/chainlink/v2/core/services/llo/bm" @@ -149,7 +150,7 @@ type Relayer struct { // Mercury mercuryORM mercury.ORM - transmitterCfg mercury.TransmitterConfig + mercuryCfg MercuryConfig triggerCapability *triggers.MercuryTriggerService // LLO/data streams @@ -162,14 +163,19 @@ type CSAETHKeystore interface { Eth() keystore.Eth } +type MercuryConfig interface { + Transmitter() coreconfig.MercuryTransmitter + VerboseLogging() bool +} + type RelayerOpts struct { DS sqlutil.DataSource CSAETHKeystore MercuryPool wsrpc.Pool RetirementReportCache llo.RetirementReportCache - TransmitterConfig mercury.TransmitterConfig - CapabilitiesRegistry coretypes.CapabilitiesRegistry - HTTPClient *http.Client + MercuryConfig + CapabilitiesRegistry coretypes.CapabilitiesRegistry + HTTPClient *http.Client } func (c RelayerOpts) Validate() error { @@ -213,7 +219,7 @@ func NewRelayer(ctx context.Context, lggr logger.Logger, chain legacyevm.Chain, cdcFactory: cdcFactory, retirementReportCache: opts.RetirementReportCache, mercuryORM: mercuryORM, - transmitterCfg: opts.TransmitterConfig, + mercuryCfg: opts.MercuryConfig, capabilitiesRegistry: opts.CapabilitiesRegistry, } @@ -484,7 +490,7 @@ func (r *Relayer) NewMercuryProvider(ctx context.Context, rargs commontypes.Rela return nil, err } - transmitter := mercury.NewTransmitter(lggr, r.transmitterCfg, clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec, benchmarkPriceDecoder, r.triggerCapability) + transmitter := mercury.NewTransmitter(lggr, r.mercuryCfg.Transmitter(), clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec, benchmarkPriceDecoder, r.triggerCapability) return NewMercuryProvider(cp, r.codec, NewMercuryChainReader(r.chain.HeadTracker()), transmitter, reportCodecV1, reportCodecV2, reportCodecV3, reportCodecV4, lggr), nil } @@ -552,15 +558,17 @@ func (r *Relayer) NewLLOProvider(ctx context.Context, rargs commontypes.RelayArg clients[server.URL] = client } transmitter = llo.NewTransmitter(llo.TransmitterOpts{ - Lggr: r.lggr, - FromAccount: fmt.Sprintf("%x", privKey.PublicKey), // NOTE: This may need to change if we support e.g. multiple tranmsmitters, to be a composite of all keys + Lggr: r.lggr, + FromAccount: fmt.Sprintf("%x", privKey.PublicKey), // NOTE: This may need to change if we support e.g. multiple tranmsmitters, to be a composite of all keys + VerboseLogging: r.mercuryCfg.VerboseLogging(), MercuryTransmitterOpts: mercurytransmitter.Opts{ - Lggr: r.lggr, - Cfg: r.transmitterCfg, - Clients: clients, - FromAccount: privKey.PublicKey, - DonID: relayConfig.LLODONID, - ORM: mercurytransmitter.NewORM(r.ds, relayConfig.LLODONID), + Lggr: r.lggr, + VerboseLogging: r.mercuryCfg.VerboseLogging(), + Cfg: r.mercuryCfg.Transmitter(), + Clients: clients, + FromAccount: privKey.PublicKey, + DonID: relayConfig.LLODONID, + ORM: mercurytransmitter.NewORM(r.ds, relayConfig.LLODONID), }, RetirementReportCache: r.retirementReportCache, }) diff --git a/core/web/resolver/testdata/config-empty-effective.toml b/core/web/resolver/testdata/config-empty-effective.toml index cd51afac5f8..0f26b02ab6f 100644 --- a/core/web/resolver/testdata/config-empty-effective.toml +++ b/core/web/resolver/testdata/config-empty-effective.toml @@ -237,6 +237,7 @@ CertFile = '' [Mercury.Transmitter] TransmitQueueMaxSize = 10000 TransmitTimeout = '5s' +TransmitConcurrency = 100 [Capabilities] [Capabilities.Peering] diff --git a/core/web/resolver/testdata/config-full.toml b/core/web/resolver/testdata/config-full.toml index bfb0dcb9961..113d319e3c5 100644 --- a/core/web/resolver/testdata/config-full.toml +++ b/core/web/resolver/testdata/config-full.toml @@ -247,6 +247,7 @@ CertFile = '' [Mercury.Transmitter] TransmitQueueMaxSize = 123 TransmitTimeout = '3m54s' +TransmitConcurrency = 456 [Capabilities] [Capabilities.Peering] diff --git a/core/web/resolver/testdata/config-multi-chain-effective.toml b/core/web/resolver/testdata/config-multi-chain-effective.toml index 074cb82482b..cb2884fde8f 100644 --- a/core/web/resolver/testdata/config-multi-chain-effective.toml +++ b/core/web/resolver/testdata/config-multi-chain-effective.toml @@ -237,6 +237,7 @@ CertFile = '' [Mercury.Transmitter] TransmitQueueMaxSize = 10000 TransmitTimeout = '5s' +TransmitConcurrency = 100 [Capabilities] [Capabilities.Peering] diff --git a/deployment/go.mod b/deployment/go.mod index 2c6bf78b0d1..6320167168b 100644 --- a/deployment/go.mod +++ b/deployment/go.mod @@ -402,7 +402,7 @@ require ( github.com/sirupsen/logrus v1.9.3 // indirect github.com/smartcontractkit/chainlink-automation v0.8.1 // indirect github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f // indirect - github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e // indirect + github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241114154055-8d29ea018b57 // indirect github.com/smartcontractkit/chainlink-feeds v0.1.1 // indirect github.com/smartcontractkit/chainlink-protos/orchestrator v0.3.0 // indirect github.com/smartcontractkit/chainlink-solana v1.1.1-0.20241115191142-8b8369c1f44e // indirect diff --git a/deployment/go.sum b/deployment/go.sum index d265b8935fd..65db9f77800 100644 --- a/deployment/go.sum +++ b/deployment/go.sum @@ -1388,8 +1388,8 @@ github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef06 github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068/go.mod h1:ny87uTW6hLjCTLiBqBRNFEhETSXhHWevYlPclT5lSco= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo= -github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg= -github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e/go.mod h1:iK3BNHKCLgSgkOyiu3iE7sfZ20Qnuk7xwjV/yO/6gnQ= +github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241114154055-8d29ea018b57 h1:1BMTG66HnCIz+KMBWGvyzELNM6VHGwv2WKFhN7H49Sg= +github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241114154055-8d29ea018b57/go.mod h1:QPiorgpbLv4+Jn4YO6xxU4ftTu4T3QN8HwX3ImP59DE= github.com/smartcontractkit/chainlink-feeds v0.1.1 h1:JzvUOM/OgGQA1sOqTXXl52R6AnNt+Wg64sVG+XSA49c= github.com/smartcontractkit/chainlink-feeds v0.1.1/go.mod h1:55EZ94HlKCfAsUiKUTNI7QlE/3d3IwTlsU3YNa/nBb4= github.com/smartcontractkit/chainlink-protos/job-distributor v0.6.0 h1:0ewLMbAz3rZrovdRUCgd028yOXX8KigB4FndAUdI2kM= diff --git a/docs/CONFIG.md b/docs/CONFIG.md index ff918468c07..20965d816ec 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -1875,6 +1875,7 @@ CertFile is the path to a PEM file of trusted root certificate authority certifi [Mercury.Transmitter] TransmitQueueMaxSize = 10_000 # Default TransmitTimeout = "5s" # Default +TransmitConcurrency = 100 # Default ``` Mercury.Transmitter controls settings for the mercury transmitter @@ -1897,6 +1898,14 @@ TransmitTimeout controls how long the transmitter will wait for a response when sending a message to the mercury server, before aborting and considering the transmission to be failed. +### TransmitConcurrency +```toml +TransmitConcurrency = 100 # Default +``` +TransmitConcurrency is the max number of concurrent transmits to each server. + +Only has effect with LLO jobs. + ## Telemetry ```toml [Telemetry] diff --git a/go.mod b/go.mod index e6039802f93..23db187326b 100644 --- a/go.mod +++ b/go.mod @@ -79,7 +79,7 @@ require ( github.com/smartcontractkit/chainlink-ccip v0.0.0-20241118091009-43c2b4804cec github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068 github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f - github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e + github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241114154055-8d29ea018b57 github.com/smartcontractkit/chainlink-feeds v0.1.1 github.com/smartcontractkit/chainlink-protos/orchestrator v0.3.0 github.com/smartcontractkit/chainlink-solana v1.1.1-0.20241115191142-8b8369c1f44e diff --git a/go.sum b/go.sum index 270966b2a91..386ab51a211 100644 --- a/go.sum +++ b/go.sum @@ -1082,8 +1082,8 @@ github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef06 github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068/go.mod h1:ny87uTW6hLjCTLiBqBRNFEhETSXhHWevYlPclT5lSco= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo= -github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg= -github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e/go.mod h1:iK3BNHKCLgSgkOyiu3iE7sfZ20Qnuk7xwjV/yO/6gnQ= +github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241114154055-8d29ea018b57 h1:1BMTG66HnCIz+KMBWGvyzELNM6VHGwv2WKFhN7H49Sg= +github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241114154055-8d29ea018b57/go.mod h1:QPiorgpbLv4+Jn4YO6xxU4ftTu4T3QN8HwX3ImP59DE= github.com/smartcontractkit/chainlink-feeds v0.1.1 h1:JzvUOM/OgGQA1sOqTXXl52R6AnNt+Wg64sVG+XSA49c= github.com/smartcontractkit/chainlink-feeds v0.1.1/go.mod h1:55EZ94HlKCfAsUiKUTNI7QlE/3d3IwTlsU3YNa/nBb4= github.com/smartcontractkit/chainlink-protos/orchestrator v0.3.0 h1:PBUaFfPLm+Efq7H9kdfGBivH+QhJ6vB5EZTR/sCZsxI= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index f3440a4db55..ddcd3ae65fa 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -415,7 +415,7 @@ require ( github.com/sirupsen/logrus v1.9.3 // indirect github.com/smartcontractkit/ccip-owner-contracts v0.0.0-20240926212305-a6deabdfce86 // indirect github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f // indirect - github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e // indirect + github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241114154055-8d29ea018b57 // indirect github.com/smartcontractkit/chainlink-feeds v0.1.1 // indirect github.com/smartcontractkit/chainlink-protos/orchestrator v0.3.0 // indirect github.com/smartcontractkit/chainlink-solana v1.1.1-0.20241115191142-8b8369c1f44e // indirect diff --git a/integration-tests/go.sum b/integration-tests/go.sum index a8ff6db2b20..8e3869244d8 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1409,8 +1409,8 @@ github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef06 github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068/go.mod h1:ny87uTW6hLjCTLiBqBRNFEhETSXhHWevYlPclT5lSco= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo= -github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg= -github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e/go.mod h1:iK3BNHKCLgSgkOyiu3iE7sfZ20Qnuk7xwjV/yO/6gnQ= +github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241114154055-8d29ea018b57 h1:1BMTG66HnCIz+KMBWGvyzELNM6VHGwv2WKFhN7H49Sg= +github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241114154055-8d29ea018b57/go.mod h1:QPiorgpbLv4+Jn4YO6xxU4ftTu4T3QN8HwX3ImP59DE= github.com/smartcontractkit/chainlink-feeds v0.1.1 h1:JzvUOM/OgGQA1sOqTXXl52R6AnNt+Wg64sVG+XSA49c= github.com/smartcontractkit/chainlink-feeds v0.1.1/go.mod h1:55EZ94HlKCfAsUiKUTNI7QlE/3d3IwTlsU3YNa/nBb4= github.com/smartcontractkit/chainlink-protos/job-distributor v0.6.0 h1:0ewLMbAz3rZrovdRUCgd028yOXX8KigB4FndAUdI2kM= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index c04b84488ca..c548b6d9159 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -422,7 +422,7 @@ require ( github.com/sirupsen/logrus v1.9.3 // indirect github.com/smartcontractkit/chain-selectors v1.0.29 // indirect github.com/smartcontractkit/chainlink-automation v0.8.1 // indirect - github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e // indirect + github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241114154055-8d29ea018b57 // indirect github.com/smartcontractkit/chainlink-feeds v0.1.1 // indirect github.com/smartcontractkit/chainlink-protos/orchestrator v0.3.0 // indirect github.com/smartcontractkit/chainlink-solana v1.1.1-0.20241115191142-8b8369c1f44e // indirect diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index 8504aecfe6e..ed0224adc21 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -1398,8 +1398,8 @@ github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef06 github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068/go.mod h1:ny87uTW6hLjCTLiBqBRNFEhETSXhHWevYlPclT5lSco= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo= -github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg= -github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e/go.mod h1:iK3BNHKCLgSgkOyiu3iE7sfZ20Qnuk7xwjV/yO/6gnQ= +github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241114154055-8d29ea018b57 h1:1BMTG66HnCIz+KMBWGvyzELNM6VHGwv2WKFhN7H49Sg= +github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241114154055-8d29ea018b57/go.mod h1:QPiorgpbLv4+Jn4YO6xxU4ftTu4T3QN8HwX3ImP59DE= github.com/smartcontractkit/chainlink-feeds v0.1.1 h1:JzvUOM/OgGQA1sOqTXXl52R6AnNt+Wg64sVG+XSA49c= github.com/smartcontractkit/chainlink-feeds v0.1.1/go.mod h1:55EZ94HlKCfAsUiKUTNI7QlE/3d3IwTlsU3YNa/nBb4= github.com/smartcontractkit/chainlink-protos/orchestrator v0.3.0 h1:PBUaFfPLm+Efq7H9kdfGBivH+QhJ6vB5EZTR/sCZsxI= diff --git a/testdata/scripts/config/merge_raw_configs.txtar b/testdata/scripts/config/merge_raw_configs.txtar index b3d50f22b36..bf0da942eea 100644 --- a/testdata/scripts/config/merge_raw_configs.txtar +++ b/testdata/scripts/config/merge_raw_configs.txtar @@ -384,6 +384,7 @@ CertFile = '' [Mercury.Transmitter] TransmitQueueMaxSize = 10000 TransmitTimeout = '5s' +TransmitConcurrency = 100 [Capabilities] [Capabilities.Peering] diff --git a/testdata/scripts/node/validate/default.txtar b/testdata/scripts/node/validate/default.txtar index 5e8b847ceda..51edf69d599 100644 --- a/testdata/scripts/node/validate/default.txtar +++ b/testdata/scripts/node/validate/default.txtar @@ -249,6 +249,7 @@ CertFile = '' [Mercury.Transmitter] TransmitQueueMaxSize = 10000 TransmitTimeout = '5s' +TransmitConcurrency = 100 [Capabilities] [Capabilities.Peering] diff --git a/testdata/scripts/node/validate/defaults-override.txtar b/testdata/scripts/node/validate/defaults-override.txtar index bf8bece28bf..19bae4bec1a 100644 --- a/testdata/scripts/node/validate/defaults-override.txtar +++ b/testdata/scripts/node/validate/defaults-override.txtar @@ -310,6 +310,7 @@ CertFile = '' [Mercury.Transmitter] TransmitQueueMaxSize = 10000 TransmitTimeout = '5s' +TransmitConcurrency = 100 [Capabilities] [Capabilities.Peering] diff --git a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar index 2e72ed7e9bb..ddd01a4c1e4 100644 --- a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar @@ -293,6 +293,7 @@ CertFile = '' [Mercury.Transmitter] TransmitQueueMaxSize = 10000 TransmitTimeout = '5s' +TransmitConcurrency = 100 [Capabilities] [Capabilities.Peering] diff --git a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar index 7b27328f7a6..0f40ad6a208 100644 --- a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar @@ -293,6 +293,7 @@ CertFile = '' [Mercury.Transmitter] TransmitQueueMaxSize = 10000 TransmitTimeout = '5s' +TransmitConcurrency = 100 [Capabilities] [Capabilities.Peering] diff --git a/testdata/scripts/node/validate/disk-based-logging.txtar b/testdata/scripts/node/validate/disk-based-logging.txtar index 83d23546175..dd7455ca3a8 100644 --- a/testdata/scripts/node/validate/disk-based-logging.txtar +++ b/testdata/scripts/node/validate/disk-based-logging.txtar @@ -293,6 +293,7 @@ CertFile = '' [Mercury.Transmitter] TransmitQueueMaxSize = 10000 TransmitTimeout = '5s' +TransmitConcurrency = 100 [Capabilities] [Capabilities.Peering] diff --git a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar index 3fccffc4e69..1ffe2ab718c 100644 --- a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar +++ b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar @@ -278,6 +278,7 @@ CertFile = '' [Mercury.Transmitter] TransmitQueueMaxSize = 10000 TransmitTimeout = '5s' +TransmitConcurrency = 100 [Capabilities] [Capabilities.Peering] diff --git a/testdata/scripts/node/validate/invalid.txtar b/testdata/scripts/node/validate/invalid.txtar index 5ea0aa289a8..52edd2b8065 100644 --- a/testdata/scripts/node/validate/invalid.txtar +++ b/testdata/scripts/node/validate/invalid.txtar @@ -283,6 +283,7 @@ CertFile = '' [Mercury.Transmitter] TransmitQueueMaxSize = 10000 TransmitTimeout = '5s' +TransmitConcurrency = 100 [Capabilities] [Capabilities.Peering] diff --git a/testdata/scripts/node/validate/valid.txtar b/testdata/scripts/node/validate/valid.txtar index 26641c0ef76..623459ce253 100644 --- a/testdata/scripts/node/validate/valid.txtar +++ b/testdata/scripts/node/validate/valid.txtar @@ -290,6 +290,7 @@ CertFile = '' [Mercury.Transmitter] TransmitQueueMaxSize = 10000 TransmitTimeout = '5s' +TransmitConcurrency = 100 [Capabilities] [Capabilities.Peering] diff --git a/testdata/scripts/node/validate/warnings.txtar b/testdata/scripts/node/validate/warnings.txtar index 51b3e897741..5452c49f122 100644 --- a/testdata/scripts/node/validate/warnings.txtar +++ b/testdata/scripts/node/validate/warnings.txtar @@ -272,6 +272,7 @@ CertFile = '' [Mercury.Transmitter] TransmitQueueMaxSize = 10000 TransmitTimeout = '5s' +TransmitConcurrency = 100 [Capabilities] [Capabilities.Peering]