From 995f4d12c16ddf97ea400deaf5673ac2522a9e6b Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Wed, 13 Nov 2024 17:25:46 -0500 Subject: [PATCH] Fix panic --- core/services/chainlink/relayer_factory.go | 4 +-- .../llo/evm/report_codec_premium_legacy.go | 8 +++-- .../services/llo/mercurytransmitter/server.go | 12 ++++--- .../llo/mercurytransmitter/transmitter.go | 29 +++++++++------ core/services/llo/transmitter.go | 12 ++++--- core/services/relay/evm/evm.go | 35 +++++++++++-------- 6 files changed, 62 insertions(+), 38 deletions(-) diff --git a/core/services/chainlink/relayer_factory.go b/core/services/chainlink/relayer_factory.go index 3740878fd19..cec7e5bb48c 100644 --- a/core/services/chainlink/relayer_factory.go +++ b/core/services/chainlink/relayer_factory.go @@ -54,7 +54,7 @@ func (r *RelayerFactory) NewDummy(config DummyFactoryConfig) (loop.Relayer, erro type EVMFactoryConfig struct { legacyevm.ChainOpts evmrelay.CSAETHKeystore - coreconfig.MercuryTransmitter + MercuryConfig coreconfig.Mercury } func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (map[types.RelayID]evmrelay.LOOPRelayAdapter, error) { @@ -83,7 +83,7 @@ func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (m DS: ccOpts.DS, CSAETHKeystore: config.CSAETHKeystore, MercuryPool: r.MercuryPool, - TransmitterConfig: config.MercuryTransmitter, + MercuryConfig: config.MercuryConfig, CapabilitiesRegistry: r.CapabilitiesRegistry, HTTPClient: r.HTTPClient, RetirementReportCache: r.RetirementReportCache, diff --git a/core/services/llo/evm/report_codec_premium_legacy.go b/core/services/llo/evm/report_codec_premium_legacy.go index 527b08b945e..b2236d2a85c 100644 --- a/core/services/llo/evm/report_codec_premium_legacy.go +++ b/core/services/llo/evm/report_codec_premium_legacy.go @@ -17,8 +17,8 @@ import ( v3 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3" "github.com/smartcontractkit/chainlink-data-streams/llo" + "github.com/smartcontractkit/chainlink-common/pkg/logger" ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" - "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury" reportcodecv3 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3/reportcodec" reporttypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3/types" @@ -28,10 +28,12 @@ var ( _ llo.ReportCodec = ReportCodecPremiumLegacy{} ) -type ReportCodecPremiumLegacy struct{ logger.Logger } +type ReportCodecPremiumLegacy struct { + logger.Logger +} func NewReportCodecPremiumLegacy(lggr logger.Logger) llo.ReportCodec { - return ReportCodecPremiumLegacy{lggr.Named("ReportCodecPremiumLegacy")} + return ReportCodecPremiumLegacy{logger.Sugared(lggr).Named("ReportCodecPremiumLegacy")} } type ReportFormatEVMPremiumLegacyOpts struct { diff --git a/core/services/llo/mercurytransmitter/server.go b/core/services/llo/mercurytransmitter/server.go index 3b7c7479a4b..be9f5cffa22 100644 --- a/core/services/llo/mercurytransmitter/server.go +++ b/core/services/llo/mercurytransmitter/server.go @@ -52,7 +52,8 @@ var ( // A server handles the queue for a given mercury server type server struct { - lggr logger.SugaredLogger + lggr logger.SugaredLogger + verboseLogging bool transmitTimeout time.Duration @@ -64,6 +65,8 @@ type server struct { url string + reportCodecPremiumLegacy llo.ReportCodec + transmitSuccessCount prometheus.Counter transmitDuplicateCount prometheus.Counter transmitConnectionErrorCount prometheus.Counter @@ -77,17 +80,19 @@ type QueueConfig interface { TransmitTimeout() commonconfig.Duration } -func newServer(lggr logger.Logger, cfg QueueConfig, client wsrpc.Client, orm ORM, serverURL string) *server { +func newServer(lggr logger.Logger, verboseLogging bool, cfg QueueConfig, client wsrpc.Client, orm ORM, serverURL string) *server { pm := NewPersistenceManager(lggr, orm, serverURL, int(cfg.TransmitQueueMaxSize()), flushDeletesFrequency, pruneFrequency) donIDStr := fmt.Sprintf("%d", pm.DonID()) return &server{ logger.Sugared(lggr), + verboseLogging, cfg.TransmitTimeout().Duration(), client, pm, NewTransmitQueue(lggr, serverURL, int(cfg.TransmitQueueMaxSize()), pm), make(chan [32]byte, int(cfg.TransmitQueueMaxSize())), serverURL, + evm.NewReportCodecPremiumLegacy(lggr), transmitSuccessCount.WithLabelValues(donIDStr, serverURL), transmitDuplicateCount.WithLabelValues(donIDStr, serverURL), transmitConnectionErrorCount.WithLabelValues(donIDStr, serverURL), @@ -190,7 +195,6 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, donI b.Reset() if res.Error == "" { s.transmitSuccessCount.Inc() - // TODO: This should be behind a "VerboseLogging" flag s.lggr.Debugw("Transmit report success", "req", fmt.Sprintf("0x%x", req), "transmission", t, "response", res) } else { // We don't need to retry here because the mercury server @@ -223,7 +227,7 @@ func (s *server) transmit(ctx context.Context, t *Transmission) (*pb.TransmitReq case llotypes.ReportFormatJSON: payload, err = llo.JSONReportCodec{}.Pack(t.ConfigDigest, t.SeqNr, t.Report.Report, t.Sigs) case llotypes.ReportFormatEVMPremiumLegacy: - payload, err = evm.ReportCodecPremiumLegacy{}.Pack(t.ConfigDigest, t.SeqNr, t.Report.Report, t.Sigs) + payload, err = s.reportCodecPremiumLegacy.Pack(t.ConfigDigest, t.SeqNr, t.Report.Report, t.Sigs) default: return nil, nil, fmt.Errorf("Transmit failed; unsupported report format: %q", t.Report.Info.ReportFormat) } diff --git a/core/services/llo/mercurytransmitter/transmitter.go b/core/services/llo/mercurytransmitter/transmitter.go index 33090ed9574..841eacddf96 100644 --- a/core/services/llo/mercurytransmitter/transmitter.go +++ b/core/services/llo/mercurytransmitter/transmitter.go @@ -101,8 +101,9 @@ type Config interface { type transmitter struct { services.StateMachine - lggr logger.SugaredLogger - cfg Config + lggr logger.SugaredLogger + verboseLogging bool + cfg Config orm ORM servers map[string]*server @@ -115,12 +116,13 @@ type transmitter struct { } type Opts struct { - Lggr logger.Logger - Cfg Config - Clients map[string]wsrpc.Client - FromAccount ed25519.PublicKey - DonID uint32 - ORM ORM + Lggr logger.Logger + VerboseLogging bool + Cfg Config + Clients map[string]wsrpc.Client + FromAccount ed25519.PublicKey + DonID uint32 + ORM ORM } func New(opts Opts) Transmitter { @@ -132,11 +134,12 @@ func newTransmitter(opts Opts) *transmitter { servers := make(map[string]*server, len(opts.Clients)) for serverURL, client := range opts.Clients { sLggr := sugared.Named(serverURL).With("serverURL", serverURL) - servers[serverURL] = newServer(sLggr, opts.Cfg, client, opts.ORM, serverURL) + servers[serverURL] = newServer(sLggr, opts.VerboseLogging, opts.Cfg, client, opts.ORM, serverURL) } return &transmitter{ services.StateMachine{}, sugared.Named("LLOMercuryTransmitter").With("donID", opts.ORM.DonID()), + opts.VerboseLogging, opts.Cfg, opts.ORM, servers, @@ -149,7 +152,9 @@ func newTransmitter(opts Opts) *transmitter { func (mt *transmitter) Start(ctx context.Context) (err error) { return mt.StartOnce("LLOMercuryTransmitter", func() error { - mt.lggr.Debugw("Loading transmit requests from database") + if mt.verboseLogging { + mt.lggr.Debugw("Loading transmit requests from database") + } { var startClosers []services.StartClose @@ -234,7 +239,9 @@ func (mt *transmitter) Transmit( g := new(errgroup.Group) for i := range transmissions { t := transmissions[i] - mt.lggr.Debugw("LLOMercuryTransmit", "digest", digest.Hex(), "seqNr", seqNr, "reportFormat", report.Info.ReportFormat, "reportLifeCycleStage", report.Info.LifeCycleStage, "transmissionHash", fmt.Sprintf("%x", t.Hash())) + if mt.verboseLogging { + mt.lggr.Debugw("LLOMercuryTransmit", "digest", digest.Hex(), "seqNr", seqNr, "reportFormat", report.Info.ReportFormat, "reportLifeCycleStage", report.Info.LifeCycleStage, "transmissionHash", fmt.Sprintf("%x", t.Hash())) + } g.Go(func() error { s := mt.servers[t.ServerURL] if ok := s.q.Push(t); !ok { diff --git a/core/services/llo/transmitter.go b/core/services/llo/transmitter.go index 8066ac2df0d..1ff5c1b36ac 100644 --- a/core/services/llo/transmitter.go +++ b/core/services/llo/transmitter.go @@ -47,8 +47,9 @@ type TransmitterRetirementReportCacheWriter interface { type transmitter struct { services.StateMachine - lggr logger.Logger - fromAccount string + lggr logger.Logger + verboseLogging bool + fromAccount string subTransmitters []Transmitter retirementReportCache TransmitterRetirementReportCacheWriter @@ -56,6 +57,7 @@ type transmitter struct { type TransmitterOpts struct { Lggr logger.Logger + VerboseLogging bool FromAccount string MercuryTransmitterOpts mercurytransmitter.Opts RetirementReportCache TransmitterRetirementReportCacheWriter @@ -69,6 +71,7 @@ func NewTransmitter(opts TransmitterOpts) Transmitter { return &transmitter{ services.StateMachine{}, opts.Lggr, + opts.VerboseLogging, opts.FromAccount, subTransmitters, opts.RetirementReportCache, @@ -114,8 +117,9 @@ func (t *transmitter) Transmit( report ocr3types.ReportWithInfo[llotypes.ReportInfo], sigs []types.AttributedOnchainSignature, ) (err error) { - // TODO: This should be behind a "VerboseLogging" flag - t.lggr.Debugw("Transmit report", "digest", digest, "seqNr", seqNr, "report", report, "sigs", sigs) + if t.verboseLogging { + t.lggr.Debugw("Transmit report", "digest", digest, "seqNr", seqNr, "report", report, "sigs", sigs) + } if report.Info.ReportFormat == llotypes.ReportFormatRetirement { // Retirement reports don't get transmitted; rather, they are stored in diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 7c070cc282d..fcf03cd8138 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -149,7 +149,7 @@ type Relayer struct { // Mercury mercuryORM mercury.ORM - transmitterCfg mercury.TransmitterConfig + mercuryCfg MercuryConfig triggerCapability *triggers.MercuryTriggerService // LLO/data streams @@ -162,14 +162,19 @@ type CSAETHKeystore interface { Eth() keystore.Eth } +type MercuryConfig interface { + Transmitter() mercury.TransmitterConfig + VerboseLogging() bool +} + type RelayerOpts struct { DS sqlutil.DataSource CSAETHKeystore MercuryPool wsrpc.Pool RetirementReportCache llo.RetirementReportCache - TransmitterConfig mercury.TransmitterConfig - CapabilitiesRegistry coretypes.CapabilitiesRegistry - HTTPClient *http.Client + MercuryConfig + CapabilitiesRegistry coretypes.CapabilitiesRegistry + HTTPClient *http.Client } func (c RelayerOpts) Validate() error { @@ -213,7 +218,7 @@ func NewRelayer(ctx context.Context, lggr logger.Logger, chain legacyevm.Chain, cdcFactory: cdcFactory, retirementReportCache: opts.RetirementReportCache, mercuryORM: mercuryORM, - transmitterCfg: opts.TransmitterConfig, + mercuryCfg: opts.MercuryConfig, capabilitiesRegistry: opts.CapabilitiesRegistry, } @@ -484,7 +489,7 @@ func (r *Relayer) NewMercuryProvider(ctx context.Context, rargs commontypes.Rela return nil, err } - transmitter := mercury.NewTransmitter(lggr, r.transmitterCfg, clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec, benchmarkPriceDecoder, r.triggerCapability) + transmitter := mercury.NewTransmitter(lggr, r.mercuryCfg.Transmitter(), clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec, benchmarkPriceDecoder, r.triggerCapability) return NewMercuryProvider(cp, r.codec, NewMercuryChainReader(r.chain.HeadTracker()), transmitter, reportCodecV1, reportCodecV2, reportCodecV3, reportCodecV4, lggr), nil } @@ -552,15 +557,17 @@ func (r *Relayer) NewLLOProvider(ctx context.Context, rargs commontypes.RelayArg clients[server.URL] = client } transmitter = llo.NewTransmitter(llo.TransmitterOpts{ - Lggr: r.lggr, - FromAccount: fmt.Sprintf("%x", privKey.PublicKey), // NOTE: This may need to change if we support e.g. multiple tranmsmitters, to be a composite of all keys + Lggr: r.lggr, + FromAccount: fmt.Sprintf("%x", privKey.PublicKey), // NOTE: This may need to change if we support e.g. multiple tranmsmitters, to be a composite of all keys + VerboseLogging: r.mercuryCfg.VerboseLogging(), MercuryTransmitterOpts: mercurytransmitter.Opts{ - Lggr: r.lggr, - Cfg: r.transmitterCfg, - Clients: clients, - FromAccount: privKey.PublicKey, - DonID: relayConfig.LLODONID, - ORM: mercurytransmitter.NewORM(r.ds, relayConfig.LLODONID), + Lggr: r.lggr, + VerboseLogging: r.mercuryCfg.VerboseLogging(), + Cfg: r.mercuryCfg.Transmitter(), + Clients: clients, + FromAccount: privKey.PublicKey, + DonID: relayConfig.LLODONID, + ORM: mercurytransmitter.NewORM(r.ds, relayConfig.LLODONID), }, RetirementReportCache: r.retirementReportCache, })