Skip to content

Commit

Permalink
view function EA telem support in mercury 2.15 release (#14465)
Browse files Browse the repository at this point in the history
* * Adds support for "tags" to Tasks that can be used generically.
* Adds a descendent task search method
* Added support in Mercury EA telemetry to utilize tags for telemetry extraction

* fixing build issue

* set version
  • Loading branch information
akuzni2 authored Sep 18, 2024
1 parent 0334412 commit 8d51ac4
Show file tree
Hide file tree
Showing 20 changed files with 600 additions and 60 deletions.
8 changes: 8 additions & 0 deletions .changeset/shaggy-flowers-call.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"chainlink": patch
---

#added and internal changes
* Adds support for "tags" to Tasks that can be used generically.
* Adds a descendent task search method
* Added support in Mercury EA telemetry to utilize tags for telemetry extraction
171 changes: 131 additions & 40 deletions core/services/ocrcommon/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,17 +390,17 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced
e.lggr.Warnw(fmt.Sprintf("cannot get bridge response from bridge task, job=%d, id=%s, name=%q, expected string got %T", e.job.ID, trr.Task.DotID(), bridgeName, trr.Result.Value), "jobID", e.job.ID, "dotID", trr.Task.DotID(), "bridgeName", bridgeName)
continue
}
eaTelem, err := parseEATelemetry([]byte(bridgeRawResponse))
eaResponse, err := parseEATelemetry([]byte(bridgeRawResponse))
if err != nil {
e.lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry, job=%d, id=%s, name=%q", e.job.ID, trr.Task.DotID(), bridgeName), "err", err, "jobID", e.job.ID, "dotID", trr.Task.DotID(), "bridgeName", bridgeName)
}

assetSymbol := e.getAssetSymbolFromRequestData(bridgeTask.RequestData)

benchmarkPrice, bidPrice, askPrice := e.getPricesFromResults(trr, d.TaskRunResults, d.FeedVersion)
benchmarkPrice, bidPrice, askPrice := e.getPricesFromBridgeTask(trr, d.TaskRunResults, d.FeedVersion)

t := &telem.EnhancedEAMercury{
DataSource: eaTelem.DataSource,
DataSource: eaResponse.DataSource,
DpBenchmarkPrice: benchmarkPrice,
DpBid: bidPrice,
DpAsk: askPrice,
Expand All @@ -412,10 +412,10 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced
MaxFinalizedTimestamp: mfts,
BridgeTaskRunStartedTimestamp: trr.CreatedAt.UnixMilli(),
BridgeTaskRunEndedTimestamp: trr.FinishedAt.Time.UnixMilli(),
ProviderRequestedTimestamp: eaTelem.ProviderRequestedTimestamp,
ProviderReceivedTimestamp: eaTelem.ProviderReceivedTimestamp,
ProviderDataStreamEstablished: eaTelem.ProviderDataStreamEstablished,
ProviderIndicatedTime: eaTelem.ProviderIndicatedTime,
ProviderRequestedTimestamp: eaResponse.ProviderRequestedTimestamp,
ProviderReceivedTimestamp: eaResponse.ProviderReceivedTimestamp,
ProviderDataStreamEstablished: eaResponse.ProviderDataStreamEstablished,
ProviderIndicatedTime: eaResponse.ProviderIndicatedTime,
Feed: e.job.OCR2OracleSpec.FeedID.Hex(),
ObservationBenchmarkPrice: bp.Int64(),
ObservationBid: bid.Int64(),
Expand All @@ -431,10 +431,11 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced
ConfigDigest: d.RepTimestamp.ConfigDigest.Hex(),
Round: int64(d.RepTimestamp.Round),
Epoch: int64(d.RepTimestamp.Epoch),
BridgeRequestData: bridgeTask.RequestData,
AssetSymbol: assetSymbol,
Version: uint32(d.FeedVersion),
}

e.lggr.Debugw(fmt.Sprintf("EA Telemetry = %+v", t), "feedID", e.job.OCR2OracleSpec.FeedID.Hex(), "jobID", e.job.ID, "dotID", trr.Task.DotID(), "bridgeName", bridgeName)
bytes, err := proto.Marshal(t)
if err != nil {
e.lggr.Warnf("protobuf marshal failed %v", err.Error())
Expand All @@ -445,11 +446,25 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced
}
}

type telemetryAttributes struct {
PriceType *string `json:"priceType"`
}

