Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/2.6.1 mercury 20231115 with cherry-pick changes #11315

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,8 @@ func (d *Delegate) newServicesMercury(

mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, runResults, lggr, oracleArgsNoPlugin, d.cfg.JobPipeline(), chEnhancedTelem, chain, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID))

if ocrcommon.ShouldCollectEnhancedTelemetryMercury(&jb) {
enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, chEnhancedTelem, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(spec.FeedID.String(), synchronization.EnhancedEAMercury), lggr.Named("EnhancedTelemetryMercury"))
if ocrcommon.ShouldCollectEnhancedTelemetryMercury(jb) {
enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, chEnhancedTelem, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(spec.FeedID.String(), synchronization.EnhancedEAMercury, rid.Network, rid.ChainID), lggr.Named("EnhancedTelemetryMercury"))
mercuryServices = append(mercuryServices, enhancedTelemService)
}

Expand Down
211 changes: 153 additions & 58 deletions core/services/ocrcommon/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,23 @@ import (
"context"
"encoding/json"
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/libocr/commontypes"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils"
"github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem"
"github.com/smartcontractkit/chainlink/v2/core/utils"

relaymercuryv1 "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury/v1"
relaymercuryv2 "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury/v2"
relaymercuryv3 "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury/v3"
)

