Skip to content

Commit

Permalink
Remove Mercury plugin dependency on raw evm chain. (#11201)
Browse files Browse the repository at this point in the history
* mercury: remove dead code

* mercury: Add ChainReader

* mercury: services init

* fix sqlx import

* mercury: error handling

* mod tidy

* mercury: log error from reader

* mercury: ensure a failed observation when reading from chain return an error

* mercury: add test for setLatestBlocks error

* make a happy linter

* Update core/services/relay/evm/mercury_provider.go

Co-authored-by: Sam <samsondav@protonmail.com>

---------

Co-authored-by: Sam <samsondav@protonmail.com>
  • Loading branch information
brunotm and samsondav authored Nov 13, 2023
1 parent de50273 commit 1206283
Show file tree
Hide file tree
Showing 14 changed files with 113 additions and 133 deletions.
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ require (
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 // indirect
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20231109141932-cb1ea9020255 // indirect
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108205920-694ce17a4a78 // indirect
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108215906-8bbaf383b742 // indirect
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 // indirect
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb // indirect
github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20230906073235-9e478e5e19f1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1464,8 +1464,8 @@ github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 h1:T3lFWumv
github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704/go.mod h1:2QuJdEouTWjh5BDy5o/vgGXQtR4Gz8yH1IYB5eT7u4M=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20231109141932-cb1ea9020255 h1:Pt6c7bJU9wIN6PQQnmN8UmYYH6lpfiQ6U/B8yEC2s5s=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20231109141932-cb1ea9020255/go.mod h1:EHppaccd/LTlTMI2o4dmBHe4BknEgEFFDjDGMNuGb3k=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108205920-694ce17a4a78 h1:ZBsxdB/5iIpl/tWhXe/RHrOwBG7pbKOMeppy5Zt2BVc=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108205920-694ce17a4a78/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108215906-8bbaf383b742 h1:28XkPE6YfJ4uabTX9/7sueRV6IKtY4hcm1nIt1e6b20=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108215906-8bbaf383b742/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 h1:DaPSVnxe7oz1QJ+AVIhQWs1W3ubQvwvGo9NbHpMs1OQ=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05/go.mod h1:o0Pn1pbaUluboaK6/yhf8xf7TiFCkyFl6WUOdwqamuU=
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb h1:HiluOfEVGOQTM6BTDImOqYdMZZ7qq7fkZ3TJdmItNr8=
Expand Down
6 changes: 1 addition & 5 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,10 +653,6 @@ func (d *Delegate) newServicesMercury(
if err != nil {
return nil, ErrRelayNotEnabled{Err: err, Relay: spec.Relay, PluginName: "mercury"}
}
chain, err := d.legacyChains.Get(rid.ChainID)
if err != nil {
return nil, fmt.Errorf("mercury services: failed to get chain %s: %w", rid.ChainID, err)
}

provider, err2 := relayer.NewPluginProvider(ctx,
types.RelayArgs{
Expand Down Expand Up @@ -695,7 +691,7 @@ func (d *Delegate) newServicesMercury(

chEnhancedTelem := make(chan ocrcommon.EnhancedTelemetryMercuryData, 100)

mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, runResults, lggr, oracleArgsNoPlugin, d.cfg.JobPipeline(), chEnhancedTelem, chain, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID))
mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, runResults, lggr, oracleArgsNoPlugin, d.cfg.JobPipeline(), chEnhancedTelem, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID))

if ocrcommon.ShouldCollectEnhancedTelemetryMercury(jb) {
enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, chEnhancedTelem, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.FeedID.String(), synchronization.EnhancedEAMercury), lggr.Named("EnhancedTelemetryMercury"))
Expand Down
3 changes: 1 addition & 2 deletions core/services/ocr2/plugins/mercury/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ func NewServices(
argsNoPlugin libocr2.MercuryOracleArgs,
cfg Config,
chEnhancedTelem chan ocrcommon.EnhancedTelemetryMercuryData,
chainHeadTracker types.ChainHeadTracker,
orm types.DataSourceORM,
feedID utils.FeedID,
) ([]job.ServiceCtx, error) {
Expand Down Expand Up @@ -66,7 +65,7 @@ func NewServices(
lggr,
runResults,
chEnhancedTelem,
chainHeadTracker,
ocr2Provider.ChainReader(),
ocr2Provider.MercuryServerFetcher(),
pluginConfig.InitialBlockNumber.Ptr(),
feedID,
Expand Down
5 changes: 3 additions & 2 deletions core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (

"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/jmoiron/sqlx"
pkgerrors "github.com/pkg/errors"
"go.uber.org/multierr"

"github.com/jmoiron/sqlx"
"github.com/smartcontractkit/libocr/gethwrappers2/ocr2aggregator"
"github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median"
"github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median/evmreportcodec"
Expand Down Expand Up @@ -189,7 +189,8 @@ func (r *Relayer) NewMercuryProvider(rargs relaytypes.RelayArgs, pargs relaytype
}
transmitter := mercury.NewTransmitter(lggr, cw.ContractConfigTracker(), client, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.db, r.pgCfg, transmitterCodec)

return NewMercuryProvider(cw, transmitter, reportCodecV1, reportCodecV2, reportCodecV3, lggr), nil
chainReader := NewChainReader(r.chain.HeadTracker())
return NewMercuryProvider(cw, transmitter, reportCodecV1, reportCodecV2, reportCodecV3, chainReader, lggr), nil
}

func (r *Relayer) NewFunctionsProvider(rargs relaytypes.RelayArgs, pargs relaytypes.PluginArgs) (relaytypes.FunctionsProvider, error) {
Expand Down
47 changes: 0 additions & 47 deletions core/services/relay/evm/mercury/mocks/chain_head_tracker.go

This file was deleted.

6 changes: 0 additions & 6 deletions core/services/relay/evm/mercury/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,9 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

httypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

//go:generate mockery --quiet --name ChainHeadTracker --output ../mocks/ --case=underscore
type ChainHeadTracker interface {
HeadTracker() httypes.HeadTracker
}

type DataSourceORM interface {
LatestReport(ctx context.Context, feedID [32]byte, qopts ...pg.QOpt) (report []byte, err error)
}
Expand Down
53 changes: 26 additions & 27 deletions core/services/relay/evm/mercury/v1/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
relaymercury "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury"
relaymercuryv1 "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury/v1"

evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon"
Expand Down Expand Up @@ -67,7 +66,7 @@ type datasource struct {
mu sync.RWMutex

chEnhancedTelem chan<- ocrcommon.EnhancedTelemetryMercuryData
chainHeadTracker types.ChainHeadTracker
chainReader relaymercury.ChainReader
fetcher Fetcher
initialBlockNumber *int64

Expand All @@ -77,8 +76,8 @@ type datasource struct {

var _ relaymercuryv1.DataSource = &datasource{}

func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr logger.Logger, rr chan *pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, chainHeadTracker types.ChainHeadTracker, fetcher Fetcher, initialBlockNumber *int64, feedID mercuryutils.FeedID) *datasource {
return &datasource{pr, jb, spec, lggr, rr, orm, reportcodec.ReportCodec{}, feedID, sync.RWMutex{}, enhancedTelemChan, chainHeadTracker, fetcher, initialBlockNumber, insufficientBlocksCount.WithLabelValues(feedID.String()), zeroBlocksCount.WithLabelValues(feedID.String())}
func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr logger.Logger, rr chan *pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, chainReader relaymercury.ChainReader, fetcher Fetcher, initialBlockNumber *int64, feedID mercuryutils.FeedID) *datasource {
return &datasource{pr, jb, spec, lggr, rr, orm, reportcodec.ReportCodec{}, feedID, sync.RWMutex{}, enhancedTelemChan, chainReader, fetcher, initialBlockNumber, insufficientBlocksCount.WithLabelValues(feedID.String()), zeroBlocksCount.WithLabelValues(feedID.String())}
}

type ErrEmptyLatestReport struct {
Expand All @@ -94,7 +93,11 @@ func (e ErrEmptyLatestReport) Error() string {
func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestamp, fetchMaxFinalizedBlockNum bool) (obs relaymercuryv1.Observation, pipelineExecutionErr error) {
// setLatestBlocks must come chronologically before observations, along
// with observationTimestamp, to avoid front-running
ds.setLatestBlocks(ctx, &obs)

// Errors are not expected when reading from the underlying ChainReader
if err := ds.setLatestBlocks(ctx, &obs); err != nil {
return obs, err
}

var wg sync.WaitGroup
if fetchMaxFinalizedBlockNum {
Expand Down Expand Up @@ -290,40 +293,36 @@ func (ds *datasource) executeRun(ctx context.Context) (*pipeline.Run, pipeline.T
return run, trrs, err
}

func (ds *datasource) setLatestBlocks(ctx context.Context, obs *relaymercuryv1.Observation) {
latestBlocks := ds.getLatestBlocks(ctx, nBlocksObservation)
func (ds *datasource) setLatestBlocks(ctx context.Context, obs *relaymercuryv1.Observation) error {
latestBlocks, err := ds.chainReader.LatestHeads(ctx, nBlocksObservation)
if err != nil {
ds.lggr.Errorw("failed to read latest blocks", "error", err)
return err
}

if len(latestBlocks) < nBlocksObservation {
ds.insufficientBlocksCounter.Inc()
ds.lggr.Warnw("Insufficient blocks", "latestBlocks", latestBlocks, "lenLatestBlocks", len(latestBlocks), "nBlocksObservation", nBlocksObservation)
}

// TODO: remove with https://smartcontract-it.atlassian.net/browse/BCF-2209
if len(latestBlocks) == 0 {
obsErr := fmt.Errorf("no blocks available")
ds.zeroBlocksCounter.Inc()
err := errors.New("no blocks available")
obs.CurrentBlockNum.Err = err
obs.CurrentBlockHash.Err = err
obs.CurrentBlockTimestamp.Err = err
obs.CurrentBlockNum.Err = obsErr
obs.CurrentBlockHash.Err = obsErr
obs.CurrentBlockTimestamp.Err = obsErr
} else {
obs.CurrentBlockNum.Val = latestBlocks[0].Number
obs.CurrentBlockHash.Val = latestBlocks[0].Hash.Bytes()
if latestBlocks[0].Timestamp.IsZero() {
obs.CurrentBlockTimestamp.Val = 0
} else {
obs.CurrentBlockTimestamp.Val = uint64(latestBlocks[0].Timestamp.Unix())
}
obs.CurrentBlockNum.Val = int64(latestBlocks[0].Number)
obs.CurrentBlockHash.Val = latestBlocks[0].Hash
obs.CurrentBlockTimestamp.Val = latestBlocks[0].Timestamp
}

for _, block := range latestBlocks {
obs.LatestBlocks = append(obs.LatestBlocks, relaymercuryv1.NewBlock(block.Number, block.Hash.Bytes(), uint64(block.Timestamp.Unix())))
obs.LatestBlocks = append(
obs.LatestBlocks,
relaymercuryv1.NewBlock(int64(block.Number), block.Hash, block.Timestamp))
}
}

func (ds *datasource) getLatestBlocks(ctx context.Context, k int) (blocks []*evmtypes.Head) {
// Use the headtracker's view of the chain, this is very fast since
// it doesn't make any external network requests, and it is the
// headtracker's job to ensure it has an up-to-date view of the chain based
// on responses from all available RPC nodes
latestHead := ds.chainHeadTracker.HeadTracker().LatestChain()
return latestHead.AsSlice(k)
return nil
}
Loading

0 comments on commit 1206283

Please sign in to comment.