Skip to content

Commit

Permalink
BCF-2877: support mercury loopp (#11933)
Browse files Browse the repository at this point in the history
* support mercury loopp

* update common

* add tests for refactored NewServices func

* add make and docker support

* bump common

* bump common

* bump common

* linter imports

* address comments

* bump common to merged version

* exlcude mercury/plugin.go from sonar duplicate requirements because the underlying API is inherently duplicated

* sonar exception
  • Loading branch information
krehermann authored Feb 9, 2024
1 parent b0a1d23 commit de15206
Show file tree
Hide file tree
Showing 16 changed files with 676 additions and 72 deletions.
5 changes: 5 additions & 0 deletions GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ chainlink-local-start:
install-medianpoc: ## Build & install the chainlink-medianpoc binary.
go install $(GOFLAGS) ./plugins/cmd/chainlink-medianpoc

.PHONY: install-mercury-loop
install-mercury-loop: ## Build & install the chainlink-medianpoc binary.
go install $(GOFLAGS) ./plugins/cmd/chainlink-mercury


.PHONY: docker ## Build the chainlink docker image
docker:
docker buildx build \
Expand Down
1 change: 1 addition & 0 deletions core/config/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var (
// LOOPP commands and vars
var (
MedianPlugin = NewPlugin("median")
MercuryPlugin = NewPlugin("mercury")
SolanaPlugin = NewPlugin("solana")
StarknetPlugin = NewPlugin("starknet")
// PrometheusDiscoveryHostName is the externally accessible hostname
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/pelletier/go-toml/v2 v2.1.1
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink-automation v1.0.2-0.20240118014648-1ab6a88c9429
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240208162729-5f42bd4a2f8b
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240209032254-f9b58810d8ca
github.com/smartcontractkit/chainlink-vrf v0.0.0-20231120191722-fef03814f868
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240112202000-6359502d2ff1
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1169,8 +1169,8 @@ github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 h1:T3lFWumv
github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704/go.mod h1:2QuJdEouTWjh5BDy5o/vgGXQtR4Gz8yH1IYB5eT7u4M=
github.com/smartcontractkit/chainlink-automation v1.0.2-0.20240118014648-1ab6a88c9429 h1:xkejUBZhcBpBrTSfxc91Iwzadrb6SXw8ks69bHIQ9Ww=
github.com/smartcontractkit/chainlink-automation v1.0.2-0.20240118014648-1ab6a88c9429/go.mod h1:wJmVvDf4XSjsahWtfUq3wvIAYEAuhr7oxmxYnEL/LGQ=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240208162729-5f42bd4a2f8b h1:HQOWQqbmtanx+nqL4fUYxpvZZWyfCkE1gPqmDgF63HY=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240208162729-5f42bd4a2f8b/go.mod h1:Mw/HFKy1ljahnnDgVaeP6s6ZCGEV7XmB5gD3jrSjnjE=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240209032254-f9b58810d8ca h1:Vtu+x4788S9stmuioWtfyxCKro7dwnqJFy96IuMRB7k=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240209032254-f9b58810d8ca/go.mod h1:pRlQrvcizMmuHAUV4N96oO2e3XbA99JCQELLc6ES160=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240206150430-fbccaa95af62 h1:DuSQLuq+Ilm3Q+2zn5agLrAi9UvFQmOUdKwZQKX0AFA=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240206150430-fbccaa95af62/go.mod h1:Ny6kBD8Houh5yZRmGiB0ovsLHdb4qOHHwBno9JZUT+Y=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20231204152908-a6e3fe8ff2a1 h1:xYqRgZO0nMSO8CBCMR0r3WA+LZ4kNL8a6bnbyk/oBtQ=
Expand Down
4 changes: 3 additions & 1 deletion core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,9 @@ func (d *Delegate) newServicesMercury(

chEnhancedTelem := make(chan ocrcommon.EnhancedTelemetryMercuryData, 100)

mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, lggr, oracleArgsNoPlugin, d.cfg.JobPipeline(), chEnhancedTelem, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID))
mCfg := mercury.NewMercuryConfig(d.cfg.JobPipeline().MaxSuccessfulRuns(), d.cfg.JobPipeline().ResultWriteQueueDepth(), d.cfg)

mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, lggr, oracleArgsNoPlugin, mCfg, chEnhancedTelem, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID))

if ocrcommon.ShouldCollectEnhancedTelemetryMercury(jb) {
enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, chEnhancedTelem, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.FeedID.String(), synchronization.EnhancedEAMercury), lggr.Named("EnhancedTelemetryMercury"))
Expand Down
282 changes: 222 additions & 60 deletions core/services/ocr2/plugins/mercury/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,24 @@ package mercury

