diff --git a/cmd/datastreams/main.go b/cmd/datastreams/main.go new file mode 100644 index 00000000000..d80aa8ef41c --- /dev/null +++ b/cmd/datastreams/main.go @@ -0,0 +1,39 @@ +package main + +import ( + "github.com/hashicorp/go-plugin" + + "github.com/smartcontractkit/chainlink-common/pkg/loop" +) + +const ( + loggerName = "PluginMercury" +) + +func main() { + s := loop.MustNewStartedServer(loggerName) + defer s.Stop() + + p := NewPlugin(s.Logger) + defer s.Logger.ErrorIfFn(p.Close, "Failed to close") + + s.MustRegister(p) + + stop := make(chan struct{}) + defer close(stop) + + plugin.Serve(&plugin.ServeConfig{ + HandshakeConfig: loop.PluginMercuryHandshakeConfig(), + Plugins: map[string]plugin.Plugin{ + loop.PluginMercuryName: &loop.GRPCPluginMercury{ + PluginServer: p, + BrokerConfig: loop.BrokerConfig{ + StopCh: stop, + Logger: s.Logger, + GRPCOpts: s.GRPCOpts, + }, + }, + }, + GRPCServer: s.GRPCOpts.NewServer, + }) +} diff --git a/cmd/datastreams/plugin.go b/cmd/datastreams/plugin.go new file mode 100644 index 00000000000..6e40882a847 --- /dev/null +++ b/cmd/datastreams/plugin.go @@ -0,0 +1,88 @@ +package main + +import ( + "context" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/loop" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/types" + v1 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v1" + v2 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v2" + v3 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3" + ds_v1 "github.com/smartcontractkit/chainlink-data-streams/mercury/v1" + ds_v2 "github.com/smartcontractkit/chainlink-data-streams/mercury/v2" + ds_v3 "github.com/smartcontractkit/chainlink-data-streams/mercury/v3" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" +) + +type Plugin struct { + loop.Plugin + stop services.StopChan +} + +func NewPlugin(lggr logger.Logger) *Plugin { + return &Plugin{Plugin: loop.Plugin{Logger: lggr}, stop: make(services.StopChan)} +} + +func (p *Plugin) NewMercuryV1Factory(ctx context.Context, provider types.MercuryProvider, dataSource v1.DataSource) (types.MercuryPluginFactory, error) { + var ctxVals loop.ContextValues + ctxVals.SetValues(ctx) + lggr := logger.With(p.Logger, ctxVals.Args()...) + + factory := ds_v1.NewFactory(dataSource, lggr, provider.OnchainConfigCodec(), provider.ReportCodecV1()) + + s := &mercuryPluginFactoryService{lggr: logger.Named(lggr, "MercuryV1PluginFactory"), MercuryPluginFactory: factory} + + p.SubService(s) + + return s, nil +} + +func (p *Plugin) NewMercuryV2Factory(ctx context.Context, provider types.MercuryProvider, dataSource v2.DataSource) (types.MercuryPluginFactory, error) { + var ctxVals loop.ContextValues + ctxVals.SetValues(ctx) + lggr := logger.With(p.Logger, ctxVals.Args()...) + + factory := ds_v2.NewFactory(dataSource, lggr, provider.OnchainConfigCodec(), provider.ReportCodecV2()) + + s := &mercuryPluginFactoryService{lggr: logger.Named(lggr, "MercuryV2PluginFactory"), MercuryPluginFactory: factory} + + p.SubService(s) + + return s, nil +} + +func (p *Plugin) NewMercuryV3Factory(ctx context.Context, provider types.MercuryProvider, dataSource v3.DataSource) (types.MercuryPluginFactory, error) { + var ctxVals loop.ContextValues + ctxVals.SetValues(ctx) + lggr := logger.With(p.Logger, ctxVals.Args()...) + + factory := ds_v3.NewFactory(dataSource, lggr, provider.OnchainConfigCodec(), provider.ReportCodecV3()) + + s := &mercuryPluginFactoryService{lggr: logger.Named(lggr, "MercuryV3PluginFactory"), MercuryPluginFactory: factory} + + p.SubService(s) + + return s, nil +} + +type mercuryPluginFactoryService struct { + services.StateMachine + lggr logger.Logger + ocr3types.MercuryPluginFactory +} + +func (r *mercuryPluginFactoryService) Name() string { return r.lggr.Name() } + +func (r *mercuryPluginFactoryService) Start(ctx context.Context) error { + return r.StartOnce("ReportingPluginFactory", func() error { return nil }) +} + +func (r *mercuryPluginFactoryService) Close() error { + return r.StopOnce("ReportingPluginFactory", func() error { return nil }) +} + +func (r *mercuryPluginFactoryService) HealthReport() map[string]error { + return map[string]error{r.Name(): r.Healthy()} +} diff --git a/core/config/env/env.go b/core/config/env/env.go index f22310a6cf8..0ebfc357bf3 100644 --- a/core/config/env/env.go +++ b/core/config/env/env.go @@ -25,6 +25,7 @@ var ( // LOOPP commands and vars var ( MedianPlugin = NewPlugin("median") + MercuryPlugin = NewPlugin("mercury") SolanaPlugin = NewPlugin("solana") StarknetPlugin = NewPlugin("starknet") // PrometheusDiscoveryHostName is the externally accessible hostname diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index c838316b1cc..b20f95d129f 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -727,7 +727,9 @@ func (d *Delegate) newServicesMercury( chEnhancedTelem := make(chan ocrcommon.EnhancedTelemetryMercuryData, 100) - mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, lggr, oracleArgsNoPlugin, d.cfg.JobPipeline(), chEnhancedTelem, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID)) + mCfg := mercury.NewMercuryConfig(d.cfg.JobPipeline().MaxSuccessfulRuns(), d.cfg.JobPipeline().ResultWriteQueueDepth(), d.cfg) + + mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, lggr, oracleArgsNoPlugin, mCfg, chEnhancedTelem, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID)) if ocrcommon.ShouldCollectEnhancedTelemetryMercury(jb) { enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, chEnhancedTelem, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.FeedID.String(), synchronization.EnhancedEAMercury), lggr.Named("EnhancedTelemetryMercury")) diff --git a/core/services/ocr2/plugins/mercury/plugin.go b/core/services/ocr2/plugins/mercury/plugin.go index b2767d6bcf5..09337da6ca2 100644 --- a/core/services/ocr2/plugins/mercury/plugin.go +++ b/core/services/ocr2/plugins/mercury/plugin.go @@ -2,17 +2,22 @@ package mercury import ( "encoding/json" + "fmt" "github.com/pkg/errors" libocr2 "github.com/smartcontractkit/libocr/offchainreporting2plus" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + "github.com/smartcontractkit/chainlink-common/pkg/loop" commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" relaymercuryv1 "github.com/smartcontractkit/chainlink-data-streams/mercury/v1" relaymercuryv2 "github.com/smartcontractkit/chainlink-data-streams/mercury/v2" relaymercuryv3 "github.com/smartcontractkit/chainlink-data-streams/mercury/v3" + "github.com/smartcontractkit/chainlink/v2/core/config/env" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/mercury/config" "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" @@ -22,11 +27,36 @@ import ( mercuryv1 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v1" mercuryv2 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v2" mercuryv3 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3" + "github.com/smartcontractkit/chainlink/v2/plugins" ) type Config interface { MaxSuccessfulRuns() uint64 ResultWriteQueueDepth() uint64 + plugins.RegistrarConfig +} + +// concrete implementation of MercuryConfig +type mercuryConfig struct { + jobPipelineMaxSuccessfulRuns uint64 + jobPipelineResultWriteQueueDepth uint64 + plugins.RegistrarConfig +} + +func NewMercuryConfig(jobPipelineMaxSuccessfulRuns uint64, jobPipelineResultWriteQueueDepth uint64, pluginProcessCfg plugins.RegistrarConfig) Config { + return &mercuryConfig{ + jobPipelineMaxSuccessfulRuns: jobPipelineMaxSuccessfulRuns, + jobPipelineResultWriteQueueDepth: jobPipelineResultWriteQueueDepth, + RegistrarConfig: pluginProcessCfg, + } +} + +func (m *mercuryConfig) MaxSuccessfulRuns() uint64 { + return m.jobPipelineMaxSuccessfulRuns +} + +func (m *mercuryConfig) ResultWriteQueueDepth() uint64 { + return m.jobPipelineResultWriteQueueDepth } func NewServices( @@ -55,8 +85,19 @@ func NewServices( } lggr = lggr.Named("MercuryPlugin").With("jobID", jb.ID, "jobName", jb.Name.ValueOrZero()) + // encapsulate all the subservices and ensure we close them all if any fail to start + srvs := []job.ServiceCtx{ocr2Provider} + abort := func() { + if cerr := services.MultiCloser(srvs).Close(); err != nil { + lggr.Errorw("Error closing unused services", "err", cerr) + } + } saver := ocrcommon.NewResultRunSaver(pipelineRunner, lggr, cfg.MaxSuccessfulRuns(), cfg.ResultWriteQueueDepth()) + srvs = append(srvs, saver) + loopEnabled, loopCmd := env.MercuryPlugin.Cmd.Get() != "", env.MercuryPlugin.Env.Get() + // this is the factory that will be used to create the mercury plugin + var factory ocr3types.MercuryPluginFactory switch feedID.Version() { case 1: ds := mercuryv1.NewDataSource( @@ -72,12 +113,31 @@ func NewServices( pluginConfig.InitialBlockNumber.Ptr(), feedID, ) - argsNoPlugin.MercuryPluginFactory = relaymercuryv1.NewFactory( - ds, - lggr, - ocr2Provider.OnchainConfigCodec(), - ocr2Provider.ReportCodecV1(), - ) + + if loopEnabled { + mercuryLggr := lggr.Named("MercuryV1").Named(feedID.String()) + envVars, err2 := plugins.ParseEnvFile(env.MercuryPlugin.Env.Get()) + if err2 != nil { + abort() + return nil, fmt.Errorf("failed to parse mercury env file: %w", err2) + } + cmdFn, opts, err2 := cfg.RegisterLOOP(plugins.CmdConfig{ + ID: mercuryLggr.Name(), + Cmd: loopCmd, + Env: envVars, + }) + if err2 != nil { + abort() + return nil, fmt.Errorf("failed to register loop: %w", err2) + } + // in loopp mode, the factory is grpc server, and we need to handle the server lifecycle + factoryServer := loop.NewMercuryV1Service(lggr, opts, cmdFn, ocr2Provider, ds) + srvs = append(srvs, factoryServer) + // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle + factory = factoryServer + } else { + factory = relaymercuryv1.NewFactory(ds, lggr, ocr2Provider.OnchainConfigCodec(), ocr2Provider.ReportCodecV1()) + } case 2: ds := mercuryv2.NewDataSource( orm, @@ -92,12 +152,31 @@ func NewServices( *pluginConfig.LinkFeedID, *pluginConfig.NativeFeedID, ) - argsNoPlugin.MercuryPluginFactory = relaymercuryv2.NewFactory( - ds, - lggr, - ocr2Provider.OnchainConfigCodec(), - ocr2Provider.ReportCodecV2(), - ) + + if loopEnabled { + mercuryLggr := lggr.Named("MercuryV2").Named(feedID.String()) + envVars, err2 := plugins.ParseEnvFile(env.MercuryPlugin.Env.Get()) + if err2 != nil { + abort() + return nil, fmt.Errorf("failed to parse mercury env file: %w", err2) + } + cmdFn, opts, err2 := cfg.RegisterLOOP(plugins.CmdConfig{ + ID: mercuryLggr.Name(), + Cmd: loopCmd, + Env: envVars, + }) + if err2 != nil { + abort() + return nil, fmt.Errorf("failed to register loop: %w", err2) + } + // in loopp mode, the factory is grpc server, and we need to handle the server lifecycle + factoryServer := loop.NewMercuryV2Service(lggr, opts, cmdFn, ocr2Provider, ds) + srvs = append(srvs, factoryServer) + // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle + factory = factoryServer + } else { + factory = relaymercuryv2.NewFactory(ds, lggr, ocr2Provider.OnchainConfigCodec(), ocr2Provider.ReportCodecV2()) + } case 3: ds := mercuryv3.NewDataSource( orm, @@ -112,19 +191,41 @@ func NewServices( *pluginConfig.LinkFeedID, *pluginConfig.NativeFeedID, ) - argsNoPlugin.MercuryPluginFactory = relaymercuryv3.NewFactory( - ds, - lggr, - ocr2Provider.OnchainConfigCodec(), - ocr2Provider.ReportCodecV3(), - ) + + if loopEnabled { + mercuryLggr := lggr.Named("MercuryV3").Named(feedID.String()) + envVars, err2 := plugins.ParseEnvFile(env.MercuryPlugin.Env.Get()) + if err2 != nil { + abort() + return nil, fmt.Errorf("failed to parse mercury env file: %w", err2) + } + cmdFn, opts, err2 := cfg.RegisterLOOP(plugins.CmdConfig{ + ID: mercuryLggr.Name(), + Cmd: loopCmd, + Env: envVars, + }) + if err2 != nil { + abort() + return nil, fmt.Errorf("failed to register loop: %w", err2) + } + // in loopp mode, the factory is grpc server, and we need to handle the server lifecycle + factoryServer := loop.NewMercuryV3Service(lggr, opts, cmdFn, ocr2Provider, ds) + srvs = append(srvs, factoryServer) + // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle + factory = factoryServer + } else { + factory = relaymercuryv3.NewFactory(ds, lggr, ocr2Provider.OnchainConfigCodec(), ocr2Provider.ReportCodecV3()) + } + default: return nil, errors.Errorf("unknown Mercury report schema version: %d", feedID.Version()) } - + argsNoPlugin.MercuryPluginFactory = factory oracle, err := libocr2.NewOracle(argsNoPlugin) if err != nil { + abort() return nil, errors.WithStack(err) } - return []job.ServiceCtx{ocr2Provider, saver, job.NewServiceAdapter(oracle)}, nil + srvs = append(srvs, job.NewServiceAdapter(oracle)) + return srvs, nil }