Skip to content

Commit

Permalink
Fix panic
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Nov 13, 2024
1 parent c005634 commit 995f4d1
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 38 deletions.
4 changes: 2 additions & 2 deletions core/services/chainlink/relayer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (r *RelayerFactory) NewDummy(config DummyFactoryConfig) (loop.Relayer, erro
type EVMFactoryConfig struct {
legacyevm.ChainOpts
evmrelay.CSAETHKeystore
coreconfig.MercuryTransmitter
MercuryConfig coreconfig.Mercury
}

func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (map[types.RelayID]evmrelay.LOOPRelayAdapter, error) {
Expand Down Expand Up @@ -83,7 +83,7 @@ func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (m
DS: ccOpts.DS,
CSAETHKeystore: config.CSAETHKeystore,
MercuryPool: r.MercuryPool,
TransmitterConfig: config.MercuryTransmitter,
MercuryConfig: config.MercuryConfig,
CapabilitiesRegistry: r.CapabilitiesRegistry,
HTTPClient: r.HTTPClient,
RetirementReportCache: r.RetirementReportCache,
Expand Down
8 changes: 5 additions & 3 deletions core/services/llo/evm/report_codec_premium_legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
v3 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3"
"github.com/smartcontractkit/chainlink-data-streams/llo"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury"
reportcodecv3 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3/reportcodec"
reporttypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3/types"
Expand All @@ -28,10 +28,12 @@ var (
_ llo.ReportCodec = ReportCodecPremiumLegacy{}
)

type ReportCodecPremiumLegacy struct{ logger.Logger }
type ReportCodecPremiumLegacy struct {
logger.Logger
}

func NewReportCodecPremiumLegacy(lggr logger.Logger) llo.ReportCodec {
return ReportCodecPremiumLegacy{lggr.Named("ReportCodecPremiumLegacy")}
return ReportCodecPremiumLegacy{logger.Sugared(lggr).Named("ReportCodecPremiumLegacy")}
}

type ReportFormatEVMPremiumLegacyOpts struct {
Expand Down
12 changes: 8 additions & 4 deletions core/services/llo/mercurytransmitter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ var (
// A server handles the queue for a given mercury server

type server struct {
lggr logger.SugaredLogger
lggr logger.SugaredLogger
verboseLogging bool

transmitTimeout time.Duration

Expand All @@ -64,6 +65,8 @@ type server struct {

url string

reportCodecPremiumLegacy llo.ReportCodec

transmitSuccessCount prometheus.Counter
transmitDuplicateCount prometheus.Counter
transmitConnectionErrorCount prometheus.Counter
Expand All @@ -77,17 +80,19 @@ type QueueConfig interface {
TransmitTimeout() commonconfig.Duration
}

func newServer(lggr logger.Logger, cfg QueueConfig, client wsrpc.Client, orm ORM, serverURL string) *server {
func newServer(lggr logger.Logger, verboseLogging bool, cfg QueueConfig, client wsrpc.Client, orm ORM, serverURL string) *server {
pm := NewPersistenceManager(lggr, orm, serverURL, int(cfg.TransmitQueueMaxSize()), flushDeletesFrequency, pruneFrequency)
donIDStr := fmt.Sprintf("%d", pm.DonID())
return &server{
logger.Sugared(lggr),
verboseLogging,
cfg.TransmitTimeout().Duration(),
client,
pm,
NewTransmitQueue(lggr, serverURL, int(cfg.TransmitQueueMaxSize()), pm),
make(chan [32]byte, int(cfg.TransmitQueueMaxSize())),
serverURL,
evm.NewReportCodecPremiumLegacy(lggr),
transmitSuccessCount.WithLabelValues(donIDStr, serverURL),
transmitDuplicateCount.WithLabelValues(donIDStr, serverURL),
transmitConnectionErrorCount.WithLabelValues(donIDStr, serverURL),
Expand Down Expand Up @@ -190,7 +195,6 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, donI
b.Reset()
if res.Error == "" {
s.transmitSuccessCount.Inc()
// TODO: This should be behind a "VerboseLogging" flag
s.lggr.Debugw("Transmit report success", "req", fmt.Sprintf("0x%x", req), "transmission", t, "response", res)
} else {
// We don't need to retry here because the mercury server
Expand Down Expand Up @@ -223,7 +227,7 @@ func (s *server) transmit(ctx context.Context, t *Transmission) (*pb.TransmitReq
case llotypes.ReportFormatJSON:
payload, err = llo.JSONReportCodec{}.Pack(t.ConfigDigest, t.SeqNr, t.Report.Report, t.Sigs)
case llotypes.ReportFormatEVMPremiumLegacy:
payload, err = evm.ReportCodecPremiumLegacy{}.Pack(t.ConfigDigest, t.SeqNr, t.Report.Report, t.Sigs)
payload, err = s.reportCodecPremiumLegacy.Pack(t.ConfigDigest, t.SeqNr, t.Report.Report, t.Sigs)
default:
return nil, nil, fmt.Errorf("Transmit failed; unsupported report format: %q", t.Report.Info.ReportFormat)
}
Expand Down
29 changes: 18 additions & 11 deletions core/services/llo/mercurytransmitter/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ type Config interface {

type transmitter struct {
services.StateMachine
lggr logger.SugaredLogger
cfg Config
lggr logger.SugaredLogger
verboseLogging bool
cfg Config

orm ORM
servers map[string]*server
Expand All @@ -115,12 +116,13 @@ type transmitter struct {
}

type Opts struct {
Lggr logger.Logger
Cfg Config
Clients map[string]wsrpc.Client
FromAccount ed25519.PublicKey
DonID uint32
ORM ORM
Lggr logger.Logger
VerboseLogging bool
Cfg Config
Clients map[string]wsrpc.Client
FromAccount ed25519.PublicKey
DonID uint32
ORM ORM
}

func New(opts Opts) Transmitter {
Expand All @@ -132,11 +134,12 @@ func newTransmitter(opts Opts) *transmitter {
servers := make(map[string]*server, len(opts.Clients))
for serverURL, client := range opts.Clients {
sLggr := sugared.Named(serverURL).With("serverURL", serverURL)
servers[serverURL] = newServer(sLggr, opts.Cfg, client, opts.ORM, serverURL)
servers[serverURL] = newServer(sLggr, opts.VerboseLogging, opts.Cfg, client, opts.ORM, serverURL)
}
return &transmitter{
services.StateMachine{},
sugared.Named("LLOMercuryTransmitter").With("donID", opts.ORM.DonID()),
opts.VerboseLogging,
opts.Cfg,
opts.ORM,
servers,
Expand All @@ -149,7 +152,9 @@ func newTransmitter(opts Opts) *transmitter {

func (mt *transmitter) Start(ctx context.Context) (err error) {
return mt.StartOnce("LLOMercuryTransmitter", func() error {
mt.lggr.Debugw("Loading transmit requests from database")
if mt.verboseLogging {
mt.lggr.Debugw("Loading transmit requests from database")
}

{
var startClosers []services.StartClose
Expand Down Expand Up @@ -234,7 +239,9 @@ func (mt *transmitter) Transmit(
g := new(errgroup.Group)
for i := range transmissions {
t := transmissions[i]
mt.lggr.Debugw("LLOMercuryTransmit", "digest", digest.Hex(), "seqNr", seqNr, "reportFormat", report.Info.ReportFormat, "reportLifeCycleStage", report.Info.LifeCycleStage, "transmissionHash", fmt.Sprintf("%x", t.Hash()))
if mt.verboseLogging {
mt.lggr.Debugw("LLOMercuryTransmit", "digest", digest.Hex(), "seqNr", seqNr, "reportFormat", report.Info.ReportFormat, "reportLifeCycleStage", report.Info.LifeCycleStage, "transmissionHash", fmt.Sprintf("%x", t.Hash()))
}
g.Go(func() error {
s := mt.servers[t.ServerURL]
if ok := s.q.Push(t); !ok {
Expand Down
12 changes: 8 additions & 4 deletions core/services/llo/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,17 @@ type TransmitterRetirementReportCacheWriter interface {

type transmitter struct {
services.StateMachine
lggr logger.Logger
fromAccount string
lggr logger.Logger
verboseLogging bool
fromAccount string

subTransmitters []Transmitter
retirementReportCache TransmitterRetirementReportCacheWriter
}

type TransmitterOpts struct {
Lggr logger.Logger
VerboseLogging bool
FromAccount string
MercuryTransmitterOpts mercurytransmitter.Opts
RetirementReportCache TransmitterRetirementReportCacheWriter
Expand All @@ -69,6 +71,7 @@ func NewTransmitter(opts TransmitterOpts) Transmitter {
return &transmitter{
services.StateMachine{},
opts.Lggr,
opts.VerboseLogging,
opts.FromAccount,
subTransmitters,
opts.RetirementReportCache,
Expand Down Expand Up @@ -114,8 +117,9 @@ func (t *transmitter) Transmit(
report ocr3types.ReportWithInfo[llotypes.ReportInfo],
sigs []types.AttributedOnchainSignature,
) (err error) {
// TODO: This should be behind a "VerboseLogging" flag
t.lggr.Debugw("Transmit report", "digest", digest, "seqNr", seqNr, "report", report, "sigs", sigs)
if t.verboseLogging {
t.lggr.Debugw("Transmit report", "digest", digest, "seqNr", seqNr, "report", report, "sigs", sigs)
}

if report.Info.ReportFormat == llotypes.ReportFormatRetirement {
// Retirement reports don't get transmitted; rather, they are stored in
Expand Down
35 changes: 21 additions & 14 deletions core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ type Relayer struct {

// Mercury
mercuryORM mercury.ORM
transmitterCfg mercury.TransmitterConfig
mercuryCfg MercuryConfig
triggerCapability *triggers.MercuryTriggerService

// LLO/data streams
Expand All @@ -162,14 +162,19 @@ type CSAETHKeystore interface {
Eth() keystore.Eth
}

type MercuryConfig interface {
Transmitter() mercury.TransmitterConfig
VerboseLogging() bool
}

type RelayerOpts struct {
DS sqlutil.DataSource
CSAETHKeystore
MercuryPool wsrpc.Pool
RetirementReportCache llo.RetirementReportCache
TransmitterConfig mercury.TransmitterConfig
CapabilitiesRegistry coretypes.CapabilitiesRegistry
HTTPClient *http.Client
MercuryConfig
CapabilitiesRegistry coretypes.CapabilitiesRegistry
HTTPClient *http.Client
}

func (c RelayerOpts) Validate() error {
Expand Down Expand Up @@ -213,7 +218,7 @@ func NewRelayer(ctx context.Context, lggr logger.Logger, chain legacyevm.Chain,
cdcFactory: cdcFactory,
retirementReportCache: opts.RetirementReportCache,
mercuryORM: mercuryORM,
transmitterCfg: opts.TransmitterConfig,
mercuryCfg: opts.MercuryConfig,
capabilitiesRegistry: opts.CapabilitiesRegistry,
}

Expand Down Expand Up @@ -484,7 +489,7 @@ func (r *Relayer) NewMercuryProvider(ctx context.Context, rargs commontypes.Rela
return nil, err
}

transmitter := mercury.NewTransmitter(lggr, r.transmitterCfg, clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec, benchmarkPriceDecoder, r.triggerCapability)
transmitter := mercury.NewTransmitter(lggr, r.mercuryCfg.Transmitter(), clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec, benchmarkPriceDecoder, r.triggerCapability)

return NewMercuryProvider(cp, r.codec, NewMercuryChainReader(r.chain.HeadTracker()), transmitter, reportCodecV1, reportCodecV2, reportCodecV3, reportCodecV4, lggr), nil
}
Expand Down Expand Up @@ -552,15 +557,17 @@ func (r *Relayer) NewLLOProvider(ctx context.Context, rargs commontypes.RelayArg
clients[server.URL] = client
}
transmitter = llo.NewTransmitter(llo.TransmitterOpts{
Lggr: r.lggr,
FromAccount: fmt.Sprintf("%x", privKey.PublicKey), // NOTE: This may need to change if we support e.g. multiple tranmsmitters, to be a composite of all keys
Lggr: r.lggr,
FromAccount: fmt.Sprintf("%x", privKey.PublicKey), // NOTE: This may need to change if we support e.g. multiple tranmsmitters, to be a composite of all keys
VerboseLogging: r.mercuryCfg.VerboseLogging(),
MercuryTransmitterOpts: mercurytransmitter.Opts{
Lggr: r.lggr,
Cfg: r.transmitterCfg,
Clients: clients,
FromAccount: privKey.PublicKey,
DonID: relayConfig.LLODONID,
ORM: mercurytransmitter.NewORM(r.ds, relayConfig.LLODONID),
Lggr: r.lggr,
VerboseLogging: r.mercuryCfg.VerboseLogging(),
Cfg: r.mercuryCfg.Transmitter(),
Clients: clients,
FromAccount: privKey.PublicKey,
DonID: relayConfig.LLODONID,
ORM: mercurytransmitter.NewORM(r.ds, relayConfig.LLODONID),
},
RetirementReportCache: r.retirementReportCache,
})
Expand Down

0 comments on commit 995f4d1

Please sign in to comment.