func (e *EnhancedTelemetryService[T]) parseTelemetryAttributes(a string) (telemetryAttributes, error) {
attrs := &telemetryAttributes{}
err := json.Unmarshal([]byte(a), attrs)
if err != nil {
return telemetryAttributes{}, err
}
return *attrs, nil
}

// getAssetSymbolFromRequestData parses the requestData of the bridge to generate an asset symbol pair
func (e *EnhancedTelemetryService[T]) getAssetSymbolFromRequestData(requestData string) string {
type reqDataPayload struct {
To string `json:"to"`
From string `json:"from"`
To *string `json:"to"`
From *string `json:"from"`
Address *string `json:"address"` // used for view function ea only
}
type reqData struct {
Data reqDataPayload `json:"data"`
Expand All @@ -461,7 +476,15 @@ func (e *EnhancedTelemetryService[T]) getAssetSymbolFromRequestData(requestData
return ""
}

return rd.Data.From + "/" + rd.Data.To
if rd.Data.From != nil && rd.Data.To != nil {
return *rd.Data.From + "/" + *rd.Data.To
}

if rd.Data.Address != nil {
return *rd.Data.Address
}

return ""
}

// ShouldCollectEnhancedTelemetryMercury checks if enhanced telemetry should be collected and sent
Expand All @@ -472,26 +495,108 @@ func ShouldCollectEnhancedTelemetryMercury(jb job.Job) bool {
return false
}

// getPricesFromResults parses the pipeline.TaskRunResults for pipeline.TaskTypeJSONParse and gets the benchmarkPrice,
const (
bid = "bid"
ask = "ask"
benchmark = "benchmark"
exchangeRate = "exchangeRate"
)

func (e *EnhancedTelemetryService[T]) getPricesFromBridgeTask(bridgeTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) {
var benchmarkPrice, bidPrice, askPrice float64

// This will assume that all fields we care about are tagged with the correct priceType
benchmarkPrice, bidPrice, askPrice = e.getPricesFromBridgeTaskByTelemetryField(bridgeTask, allTasks)

// If prices weren't parsed by telemetry fields - attempt to get prices using the legacy method
// This is for backwards compatibility with job specs that don't have the telemetry attributes set
if benchmarkPrice == 0 && bidPrice == 0 && askPrice == 0 {
benchmarkP, bidP, askP := e.getPricesFromResultsByOrder(bridgeTask, allTasks, mercuryVersion)
bidPrice = bidP
askPrice = askP
benchmarkPrice = benchmarkP
}

return benchmarkPrice, bidPrice, askPrice
}

// CollectTaskRunResultsWithTags collects TaskRunResults for descendent tasks with non-empty TaskTags.
func (e *EnhancedTelemetryService[T]) collectTaskRunResultsWithTags(bridgeTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults) []pipeline.TaskRunResult {
startTask := bridgeTask.Task
descendants := startTask.GetDescendantTasks()
var taskRunResultsWithTags []pipeline.TaskRunResult
for _, task := range descendants {
trr := allTasks.GetTaskRunResultOf(task)
if trr != nil {
if trr.Task.TaskTags() != "" {
taskRunResultsWithTags = append(taskRunResultsWithTags, *trr)
}
}
}
return taskRunResultsWithTags
}

// getPricesFromBridgeTaskByTelemetryField attempts to parse prices from via telemetry fields in the TaskTags
func (e *EnhancedTelemetryService[T]) getPricesFromBridgeTaskByTelemetryField(bridgeTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults) (float64, float64, float64) {
var benchmarkPrice, bidPrice, askPrice float64

// Outputs are the mapped tasks from this task.
var tasksWithTags = e.collectTaskRunResultsWithTags(bridgeTask, allTasks)

for _, trr := range tasksWithTags {

attributes, err := e.parseTelemetryAttributes(trr.Task.TaskTags())
if err != nil {
e.lggr.Warnw(fmt.Sprintf("cannot parse telemetry attributes, feed=%s, taskTags=%s", e.job.OCR2OracleSpec.FeedID.Hex(), trr.Task.TaskTags()), "err", err)
continue
}

if attributes.PriceType != nil {
switch *attributes.PriceType {
case bid:
bidPrice = e.parsePriceFromTask(trr)
case ask:
askPrice = e.parsePriceFromTask(trr)
case benchmark:
benchmarkPrice = e.parsePriceFromTask(trr)
case exchangeRate:
price := e.parsePriceFromTask(trr)
benchmarkPrice, bidPrice, askPrice = price, price, price
case "":
e.lggr.Warnw(fmt.Sprintf("no priceType found in attributes, parsedAttributes=%+v, job %d, id %s", attributes, e.job.ID, trr.Task.DotID()))
}
}
}

return benchmarkPrice, bidPrice, askPrice
}

func (e *EnhancedTelemetryService[T]) parsePriceFromTask(trr pipeline.TaskRunResult) float64 {
var val float64
if trr.Result.Error != nil {
e.lggr.Warnw(fmt.Sprintf("got error on EA telemetry price task, job %d, id %s: %s", e.job.ID, trr.Task.DotID(), trr.Result.Error), "err", trr.Result.Error)
return 0
}
val, err := getResultFloat64(&trr)
if err != nil {
e.lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry price to float64, DOT id %s", trr.Task.DotID()), "job", e.job.ID, "task_type", trr.Task.Type(), "task_tags", trr.Task.TaskTags(), "err", err)
}
return val
}

// getPricesFromResultsByOrder parses the pipeline.TaskRunResults for pipeline.TaskTypeJSONParse and gets the benchmarkPrice,
// bid and ask. This functions expects the pipeline.TaskRunResults to be correctly ordered
func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) {
func (e *EnhancedTelemetryService[T]) getPricesFromResultsByOrder(startTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) {
var benchmarkPrice, askPrice, bidPrice float64
var err error

// We rely on task results to be sorted in the correct order
benchmarkPriceTask := allTasks.GetNextTaskOf(startTask)
if benchmarkPriceTask == nil {
e.lggr.Warnf("cannot parse enhanced EA telemetry benchmark price, task is nil, job %d", e.job.ID)
return 0, 0, 0
}
if benchmarkPriceTask.Task.Type() == pipeline.TaskTypeJSONParse {
if benchmarkPriceTask.Result.Error != nil {
e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry benchmark price, job %d, id %s: %s", e.job.ID, benchmarkPriceTask.Task.DotID(), benchmarkPriceTask.Result.Error), "err", benchmarkPriceTask.Result.Error)
} else {
benchmarkPrice, err = getResultFloat64(benchmarkPriceTask)
if err != nil {
e.lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry benchmark price, job %d, id %s", e.job.ID, benchmarkPriceTask.Task.DotID()), "err", err)
}
}
benchmarkPrice = e.parsePriceFromTask(*benchmarkPriceTask)
}

// mercury version 2 only supports benchmarkPrice
Expand All @@ -505,31 +610,17 @@ func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.Ta
return benchmarkPrice, 0, 0
}

if bidTask != nil && bidTask.Task.Type() == pipeline.TaskTypeJSONParse {
if bidTask.Result.Error != nil {
e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry bid price, job %d, id %s: %s", e.job.ID, bidTask.Task.DotID(), bidTask.Result.Error), "err", bidTask.Result.Error)
} else {
bidPrice, err = getResultFloat64(bidTask)
if err != nil {
e.lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry bid price, job %d, id %s", e.job.ID, bidTask.Task.DotID()), "err", err)
}
}
if bidTask.Task.Type() == pipeline.TaskTypeJSONParse {
bidPrice = e.parsePriceFromTask(*bidTask)
}

askTask := allTasks.GetNextTaskOf(*bidTask)
if askTask == nil {
e.lggr.Warnf("cannot parse enhanced EA telemetry ask price, task is nil, job %d, id %s", e.job.ID, benchmarkPriceTask.Task.DotID())
return benchmarkPrice, bidPrice, 0
}
if askTask != nil && askTask.Task.Type() == pipeline.TaskTypeJSONParse {
if bidTask.Result.Error != nil {
e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry ask price, job %d, id %s: %s", e.job.ID, askTask.Task.DotID(), askTask.Result.Error), "err", askTask.Result.Error)
} else {
askPrice, err = getResultFloat64(askTask)
if err != nil {
e.lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry ask price, job %d, id %s", e.job.ID, askTask.Task.DotID()), "err", err)
}
}
if askTask.Task.Type() == pipeline.TaskTypeJSONParse {
askPrice = e.parsePriceFromTask(*askTask)
}

return benchmarkPrice, bidPrice, askPrice
Expand Down
Loading

0 comments on commit 8d51ac4

Please sign in to comment.