Skip to content

Commit

Permalink
support mercury loopp
Browse files Browse the repository at this point in the history
  • Loading branch information
krehermann committed Feb 5, 2024
1 parent e20511d commit 5080c48
Show file tree
Hide file tree
Showing 5 changed files with 252 additions and 21 deletions.
39 changes: 39 additions & 0 deletions cmd/datastreams/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package main

import (
"github.com/hashicorp/go-plugin"

"github.com/smartcontractkit/chainlink-common/pkg/loop"
)

const (
loggerName = "PluginMercury"
)

func main() {
s := loop.MustNewStartedServer(loggerName)
defer s.Stop()

p := NewPlugin(s.Logger)
defer s.Logger.ErrorIfFn(p.Close, "Failed to close")

s.MustRegister(p)

stop := make(chan struct{})
defer close(stop)

plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: loop.PluginMercuryHandshakeConfig(),

Check failure on line 26 in cmd/datastreams/main.go

View workflow job for this annotation

GitHub Actions / lint

undefined: loop.PluginMercuryHandshakeConfig
Plugins: map[string]plugin.Plugin{
loop.PluginMercuryName: &loop.GRPCPluginMercury{

Check failure on line 28 in cmd/datastreams/main.go

View workflow job for this annotation

GitHub Actions / lint

undefined: loop.PluginMercuryName
PluginServer: p,
BrokerConfig: loop.BrokerConfig{
StopCh: stop,
Logger: s.Logger,
GRPCOpts: s.GRPCOpts,
},
},
},
GRPCServer: s.GRPCOpts.NewServer,
})
}
88 changes: 88 additions & 0 deletions cmd/datastreams/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package main

import (
"context"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types"
v1 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v1"
v2 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v2"
v3 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3"
ds_v1 "github.com/smartcontractkit/chainlink-data-streams/mercury/v1"
ds_v2 "github.com/smartcontractkit/chainlink-data-streams/mercury/v2"
ds_v3 "github.com/smartcontractkit/chainlink-data-streams/mercury/v3"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
)

type Plugin struct {
loop.Plugin
stop services.StopChan
}

func NewPlugin(lggr logger.Logger) *Plugin {
return &Plugin{Plugin: loop.Plugin{Logger: lggr}, stop: make(services.StopChan)}
}

func (p *Plugin) NewMercuryV1Factory(ctx context.Context, provider types.MercuryProvider, dataSource v1.DataSource) (types.MercuryPluginFactory, error) {

Check failure on line 28 in cmd/datastreams/plugin.go

View workflow job for this annotation

GitHub Actions / lint

undefined: v1.DataSource

Check failure on line 28 in cmd/datastreams/plugin.go

View workflow job for this annotation

GitHub Actions / lint

undefined: types.MercuryPluginFactory
var ctxVals loop.ContextValues
ctxVals.SetValues(ctx)
lggr := logger.With(p.Logger, ctxVals.Args()...)

factory := ds_v1.NewFactory(dataSource, lggr, provider.OnchainConfigCodec(), provider.ReportCodecV1())

s := &mercuryPluginFactoryService{lggr: logger.Named(lggr, "MercuryV1PluginFactory"), MercuryPluginFactory: factory}

p.SubService(s)

return s, nil
}

func (p *Plugin) NewMercuryV2Factory(ctx context.Context, provider types.MercuryProvider, dataSource v2.DataSource) (types.MercuryPluginFactory, error) {

Check failure on line 42 in cmd/datastreams/plugin.go

View workflow job for this annotation

GitHub Actions / lint

undefined: v2.DataSource

Check failure on line 42 in cmd/datastreams/plugin.go

View workflow job for this annotation

GitHub Actions / lint

undefined: types.MercuryPluginFactory
var ctxVals loop.ContextValues
ctxVals.SetValues(ctx)
lggr := logger.With(p.Logger, ctxVals.Args()...)

factory := ds_v2.NewFactory(dataSource, lggr, provider.OnchainConfigCodec(), provider.ReportCodecV2())

s := &mercuryPluginFactoryService{lggr: logger.Named(lggr, "MercuryV2PluginFactory"), MercuryPluginFactory: factory}

p.SubService(s)

return s, nil
}