import (
"encoding/json"
"fmt"
"os/exec"

"github.com/pkg/errors"

libocr2 "github.com/smartcontractkit/libocr/offchainreporting2plus"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"

commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"
relaymercuryv1 "github.com/smartcontractkit/chainlink-data-streams/mercury/v1"
relaymercuryv2 "github.com/smartcontractkit/chainlink-data-streams/mercury/v2"
relaymercuryv3 "github.com/smartcontractkit/chainlink-data-streams/mercury/v3"

"github.com/smartcontractkit/chainlink-common/pkg/loop"
commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"

"github.com/smartcontractkit/chainlink/v2/core/config/env"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/mercury/config"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon"
Expand All @@ -22,11 +29,36 @@ import (
mercuryv1 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v1"
mercuryv2 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v2"
mercuryv3 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3"
"github.com/smartcontractkit/chainlink/v2/plugins"
)

type Config interface {
MaxSuccessfulRuns() uint64
ResultWriteQueueDepth() uint64
plugins.RegistrarConfig
}

// concrete implementation of MercuryConfig
type mercuryConfig struct {
jobPipelineMaxSuccessfulRuns uint64
jobPipelineResultWriteQueueDepth uint64
plugins.RegistrarConfig
}

func NewMercuryConfig(jobPipelineMaxSuccessfulRuns uint64, jobPipelineResultWriteQueueDepth uint64, pluginProcessCfg plugins.RegistrarConfig) Config {
return &mercuryConfig{
jobPipelineMaxSuccessfulRuns: jobPipelineMaxSuccessfulRuns,
jobPipelineResultWriteQueueDepth: jobPipelineResultWriteQueueDepth,
RegistrarConfig: pluginProcessCfg,
}
}

func (m *mercuryConfig) MaxSuccessfulRuns() uint64 {
return m.jobPipelineMaxSuccessfulRuns
}

func (m *mercuryConfig) ResultWriteQueueDepth() uint64 {
return m.jobPipelineResultWriteQueueDepth
}

func NewServices(
Expand Down Expand Up @@ -55,76 +87,206 @@ func NewServices(
}
lggr = lggr.Named("MercuryPlugin").With("jobID", jb.ID, "jobName", jb.Name.ValueOrZero())

// encapsulate all the subservices and ensure we close them all if any fail to start
srvs := []job.ServiceCtx{ocr2Provider}
abort := func() {
if cerr := services.MultiCloser(srvs).Close(); err != nil {
lggr.Errorw("Error closing unused services", "err", cerr)
}
}
saver := ocrcommon.NewResultRunSaver(pipelineRunner, lggr, cfg.MaxSuccessfulRuns(), cfg.ResultWriteQueueDepth())
srvs = append(srvs, saver)

// this is the factory that will be used to create the mercury plugin
var (
factory ocr3types.MercuryPluginFactory
factoryServices []job.ServiceCtx
)
fCfg := factoryCfg{
orm: orm,
pipelineRunner: pipelineRunner,
jb: jb,
lggr: lggr,
saver: saver,
chEnhancedTelem: chEnhancedTelem,
ocr2Provider: ocr2Provider,
reportingPluginConfig: pluginConfig,
cfg: cfg,
feedID: feedID,
}
switch feedID.Version() {
case 1:
ds := mercuryv1.NewDataSource(
orm,
pipelineRunner,
jb,
*jb.PipelineSpec,
lggr,
saver,
chEnhancedTelem,
ocr2Provider.MercuryChainReader(),
ocr2Provider.MercuryServerFetcher(),
pluginConfig.InitialBlockNumber.Ptr(),
feedID,
)
argsNoPlugin.MercuryPluginFactory = relaymercuryv1.NewFactory(
ds,
lggr,
ocr2Provider.OnchainConfigCodec(),
ocr2Provider.ReportCodecV1(),
)
factory, factoryServices, err = newv1factory(fCfg)
if err != nil {
abort()
return nil, fmt.Errorf("failed to create mercury v1 factory: %w", err)
}
srvs = append(srvs, factoryServices...)
case 2:
ds := mercuryv2.NewDataSource(
orm,
pipelineRunner,
jb,
*jb.PipelineSpec,
feedID,
lggr,
saver,
chEnhancedTelem,
ocr2Provider.MercuryServerFetcher(),
*pluginConfig.LinkFeedID,
*pluginConfig.NativeFeedID,
)
argsNoPlugin.MercuryPluginFactory = relaymercuryv2.NewFactory(
ds,
lggr,
ocr2Provider.OnchainConfigCodec(),
ocr2Provider.ReportCodecV2(),
)
factory, factoryServices, err = newv2factory(fCfg)
if err != nil {
abort()
return nil, fmt.Errorf("failed to create mercury v2 factory: %w", err)
}
srvs = append(srvs, factoryServices...)
case 3:
ds := mercuryv3.NewDataSource(
orm,
pipelineRunner,
jb,
*jb.PipelineSpec,
feedID,
lggr,
saver,
chEnhancedTelem,
ocr2Provider.MercuryServerFetcher(),
*pluginConfig.LinkFeedID,
*pluginConfig.NativeFeedID,
)
argsNoPlugin.MercuryPluginFactory = relaymercuryv3.NewFactory(
ds,
lggr,
ocr2Provider.OnchainConfigCodec(),
ocr2Provider.ReportCodecV3(),
)
factory, factoryServices, err = newv3factory(fCfg)
if err != nil {
abort()
return nil, fmt.Errorf("failed to create mercury v3 factory: %w", err)
}
srvs = append(srvs, factoryServices...)
default:
return nil, errors.Errorf("unknown Mercury report schema version: %d", feedID.Version())
}

argsNoPlugin.MercuryPluginFactory = factory
oracle, err := libocr2.NewOracle(argsNoPlugin)
if err != nil {
abort()
return nil, errors.WithStack(err)
}
return []job.ServiceCtx{ocr2Provider, saver, job.NewServiceAdapter(oracle)}, nil
srvs = append(srvs, job.NewServiceAdapter(oracle))
return srvs, nil
}

type factoryCfg struct {
orm types.DataSourceORM
pipelineRunner pipeline.Runner
jb job.Job
lggr logger.Logger
saver *ocrcommon.RunResultSaver
chEnhancedTelem chan ocrcommon.EnhancedTelemetryMercuryData
ocr2Provider commontypes.MercuryProvider
reportingPluginConfig config.PluginConfig
cfg Config
feedID utils.FeedID
}

func newv3factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.ServiceCtx, error) {
var factory ocr3types.MercuryPluginFactory
srvs := make([]job.ServiceCtx, 0)

ds := mercuryv3.NewDataSource(
factoryCfg.orm,
factoryCfg.pipelineRunner,
factoryCfg.jb,
*factoryCfg.jb.PipelineSpec,
factoryCfg.feedID,
factoryCfg.lggr,
factoryCfg.saver,
factoryCfg.chEnhancedTelem,
factoryCfg.ocr2Provider.MercuryServerFetcher(),
*factoryCfg.reportingPluginConfig.LinkFeedID,
*factoryCfg.reportingPluginConfig.NativeFeedID,
)

loopCmd := env.MercuryPlugin.Cmd.Get()
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
factoryServer := loop.NewMercuryV3Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
factory = relaymercuryv3.NewFactory(ds, factoryCfg.lggr, factoryCfg.ocr2Provider.OnchainConfigCodec(), factoryCfg.ocr2Provider.ReportCodecV3())
}
return factory, srvs, nil
}

