diff --git a/GNUmakefile b/GNUmakefile index 549a7222a82..1a5fe5f24aa 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -62,6 +62,11 @@ chainlink-local-start: install-medianpoc: ## Build & install the chainlink-medianpoc binary. go install $(GOFLAGS) ./plugins/cmd/chainlink-medianpoc +.PHONY: install-mercury-loop +install-mercury-loop: ## Build & install the chainlink-medianpoc binary. + go install $(GOFLAGS) ./plugins/cmd/chainlink-mercury + + .PHONY: docker ## Build the chainlink docker image docker: docker buildx build \ diff --git a/core/config/env/env.go b/core/config/env/env.go index f22310a6cf8..0ebfc357bf3 100644 --- a/core/config/env/env.go +++ b/core/config/env/env.go @@ -25,6 +25,7 @@ var ( // LOOPP commands and vars var ( MedianPlugin = NewPlugin("median") + MercuryPlugin = NewPlugin("mercury") SolanaPlugin = NewPlugin("solana") StarknetPlugin = NewPlugin("starknet") // PrometheusDiscoveryHostName is the externally accessible hostname diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 0f8c04307f3..1cecd5699cf 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -19,7 +19,7 @@ require ( github.com/pelletier/go-toml/v2 v2.1.1 github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/chainlink-automation v1.0.2-0.20240118014648-1ab6a88c9429 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240208162729-5f42bd4a2f8b + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240209032254-f9b58810d8ca github.com/smartcontractkit/chainlink-vrf v0.0.0-20231120191722-fef03814f868 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 github.com/smartcontractkit/libocr v0.0.0-20240112202000-6359502d2ff1 diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 9d0bd7294c9..f9ae572c858 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1169,8 +1169,8 @@ github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 h1:T3lFWumv github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704/go.mod h1:2QuJdEouTWjh5BDy5o/vgGXQtR4Gz8yH1IYB5eT7u4M= github.com/smartcontractkit/chainlink-automation v1.0.2-0.20240118014648-1ab6a88c9429 h1:xkejUBZhcBpBrTSfxc91Iwzadrb6SXw8ks69bHIQ9Ww= github.com/smartcontractkit/chainlink-automation v1.0.2-0.20240118014648-1ab6a88c9429/go.mod h1:wJmVvDf4XSjsahWtfUq3wvIAYEAuhr7oxmxYnEL/LGQ= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240208162729-5f42bd4a2f8b h1:HQOWQqbmtanx+nqL4fUYxpvZZWyfCkE1gPqmDgF63HY= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240208162729-5f42bd4a2f8b/go.mod h1:Mw/HFKy1ljahnnDgVaeP6s6ZCGEV7XmB5gD3jrSjnjE= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240209032254-f9b58810d8ca h1:Vtu+x4788S9stmuioWtfyxCKro7dwnqJFy96IuMRB7k= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240209032254-f9b58810d8ca/go.mod h1:pRlQrvcizMmuHAUV4N96oO2e3XbA99JCQELLc6ES160= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240206150430-fbccaa95af62 h1:DuSQLuq+Ilm3Q+2zn5agLrAi9UvFQmOUdKwZQKX0AFA= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240206150430-fbccaa95af62/go.mod h1:Ny6kBD8Houh5yZRmGiB0ovsLHdb4qOHHwBno9JZUT+Y= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20231204152908-a6e3fe8ff2a1 h1:xYqRgZO0nMSO8CBCMR0r3WA+LZ4kNL8a6bnbyk/oBtQ= diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index c838316b1cc..b20f95d129f 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -727,7 +727,9 @@ func (d *Delegate) newServicesMercury( chEnhancedTelem := make(chan ocrcommon.EnhancedTelemetryMercuryData, 100) - mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, lggr, oracleArgsNoPlugin, d.cfg.JobPipeline(), chEnhancedTelem, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID)) + mCfg := mercury.NewMercuryConfig(d.cfg.JobPipeline().MaxSuccessfulRuns(), d.cfg.JobPipeline().ResultWriteQueueDepth(), d.cfg) + + mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, lggr, oracleArgsNoPlugin, mCfg, chEnhancedTelem, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID)) if ocrcommon.ShouldCollectEnhancedTelemetryMercury(jb) { enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, chEnhancedTelem, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.FeedID.String(), synchronization.EnhancedEAMercury), lggr.Named("EnhancedTelemetryMercury")) diff --git a/core/services/ocr2/plugins/mercury/plugin.go b/core/services/ocr2/plugins/mercury/plugin.go index b2767d6bcf5..c5eba78b0d8 100644 --- a/core/services/ocr2/plugins/mercury/plugin.go +++ b/core/services/ocr2/plugins/mercury/plugin.go @@ -2,17 +2,24 @@ package mercury import ( "encoding/json" + "fmt" + "os/exec" "github.com/pkg/errors" libocr2 "github.com/smartcontractkit/libocr/offchainreporting2plus" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" - commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" relaymercuryv1 "github.com/smartcontractkit/chainlink-data-streams/mercury/v1" relaymercuryv2 "github.com/smartcontractkit/chainlink-data-streams/mercury/v2" relaymercuryv3 "github.com/smartcontractkit/chainlink-data-streams/mercury/v3" + "github.com/smartcontractkit/chainlink-common/pkg/loop" + commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" + + "github.com/smartcontractkit/chainlink/v2/core/config/env" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/mercury/config" "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" @@ -22,11 +29,36 @@ import ( mercuryv1 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v1" mercuryv2 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v2" mercuryv3 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3" + "github.com/smartcontractkit/chainlink/v2/plugins" ) type Config interface { MaxSuccessfulRuns() uint64 ResultWriteQueueDepth() uint64 + plugins.RegistrarConfig +} + +// concrete implementation of MercuryConfig +type mercuryConfig struct { + jobPipelineMaxSuccessfulRuns uint64 + jobPipelineResultWriteQueueDepth uint64 + plugins.RegistrarConfig +} + +func NewMercuryConfig(jobPipelineMaxSuccessfulRuns uint64, jobPipelineResultWriteQueueDepth uint64, pluginProcessCfg plugins.RegistrarConfig) Config { + return &mercuryConfig{ + jobPipelineMaxSuccessfulRuns: jobPipelineMaxSuccessfulRuns, + jobPipelineResultWriteQueueDepth: jobPipelineResultWriteQueueDepth, + RegistrarConfig: pluginProcessCfg, + } +} + +func (m *mercuryConfig) MaxSuccessfulRuns() uint64 { + return m.jobPipelineMaxSuccessfulRuns +} + +func (m *mercuryConfig) ResultWriteQueueDepth() uint64 { + return m.jobPipelineResultWriteQueueDepth } func NewServices( @@ -55,76 +87,206 @@ func NewServices( } lggr = lggr.Named("MercuryPlugin").With("jobID", jb.ID, "jobName", jb.Name.ValueOrZero()) + // encapsulate all the subservices and ensure we close them all if any fail to start + srvs := []job.ServiceCtx{ocr2Provider} + abort := func() { + if cerr := services.MultiCloser(srvs).Close(); err != nil { + lggr.Errorw("Error closing unused services", "err", cerr) + } + } saver := ocrcommon.NewResultRunSaver(pipelineRunner, lggr, cfg.MaxSuccessfulRuns(), cfg.ResultWriteQueueDepth()) + srvs = append(srvs, saver) + // this is the factory that will be used to create the mercury plugin + var ( + factory ocr3types.MercuryPluginFactory + factoryServices []job.ServiceCtx + ) + fCfg := factoryCfg{ + orm: orm, + pipelineRunner: pipelineRunner, + jb: jb, + lggr: lggr, + saver: saver, + chEnhancedTelem: chEnhancedTelem, + ocr2Provider: ocr2Provider, + reportingPluginConfig: pluginConfig, + cfg: cfg, + feedID: feedID, + } switch feedID.Version() { case 1: - ds := mercuryv1.NewDataSource( - orm, - pipelineRunner, - jb, - *jb.PipelineSpec, - lggr, - saver, - chEnhancedTelem, - ocr2Provider.MercuryChainReader(), - ocr2Provider.MercuryServerFetcher(), - pluginConfig.InitialBlockNumber.Ptr(), - feedID, - ) - argsNoPlugin.MercuryPluginFactory = relaymercuryv1.NewFactory( - ds, - lggr, - ocr2Provider.OnchainConfigCodec(), - ocr2Provider.ReportCodecV1(), - ) + factory, factoryServices, err = newv1factory(fCfg) + if err != nil { + abort() + return nil, fmt.Errorf("failed to create mercury v1 factory: %w", err) + } + srvs = append(srvs, factoryServices...) case 2: - ds := mercuryv2.NewDataSource( - orm, - pipelineRunner, - jb, - *jb.PipelineSpec, - feedID, - lggr, - saver, - chEnhancedTelem, - ocr2Provider.MercuryServerFetcher(), - *pluginConfig.LinkFeedID, - *pluginConfig.NativeFeedID, - ) - argsNoPlugin.MercuryPluginFactory = relaymercuryv2.NewFactory( - ds, - lggr, - ocr2Provider.OnchainConfigCodec(), - ocr2Provider.ReportCodecV2(), - ) + factory, factoryServices, err = newv2factory(fCfg) + if err != nil { + abort() + return nil, fmt.Errorf("failed to create mercury v2 factory: %w", err) + } + srvs = append(srvs, factoryServices...) case 3: - ds := mercuryv3.NewDataSource( - orm, - pipelineRunner, - jb, - *jb.PipelineSpec, - feedID, - lggr, - saver, - chEnhancedTelem, - ocr2Provider.MercuryServerFetcher(), - *pluginConfig.LinkFeedID, - *pluginConfig.NativeFeedID, - ) - argsNoPlugin.MercuryPluginFactory = relaymercuryv3.NewFactory( - ds, - lggr, - ocr2Provider.OnchainConfigCodec(), - ocr2Provider.ReportCodecV3(), - ) + factory, factoryServices, err = newv3factory(fCfg) + if err != nil { + abort() + return nil, fmt.Errorf("failed to create mercury v3 factory: %w", err) + } + srvs = append(srvs, factoryServices...) default: return nil, errors.Errorf("unknown Mercury report schema version: %d", feedID.Version()) } - + argsNoPlugin.MercuryPluginFactory = factory oracle, err := libocr2.NewOracle(argsNoPlugin) if err != nil { + abort() return nil, errors.WithStack(err) } - return []job.ServiceCtx{ocr2Provider, saver, job.NewServiceAdapter(oracle)}, nil + srvs = append(srvs, job.NewServiceAdapter(oracle)) + return srvs, nil +} + +type factoryCfg struct { + orm types.DataSourceORM + pipelineRunner pipeline.Runner + jb job.Job + lggr logger.Logger + saver *ocrcommon.RunResultSaver + chEnhancedTelem chan ocrcommon.EnhancedTelemetryMercuryData + ocr2Provider commontypes.MercuryProvider + reportingPluginConfig config.PluginConfig + cfg Config + feedID utils.FeedID +} + +func newv3factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.ServiceCtx, error) { + var factory ocr3types.MercuryPluginFactory + srvs := make([]job.ServiceCtx, 0) + + ds := mercuryv3.NewDataSource( + factoryCfg.orm, + factoryCfg.pipelineRunner, + factoryCfg.jb, + *factoryCfg.jb.PipelineSpec, + factoryCfg.feedID, + factoryCfg.lggr, + factoryCfg.saver, + factoryCfg.chEnhancedTelem, + factoryCfg.ocr2Provider.MercuryServerFetcher(), + *factoryCfg.reportingPluginConfig.LinkFeedID, + *factoryCfg.reportingPluginConfig.NativeFeedID, + ) + + loopCmd := env.MercuryPlugin.Cmd.Get() + loopEnabled := loopCmd != "" + + if loopEnabled { + cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) + if err != nil { + return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err) + } + // in loopp mode, the factory is grpc server, and we need to handle the server lifecycle + factoryServer := loop.NewMercuryV3Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds) + srvs = append(srvs, factoryServer) + // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle + factory = factoryServer + } else { + factory = relaymercuryv3.NewFactory(ds, factoryCfg.lggr, factoryCfg.ocr2Provider.OnchainConfigCodec(), factoryCfg.ocr2Provider.ReportCodecV3()) + } + return factory, srvs, nil +} + +func newv2factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.ServiceCtx, error) { + var factory ocr3types.MercuryPluginFactory + srvs := make([]job.ServiceCtx, 0) + + ds := mercuryv2.NewDataSource( + factoryCfg.orm, + factoryCfg.pipelineRunner, + factoryCfg.jb, + *factoryCfg.jb.PipelineSpec, + factoryCfg.feedID, + factoryCfg.lggr, + factoryCfg.saver, + factoryCfg.chEnhancedTelem, + factoryCfg.ocr2Provider.MercuryServerFetcher(), + *factoryCfg.reportingPluginConfig.LinkFeedID, + *factoryCfg.reportingPluginConfig.NativeFeedID, + ) + + loopCmd := env.MercuryPlugin.Cmd.Get() + loopEnabled := loopCmd != "" + + if loopEnabled { + cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) + if err != nil { + return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err) + } + // in loopp mode, the factory is grpc server, and we need to handle the server lifecycle + factoryServer := loop.NewMercuryV2Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds) + srvs = append(srvs, factoryServer) + // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle + factory = factoryServer + } else { + factory = relaymercuryv2.NewFactory(ds, factoryCfg.lggr, factoryCfg.ocr2Provider.OnchainConfigCodec(), factoryCfg.ocr2Provider.ReportCodecV2()) + } + return factory, srvs, nil +} + +func newv1factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.ServiceCtx, error) { + var factory ocr3types.MercuryPluginFactory + srvs := make([]job.ServiceCtx, 0) + + ds := mercuryv1.NewDataSource( + factoryCfg.orm, + factoryCfg.pipelineRunner, + factoryCfg.jb, + *factoryCfg.jb.PipelineSpec, + factoryCfg.lggr, + factoryCfg.saver, + factoryCfg.chEnhancedTelem, + factoryCfg.ocr2Provider.MercuryChainReader(), + factoryCfg.ocr2Provider.MercuryServerFetcher(), + factoryCfg.reportingPluginConfig.InitialBlockNumber.Ptr(), + factoryCfg.feedID, + ) + + loopCmd := env.MercuryPlugin.Cmd.Get() + loopEnabled := loopCmd != "" + + if loopEnabled { + cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) + if err != nil { + return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err) + } + // in loopp mode, the factory is grpc server, and we need to handle the server lifecycle + factoryServer := loop.NewMercuryV1Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds) + srvs = append(srvs, factoryServer) + // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle + factory = factoryServer + } else { + factory = relaymercuryv1.NewFactory(ds, factoryCfg.lggr, factoryCfg.ocr2Provider.OnchainConfigCodec(), factoryCfg.ocr2Provider.ReportCodecV1()) + } + return factory, srvs, nil +} + +func initLoop(cmd string, cfg Config, feedID utils.FeedID, lggr logger.Logger) (func() *exec.Cmd, loop.GRPCOpts, logger.Logger, error) { + lggr.Debugw("Initializing Mercury loop", "command", cmd) + mercuryLggr := lggr.Named(fmt.Sprintf("MercuryV%d", feedID.Version())).Named(feedID.String()) + envVars, err := plugins.ParseEnvFile(env.MercuryPlugin.Env.Get()) + if err != nil { + return nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to parse mercury env file: %w", err) + } + cmdFn, opts, err := cfg.RegisterLOOP(plugins.CmdConfig{ + ID: mercuryLggr.Name(), + Cmd: cmd, + Env: envVars, + }) + if err != nil { + return nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to register loop: %w", err) + } + return cmdFn, opts, mercuryLggr, nil } diff --git a/core/services/ocr2/plugins/mercury/plugin_test.go b/core/services/ocr2/plugins/mercury/plugin_test.go new file mode 100644 index 00000000000..4e6d4d82a7e --- /dev/null +++ b/core/services/ocr2/plugins/mercury/plugin_test.go @@ -0,0 +1,284 @@ +package mercury_test + +import ( + "context" + "os/exec" + "reflect" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + + "github.com/smartcontractkit/chainlink/v2/core/config/env" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" + + "github.com/smartcontractkit/chainlink-common/pkg/loop" + commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/mercury" + v1 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v1" + v2 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v2" + v3 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3" + + mercuryocr2 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/mercury" + + libocr2 "github.com/smartcontractkit/libocr/offchainreporting2plus" + libocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + + "github.com/smartcontractkit/chainlink/v2/core/services/pg" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" + "github.com/smartcontractkit/chainlink/v2/core/services/relay" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/types" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils" + "github.com/smartcontractkit/chainlink/v2/plugins" +) + +var ( + v1FeedId = [32]uint8{00, 01, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114} + v2FeedId = [32]uint8{00, 02, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114} + v3FeedId = [32]uint8{00, 03, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114} + + testArgsNoPlugin = libocr2.MercuryOracleArgs{ + LocalConfig: libocr2types.LocalConfig{ + DevelopmentMode: libocr2types.EnableDangerousDevelopmentMode, + }, + } + + testCfg = mercuryocr2.NewMercuryConfig(1, 1, &testRegistrarConfig{}) + + v1jsonCfg = job.JSONConfig{ + "serverURL": "example.com:80", + "serverPubKey": "724ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93", + "initialBlockNumber": 1234, + } + + v2jsonCfg = job.JSONConfig{ + "serverURL": "example.com:80", + "serverPubKey": "724ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93", + "linkFeedID": "0x00026b4aa7e57ca7b68ae1bf45653f56b656fd3aa335ef7fae696b663f1b8472", + "nativeFeedID": "0x00036b4aa7e57ca7b68ae1bf45653f56b656fd3aa335ef7fae696b663f1b8472", + } + + v3jsonCfg = job.JSONConfig{ + "serverURL": "example.com:80", + "serverPubKey": "724ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93", + "linkFeedID": "0x00026b4aa7e57ca7b68ae1bf45653f56b656fd3aa335ef7fae696b663f1b8472", + "nativeFeedID": "0x00036b4aa7e57ca7b68ae1bf45653f56b656fd3aa335ef7fae696b663f1b8472", + } + + testJob = job.Job{ + ID: 1, + ExternalJobID: uuid.Must(uuid.NewRandom()), + OCR2OracleSpecID: ptr(int32(7)), + OCR2OracleSpec: &job.OCR2OracleSpec{ + ID: 7, + ContractID: "phony", + FeedID: ptr(common.BytesToHash([]byte{1, 2, 3})), + Relay: relay.EVM, + ChainID: "1", + }, + PipelineSpec: &pipeline.Spec{}, + PipelineSpecID: int32(1), + } + + // this is kind of gross, but it's the best way to test return values of the services + expectedEmbeddedServiceCnt = 3 + expectedLoopServiceCnt = expectedEmbeddedServiceCnt + 1 +) + +func TestNewServices(t *testing.T) { + type args struct { + pluginConfig job.JSONConfig + feedID utils.FeedID + } + tests := []struct { + name string + args args + loopMode bool + wantLoopFactory any + wantServiceCnt int + wantErr bool + }{ + { + name: "no plugin config error ", + args: args{ + feedID: v1FeedId, + }, + wantServiceCnt: 0, + wantErr: true, + }, + + { + name: "v1 legacy", + args: args{ + pluginConfig: v1jsonCfg, + feedID: v1FeedId, + }, + wantServiceCnt: expectedEmbeddedServiceCnt, + wantErr: false, + }, + { + name: "v2 legacy", + args: args{ + pluginConfig: v2jsonCfg, + feedID: v2FeedId, + }, + wantServiceCnt: expectedEmbeddedServiceCnt, + wantErr: false, + }, + { + name: "v3 legacy", + args: args{ + pluginConfig: v3jsonCfg, + feedID: v3FeedId, + }, + wantServiceCnt: expectedEmbeddedServiceCnt, + wantErr: false, + }, + { + name: "v1 loop", + loopMode: true, + args: args{ + pluginConfig: v1jsonCfg, + feedID: v1FeedId, + }, + wantServiceCnt: expectedLoopServiceCnt, + wantErr: false, + wantLoopFactory: &loop.MercuryV1Service{}, + }, + { + name: "v2 loop", + loopMode: true, + args: args{ + pluginConfig: v2jsonCfg, + feedID: v2FeedId, + }, + wantServiceCnt: expectedLoopServiceCnt, + wantErr: false, + wantLoopFactory: &loop.MercuryV2Service{}, + }, + { + name: "v3 loop", + loopMode: true, + args: args{ + pluginConfig: v3jsonCfg, + feedID: v3FeedId, + }, + wantServiceCnt: expectedLoopServiceCnt, + wantErr: false, + wantLoopFactory: &loop.MercuryV3Service{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.loopMode { + t.Setenv(string(env.MercuryPlugin.Cmd), "fake_cmd") + assert.NotEmpty(t, env.MercuryPlugin.Cmd.Get()) + } + got, err := newServicesTestWrapper(t, tt.args.pluginConfig, tt.args.feedID) + if (err != nil) != tt.wantErr { + t.Errorf("NewServices() error = %v, wantErr %v", err, tt.wantErr) + return + } + assert.Len(t, got, tt.wantServiceCnt) + if tt.loopMode { + foundLoopFactory := false + for i := 0; i < len(got); i++ { + if reflect.TypeOf(got[i]) == reflect.TypeOf(tt.wantLoopFactory) { + foundLoopFactory = true + break + } + } + assert.True(t, foundLoopFactory) + } + }) + } +} + +// we are only varying the version via feedID (and the plugin config) +// this wrapper supplies dummy values for the rest of the arguments +func newServicesTestWrapper(t *testing.T, pluginConfig job.JSONConfig, feedID utils.FeedID) ([]job.ServiceCtx, error) { + t.Helper() + jb := testJob + jb.OCR2OracleSpec.PluginConfig = pluginConfig + return mercuryocr2.NewServices(jb, &testProvider{}, nil, logger.TestLogger(t), testArgsNoPlugin, testCfg, nil, &testDataSourceORM{}, feedID) +} + +type testProvider struct{} + +// ChainReader implements types.MercuryProvider. +func (*testProvider) ChainReader() commontypes.ChainReader { panic("unimplemented") } + +// Close implements types.MercuryProvider. +func (*testProvider) Close() error { return nil } + +// Codec implements types.MercuryProvider. +func (*testProvider) Codec() commontypes.Codec { panic("unimplemented") } + +// ContractConfigTracker implements types.MercuryProvider. +func (*testProvider) ContractConfigTracker() libocr2types.ContractConfigTracker { + panic("unimplemented") +} + +// ContractTransmitter implements types.MercuryProvider. +func (*testProvider) ContractTransmitter() libocr2types.ContractTransmitter { + panic("unimplemented") +} + +// HealthReport implements types.MercuryProvider. +func (*testProvider) HealthReport() map[string]error { panic("unimplemented") } + +// MercuryChainReader implements types.MercuryProvider. +func (*testProvider) MercuryChainReader() mercury.ChainReader { return nil } + +// MercuryServerFetcher implements types.MercuryProvider. +func (*testProvider) MercuryServerFetcher() mercury.ServerFetcher { return nil } + +// Name implements types.MercuryProvider. +func (*testProvider) Name() string { panic("unimplemented") } + +// OffchainConfigDigester implements types.MercuryProvider. +func (*testProvider) OffchainConfigDigester() libocr2types.OffchainConfigDigester { + panic("unimplemented") +} + +// OnchainConfigCodec implements types.MercuryProvider. +func (*testProvider) OnchainConfigCodec() mercury.OnchainConfigCodec { + return nil +} + +// Ready implements types.MercuryProvider. +func (*testProvider) Ready() error { panic("unimplemented") } + +// ReportCodecV1 implements types.MercuryProvider. +func (*testProvider) ReportCodecV1() v1.ReportCodec { return nil } + +// ReportCodecV2 implements types.MercuryProvider. +func (*testProvider) ReportCodecV2() v2.ReportCodec { return nil } + +// ReportCodecV3 implements types.MercuryProvider. +func (*testProvider) ReportCodecV3() v3.ReportCodec { return nil } + +// Start implements types.MercuryProvider. +func (*testProvider) Start(context.Context) error { panic("unimplemented") } + +var _ commontypes.MercuryProvider = (*testProvider)(nil) + +type testRegistrarConfig struct{} + +// RegisterLOOP implements plugins.RegistrarConfig. +func (*testRegistrarConfig) RegisterLOOP(config plugins.CmdConfig) (func() *exec.Cmd, loop.GRPCOpts, error) { + return nil, loop.GRPCOpts{}, nil +} + +var _ plugins.RegistrarConfig = (*testRegistrarConfig)(nil) + +type testDataSourceORM struct{} + +// LatestReport implements types.DataSourceORM. +func (*testDataSourceORM) LatestReport(ctx context.Context, feedID [32]byte, qopts ...pg.QOpt) (report []byte, err error) { + return []byte{1, 2, 3}, nil +} + +var _ types.DataSourceORM = (*testDataSourceORM)(nil) diff --git a/go.mod b/go.mod index 7b4c824bbd0..5a85983fb48 100644 --- a/go.mod +++ b/go.mod @@ -66,7 +66,7 @@ require ( github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 github.com/smartcontractkit/chainlink-automation v1.0.2-0.20240118014648-1ab6a88c9429 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240208162729-5f42bd4a2f8b + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240209032254-f9b58810d8ca github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240206150430-fbccaa95af62 github.com/smartcontractkit/chainlink-data-streams v0.0.0-20231204152908-a6e3fe8ff2a1 github.com/smartcontractkit/chainlink-feeds v0.0.0-20240119021347-3c541a78cdb8 diff --git a/go.sum b/go.sum index 2dacaa51c1c..0610b422deb 100644 --- a/go.sum +++ b/go.sum @@ -1164,8 +1164,8 @@ github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 h1:T3lFWumv github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704/go.mod h1:2QuJdEouTWjh5BDy5o/vgGXQtR4Gz8yH1IYB5eT7u4M= github.com/smartcontractkit/chainlink-automation v1.0.2-0.20240118014648-1ab6a88c9429 h1:xkejUBZhcBpBrTSfxc91Iwzadrb6SXw8ks69bHIQ9Ww= github.com/smartcontractkit/chainlink-automation v1.0.2-0.20240118014648-1ab6a88c9429/go.mod h1:wJmVvDf4XSjsahWtfUq3wvIAYEAuhr7oxmxYnEL/LGQ= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240208162729-5f42bd4a2f8b h1:HQOWQqbmtanx+nqL4fUYxpvZZWyfCkE1gPqmDgF63HY= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240208162729-5f42bd4a2f8b/go.mod h1:Mw/HFKy1ljahnnDgVaeP6s6ZCGEV7XmB5gD3jrSjnjE= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240209032254-f9b58810d8ca h1:Vtu+x4788S9stmuioWtfyxCKro7dwnqJFy96IuMRB7k= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240209032254-f9b58810d8ca/go.mod h1:pRlQrvcizMmuHAUV4N96oO2e3XbA99JCQELLc6ES160= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240206150430-fbccaa95af62 h1:DuSQLuq+Ilm3Q+2zn5agLrAi9UvFQmOUdKwZQKX0AFA= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240206150430-fbccaa95af62/go.mod h1:Ny6kBD8Houh5yZRmGiB0ovsLHdb4qOHHwBno9JZUT+Y= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20231204152908-a6e3fe8ff2a1 h1:xYqRgZO0nMSO8CBCMR0r3WA+LZ4kNL8a6bnbyk/oBtQ= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index f737f8681c5..726e70c260f 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -24,7 +24,7 @@ require ( github.com/segmentio/ksuid v1.0.4 github.com/slack-go/slack v0.12.2 github.com/smartcontractkit/chainlink-automation v1.0.2-0.20240118014648-1ab6a88c9429 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240208162729-5f42bd4a2f8b + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240209032254-f9b58810d8ca github.com/smartcontractkit/chainlink-testing-framework v1.23.2 github.com/smartcontractkit/chainlink-vrf v0.0.0-20231120191722-fef03814f868 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 diff --git a/integration-tests/go.sum b/integration-tests/go.sum index 235002b35a2..16aa3f2014d 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1501,8 +1501,8 @@ github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 h1:T3lFWumv github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704/go.mod h1:2QuJdEouTWjh5BDy5o/vgGXQtR4Gz8yH1IYB5eT7u4M= github.com/smartcontractkit/chainlink-automation v1.0.2-0.20240118014648-1ab6a88c9429 h1:xkejUBZhcBpBrTSfxc91Iwzadrb6SXw8ks69bHIQ9Ww= github.com/smartcontractkit/chainlink-automation v1.0.2-0.20240118014648-1ab6a88c9429/go.mod h1:wJmVvDf4XSjsahWtfUq3wvIAYEAuhr7oxmxYnEL/LGQ= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240208162729-5f42bd4a2f8b h1:HQOWQqbmtanx+nqL4fUYxpvZZWyfCkE1gPqmDgF63HY= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240208162729-5f42bd4a2f8b/go.mod h1:Mw/HFKy1ljahnnDgVaeP6s6ZCGEV7XmB5gD3jrSjnjE= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240209032254-f9b58810d8ca h1:Vtu+x4788S9stmuioWtfyxCKro7dwnqJFy96IuMRB7k= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240209032254-f9b58810d8ca/go.mod h1:pRlQrvcizMmuHAUV4N96oO2e3XbA99JCQELLc6ES160= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240206150430-fbccaa95af62 h1:DuSQLuq+Ilm3Q+2zn5agLrAi9UvFQmOUdKwZQKX0AFA= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240206150430-fbccaa95af62/go.mod h1:Ny6kBD8Houh5yZRmGiB0ovsLHdb4qOHHwBno9JZUT+Y= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20231204152908-a6e3fe8ff2a1 h1:xYqRgZO0nMSO8CBCMR0r3WA+LZ4kNL8a6bnbyk/oBtQ= diff --git a/plugins/chainlink.Dockerfile b/plugins/chainlink.Dockerfile index 2f3e46e9ce1..cdaa2876d79 100644 --- a/plugins/chainlink.Dockerfile +++ b/plugins/chainlink.Dockerfile @@ -20,6 +20,9 @@ RUN make install-chainlink # Install medianpoc binary RUN make install-medianpoc +# Install the mercury binary +RUN make install-mercury-loop + # Link LOOP Plugin source dirs with simple names RUN go list -m -f "{{.Dir}}" github.com/smartcontractkit/chainlink-feeds | xargs -I % ln -s % /chainlink-feeds RUN go list -m -f "{{.Dir}}" github.com/smartcontractkit/chainlink-solana | xargs -I % ln -s % /chainlink-solana diff --git a/plugins/cmd/chainlink-mercury/README.md b/plugins/cmd/chainlink-mercury/README.md new file mode 100644 index 00000000000..89775cfe911 --- /dev/null +++ b/plugins/cmd/chainlink-mercury/README.md @@ -0,0 +1,19 @@ +This directory houses the Mercury LOOPP + +# Running Integration Tests Locally + +Running the tests is as simple as +- building this binary +- setting the CL_MERCURY_CMD env var to the *fully resolved* binary path +- running the test(s) + + +The interesting tests are `TestIntegration_MercuryV*` in ` github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/mercury` + +In detail: +``` +sh + +make install-mercury-loop # builds `mercury` binary in this dir +CL_MERCURY_CMD=/plugins/cmd/mercury/mercury go test -v -timeout 120s -run ^TestIntegration_MercuryV github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/mercury 2>&1 | tee /tmp/mercury_loop.log +``` \ No newline at end of file diff --git a/plugins/cmd/chainlink-mercury/main.go b/plugins/cmd/chainlink-mercury/main.go new file mode 100644 index 00000000000..d80aa8ef41c --- /dev/null +++ b/plugins/cmd/chainlink-mercury/main.go @@ -0,0 +1,39 @@ +package main + +import ( + "github.com/hashicorp/go-plugin" + + "github.com/smartcontractkit/chainlink-common/pkg/loop" +) + +const ( + loggerName = "PluginMercury" +) + +func main() { + s := loop.MustNewStartedServer(loggerName) + defer s.Stop() + + p := NewPlugin(s.Logger) + defer s.Logger.ErrorIfFn(p.Close, "Failed to close") + + s.MustRegister(p) + + stop := make(chan struct{}) + defer close(stop) + + plugin.Serve(&plugin.ServeConfig{ + HandshakeConfig: loop.PluginMercuryHandshakeConfig(), + Plugins: map[string]plugin.Plugin{ + loop.PluginMercuryName: &loop.GRPCPluginMercury{ + PluginServer: p, + BrokerConfig: loop.BrokerConfig{ + StopCh: stop, + Logger: s.Logger, + GRPCOpts: s.GRPCOpts, + }, + }, + }, + GRPCServer: s.GRPCOpts.NewServer, + }) +} diff --git a/plugins/cmd/chainlink-mercury/plugin.go b/plugins/cmd/chainlink-mercury/plugin.go new file mode 100644 index 00000000000..07531662ba9 --- /dev/null +++ b/plugins/cmd/chainlink-mercury/plugin.go @@ -0,0 +1,89 @@ +package main + +import ( + "context" + + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/loop" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/types" + v1 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v1" + v2 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v2" + v3 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3" + ds_v1 "github.com/smartcontractkit/chainlink-data-streams/mercury/v1" + ds_v2 "github.com/smartcontractkit/chainlink-data-streams/mercury/v2" + ds_v3 "github.com/smartcontractkit/chainlink-data-streams/mercury/v3" +) + +type Plugin struct { + loop.Plugin + stop services.StopChan +} + +func NewPlugin(lggr logger.Logger) *Plugin { + return &Plugin{Plugin: loop.Plugin{Logger: lggr}, stop: make(services.StopChan)} +} + +func (p *Plugin) NewMercuryV1Factory(ctx context.Context, provider types.MercuryProvider, dataSource v1.DataSource) (types.MercuryPluginFactory, error) { + var ctxVals loop.ContextValues + ctxVals.SetValues(ctx) + lggr := logger.With(p.Logger, ctxVals.Args()...) + + factory := ds_v1.NewFactory(dataSource, lggr, provider.OnchainConfigCodec(), provider.ReportCodecV1()) + + s := &mercuryPluginFactoryService{lggr: logger.Named(lggr, "MercuryV1PluginFactory"), MercuryPluginFactory: factory} + + p.SubService(s) + + return s, nil +} + +func (p *Plugin) NewMercuryV2Factory(ctx context.Context, provider types.MercuryProvider, dataSource v2.DataSource) (types.MercuryPluginFactory, error) { + var ctxVals loop.ContextValues + ctxVals.SetValues(ctx) + lggr := logger.With(p.Logger, ctxVals.Args()...) + + factory := ds_v2.NewFactory(dataSource, lggr, provider.OnchainConfigCodec(), provider.ReportCodecV2()) + + s := &mercuryPluginFactoryService{lggr: logger.Named(lggr, "MercuryV2PluginFactory"), MercuryPluginFactory: factory} + + p.SubService(s) + + return s, nil +} + +func (p *Plugin) NewMercuryV3Factory(ctx context.Context, provider types.MercuryProvider, dataSource v3.DataSource) (types.MercuryPluginFactory, error) { + var ctxVals loop.ContextValues + ctxVals.SetValues(ctx) + lggr := logger.With(p.Logger, ctxVals.Args()...) + + factory := ds_v3.NewFactory(dataSource, lggr, provider.OnchainConfigCodec(), provider.ReportCodecV3()) + + s := &mercuryPluginFactoryService{lggr: logger.Named(lggr, "MercuryV3PluginFactory"), MercuryPluginFactory: factory} + + p.SubService(s) + + return s, nil +} + +type mercuryPluginFactoryService struct { + services.StateMachine + lggr logger.Logger + ocr3types.MercuryPluginFactory +} + +func (r *mercuryPluginFactoryService) Name() string { return r.lggr.Name() } + +func (r *mercuryPluginFactoryService) Start(ctx context.Context) error { + return r.StartOnce("ReportingPluginFactory", func() error { return nil }) +} + +func (r *mercuryPluginFactoryService) Close() error { + return r.StopOnce("ReportingPluginFactory", func() error { return nil }) +} + +func (r *mercuryPluginFactoryService) HealthReport() map[string]error { + return map[string]error{r.Name(): r.Healthy()} +} diff --git a/sonar-project.properties b/sonar-project.properties index db2a6be1f3b..546f68abb32 100644 --- a/sonar-project.properties +++ b/sonar-project.properties @@ -6,8 +6,8 @@ sonar.python.version=3.8 sonar.exclusions=**/node_modules/**/*,**/mocks/**/*, **/testdata/**/*, **/contracts/typechain/**/*, **/contracts/artifacts/**/*, **/contracts/cache/**/*, **/contracts/scripts/**/*, **/generated/**/*, **/fixtures/**/*, **/docs/**/*, **/tools/**/*, **/*.pb.go, **/*report.xml, **/*.config.ts, **/*.txt, **/*.abi, **/*.bin, **/*_codecgen.go, core/services/relay/evm/types/*_gen.go, core/services/relay/evm/types/gen/main.go, core/services/relay/evm/testfiles/*, **/core/web/assets**, core/scripts/chaincli/handler/debug.go # Coverage exclusions sonar.coverage.exclusions=**/*.test.ts, **/*_test.go, **/contracts/test/**/*, **/contracts/**/tests/**/*, **/core/**/testutils/**/*, **/core/**/mocks/**/*, **/core/**/cltest/**/*, **/integration-tests/**/*, **/generated/**/*, **/core/scripts**/* , **/*.pb.go, ./plugins/**/*, **/main.go, **/0195_add_not_null_to_evm_chain_id_in_job_specs.go, **/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go -# Duplication exclusions -sonar.cpd.exclusions=**/contracts/**/*.sol, **/config.go, /core/services/ocr2/plugins/ocr2keeper/evm*/*,**/integration-tests/testconfig/**/* +# Duplication exclusions: mercury excluded because current MercuryProvider and Factory APIs are inherently duplicated due to embedded versioning +sonar.cpd.exclusions=**/contracts/**/*.sol, **/config.go, /core/services/ocr2/plugins/ocr2keeper/evm*/*, **/integration-tests/testconfig/**/*, /core/services/ocr2/plugins/mercury/plugin.go # Tests' root folder, inclusions (tests to check and count) and exclusions sonar.tests=.