func (p *Plugin) NewMercuryV3Factory(ctx context.Context, provider types.MercuryProvider, dataSource v3.DataSource) (types.MercuryPluginFactory, error) {

Check failure on line 56 in cmd/datastreams/plugin.go

View workflow job for this annotation

GitHub Actions / lint

undefined: v3.DataSource

Check failure on line 56 in cmd/datastreams/plugin.go

View workflow job for this annotation

GitHub Actions / lint

undefined: types.MercuryPluginFactory
var ctxVals loop.ContextValues
ctxVals.SetValues(ctx)
lggr := logger.With(p.Logger, ctxVals.Args()...)

factory := ds_v3.NewFactory(dataSource, lggr, provider.OnchainConfigCodec(), provider.ReportCodecV3())

s := &mercuryPluginFactoryService{lggr: logger.Named(lggr, "MercuryV3PluginFactory"), MercuryPluginFactory: factory}

p.SubService(s)

return s, nil
}

type mercuryPluginFactoryService struct {
services.StateMachine
lggr logger.Logger
ocr3types.MercuryPluginFactory
}

func (r *mercuryPluginFactoryService) Name() string { return r.lggr.Name() }

func (r *mercuryPluginFactoryService) Start(ctx context.Context) error {
return r.StartOnce("ReportingPluginFactory", func() error { return nil })
}

func (r *mercuryPluginFactoryService) Close() error {
return r.StopOnce("ReportingPluginFactory", func() error { return nil })
}

func (r *mercuryPluginFactoryService) HealthReport() map[string]error {
return map[string]error{r.Name(): r.Healthy()}
}
1 change: 1 addition & 0 deletions core/config/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var (
// LOOPP commands and vars
var (
MedianPlugin = NewPlugin("median")
MercuryPlugin = NewPlugin("mercury")
SolanaPlugin = NewPlugin("solana")
StarknetPlugin = NewPlugin("starknet")
// PrometheusDiscoveryHostName is the externally accessible hostname
Expand Down
4 changes: 3 additions & 1 deletion core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,9 @@ func (d *Delegate) newServicesMercury(

chEnhancedTelem := make(chan ocrcommon.EnhancedTelemetryMercuryData, 100)

mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, lggr, oracleArgsNoPlugin, d.cfg.JobPipeline(), chEnhancedTelem, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID))
mCfg := mercury.NewMercuryConfig(d.cfg.JobPipeline().MaxSuccessfulRuns(), d.cfg.JobPipeline().ResultWriteQueueDepth(), d.cfg)

mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, lggr, oracleArgsNoPlugin, mCfg, chEnhancedTelem, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID))

if ocrcommon.ShouldCollectEnhancedTelemetryMercury(jb) {
enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, chEnhancedTelem, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.FeedID.String(), synchronization.EnhancedEAMercury), lggr.Named("EnhancedTelemetryMercury"))
Expand Down
141 changes: 121 additions & 20 deletions core/services/ocr2/plugins/mercury/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@ package mercury