func newv2factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.ServiceCtx, error) {
var factory ocr3types.MercuryPluginFactory
srvs := make([]job.ServiceCtx, 0)

ds := mercuryv2.NewDataSource(
factoryCfg.orm,
factoryCfg.pipelineRunner,
factoryCfg.jb,
*factoryCfg.jb.PipelineSpec,
factoryCfg.feedID,
factoryCfg.lggr,
factoryCfg.saver,
factoryCfg.chEnhancedTelem,
factoryCfg.ocr2Provider.MercuryServerFetcher(),
*factoryCfg.reportingPluginConfig.LinkFeedID,
*factoryCfg.reportingPluginConfig.NativeFeedID,
)

loopCmd := env.MercuryPlugin.Cmd.Get()
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
factoryServer := loop.NewMercuryV2Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
factory = relaymercuryv2.NewFactory(ds, factoryCfg.lggr, factoryCfg.ocr2Provider.OnchainConfigCodec(), factoryCfg.ocr2Provider.ReportCodecV2())
}
return factory, srvs, nil
}

func newv1factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.ServiceCtx, error) {
var factory ocr3types.MercuryPluginFactory
srvs := make([]job.ServiceCtx, 0)

ds := mercuryv1.NewDataSource(
factoryCfg.orm,
factoryCfg.pipelineRunner,
factoryCfg.jb,
*factoryCfg.jb.PipelineSpec,
factoryCfg.lggr,
factoryCfg.saver,
factoryCfg.chEnhancedTelem,
factoryCfg.ocr2Provider.MercuryChainReader(),
factoryCfg.ocr2Provider.MercuryServerFetcher(),
factoryCfg.reportingPluginConfig.InitialBlockNumber.Ptr(),
factoryCfg.feedID,
)

loopCmd := env.MercuryPlugin.Cmd.Get()
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
factoryServer := loop.NewMercuryV1Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
factory = relaymercuryv1.NewFactory(ds, factoryCfg.lggr, factoryCfg.ocr2Provider.OnchainConfigCodec(), factoryCfg.ocr2Provider.ReportCodecV1())
}
return factory, srvs, nil
}

func initLoop(cmd string, cfg Config, feedID utils.FeedID, lggr logger.Logger) (func() *exec.Cmd, loop.GRPCOpts, logger.Logger, error) {
lggr.Debugw("Initializing Mercury loop", "command", cmd)
mercuryLggr := lggr.Named(fmt.Sprintf("MercuryV%d", feedID.Version())).Named(feedID.String())
envVars, err := plugins.ParseEnvFile(env.MercuryPlugin.Env.Get())
if err != nil {
return nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to parse mercury env file: %w", err)
}
cmdFn, opts, err := cfg.RegisterLOOP(plugins.CmdConfig{
ID: mercuryLggr.Name(),
Cmd: cmd,
Env: envVars,
})
if err != nil {
return nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to register loop: %w", err)
}
return cmdFn, opts, mercuryLggr, nil
}
Loading

0 comments on commit de15206

Please sign in to comment.