Skip to content

Commit

Permalink
Merc 6304 view function ea telem support (#14467)
Browse files Browse the repository at this point in the history
* * Adds support for "tags" to Tasks that can be used generically.
* Adds a descendent task search method
* Added support in Mercury EA telemetry to utilize tags for telemetry extraction

* * Adds support for "tags" to Tasks that can be used generically.
* Adds a descendent task search method
* Added support in Mercury EA telemetry to utilize tags for telemetry extraction

* changeset

* remove changeset file

* linting
  • Loading branch information
akuzni2 authored Sep 18, 2024
1 parent 767aed2 commit 358fc17
Show file tree
Hide file tree
Showing 18 changed files with 585 additions and 53 deletions.
7 changes: 7 additions & 0 deletions .changeset/twenty-boxes-thank.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"chainlink": patch
---
#added
* Adds support for "tags" to Tasks that can be used generically.
* Adds a descendent task search method
* Added support in Mercury EA telemetry to utilize tags for telemetry extraction
161 changes: 126 additions & 35 deletions core/services/ocrcommon/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type EATelemetry struct {
BridgeTaskRunStartedTimestamp int64
BridgeTaskRunEndedTimestamp int64
AssetSymbol string
BridgeRequestData string
}

type EnhancedTelemetryData struct {
Expand Down Expand Up @@ -168,8 +169,8 @@ func ParseMercuryEATelemetry(lggr logger.Logger, trrs pipeline.TaskRunResults, f
if err != nil {
lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry, id=%s, name=%q", trr.Task.DotID(), bridgeName), "err", err, "dotID", trr.Task.DotID(), "bridgeName", bridgeName)
}

eaTelem.DpBenchmarkPrice, eaTelem.DpBid, eaTelem.DpAsk = getPricesFromResults(lggr, trr, trrs, feedVersion)
eaTelem.BridgeRequestData = bridgeTask.RequestData
eaTelem.DpBenchmarkPrice, eaTelem.DpBid, eaTelem.DpAsk = getPricesFromBridgeTask(lggr, trr, trrs, feedVersion)

eaTelem.BridgeTaskRunStartedTimestamp = trr.CreatedAt.UnixMilli()
eaTelem.BridgeTaskRunEndedTimestamp = trr.FinishedAt.Time.UnixMilli()
Expand Down Expand Up @@ -448,10 +449,11 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced
ConfigDigest: d.RepTimestamp.ConfigDigest.Hex(),
Round: int64(d.RepTimestamp.Round),
Epoch: int64(d.RepTimestamp.Epoch),
BridgeRequestData: eaTelem.BridgeRequestData,
AssetSymbol: eaTelem.AssetSymbol,
Version: uint32(d.FeedVersion),
}

e.lggr.Debugw(fmt.Sprintf("EA Telemetry = %+v", t), "feedID", e.job.OCR2OracleSpec.FeedID.Hex(), "jobID", e.job.ID, "datasource", eaTelem.DataSource)
bytes, err := proto.Marshal(t)
if err != nil {
e.lggr.Warnf("protobuf marshal failed %v", err.Error())
Expand All @@ -462,11 +464,25 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced
}
}

type telemetryAttributes struct {
PriceType *string `json:"priceType"`
}

func parseTelemetryAttributes(a string) (telemetryAttributes, error) {
attrs := &telemetryAttributes{}
err := json.Unmarshal([]byte(a), attrs)
if err != nil {
return telemetryAttributes{}, err
}
return *attrs, nil
}

// getAssetSymbolFromRequestData parses the requestData of the bridge to generate an asset symbol pair
func getAssetSymbolFromRequestData(requestData string) string {
type reqDataPayload struct {
To string `json:"to"`
From string `json:"from"`
To *string `json:"to"`
From *string `json:"from"`
Address *string `json:"address"` // used for view function ea only
}
type reqData struct {
Data reqDataPayload `json:"data"`
Expand All @@ -478,7 +494,15 @@ func getAssetSymbolFromRequestData(requestData string) string {
return ""
}

return rd.Data.From + "/" + rd.Data.To
if rd.Data.From != nil && rd.Data.To != nil {
return *rd.Data.From + "/" + *rd.Data.To
}

if rd.Data.Address != nil {
return *rd.Data.Address
}

return ""
}

// ShouldCollectEnhancedTelemetryMercury checks if enhanced telemetry should be collected and sent
Expand All @@ -489,26 +513,107 @@ func ShouldCollectEnhancedTelemetryMercury(jb job.Job) bool {
return false
}

// getPricesFromResults parses the pipeline.TaskRunResults for pipeline.TaskTypeJSONParse and gets the benchmarkPrice,
const (
bid = "bid"
ask = "ask"
benchmark = "benchmark"
exchangeRate = "exchangeRate"
)

func getPricesFromBridgeTask(lggr logger.Logger, bridgeTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) {
var benchmarkPrice, bidPrice, askPrice float64

// This will assume that all fields we care about are tagged with the correct priceType
benchmarkPrice, bidPrice, askPrice = getPricesFromBridgeTaskByTelemetryField(lggr, bridgeTask, allTasks)

// If prices weren't parsed by telemetry fields - attempt to get prices using the legacy method
// This is for backwards compatibility with job specs that don't have the telemetry attributes set
if benchmarkPrice == 0 && bidPrice == 0 && askPrice == 0 {
benchmarkP, bidP, askP := getPricesFromResultsByOrder(lggr, bridgeTask, allTasks, mercuryVersion)
bidPrice = bidP
askPrice = askP
benchmarkPrice = benchmarkP
}

return benchmarkPrice, bidPrice, askPrice
}

// CollectTaskRunResultsWithTags collects TaskRunResults for descendent tasks with non-empty TaskTags.
func collectTaskRunResultsWithTags(bridgeTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults) []pipeline.TaskRunResult {
startTask := bridgeTask.Task
descendants := startTask.GetDescendantTasks()
var taskRunResultsWithTags []pipeline.TaskRunResult
for _, task := range descendants {
trr := allTasks.GetTaskRunResultOf(task)
if trr != nil {
if trr.Task.TaskTags() != "" {
taskRunResultsWithTags = append(taskRunResultsWithTags, *trr)
}
}
}
return taskRunResultsWithTags
}

// getPricesFromBridgeTaskByTelemetryField attempts to parse prices from via telemetry fields in the TaskTags
func getPricesFromBridgeTaskByTelemetryField(lggr logger.Logger, bridgeTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults) (float64, float64, float64) {
var benchmarkPrice, bidPrice, askPrice float64

// Outputs are the mapped tasks from this task.
var tasksWithTags = collectTaskRunResultsWithTags(bridgeTask, allTasks)

for _, trr := range tasksWithTags {
attributes, err := parseTelemetryAttributes(trr.Task.TaskTags())
if err != nil {
lggr.Warnw(fmt.Sprintf("found telemetry attributes but cannot them, taskTags=%s", trr.Task.TaskTags()), "err", err)
continue
}

if attributes.PriceType != nil {
switch *attributes.PriceType {
case bid:
bidPrice = parsePriceFromTask(lggr, trr)
case ask:
askPrice = parsePriceFromTask(lggr, trr)
case benchmark:
benchmarkPrice = parsePriceFromTask(lggr, trr)
case exchangeRate:
price := parsePriceFromTask(lggr, trr)
benchmarkPrice, bidPrice, askPrice = price, price, price
case "":
lggr.Warnw(fmt.Sprintf("no priceType found in attributes, parsedAttributes=%+v, id %s", attributes, trr.Task.DotID()))
}
}
}

return benchmarkPrice, bidPrice, askPrice
}

func parsePriceFromTask(lggr logger.Logger, trr pipeline.TaskRunResult) float64 {
var val float64
if trr.Result.Error != nil {
lggr.Warnw(fmt.Sprintf("got error on EA telemetry price task, id %s: %s", trr.Task.DotID(), trr.Result.Error), "err", trr.Result.Error)
return 0
}
val, err := getResultFloat64(&trr)
if err != nil {
lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry price to float64, DOT id %s", trr.Task.DotID()), "task_type", trr.Task.Type(), "task_tags", trr.Task.TaskTags(), "err", err)
}
return val
}

// getPricesFromResultsByOrder parses the pipeline.TaskRunResults for pipeline.TaskTypeJSONParse and gets the benchmarkPrice,
// bid and ask. This functions expects the pipeline.TaskRunResults to be correctly ordered
func getPricesFromResults(lggr logger.Logger, startTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) {
func getPricesFromResultsByOrder(lggr logger.Logger, startTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) {
var benchmarkPrice, askPrice, bidPrice float64
var err error

// We rely on task results to be sorted in the correct order
benchmarkPriceTask := allTasks.GetNextTaskOf(startTask)
if benchmarkPriceTask == nil {
lggr.Warn("cannot parse enhanced EA telemetry benchmark price, task is nil")
return 0, 0, 0
}
if benchmarkPriceTask.Task.Type() == pipeline.TaskTypeJSONParse {
if benchmarkPriceTask.Result.Error != nil {
lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry benchmark price, id %s: %s", benchmarkPriceTask.Task.DotID(), benchmarkPriceTask.Result.Error), "err", benchmarkPriceTask.Result.Error)
} else {
benchmarkPrice, err = getResultFloat64(benchmarkPriceTask)
if err != nil {
lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry benchmark price, id %s", benchmarkPriceTask.Task.DotID()), "err", err)
}
}
benchmarkPrice = parsePriceFromTask(lggr, *benchmarkPriceTask)
}

// mercury version 2 only supports benchmarkPrice
Expand All @@ -522,31 +627,17 @@ func getPricesFromResults(lggr logger.Logger, startTask pipeline.TaskRunResult,
return benchmarkPrice, 0, 0
}

if bidTask != nil && bidTask.Task.Type() == pipeline.TaskTypeJSONParse {
if bidTask.Result.Error != nil {
lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry bid price, id %s: %s", bidTask.Task.DotID(), bidTask.Result.Error), "err", bidTask.Result.Error)
} else {
bidPrice, err = getResultFloat64(bidTask)
if err != nil {
lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry bid price, id %s", bidTask.Task.DotID()), "err", err)
}
}
if bidTask.Task.Type() == pipeline.TaskTypeJSONParse {
bidPrice = parsePriceFromTask(lggr, *bidTask)
}

askTask := allTasks.GetNextTaskOf(*bidTask)
if askTask == nil {
lggr.Warnf("cannot parse enhanced EA telemetry ask price, task is nil, id %s", benchmarkPriceTask.Task.DotID())
return benchmarkPrice, bidPrice, 0
}
if askTask != nil && askTask.Task.Type() == pipeline.TaskTypeJSONParse {
if bidTask.Result.Error != nil {
lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry ask price, id %s: %s", askTask.Task.DotID(), askTask.Result.Error), "err", askTask.Result.Error)
} else {
askPrice, err = getResultFloat64(askTask)
if err != nil {
lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry ask price, id %s", askTask.Task.DotID()), "err", err)
}
}
if askTask.Task.Type() == pipeline.TaskTypeJSONParse {
askPrice = parsePriceFromTask(lggr, *askTask)
}

return benchmarkPrice, bidPrice, askPrice
Expand Down
Loading

0 comments on commit 358fc17

Please sign in to comment.