import (
"encoding/json"
"fmt"

"github.com/pkg/errors"

libocr2 "github.com/smartcontractkit/libocr/offchainreporting2plus"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"

"github.com/smartcontractkit/chainlink-common/pkg/loop"
commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"
relaymercuryv1 "github.com/smartcontractkit/chainlink-data-streams/mercury/v1"
relaymercuryv2 "github.com/smartcontractkit/chainlink-data-streams/mercury/v2"
relaymercuryv3 "github.com/smartcontractkit/chainlink-data-streams/mercury/v3"

"github.com/smartcontractkit/chainlink/v2/core/config/env"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/mercury/config"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon"
Expand All @@ -22,11 +27,36 @@ import (
mercuryv1 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v1"
mercuryv2 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v2"
mercuryv3 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3"
"github.com/smartcontractkit/chainlink/v2/plugins"
)

type Config interface {
MaxSuccessfulRuns() uint64
ResultWriteQueueDepth() uint64
plugins.RegistrarConfig
}

// concrete implementation of MercuryConfig
type mercuryConfig struct {
jobPipelineMaxSuccessfulRuns uint64
jobPipelineResultWriteQueueDepth uint64
plugins.RegistrarConfig
}

func NewMercuryConfig(jobPipelineMaxSuccessfulRuns uint64, jobPipelineResultWriteQueueDepth uint64, pluginProcessCfg plugins.RegistrarConfig) Config {
return &mercuryConfig{
jobPipelineMaxSuccessfulRuns: jobPipelineMaxSuccessfulRuns,
jobPipelineResultWriteQueueDepth: jobPipelineResultWriteQueueDepth,
RegistrarConfig: pluginProcessCfg,
}
}

func (m *mercuryConfig) MaxSuccessfulRuns() uint64 {
return m.jobPipelineMaxSuccessfulRuns
}

func (m *mercuryConfig) ResultWriteQueueDepth() uint64 {
return m.jobPipelineResultWriteQueueDepth
}

func NewServices(
Expand Down Expand Up @@ -55,8 +85,19 @@ func NewServices(
}
lggr = lggr.Named("MercuryPlugin").With("jobID", jb.ID, "jobName", jb.Name.ValueOrZero())

// encapsulate all the subservices and ensure we close them all if any fail to start
srvs := []job.ServiceCtx{ocr2Provider}
abort := func() {
if cerr := services.MultiCloser(srvs).Close(); err != nil {
lggr.Errorw("Error closing unused services", "err", cerr)
}
}
saver := ocrcommon.NewResultRunSaver(pipelineRunner, lggr, cfg.MaxSuccessfulRuns(), cfg.ResultWriteQueueDepth())
srvs = append(srvs, saver)

loopEnabled, loopCmd := env.MercuryPlugin.Cmd.Get() != "", env.MercuryPlugin.Env.Get()
// this is the factory that will be used to create the mercury plugin
var factory ocr3types.MercuryPluginFactory
switch feedID.Version() {
case 1:
ds := mercuryv1.NewDataSource(
Expand All @@ -72,12 +113,31 @@ func NewServices(
pluginConfig.InitialBlockNumber.Ptr(),
feedID,
)
argsNoPlugin.MercuryPluginFactory = relaymercuryv1.NewFactory(
ds,
lggr,
ocr2Provider.OnchainConfigCodec(),
ocr2Provider.ReportCodecV1(),
)

if loopEnabled {
mercuryLggr := lggr.Named("MercuryV1").Named(feedID.String())
envVars, err2 := plugins.ParseEnvFile(env.MercuryPlugin.Env.Get())
if err2 != nil {
abort()
return nil, fmt.Errorf("failed to parse mercury env file: %w", err2)
}
cmdFn, opts, err2 := cfg.RegisterLOOP(plugins.CmdConfig{
ID: mercuryLggr.Name(),
Cmd: loopCmd,
Env: envVars,
})
if err2 != nil {
abort()
return nil, fmt.Errorf("failed to register loop: %w", err2)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
factoryServer := loop.NewMercuryV1Service(lggr, opts, cmdFn, ocr2Provider, ds)

Check failure on line 134 in core/services/ocr2/plugins/mercury/plugin.go

View workflow job for this annotation

GitHub Actions / lint

undefined: loop.NewMercuryV1Service

Check failure on line 134 in core/services/ocr2/plugins/mercury/plugin.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_fuzz)

undefined: loop.NewMercuryV1Service

Check failure on line 134 in core/services/ocr2/plugins/mercury/plugin.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_race_tests)

undefined: loop.NewMercuryV1Service

Check failure on line 134 in core/services/ocr2/plugins/mercury/plugin.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

undefined: loop.NewMercuryV1Service

Check failure on line 134 in core/services/ocr2/plugins/mercury/plugin.go

View workflow job for this annotation

GitHub Actions / Flakey Test Detection

undefined: loop.NewMercuryV1Service

Check failure on line 134 in core/services/ocr2/plugins/mercury/plugin.go

View workflow job for this annotation

GitHub Actions / Analyze go

undefined: loop.NewMercuryV1Service
srvs = append(srvs, factoryServer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
factory = relaymercuryv1.NewFactory(ds, lggr, ocr2Provider.OnchainConfigCodec(), ocr2Provider.ReportCodecV1())
}
case 2:
ds := mercuryv2.NewDataSource(
orm,
Expand All @@ -92,12 +152,31 @@ func NewServices(
*pluginConfig.LinkFeedID,
*pluginConfig.NativeFeedID,
)
argsNoPlugin.MercuryPluginFactory = relaymercuryv2.NewFactory(
ds,
lggr,
ocr2Provider.OnchainConfigCodec(),
ocr2Provider.ReportCodecV2(),
)

if loopEnabled {
mercuryLggr := lggr.Named("MercuryV2").Named(feedID.String())
envVars, err2 := plugins.ParseEnvFile(env.MercuryPlugin.Env.Get())
if err2 != nil {
abort()
return nil, fmt.Errorf("failed to parse mercury env file: %w", err2)
}
cmdFn, opts, err2 := cfg.RegisterLOOP(plugins.CmdConfig{
ID: mercuryLggr.Name(),
Cmd: loopCmd,
Env: envVars,
})
if err2 != nil {
abort()
return nil, fmt.Errorf("failed to register loop: %w", err2)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
factoryServer := loop.NewMercuryV2Service(lggr, opts, cmdFn, ocr2Provider, ds)

Check failure on line 173 in core/services/ocr2/plugins/mercury/plugin.go

View workflow job for this annotation

GitHub Actions / lint

undefined: loop.NewMercuryV2Service

Check failure on line 173 in core/services/ocr2/plugins/mercury/plugin.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_fuzz)

undefined: loop.NewMercuryV2Service

Check failure on line 173 in core/services/ocr2/plugins/mercury/plugin.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_race_tests)

undefined: loop.NewMercuryV2Service

Check failure on line 173 in core/services/ocr2/plugins/mercury/plugin.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

undefined: loop.NewMercuryV2Service

Check failure on line 173 in core/services/ocr2/plugins/mercury/plugin.go

View workflow job for this annotation

GitHub Actions / Flakey Test Detection

undefined: loop.NewMercuryV2Service

Check failure on line 173 in core/services/ocr2/plugins/mercury/plugin.go

View workflow job for this annotation

GitHub Actions / Analyze go

undefined: loop.NewMercuryV2Service
srvs = append(srvs, factoryServer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
factory = relaymercuryv2.NewFactory(ds, lggr, ocr2Provider.OnchainConfigCodec(), ocr2Provider.ReportCodecV2())
}
case 3:
ds := mercuryv3.NewDataSource(
orm,
Expand All @@ -112,19 +191,41 @@ func NewServices(
*pluginConfig.LinkFeedID,
*pluginConfig.NativeFeedID,
)
argsNoPlugin.MercuryPluginFactory = relaymercuryv3.NewFactory(
ds,
lggr,
ocr2Provider.OnchainConfigCodec(),
ocr2Provider.ReportCodecV3(),
)

if loopEnabled {
mercuryLggr := lggr.Named("MercuryV3").Named(feedID.String())
envVars, err2 := plugins.ParseEnvFile(env.MercuryPlugin.Env.Get())
if err2 != nil {
abort()
return nil, fmt.Errorf("failed to parse mercury env file: %w", err2)
}
cmdFn, opts, err2 := cfg.RegisterLOOP(plugins.CmdConfig{
ID: mercuryLggr.Name(),
Cmd: loopCmd,
Env: envVars,
})
if err2 != nil {
abort()
return nil, fmt.Errorf("failed to register loop: %w", err2)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
factoryServer := loop.NewMercuryV3Service(lggr, opts, cmdFn, ocr2Provider, ds)

Check failure on line 212 in core/services/ocr2/plugins/mercury/plugin.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_fuzz)

undefined: loop.NewMercuryV3Service

Check failure on line 212 in core/services/ocr2/plugins/mercury/plugin.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_race_tests)

undefined: loop.NewMercuryV3Service

Check failure on line 212 in core/services/ocr2/plugins/mercury/plugin.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

undefined: loop.NewMercuryV3Service

Check failure on line 212 in core/services/ocr2/plugins/mercury/plugin.go

View workflow job for this annotation

GitHub Actions / Flakey Test Detection

undefined: loop.NewMercuryV3Service

Check failure on line 212 in core/services/ocr2/plugins/mercury/plugin.go

View workflow job for this annotation

GitHub Actions / Analyze go

undefined: loop.NewMercuryV3Service
srvs = append(srvs, factoryServer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
factory = relaymercuryv3.NewFactory(ds, lggr, ocr2Provider.OnchainConfigCodec(), ocr2Provider.ReportCodecV3())
}

default:
return nil, errors.Errorf("unknown Mercury report schema version: %d", feedID.Version())
}

argsNoPlugin.MercuryPluginFactory = factory
oracle, err := libocr2.NewOracle(argsNoPlugin)
if err != nil {
abort()
return nil, errors.WithStack(err)
}
return []job.ServiceCtx{ocr2Provider, saver, job.NewServiceAdapter(oracle)}, nil
srvs = append(srvs, job.NewServiceAdapter(oracle))
return srvs, nil
}

0 comments on commit 5080c48

Please sign in to comment.