type eaTelemetry struct {
Expand All @@ -35,9 +38,15 @@ type EnhancedTelemetryData struct {
}

type EnhancedTelemetryMercuryData struct {
TaskRunResults pipeline.TaskRunResults
Observation relaymercuryv1.Observation
RepTimestamp ocrtypes.ReportTimestamp
V1Observation *relaymercuryv1.Observation
V2Observation *relaymercuryv2.Observation
V3Observation *relaymercuryv3.Observation
TaskRunResults pipeline.TaskRunResults
RepTimestamp ocrtypes.ReportTimestamp
FeedVersion mercuryutils.FeedVersion
FetchMaxFinalizedTimestamp bool
IsLinkFeed bool
IsNativeFeed bool
}

type EnhancedTelemetryService[T EnhancedTelemetryData | EnhancedTelemetryMercuryData] struct {
Expand Down Expand Up @@ -68,13 +77,13 @@ func (e *EnhancedTelemetryService[T]) Start(context.Context) error {
for {
select {
case t := <-e.chTelem:
switch any(t).(type) {
switch v := any(t).(type) {
case EnhancedTelemetryData:
s := any(t).(EnhancedTelemetryData)
e.collectEATelemetry(s.TaskRunResults, s.FinalResults, s.RepTimestamp)
e.collectEATelemetry(v.TaskRunResults, v.FinalResults, v.RepTimestamp)
case EnhancedTelemetryMercuryData:
s := any(t).(EnhancedTelemetryMercuryData)
e.collectMercuryEnhancedTelemetry(s.Observation, s.TaskRunResults, s.RepTimestamp)
e.collectMercuryEnhancedTelemetry(v)
default:
e.lggr.Errorf("unrecognised telemetry data type: %T", t)
}
case <-e.chDone:
return
Expand Down Expand Up @@ -223,14 +232,19 @@ func (e *EnhancedTelemetryService[T]) collectAndSend(trrs *pipeline.TaskRunResul
continue
}

if trr.Result.Error != nil {
e.lggr.Warnw(fmt.Sprintf("cannot get bridge response from bridge task, job %d, id %s", e.job.ID, trr.Task.DotID()), "err", trr.Result.Error)
continue
}
bridgeRawResponse, ok := trr.Result.Value.(string)
if !ok {
e.lggr.Warnf("cannot get bridge response from bridge task, job %d, id %s", e.job.ID, trr.Task.DotID())
e.lggr.Warnf("cannot parse bridge response from bridge task, job %d, id %s: expected string, got: %v (type %T)", e.job.ID, trr.Task.DotID(), trr.Result.Value, trr.Result.Value)
continue
}
eaTelem, err := parseEATelemetry([]byte(bridgeRawResponse))
if err != nil {
e.lggr.Warnf("cannot parse EA telemetry, job %d, id %s", e.job.ID, trr.Task.DotID())
e.lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry, job %d, id %s", e.job.ID, trr.Task.DotID()), "err", err)
continue
}
value := e.getParsedValue(trrs, trr)

Expand All @@ -253,7 +267,7 @@ func (e *EnhancedTelemetryService[T]) collectAndSend(trrs *pipeline.TaskRunResul

bytes, err := proto.Marshal(t)
if err != nil {
e.lggr.Warnf("protobuf marshal failed %v", err.Error())
e.lggr.Warnw("protobuf marshal failed", "err", err)
continue
}

Expand All @@ -263,14 +277,81 @@ func (e *EnhancedTelemetryService[T]) collectAndSend(trrs *pipeline.TaskRunResul

// collectMercuryEnhancedTelemetry checks if enhanced telemetry should be collected, fetches the information needed and
// sends the telemetry
func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(obs relaymercuryv1.Observation, trrs pipeline.TaskRunResults, repts ocrtypes.ReportTimestamp) {
func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d EnhancedTelemetryMercuryData) {
if e.monitoringEndpoint == nil {
return
}

obsBenchmarkPrice, obsBid, obsAsk, obsBlockNum, obsBlockHash, obsBlockTimestamp := e.getFinalValues(obs)
// v1 fields
var bn int64
var bh string
var bt uint64
// v1+v2+v3 fields
bp := big.NewInt(0)
//v1+v3 fields
bid := big.NewInt(0)
ask := big.NewInt(0)
// v2+v3 fields
var mfts, lp, np int64

switch {
case d.V1Observation != nil:
obs := *d.V1Observation
if obs.CurrentBlockNum.Err == nil {
bn = obs.CurrentBlockNum.Val
}
if obs.CurrentBlockHash.Err == nil {
bh = common.BytesToHash(obs.CurrentBlockHash.Val).Hex()
}
if obs.CurrentBlockTimestamp.Err == nil {
bt = obs.CurrentBlockTimestamp.Val
}
if obs.BenchmarkPrice.Err == nil && obs.BenchmarkPrice.Val != nil {
bp = obs.BenchmarkPrice.Val
}
if obs.Bid.Err == nil && obs.Bid.Val != nil {
bid = obs.Bid.Val
}
if obs.Ask.Err == nil && obs.Ask.Val != nil {
ask = obs.Ask.Val
}
case d.V2Observation != nil:
obs := *d.V2Observation
if obs.MaxFinalizedTimestamp.Err == nil {
mfts = obs.MaxFinalizedTimestamp.Val
}
if obs.LinkPrice.Err == nil && obs.LinkPrice.Val != nil {
lp = obs.LinkPrice.Val.Int64()
}
if obs.NativePrice.Err == nil && obs.NativePrice.Val != nil {
np = obs.NativePrice.Val.Int64()
}
if obs.BenchmarkPrice.Err == nil && obs.BenchmarkPrice.Val != nil {
bp = obs.BenchmarkPrice.Val
}
case d.V3Observation != nil:
obs := *d.V3Observation
if obs.MaxFinalizedTimestamp.Err == nil {
mfts = obs.MaxFinalizedTimestamp.Val
}
if obs.LinkPrice.Err == nil && obs.LinkPrice.Val != nil {
lp = obs.LinkPrice.Val.Int64()
}
if obs.NativePrice.Err == nil && obs.NativePrice.Val != nil {
np = obs.NativePrice.Val.Int64()
}
if obs.BenchmarkPrice.Err == nil && obs.BenchmarkPrice.Val != nil {
bp = obs.BenchmarkPrice.Val
}
if obs.Bid.Err == nil && obs.Bid.Val != nil {
bid = obs.Bid.Val
}
if obs.Ask.Err == nil && obs.Ask.Val != nil {
ask = obs.Ask.Val
}
}

for _, trr := range trrs {
for _, trr := range d.TaskRunResults {
if trr.Task.Type() != pipeline.TaskTypeBridge {
continue
}
Expand All @@ -287,30 +368,41 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(obs relaym
}

assetSymbol := e.getAssetSymbolFromRequestData(bridgeTask.RequestData)
benchmarkPrice, bidPrice, askPrice := e.getPricesFromResults(trr, &trrs)

benchmarkPrice, bidPrice, askPrice := e.getPricesFromResults(trr, d.TaskRunResults, d.FeedVersion)

t := &telem.EnhancedEAMercury{
DataSource: eaTelem.DataSource,
DpBenchmarkPrice: benchmarkPrice,
DpBid: bidPrice,
DpAsk: askPrice,
CurrentBlockNumber: obsBlockNum,
CurrentBlockHash: common.BytesToHash(obsBlockHash).String(),
CurrentBlockTimestamp: obsBlockTimestamp,
BridgeTaskRunStartedTimestamp: trr.CreatedAt.UnixMilli(),
BridgeTaskRunEndedTimestamp: trr.FinishedAt.Time.UnixMilli(),
ProviderRequestedTimestamp: eaTelem.ProviderRequestedTimestamp,
ProviderReceivedTimestamp: eaTelem.ProviderReceivedTimestamp,
ProviderDataStreamEstablished: eaTelem.ProviderDataStreamEstablished,
ProviderIndicatedTime: eaTelem.ProviderIndicatedTime,
Feed: e.job.OCR2OracleSpec.FeedID.Hex(),
ObservationBenchmarkPrice: obsBenchmarkPrice,
ObservationBid: obsBid,
ObservationAsk: obsAsk,
ConfigDigest: repts.ConfigDigest.Hex(),
Round: int64(repts.Round),
Epoch: int64(repts.Epoch),
AssetSymbol: assetSymbol,
DataSource: eaTelem.DataSource,
DpBenchmarkPrice: benchmarkPrice,
DpBid: bidPrice,
DpAsk: askPrice,
CurrentBlockNumber: bn,
CurrentBlockHash: bh,
CurrentBlockTimestamp: bt,
FetchMaxFinalizedTimestamp: d.FetchMaxFinalizedTimestamp,
MaxFinalizedTimestamp: mfts,
BridgeTaskRunStartedTimestamp: trr.CreatedAt.UnixMilli(),
BridgeTaskRunEndedTimestamp: trr.FinishedAt.Time.UnixMilli(),
ProviderRequestedTimestamp: eaTelem.ProviderRequestedTimestamp,
ProviderReceivedTimestamp: eaTelem.ProviderReceivedTimestamp,
ProviderDataStreamEstablished: eaTelem.ProviderDataStreamEstablished,
ProviderIndicatedTime: eaTelem.ProviderIndicatedTime,
Feed: e.job.OCR2OracleSpec.FeedID.Hex(),
ObservationBenchmarkPrice: bp.Int64(),
ObservationBid: bid.Int64(),
ObservationAsk: ask.Int64(),
ObservationBenchmarkPriceString: stringOrEmpty(bp),
ObservationBidString: stringOrEmpty(bid),
ObservationAskString: stringOrEmpty(ask),
IsLinkFeed: d.IsLinkFeed,
LinkPrice: lp,
IsNativeFeed: d.IsNativeFeed,
NativePrice: np,
ConfigDigest: d.RepTimestamp.ConfigDigest.Hex(),
Round: int64(d.RepTimestamp.Round),
Epoch: int64(d.RepTimestamp.Epoch),
AssetSymbol: assetSymbol,
Version: uint32(d.FeedVersion),
}

bytes, err := proto.Marshal(t)
Expand Down Expand Up @@ -343,19 +435,19 @@ func (e *EnhancedTelemetryService[T]) getAssetSymbolFromRequestData(requestData
}

// ShouldCollectEnhancedTelemetryMercury checks if enhanced telemetry should be collected and sent
func ShouldCollectEnhancedTelemetryMercury(job *job.Job) bool {
if job.Type.String() == pipeline.OffchainReporting2JobType && job.OCR2OracleSpec != nil {
return job.OCR2OracleSpec.CaptureEATelemetry
func ShouldCollectEnhancedTelemetryMercury(jb job.Job) bool {
if jb.Type.String() == pipeline.OffchainReporting2JobType && jb.OCR2OracleSpec != nil {
return jb.OCR2OracleSpec.CaptureEATelemetry
}
return false
}

// getPricesFromResults parses the pipeline.TaskRunResults for pipeline.TaskTypeJSONParse and gets the benchmarkPrice,
// bid and ask. This functions expects the pipeline.TaskRunResults to be correctly ordered
func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.TaskRunResult, allTasks *pipeline.TaskRunResults) (float64, float64, float64) {
func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) {
var benchmarkPrice, askPrice, bidPrice float64
var err error
//We rely on task results to be sorted in the correct order
// We rely on task results to be sorted in the correct order
benchmarkPriceTask := allTasks.GetNextTaskOf(startTask)
if benchmarkPriceTask == nil {
e.lggr.Warnf("cannot parse enhanced EA telemetry benchmark price, task is nil, job %d, id %s", e.job.ID)
Expand All @@ -372,12 +464,18 @@ func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.Ta
}
}

// mercury version 2 only supports benchmarkPrice
if mercuryVersion == 2 {
return benchmarkPrice, 0, 0
}

bidTask := allTasks.GetNextTaskOf(*benchmarkPriceTask)
if bidTask == nil {
e.lggr.Warnf("cannot parse enhanced EA telemetry bid price, task is nil, job %d, id %s", e.job.ID)
return benchmarkPrice, 0, 0
}
if bidTask.Task.Type() == pipeline.TaskTypeJSONParse {

if bidTask != nil && bidTask.Task.Type() == pipeline.TaskTypeJSONParse {
if bidTask.Result.Error != nil {
e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry bid price, job %d, id %s: %s", e.job.ID, bidTask.Task.DotID(), bidTask.Result.Error), "err", bidTask.Result.Error)
} else {
Expand All @@ -393,7 +491,7 @@ func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.Ta
e.lggr.Warnf("cannot parse enhanced EA telemetry ask price, task is nil, job %d, id %s", e.job.ID)
return benchmarkPrice, bidPrice, 0
}
if askTask.Task.Type() == pipeline.TaskTypeJSONParse {
if askTask != nil && askTask.Task.Type() == pipeline.TaskTypeJSONParse {
if bidTask.Result.Error != nil {
e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry ask price, job %d, id %s: %s", e.job.ID, askTask.Task.DotID(), askTask.Result.Error), "err", askTask.Result.Error)
} else {
Expand All @@ -407,21 +505,11 @@ func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.Ta
return benchmarkPrice, bidPrice, askPrice
}

// getFinalValues runs a parse on the pipeline.TaskRunResults and returns the values
func (e *EnhancedTelemetryService[T]) getFinalValues(obs relaymercuryv1.Observation) (int64, int64, int64, int64, []byte, uint64) {
var benchmarkPrice, bid, ask int64

if obs.BenchmarkPrice.Val != nil {
benchmarkPrice = obs.BenchmarkPrice.Val.Int64()
}
if obs.Bid.Val != nil {
bid = obs.Bid.Val.Int64()
}
if obs.Ask.Val != nil {
ask = obs.Ask.Val.Int64()
// MaybeEnqueueEnhancedTelem sends data to the telemetry channel for processing
func MaybeEnqueueEnhancedTelem(jb job.Job, ch chan<- EnhancedTelemetryMercuryData, data EnhancedTelemetryMercuryData) {
if ShouldCollectEnhancedTelemetryMercury(jb) {
EnqueueEnhancedTelem[EnhancedTelemetryMercuryData](ch, data)
}

return benchmarkPrice, bid, ask, obs.CurrentBlockNum.Val, obs.CurrentBlockHash.Val, obs.CurrentBlockTimestamp.Val
}

// EnqueueEnhancedTelem sends data to the telemetry channel for processing
Expand All @@ -441,3 +529,10 @@ func getResultFloat64(task *pipeline.TaskRunResult) (float64, error) {
resultFloat64, _ := result.Float64()
return resultFloat64, nil
}

func stringOrEmpty(n *big.Int) string {
if n.Cmp(big.NewInt(0)) == 0 {
return ""
}
return n.String()
}
Loading
Loading