Skip to content

Commit

Permalink
MERC 1388 update telemetry module to address the multi report structu…
Browse files Browse the repository at this point in the history
…re (#10827)

* w

* Don't warn uselessly on bid/ask missing

* - Update enhancedEAmercury proto file
- Fix tests

* Genereate

* fix foundry deps

---------

Co-authored-by: george-dorin <george.dorin@smartcontract.com>
Co-authored-by: Rens Rooimans <github@rensrooimans.nl>
  • Loading branch information
3 people authored Oct 30, 2023
1 parent 5e1c3a3 commit c942403
Show file tree
Hide file tree
Showing 9 changed files with 698 additions and 287 deletions.
2 changes: 1 addition & 1 deletion core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func (d *Delegate) newServicesMercury(

mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, runResults, lggr, oracleArgsNoPlugin, d.cfg.JobPipeline(), chEnhancedTelem, chain, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID))

if ocrcommon.ShouldCollectEnhancedTelemetryMercury(&jb) {
if ocrcommon.ShouldCollectEnhancedTelemetryMercury(jb) {
enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, chEnhancedTelem, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(spec.FeedID.String(), synchronization.EnhancedEAMercury, rid.Network, rid.ChainID), lggr.Named("EnhancedTelemetryMercury"))
mercuryServices = append(mercuryServices, enhancedTelemService)
}
Expand Down
189 changes: 139 additions & 50 deletions core/services/ocrcommon/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@ import (
"math/big"

"github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/libocr/commontypes"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils"
"github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem"
"github.com/smartcontractkit/chainlink/v2/core/utils"

relaymercuryv1 "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury/v1"
relaymercuryv2 "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury/v2"
relaymercuryv3 "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury/v3"
)

type eaTelemetry struct {
Expand All @@ -36,9 +38,15 @@ type EnhancedTelemetryData struct {
}

type EnhancedTelemetryMercuryData struct {
TaskRunResults pipeline.TaskRunResults
Observation relaymercuryv1.Observation
RepTimestamp ocrtypes.ReportTimestamp
V1Observation *relaymercuryv1.Observation
V2Observation *relaymercuryv2.Observation
V3Observation *relaymercuryv3.Observation
TaskRunResults pipeline.TaskRunResults
RepTimestamp ocrtypes.ReportTimestamp
FeedVersion mercuryutils.FeedVersion
FetchMaxFinalizedTimestamp bool
IsLinkFeed bool
IsNativeFeed bool
}

type EnhancedTelemetryService[T EnhancedTelemetryData | EnhancedTelemetryMercuryData] struct {
Expand Down Expand Up @@ -69,13 +77,13 @@ func (e *EnhancedTelemetryService[T]) Start(context.Context) error {
for {
select {
case t := <-e.chTelem:
switch any(t).(type) {
switch v := any(t).(type) {
case EnhancedTelemetryData:
s := any(t).(EnhancedTelemetryData)
e.collectEATelemetry(s.TaskRunResults, s.FinalResults, s.RepTimestamp)
e.collectEATelemetry(v.TaskRunResults, v.FinalResults, v.RepTimestamp)
case EnhancedTelemetryMercuryData:
s := any(t).(EnhancedTelemetryMercuryData)
e.collectMercuryEnhancedTelemetry(s.Observation, s.TaskRunResults, s.RepTimestamp)
e.collectMercuryEnhancedTelemetry(v)
default:
e.lggr.Errorf("unrecognised telemetry data type: %T", t)
}
case <-e.chDone:
return
Expand Down Expand Up @@ -224,14 +232,19 @@ func (e *EnhancedTelemetryService[T]) collectAndSend(trrs *pipeline.TaskRunResul
continue
}

if trr.Result.Error != nil {
e.lggr.Warnw(fmt.Sprintf("cannot get bridge response from bridge task, job %d, id %s", e.job.ID, trr.Task.DotID()), "err", trr.Result.Error)
continue
}
bridgeRawResponse, ok := trr.Result.Value.(string)
if !ok {
e.lggr.Warnf("cannot get bridge response from bridge task, job %d, id %s", e.job.ID, trr.Task.DotID())
e.lggr.Warnf("cannot parse bridge response from bridge task, job %d, id %s: expected string, got: %v (type %T)", e.job.ID, trr.Task.DotID(), trr.Result.Value, trr.Result.Value)
continue
}
eaTelem, err := parseEATelemetry([]byte(bridgeRawResponse))
if err != nil {
e.lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry, job %d, id %s", e.job.ID, trr.Task.DotID()), "err", err)
continue
}
value := e.getParsedValue(trrs, trr)

Expand All @@ -254,7 +267,7 @@ func (e *EnhancedTelemetryService[T]) collectAndSend(trrs *pipeline.TaskRunResul

bytes, err := proto.Marshal(t)
if err != nil {
e.lggr.Warnf("protobuf marshal failed %v", err.Error())
e.lggr.Warnw("protobuf marshal failed", "err", err)
continue
}

Expand All @@ -264,14 +277,81 @@ func (e *EnhancedTelemetryService[T]) collectAndSend(trrs *pipeline.TaskRunResul

// collectMercuryEnhancedTelemetry checks if enhanced telemetry should be collected, fetches the information needed and
// sends the telemetry
func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(obs relaymercuryv1.Observation, trrs pipeline.TaskRunResults, repts ocrtypes.ReportTimestamp) {
func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d EnhancedTelemetryMercuryData) {
if e.monitoringEndpoint == nil {
return
}

obsBenchmarkPrice, obsBid, obsAsk, obsBlockNum, obsBlockHash, obsBlockTimestamp := e.getFinalValues(obs)
// v1 fields
var bn int64
var bh string
var bt uint64
// v1+v2+v3 fields
bp := big.NewInt(0)
//v1+v3 fields
bid := big.NewInt(0)
ask := big.NewInt(0)
// v2+v3 fields
var mfts, lp, np int64

switch {
case d.V1Observation != nil:
obs := *d.V1Observation
if obs.CurrentBlockNum.Err == nil {
bn = obs.CurrentBlockNum.Val
}
if obs.CurrentBlockHash.Err == nil {
bh = common.BytesToHash(obs.CurrentBlockHash.Val).Hex()
}
if obs.CurrentBlockTimestamp.Err == nil {
bt = obs.CurrentBlockTimestamp.Val
}
if obs.BenchmarkPrice.Err == nil && obs.BenchmarkPrice.Val != nil {
bp = obs.BenchmarkPrice.Val
}
if obs.Bid.Err == nil && obs.Bid.Val != nil {
bid = obs.Bid.Val
}
if obs.Ask.Err == nil && obs.Ask.Val != nil {
ask = obs.Ask.Val
}
case d.V2Observation != nil:
obs := *d.V2Observation
if obs.MaxFinalizedTimestamp.Err == nil {
mfts = obs.MaxFinalizedTimestamp.Val
}
if obs.LinkPrice.Err == nil && obs.LinkPrice.Val != nil {
lp = obs.LinkPrice.Val.Int64()
}
if obs.NativePrice.Err == nil && obs.NativePrice.Val != nil {
np = obs.NativePrice.Val.Int64()
}
if obs.BenchmarkPrice.Err == nil && obs.BenchmarkPrice.Val != nil {
bp = obs.BenchmarkPrice.Val
}
case d.V3Observation != nil:
obs := *d.V3Observation
if obs.MaxFinalizedTimestamp.Err == nil {
mfts = obs.MaxFinalizedTimestamp.Val
}
if obs.LinkPrice.Err == nil && obs.LinkPrice.Val != nil {
lp = obs.LinkPrice.Val.Int64()
}
if obs.NativePrice.Err == nil && obs.NativePrice.Val != nil {
np = obs.NativePrice.Val.Int64()
}
if obs.BenchmarkPrice.Err == nil && obs.BenchmarkPrice.Val != nil {
bp = obs.BenchmarkPrice.Val
}
if obs.Bid.Err == nil && obs.Bid.Val != nil {
bid = obs.Bid.Val
}
if obs.Ask.Err == nil && obs.Ask.Val != nil {
ask = obs.Ask.Val
}
}

for _, trr := range trrs {
for _, trr := range d.TaskRunResults {
if trr.Task.Type() != pipeline.TaskTypeBridge {
continue
}
Expand All @@ -288,33 +368,41 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(obs relaym
}

assetSymbol := e.getAssetSymbolFromRequestData(bridgeTask.RequestData)
benchmarkPrice, bidPrice, askPrice := e.getPricesFromResults(trr, &trrs)

benchmarkPrice, bidPrice, askPrice := e.getPricesFromResults(trr, d.TaskRunResults, d.FeedVersion)

t := &telem.EnhancedEAMercury{
DataSource: eaTelem.DataSource,
DpBenchmarkPrice: benchmarkPrice,
DpBid: bidPrice,
DpAsk: askPrice,
CurrentBlockNumber: obsBlockNum,
CurrentBlockHash: common.BytesToHash(obsBlockHash).String(),
CurrentBlockTimestamp: obsBlockTimestamp,
CurrentBlockNumber: bn,
CurrentBlockHash: bh,
CurrentBlockTimestamp: bt,
FetchMaxFinalizedTimestamp: d.FetchMaxFinalizedTimestamp,
MaxFinalizedTimestamp: mfts,
BridgeTaskRunStartedTimestamp: trr.CreatedAt.UnixMilli(),
BridgeTaskRunEndedTimestamp: trr.FinishedAt.Time.UnixMilli(),
ProviderRequestedTimestamp: eaTelem.ProviderRequestedTimestamp,
ProviderReceivedTimestamp: eaTelem.ProviderReceivedTimestamp,
ProviderDataStreamEstablished: eaTelem.ProviderDataStreamEstablished,
ProviderIndicatedTime: eaTelem.ProviderIndicatedTime,
Feed: e.job.OCR2OracleSpec.FeedID.Hex(),
ObservationBenchmarkPrice: obsBenchmarkPrice.Int64(), //Deprecated: observation value will not fit in int64, we will use the string equivalent field ObservationBenchmarkPriceString
ObservationBid: obsBid.Int64(), //Deprecated: observation value will not fit in int64, we will use the string equivalent field ObservationBidString
ObservationAsk: obsAsk.Int64(), //Deprecated: observation value will not fit in int64, we will use the string equivalent field ObservationAskString
ConfigDigest: repts.ConfigDigest.Hex(),
Round: int64(repts.Round),
Epoch: int64(repts.Epoch),
ObservationBenchmarkPrice: bp.Int64(),
ObservationBid: bid.Int64(),
ObservationAsk: ask.Int64(),
ObservationBenchmarkPriceString: stringOrEmpty(bp),
ObservationBidString: stringOrEmpty(bid),
ObservationAskString: stringOrEmpty(ask),
IsLinkFeed: d.IsLinkFeed,
LinkPrice: lp,
IsNativeFeed: d.IsNativeFeed,
NativePrice: np,
ConfigDigest: d.RepTimestamp.ConfigDigest.Hex(),
Round: int64(d.RepTimestamp.Round),
Epoch: int64(d.RepTimestamp.Epoch),
AssetSymbol: assetSymbol,
ObservationBenchmarkPriceString: obsBenchmarkPrice.String(),
ObservationBidString: obsBid.String(),
ObservationAskString: obsAsk.String(),
Version: uint32(d.FeedVersion),
}

bytes, err := proto.Marshal(t)
Expand Down Expand Up @@ -347,19 +435,19 @@ func (e *EnhancedTelemetryService[T]) getAssetSymbolFromRequestData(requestData
}

// ShouldCollectEnhancedTelemetryMercury checks if enhanced telemetry should be collected and sent
func ShouldCollectEnhancedTelemetryMercury(job *job.Job) bool {
if job.Type.String() == pipeline.OffchainReporting2JobType && job.OCR2OracleSpec != nil {
return job.OCR2OracleSpec.CaptureEATelemetry
func ShouldCollectEnhancedTelemetryMercury(jb job.Job) bool {
if jb.Type.String() == pipeline.OffchainReporting2JobType && jb.OCR2OracleSpec != nil {
return jb.OCR2OracleSpec.CaptureEATelemetry
}
return false
}

// getPricesFromResults parses the pipeline.TaskRunResults for pipeline.TaskTypeJSONParse and gets the benchmarkPrice,
// bid and ask. This functions expects the pipeline.TaskRunResults to be correctly ordered
func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.TaskRunResult, allTasks *pipeline.TaskRunResults) (float64, float64, float64) {
func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) {
var benchmarkPrice, askPrice, bidPrice float64
var err error
//We rely on task results to be sorted in the correct order
// We rely on task results to be sorted in the correct order
benchmarkPriceTask := allTasks.GetNextTaskOf(startTask)
if benchmarkPriceTask == nil {
e.lggr.Warnf("cannot parse enhanced EA telemetry benchmark price, task is nil, job %d, id %s", e.job.ID)
Expand All @@ -376,12 +464,18 @@ func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.Ta
}
}

// mercury version 2 only supports benchmarkPrice
if mercuryVersion == 2 {
return benchmarkPrice, 0, 0
}

bidTask := allTasks.GetNextTaskOf(*benchmarkPriceTask)
if bidTask == nil {
e.lggr.Warnf("cannot parse enhanced EA telemetry bid price, task is nil, job %d, id %s", e.job.ID)
return benchmarkPrice, 0, 0
}
if bidTask.Task.Type() == pipeline.TaskTypeJSONParse {

if bidTask != nil && bidTask.Task.Type() == pipeline.TaskTypeJSONParse {
if bidTask.Result.Error != nil {
e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry bid price, job %d, id %s: %s", e.job.ID, bidTask.Task.DotID(), bidTask.Result.Error), "err", bidTask.Result.Error)
} else {
Expand All @@ -397,7 +491,7 @@ func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.Ta
e.lggr.Warnf("cannot parse enhanced EA telemetry ask price, task is nil, job %d, id %s", e.job.ID)
return benchmarkPrice, bidPrice, 0
}
if askTask.Task.Type() == pipeline.TaskTypeJSONParse {
if askTask != nil && askTask.Task.Type() == pipeline.TaskTypeJSONParse {
if bidTask.Result.Error != nil {
e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry ask price, job %d, id %s: %s", e.job.ID, askTask.Task.DotID(), askTask.Result.Error), "err", askTask.Result.Error)
} else {
Expand All @@ -411,23 +505,11 @@ func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.Ta
return benchmarkPrice, bidPrice, askPrice
}

// getFinalValues runs a parse on the pipeline.TaskRunResults and returns the values
func (e *EnhancedTelemetryService[T]) getFinalValues(obs relaymercuryv1.Observation) (*big.Int, *big.Int, *big.Int, int64, []byte, uint64) {
benchmarkPrice := big.NewInt(0)
bid := big.NewInt(0)
ask := big.NewInt(0)

if obs.BenchmarkPrice.Val != nil {
benchmarkPrice = obs.BenchmarkPrice.Val
}
if obs.Bid.Val != nil {
bid = obs.Bid.Val
}
if obs.Ask.Val != nil {
ask = obs.Ask.Val
// MaybeEnqueueEnhancedTelem sends data to the telemetry channel for processing
func MaybeEnqueueEnhancedTelem(jb job.Job, ch chan<- EnhancedTelemetryMercuryData, data EnhancedTelemetryMercuryData) {
if ShouldCollectEnhancedTelemetryMercury(jb) {
EnqueueEnhancedTelem[EnhancedTelemetryMercuryData](ch, data)
}

return benchmarkPrice, bid, ask, obs.CurrentBlockNum.Val, obs.CurrentBlockHash.Val, obs.CurrentBlockTimestamp.Val
}

// EnqueueEnhancedTelem sends data to the telemetry channel for processing
Expand All @@ -447,3 +529,10 @@ func getResultFloat64(task *pipeline.TaskRunResult) (float64, error) {
resultFloat64, _ := result.Float64()
return resultFloat64, nil
}

func stringOrEmpty(n *big.Int) string {
if n.Cmp(big.NewInt(0)) == 0 {
return ""
}
return n.String()
}
Loading

0 comments on commit c942403

Please sign in to comment.