diff --git a/.changeset/twenty-boxes-thank.md b/.changeset/twenty-boxes-thank.md new file mode 100644 index 00000000000..b90ec1d9fdf --- /dev/null +++ b/.changeset/twenty-boxes-thank.md @@ -0,0 +1,7 @@ +--- +"chainlink": patch +--- +#added +* Adds support for "tags" to Tasks that can be used generically. +* Adds a descendent task search method +* Added support in Mercury EA telemetry to utilize tags for telemetry extraction diff --git a/core/services/ocrcommon/telemetry.go b/core/services/ocrcommon/telemetry.go index 2be3e2228c4..f146bacc181 100644 --- a/core/services/ocrcommon/telemetry.go +++ b/core/services/ocrcommon/telemetry.go @@ -38,6 +38,7 @@ type EATelemetry struct { BridgeTaskRunStartedTimestamp int64 BridgeTaskRunEndedTimestamp int64 AssetSymbol string + BridgeRequestData string } type EnhancedTelemetryData struct { @@ -168,8 +169,8 @@ func ParseMercuryEATelemetry(lggr logger.Logger, trrs pipeline.TaskRunResults, f if err != nil { lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry, id=%s, name=%q", trr.Task.DotID(), bridgeName), "err", err, "dotID", trr.Task.DotID(), "bridgeName", bridgeName) } - - eaTelem.DpBenchmarkPrice, eaTelem.DpBid, eaTelem.DpAsk = getPricesFromResults(lggr, trr, trrs, feedVersion) + eaTelem.BridgeRequestData = bridgeTask.RequestData + eaTelem.DpBenchmarkPrice, eaTelem.DpBid, eaTelem.DpAsk = getPricesFromBridgeTask(lggr, trr, trrs, feedVersion) eaTelem.BridgeTaskRunStartedTimestamp = trr.CreatedAt.UnixMilli() eaTelem.BridgeTaskRunEndedTimestamp = trr.FinishedAt.Time.UnixMilli() @@ -448,10 +449,11 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced ConfigDigest: d.RepTimestamp.ConfigDigest.Hex(), Round: int64(d.RepTimestamp.Round), Epoch: int64(d.RepTimestamp.Epoch), + BridgeRequestData: eaTelem.BridgeRequestData, AssetSymbol: eaTelem.AssetSymbol, Version: uint32(d.FeedVersion), } - + e.lggr.Debugw(fmt.Sprintf("EA Telemetry = %+v", t), "feedID", e.job.OCR2OracleSpec.FeedID.Hex(), "jobID", e.job.ID, "datasource", eaTelem.DataSource) bytes, err := proto.Marshal(t) if err != nil { e.lggr.Warnf("protobuf marshal failed %v", err.Error()) @@ -462,11 +464,25 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced } } +type telemetryAttributes struct { + PriceType *string `json:"priceType"` +} + +func parseTelemetryAttributes(a string) (telemetryAttributes, error) { + attrs := &telemetryAttributes{} + err := json.Unmarshal([]byte(a), attrs) + if err != nil { + return telemetryAttributes{}, err + } + return *attrs, nil +} + // getAssetSymbolFromRequestData parses the requestData of the bridge to generate an asset symbol pair func getAssetSymbolFromRequestData(requestData string) string { type reqDataPayload struct { - To string `json:"to"` - From string `json:"from"` + To *string `json:"to"` + From *string `json:"from"` + Address *string `json:"address"` // used for view function ea only } type reqData struct { Data reqDataPayload `json:"data"` @@ -478,7 +494,15 @@ func getAssetSymbolFromRequestData(requestData string) string { return "" } - return rd.Data.From + "/" + rd.Data.To + if rd.Data.From != nil && rd.Data.To != nil { + return *rd.Data.From + "/" + *rd.Data.To + } + + if rd.Data.Address != nil { + return *rd.Data.Address + } + + return "" } // ShouldCollectEnhancedTelemetryMercury checks if enhanced telemetry should be collected and sent @@ -489,11 +513,99 @@ func ShouldCollectEnhancedTelemetryMercury(jb job.Job) bool { return false } -// getPricesFromResults parses the pipeline.TaskRunResults for pipeline.TaskTypeJSONParse and gets the benchmarkPrice, +const ( + bid = "bid" + ask = "ask" + benchmark = "benchmark" + exchangeRate = "exchangeRate" +) + +func getPricesFromBridgeTask(lggr logger.Logger, bridgeTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) { + var benchmarkPrice, bidPrice, askPrice float64 + + // This will assume that all fields we care about are tagged with the correct priceType + benchmarkPrice, bidPrice, askPrice = getPricesFromBridgeTaskByTelemetryField(lggr, bridgeTask, allTasks) + + // If prices weren't parsed by telemetry fields - attempt to get prices using the legacy method + // This is for backwards compatibility with job specs that don't have the telemetry attributes set + if benchmarkPrice == 0 && bidPrice == 0 && askPrice == 0 { + benchmarkP, bidP, askP := getPricesFromResultsByOrder(lggr, bridgeTask, allTasks, mercuryVersion) + bidPrice = bidP + askPrice = askP + benchmarkPrice = benchmarkP + } + + return benchmarkPrice, bidPrice, askPrice +} + +// CollectTaskRunResultsWithTags collects TaskRunResults for descendent tasks with non-empty TaskTags. +func collectTaskRunResultsWithTags(bridgeTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults) []pipeline.TaskRunResult { + startTask := bridgeTask.Task + descendants := startTask.GetDescendantTasks() + var taskRunResultsWithTags []pipeline.TaskRunResult + for _, task := range descendants { + trr := allTasks.GetTaskRunResultOf(task) + if trr != nil { + if trr.Task.TaskTags() != "" { + taskRunResultsWithTags = append(taskRunResultsWithTags, *trr) + } + } + } + return taskRunResultsWithTags +} + +// getPricesFromBridgeTaskByTelemetryField attempts to parse prices from via telemetry fields in the TaskTags +func getPricesFromBridgeTaskByTelemetryField(lggr logger.Logger, bridgeTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults) (float64, float64, float64) { + var benchmarkPrice, bidPrice, askPrice float64 + + // Outputs are the mapped tasks from this task. + var tasksWithTags = collectTaskRunResultsWithTags(bridgeTask, allTasks) + + for _, trr := range tasksWithTags { + attributes, err := parseTelemetryAttributes(trr.Task.TaskTags()) + if err != nil { + lggr.Warnw(fmt.Sprintf("found telemetry attributes but cannot them, taskTags=%s", trr.Task.TaskTags()), "err", err) + continue + } + + if attributes.PriceType != nil { + switch *attributes.PriceType { + case bid: + bidPrice = parsePriceFromTask(lggr, trr) + case ask: + askPrice = parsePriceFromTask(lggr, trr) + case benchmark: + benchmarkPrice = parsePriceFromTask(lggr, trr) + case exchangeRate: + price := parsePriceFromTask(lggr, trr) + benchmarkPrice, bidPrice, askPrice = price, price, price + case "": + lggr.Warnw(fmt.Sprintf("no priceType found in attributes, parsedAttributes=%+v, id %s", attributes, trr.Task.DotID())) + } + } + } + + return benchmarkPrice, bidPrice, askPrice +} + +func parsePriceFromTask(lggr logger.Logger, trr pipeline.TaskRunResult) float64 { + var val float64 + if trr.Result.Error != nil { + lggr.Warnw(fmt.Sprintf("got error on EA telemetry price task, id %s: %s", trr.Task.DotID(), trr.Result.Error), "err", trr.Result.Error) + return 0 + } + val, err := getResultFloat64(&trr) + if err != nil { + lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry price to float64, DOT id %s", trr.Task.DotID()), "task_type", trr.Task.Type(), "task_tags", trr.Task.TaskTags(), "err", err) + } + return val +} + +// getPricesFromResultsByOrder parses the pipeline.TaskRunResults for pipeline.TaskTypeJSONParse and gets the benchmarkPrice, // bid and ask. This functions expects the pipeline.TaskRunResults to be correctly ordered -func getPricesFromResults(lggr logger.Logger, startTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) { +func getPricesFromResultsByOrder(lggr logger.Logger, startTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) { var benchmarkPrice, askPrice, bidPrice float64 - var err error + // We rely on task results to be sorted in the correct order benchmarkPriceTask := allTasks.GetNextTaskOf(startTask) if benchmarkPriceTask == nil { @@ -501,14 +613,7 @@ func getPricesFromResults(lggr logger.Logger, startTask pipeline.TaskRunResult, return 0, 0, 0 } if benchmarkPriceTask.Task.Type() == pipeline.TaskTypeJSONParse { - if benchmarkPriceTask.Result.Error != nil { - lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry benchmark price, id %s: %s", benchmarkPriceTask.Task.DotID(), benchmarkPriceTask.Result.Error), "err", benchmarkPriceTask.Result.Error) - } else { - benchmarkPrice, err = getResultFloat64(benchmarkPriceTask) - if err != nil { - lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry benchmark price, id %s", benchmarkPriceTask.Task.DotID()), "err", err) - } - } + benchmarkPrice = parsePriceFromTask(lggr, *benchmarkPriceTask) } // mercury version 2 only supports benchmarkPrice @@ -522,15 +627,8 @@ func getPricesFromResults(lggr logger.Logger, startTask pipeline.TaskRunResult, return benchmarkPrice, 0, 0 } - if bidTask != nil && bidTask.Task.Type() == pipeline.TaskTypeJSONParse { - if bidTask.Result.Error != nil { - lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry bid price, id %s: %s", bidTask.Task.DotID(), bidTask.Result.Error), "err", bidTask.Result.Error) - } else { - bidPrice, err = getResultFloat64(bidTask) - if err != nil { - lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry bid price, id %s", bidTask.Task.DotID()), "err", err) - } - } + if bidTask.Task.Type() == pipeline.TaskTypeJSONParse { + bidPrice = parsePriceFromTask(lggr, *bidTask) } askTask := allTasks.GetNextTaskOf(*bidTask) @@ -538,15 +636,8 @@ func getPricesFromResults(lggr logger.Logger, startTask pipeline.TaskRunResult, lggr.Warnf("cannot parse enhanced EA telemetry ask price, task is nil, id %s", benchmarkPriceTask.Task.DotID()) return benchmarkPrice, bidPrice, 0 } - if askTask != nil && askTask.Task.Type() == pipeline.TaskTypeJSONParse { - if bidTask.Result.Error != nil { - lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry ask price, id %s: %s", askTask.Task.DotID(), askTask.Result.Error), "err", askTask.Result.Error) - } else { - askPrice, err = getResultFloat64(askTask) - if err != nil { - lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry ask price, id %s", askTask.Task.DotID()), "err", err) - } - } + if askTask.Task.Type() == pipeline.TaskTypeJSONParse { + askPrice = parsePriceFromTask(lggr, *askTask) } return benchmarkPrice, bidPrice, askPrice diff --git a/core/services/ocrcommon/telemetry_test.go b/core/services/ocrcommon/telemetry_test.go index e257554803c..631cc32ed07 100644 --- a/core/services/ocrcommon/telemetry_test.go +++ b/core/services/ocrcommon/telemetry_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/ethereum/go-ethereum/common" + "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -445,15 +446,84 @@ var trrsMercuryV2 = pipeline.TaskRunResults{ }, } -func TestGetPricesFromResults(t *testing.T) { +func TestGetPricesFromBridgeByTelemetryField(t *testing.T) { + lggr, _ := logger.TestLoggerObserved(t, zap.WarnLevel) + // These are intentionally out of order from the "legacy" method which expects order of `benchmark, bid, ask` + jsonParseTaskBid := pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(1, "json_parse_2", nil, nil, 2), + } + jsonParseTaskBid.BaseTask.Tags = `{"priceType": "bid"}` + jsonParseTaskAsk := pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(2, "json_parse_3", nil, nil, 3), + } + jsonParseTaskAsk.BaseTask.Tags = `{"priceType": "ask"}` + jsonParseTaskBenchmark := pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(3, "json_parse_1", nil, nil, 1), + } + jsonParseTaskBenchmark.BaseTask.Tags = `{"priceType": "benchmark"}` + + bridgeOutputs := []pipeline.Task{&jsonParseTaskAsk, &jsonParseTaskBid, &jsonParseTaskBenchmark} + + bridgeTask := pipeline.BridgeTask{ + Name: "bridge-task", + BaseTask: pipeline.NewBaseTask(0, "bridge", nil, bridgeOutputs, 0), + } + + // Create task run results + taskRunResults := pipeline.TaskRunResults{ + pipeline.TaskRunResult{ + Task: &bridgeTask, + Result: pipeline.Result{ + Value: bridgeResponse, + }, + }, + pipeline.TaskRunResult{ + Task: &jsonParseTaskBenchmark, + Result: pipeline.Result{ + Value: "123456.123456", + }, + }, + pipeline.TaskRunResult{ + Task: &jsonParseTaskBid, + Result: pipeline.Result{ + Value: "1234567.1234567", + }, + }, + pipeline.TaskRunResult{ + Task: &jsonParseTaskAsk, + Result: pipeline.Result{ + Value: "321123", + }, + }, + } + + benchmarkPrice, bidPrice, askPrice := getPricesFromBridgeTask(lggr, taskRunResults[0], taskRunResults, 1) + + require.Equal(t, 123456.123456, benchmarkPrice) + require.Equal(t, 1234567.1234567, bidPrice) + require.Equal(t, 321123.0, askPrice) + + // now removing the TaskTags will throw off the parsed order - and we'll be parsing the "incorrect" prices + // according to the legacy ordering approach + jsonParseTaskAsk.BaseTask.Tags = "" + jsonParseTaskBid.BaseTask.Tags = "" + jsonParseTaskBenchmark.BaseTask.Tags = "" + + wrongBenchmarkPrice, wrongBidPrice, wrongAskPrice := getPricesFromBridgeTask(lggr, taskRunResults[0], taskRunResults, 1) + require.Equal(t, 1234567.1234567, wrongBenchmarkPrice) + require.Equal(t, 321123.0, wrongBidPrice) + require.Equal(t, 123456.123456, wrongAskPrice) +} + +func TestGetPricesFromBridgeTaskByOrder(t *testing.T) { lggr, logs := logger.TestLoggerObserved(t, zap.WarnLevel) - benchmarkPrice, bid, ask := getPricesFromResults(lggr, trrsMercuryV1[0], trrsMercuryV1, 1) + benchmarkPrice, bid, ask := getPricesFromBridgeTask(lggr, trrsMercuryV1[0], trrsMercuryV1, 1) require.Equal(t, 123456.123456, benchmarkPrice) require.Equal(t, 1234567.1234567, bid) require.Equal(t, float64(321123), ask) - benchmarkPrice, bid, ask = getPricesFromResults(lggr, trrsMercuryV1[0], pipeline.TaskRunResults{}, 1) + benchmarkPrice, bid, ask = getPricesFromBridgeTask(lggr, trrsMercuryV1[0], pipeline.TaskRunResults{}, 1) require.Equal(t, float64(0), benchmarkPrice) require.Equal(t, float64(0), bid) require.Equal(t, float64(0), ask) @@ -461,12 +531,12 @@ func TestGetPricesFromResults(t *testing.T) { require.Contains(t, logs.All()[0].Message, "cannot parse enhanced EA telemetry") tt := trrsMercuryV1[:2] - getPricesFromResults(lggr, trrsMercuryV1[0], tt, 1) + getPricesFromBridgeTask(lggr, trrsMercuryV1[0], tt, 1) require.Equal(t, 2, logs.Len()) require.Contains(t, logs.All()[1].Message, "cannot parse enhanced EA telemetry bid price, task is nil") tt = trrsMercuryV1[:3] - getPricesFromResults(lggr, trrsMercuryV1[0], tt, 1) + getPricesFromBridgeTask(lggr, trrsMercuryV1[0], tt, 1) require.Equal(t, 3, logs.Len()) require.Contains(t, logs.All()[2].Message, "cannot parse enhanced EA telemetry ask price, task is nil") @@ -504,16 +574,16 @@ func TestGetPricesFromResults(t *testing.T) { Value: nil, }, }} - benchmarkPrice, bid, ask = getPricesFromResults(lggr, trrsMercuryV1[0], trrs2, 3) + benchmarkPrice, bid, ask = getPricesFromBridgeTask(lggr, trrsMercuryV1[0], trrs2, 3) require.Equal(t, benchmarkPrice, float64(0)) require.Equal(t, bid, float64(0)) require.Equal(t, ask, float64(0)) require.Equal(t, logs.Len(), 6) - require.Contains(t, logs.All()[3].Message, "cannot parse enhanced EA telemetry benchmark price") - require.Contains(t, logs.All()[4].Message, "cannot parse enhanced EA telemetry bid price") - require.Contains(t, logs.All()[5].Message, "cannot parse enhanced EA telemetry ask price") + require.Contains(t, logs.All()[3].Message, "cannot parse EA telemetry price to float64, DOT id ds1_benchmark") + require.Contains(t, logs.All()[4].Message, "cannot parse EA telemetry price to float64, DOT id ds2_bid") + require.Contains(t, logs.All()[5].Message, "cannot parse EA telemetry price to float64, DOT id ds3_ask") - benchmarkPrice, bid, ask = getPricesFromResults(lggr, trrsMercuryV1[0], trrsMercuryV2, 2) + benchmarkPrice, bid, ask = getPricesFromBridgeTask(lggr, trrsMercuryV1[0], trrsMercuryV2, 2) require.Equal(t, 123456.123456, benchmarkPrice) require.Equal(t, float64(0), bid) require.Equal(t, float64(0), ask) @@ -539,6 +609,165 @@ func TestGetAssetSymbolFromRequestData(t *testing.T) { require.Equal(t, getAssetSymbolFromRequestData(""), "") reqData := `{"data":{"to":"LINK","from":"USD"}}` require.Equal(t, getAssetSymbolFromRequestData(reqData), "USD/LINK") + viewFunctionReqData := `{"data":{"address":"0x12345678", "signature": "function stEthPerToken() view returns (int256)"}}` + require.Equal(t, "0x12345678", getAssetSymbolFromRequestData(viewFunctionReqData)) +} + +func getViewFunctionTaskRunResults() pipeline.TaskRunResults { + var taskViewFunctionParseValue = func() pipeline.MultiplyTask { + task := pipeline.MultiplyTask{ + BaseTask: pipeline.NewBaseTask(3, "ds1_parse", nil, nil, 3), + Times: "1", + } + task.BaseTask.Tags = `{"priceType": "exchangeRate"}` + return task + }() + + var taskViewFunctionDecode = pipeline.ETHABIDecodeTask{ + ABI: "uint256 data", + BaseTask: pipeline.NewBaseTask(2, "ds1_decode", nil, []pipeline.Task{&taskViewFunctionParseValue}, 2), + } + + var taskViewFunctionJSONParse = pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(1, "ds1_parse", nil, []pipeline.Task{&taskViewFunctionDecode}, 1), + } + + const viewFunctionBridgeResponse = `{ + "data": { + "result": "0x000000000000000000000000000000000000000000000000105ba6a589b23a81" + }, + "statusCode": 200, + "result": "0x000000000000000000000000000000000000000000000000105ba6a589b23a81", + "timestamps": { + "providerDataRequestedUnixMs": 1726243598046, + "providerDataReceivedUnixMs": 1726243598341 + }, + "meta": { + "adapterName": "VIEW_FUNCTION" + } + }` + + var taskViewFunctionBridgeRequest = pipeline.BridgeTask{ + Name: "bridge-view-function", + BaseTask: pipeline.NewBaseTask(0, "ds1", nil, []pipeline.Task{&taskViewFunctionJSONParse}, 0), + RequestData: `{"data":{"address":"0x1234","signature":"function stEthPerToken() external view returns (uint256)"}}`, + } + + return pipeline.TaskRunResults{ + pipeline.TaskRunResult{ + Task: &taskViewFunctionBridgeRequest, + Result: pipeline.Result{ + Value: viewFunctionBridgeResponse, + }, + }, + pipeline.TaskRunResult{ + Task: &taskViewFunctionJSONParse, + Result: pipeline.Result{ + Value: `0x000000000000000000000000000000000000000000000000105ba6a589b23a81`, + }, + }, + pipeline.TaskRunResult{ + Task: &taskViewFunctionDecode, + Result: pipeline.Result{ + Value: map[string]interface{}{ + "data": big.NewInt(1178718957397490305), + }, + }, + }, + pipeline.TaskRunResult{ + Task: &taskViewFunctionParseValue, + Result: pipeline.Result{ + Value: decimal.NewFromInt(1178718957397490305), + }, + }, + } +} + +func TestCollectMercuryEnhancedTelemetryV1ViewFunction(t *testing.T) { + wg := sync.WaitGroup{} + ingressClient := mocks.NewTelemetryService(t) + ingressAgent := telemetry.NewIngressAgentWrapper(ingressClient) + monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("test-network", "test-chainID", "0xa", synchronization.EnhancedEAMercury) + + var sentMessage []byte + ingressClient.On("Send", mock.Anything, mock.AnythingOfType("[]uint8"), mock.AnythingOfType("string"), mock.AnythingOfType("TelemetryType")).Return().Run(func(args mock.Arguments) { + sentMessage = args[1].([]byte) + wg.Done() + }) + + lggr, _ := logger.TestLoggerObserved(t, zap.WarnLevel) + chTelem := make(chan EnhancedTelemetryMercuryData, 100) + chDone := make(chan struct{}) + feedID := common.HexToHash("0x111") + e := EnhancedTelemetryService[EnhancedTelemetryMercuryData]{ + chDone: chDone, + chTelem: chTelem, + job: &job.Job{ + Type: job.Type(pipeline.OffchainReporting2JobType), + OCR2OracleSpec: &job.OCR2OracleSpec{ + CaptureEATelemetry: true, + FeedID: &feedID, + }, + }, + lggr: lggr, + monitoringEndpoint: monitoringEndpoint, + } + servicetest.Run(t, &e) + + wg.Add(1) + + taskRunResults := getViewFunctionTaskRunResults() + + chTelem <- EnhancedTelemetryMercuryData{ + TaskRunResults: taskRunResults, + V1Observation: &mercuryv1.Observation{ + BenchmarkPrice: mercury.ObsResult[*big.Int]{Val: big.NewInt(111111)}, + Bid: mercury.ObsResult[*big.Int]{Val: big.NewInt(222222)}, + Ask: mercury.ObsResult[*big.Int]{Val: big.NewInt(333333)}, + CurrentBlockNum: mercury.ObsResult[int64]{Val: 123456789}, + CurrentBlockHash: mercury.ObsResult[[]byte]{Val: common.HexToHash("0x123321").Bytes()}, + CurrentBlockTimestamp: mercury.ObsResult[uint64]{Val: 987654321}, + }, + RepTimestamp: types.ReportTimestamp{ + ConfigDigest: types.ConfigDigest{2}, + Epoch: 11, + Round: 22, + }, + } + + expectedTelemetry := telem.EnhancedEAMercury{ + DataSource: "VIEW_FUNCTION", + DpBenchmarkPrice: 1178718957397490400, + DpBid: 1178718957397490400, + DpAsk: 1178718957397490400, + CurrentBlockNumber: 123456789, + CurrentBlockHash: common.HexToHash("0x123321").String(), + CurrentBlockTimestamp: 987654321, + BridgeTaskRunStartedTimestamp: taskRunResults[0].CreatedAt.UnixMilli(), + BridgeTaskRunEndedTimestamp: taskRunResults[0].FinishedAt.Time.UnixMilli(), + ProviderRequestedTimestamp: 1726243598046, + ProviderReceivedTimestamp: 1726243598341, + ProviderDataStreamEstablished: 0, + ProviderIndicatedTime: 0, + Feed: common.HexToHash("0x111").String(), + ObservationBenchmarkPrice: 111111, + ObservationBid: 222222, + ObservationAsk: 333333, + ConfigDigest: "0200000000000000000000000000000000000000000000000000000000000000", + Round: 22, + Epoch: 11, + BridgeRequestData: `{"data":{"address":"0x1234","signature":"function stEthPerToken() external view returns (uint256)"}}`, + AssetSymbol: "0x1234", + ObservationBenchmarkPriceString: "111111", + ObservationBidString: "222222", + ObservationAskString: "333333", + } + + expectedMessage, _ := proto.Marshal(&expectedTelemetry) + wg.Wait() + require.Equal(t, expectedMessage, sentMessage) + + chDone <- struct{}{} } func TestCollectMercuryEnhancedTelemetryV1(t *testing.T) { @@ -612,6 +841,7 @@ func TestCollectMercuryEnhancedTelemetryV1(t *testing.T) { ConfigDigest: "0200000000000000000000000000000000000000000000000000000000000000", Round: 22, Epoch: 11, + BridgeRequestData: `{"data":{"to":"LINK","from":"USD"}}`, AssetSymbol: "USD/LINK", ObservationBenchmarkPriceString: "111111", ObservationBidString: "222222", @@ -725,6 +955,7 @@ func TestCollectMercuryEnhancedTelemetryV2(t *testing.T) { ConfigDigest: "0200000000000000000000000000000000000000000000000000000000000000", Round: 22, Epoch: 11, + BridgeRequestData: `{"data":{"to":"LINK","from":"USD"}}`, AssetSymbol: "USD/LINK", ObservationBenchmarkPriceString: "111111", MaxFinalizedTimestamp: 321, diff --git a/core/services/pipeline/common.go b/core/services/pipeline/common.go index 4e7c5b9cefa..50611ee32a4 100644 --- a/core/services/pipeline/common.go +++ b/core/services/pipeline/common.go @@ -60,6 +60,8 @@ type ( TaskRetries() uint32 TaskMinBackoff() time.Duration TaskMaxBackoff() time.Duration + TaskTags() string + GetDescendantTasks() []Task } Config interface { @@ -263,6 +265,16 @@ func (trrs TaskRunResults) Terminals() (terminals []TaskRunResult) { return } +// GetNextTaskOf returns the task with the next id or nil if it does not exist +func (trrs *TaskRunResults) GetTaskRunResultOf(task Task) *TaskRunResult { + for _, trr := range *trrs { + if trr.Task.Base().id == task.Base().id { + return &trr + } + } + return nil +} + // GetNextTaskOf returns the task with the next id or nil if it does not exist func (trrs *TaskRunResults) GetNextTaskOf(task TaskRunResult) *TaskRunResult { nextID := task.Task.Base().id + 1 diff --git a/core/services/pipeline/common_test.go b/core/services/pipeline/common_test.go index ce545ec14a0..ed7998b79e3 100644 --- a/core/services/pipeline/common_test.go +++ b/core/services/pipeline/common_test.go @@ -17,6 +17,14 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" ) +func TestAtrributesAttribute(t *testing.T) { + a := `ds1 [type=http method=GET tags=<{"attribute1":"value1", "attribute2":42}>];` + p, err := pipeline.Parse(a) + require.NoError(t, err) + task := p.Tasks[0] + assert.Equal(t, "{\"attribute1\":\"value1\", \"attribute2\":42}", task.TaskTags()) +} + func TestTimeoutAttribute(t *testing.T) { t.Parallel() @@ -320,3 +328,69 @@ func TestGetNextTaskOf(t *testing.T) { nextTask = trrs.GetNextTaskOf(*nextTask) assert.Empty(t, nextTask) } + +func TestGetDescendantTasks(t *testing.T) { + t.Parallel() + + t.Run("GetDescendantTasks with multiple levels of tasks", func(t *testing.T) { + l3T2 := pipeline.AnyTask{ + BaseTask: pipeline.NewBaseTask(6, "l3T2", nil, nil, 1), + } + l3T1 := pipeline.MedianTask{ + BaseTask: pipeline.NewBaseTask(5, "l3T1", nil, nil, 1), + } + l2T1 := pipeline.MultiplyTask{ + BaseTask: pipeline.NewBaseTask(4, "l2T1", nil, []pipeline.Task{&l3T1, &l3T2}, 1), + } + l1T1 := pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(3, "l1T1", nil, []pipeline.Task{&l2T1}, 2), + } + l1T2 := pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(2, "l1T2", nil, nil, 3), + } + l1T3 := pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(1, "l1T3", nil, nil, 4), + } + + baseTask := pipeline.BridgeTask{ + Name: "bridge-task", + BaseTask: pipeline.NewBaseTask(0, "baseTask", nil, []pipeline.Task{&l1T1, &l1T2, &l1T3}, 0), + } + + descendents := baseTask.GetDescendantTasks() + assert.Len(t, descendents, 6) + }) + + t.Run("GetDescendantTasks with duplicate tasks defined", func(t *testing.T) { + l2T1 := pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(2, "l1T2", nil, nil, 3), + } + l1T1 := pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(1, "l1T2", nil, []pipeline.Task{&l2T1, &l2T1, &l2T1}, 3), + } + taskWithRepeats := pipeline.BridgeTask{ + Name: "bridge-task", + BaseTask: pipeline.NewBaseTask(0, "taskWithRepeats", nil, []pipeline.Task{&l1T1, &l1T1, &l1T1}, 0), + } + descendents := taskWithRepeats.GetDescendantTasks() + assert.Len(t, descendents, 2) + }) + + t.Run("GetDescendantTasks with nil output tasks", func(t *testing.T) { + taskWithRepeats := pipeline.BridgeTask{ + Name: "bridge-task", + BaseTask: pipeline.NewBaseTask(0, "taskWithRepeats", nil, nil, 0), + } + descendents := taskWithRepeats.GetDescendantTasks() + assert.Len(t, descendents, 0) + }) + + t.Run("GetDescendantTasks with empty list of output tasks", func(t *testing.T) { + taskWithRepeats := pipeline.BridgeTask{ + Name: "bridge-task", + BaseTask: pipeline.NewBaseTask(0, "taskWithRepeats", nil, []pipeline.Task{}, 0), + } + descendents := taskWithRepeats.GetDescendantTasks() + assert.Len(t, descendents, 0) + }) +} diff --git a/core/services/pipeline/runner_test.go b/core/services/pipeline/runner_test.go index 022a77c9471..ea30b3ff086 100644 --- a/core/services/pipeline/runner_test.go +++ b/core/services/pipeline/runner_test.go @@ -131,6 +131,50 @@ ds5 [type=http method="GET" url="%s" index=2] require.Len(t, errorResults, 3) } +func Test_PipelineRunner_ExecuteEthAbiDecode(t *testing.T) { + db := pgtest.NewSqlxDB(t) + cfg := configtest.NewTestGeneralConfig(t) + + mockResult := `{"data":{"result":"0x000000000000000000000000000000000000000000000000105ba6a589b23a81"}}` + s1 := httptest.NewServer(NewMockHandler(mockResult)) + defer s1.Close() + + bridgeFeedURL, err := url.ParseRequestURI(s1.URL) + require.NoError(t, err) + + _, bt := cltest.MustCreateBridge(t, db, cltest.BridgeOpts{URL: bridgeFeedURL.String()}) + + btORM := bridgesMocks.NewORM(t) + btORM.On("FindBridge", mock.Anything, bt.Name).Return(*bt, nil).Once() + + r, _ := newRunner(t, db, btORM, cfg) + + s := fmt.Sprintf(` + ds1 [type=bridge name="%s" timeout=0 requestData=<{"data": {"address": "0x1234"}}>] + ds1_parse [type=jsonparse path="data,result"] + ds1_decode [type=ethabidecode abi="int256 data" data="$(ds1_parse)"]; + ds1_value [type="multiply" input="$(ds1_decode.data)" times=1] + + ds1->ds1_parse->ds1_decode->ds1_value + +`, bt.Name.String()) + d, err := pipeline.Parse(s) + require.NoError(t, err) + + spec := pipeline.Spec{DotDagSource: s} + vars := pipeline.NewVarsFrom(nil) + + _, trrs, err := r.ExecuteRun(testutils.Context(t), spec, vars) + require.NoError(t, err) + require.Len(t, trrs, len(d.Tasks)) + + finalResults := trrs.FinalResult() + + val := finalResults.Values[0].(decimal.Decimal) + + assert.Equal(t, decimal.NewFromInt(1178718957397490305), val) +} + type taskRunWithVars struct { bridgeName string ds2URL, ds4URL string diff --git a/core/services/pipeline/task.base.go b/core/services/pipeline/task.base.go index 7a62f4e7ff8..3e1db5fcdb5 100644 --- a/core/services/pipeline/task.base.go +++ b/core/services/pipeline/task.base.go @@ -22,6 +22,8 @@ type BaseTask struct { MinBackoff time.Duration `mapstructure:"minBackoff"` MaxBackoff time.Duration `mapstructure:"maxBackoff"` + Tags string `mapstructure:"tags" json:"-"` + uuid uuid.UUID } @@ -77,3 +79,32 @@ func (t BaseTask) TaskMaxBackoff() time.Duration { } return time.Minute } + +func (t BaseTask) TaskTags() string { + return t.Tags +} + +// GetDescendantTasks retrieves all descendant tasks of a given task +func (t BaseTask) GetDescendantTasks() []Task { + if len(t.outputs) == 0 { + return []Task{} + } + var descendants []Task + queue := append([]Task{}, t.outputs...) + visited := make(map[int]bool) + + for len(queue) > 0 { + currentTask := queue[0] + queue = queue[1:] + + taskID := currentTask.ID() + if visited[taskID] { + continue + } + visited[taskID] = true + descendants = append(descendants, currentTask) + queue = append(queue, currentTask.Outputs()...) + } + + return descendants +} diff --git a/core/services/pipeline/task.bridge_test.go b/core/services/pipeline/task.bridge_test.go index d7519232eb5..cd81f8656fd 100644 --- a/core/services/pipeline/task.bridge_test.go +++ b/core/services/pipeline/task.bridge_test.go @@ -117,6 +117,18 @@ func mustReadFile(t testing.TB, file string) string { return string(content) } +// NewMockHandler returns an http.HandlerFunc that responds with the given payload for any request +func NewMockHandler(payload string) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte(payload)) + if err != nil { + http.Error(w, "Failed to write response", http.StatusInternalServerError) + } + } +} + func fakePriceResponder(t *testing.T, requestData map[string]interface{}, result decimal.Decimal, inputKey string, expectedInput interface{}) http.Handler { t.Helper() diff --git a/core/services/pipeline/task.eth_abi_decode_test.go b/core/services/pipeline/task.eth_abi_decode_test.go index 3c7f5b4776b..565e8b485dd 100644 --- a/core/services/pipeline/task.eth_abi_decode_test.go +++ b/core/services/pipeline/task.eth_abi_decode_test.go @@ -25,6 +25,20 @@ var testsABIDecode = []struct { expectedErrorCause error expectedErrorContains string }{ + { + "uint256", + "uint256 data", + "$(data)", + NewVarsFrom(map[string]interface{}{ + "data": "0x000000000000000000000000000000000000000000000000105ba6a589b23a81", + }), + nil, + map[string]interface{}{ + "data": big.NewInt(1178718957397490305), + }, + nil, + "", + }, { "uint256, bool, int256, string", "uint256 u, bool b, int256 i, string s", diff --git a/core/services/relay/evm/mercury/mocks/pipeline.go b/core/services/relay/evm/mercury/mocks/pipeline.go index 44be1377aeb..a7183c9a037 100644 --- a/core/services/relay/evm/mercury/mocks/pipeline.go +++ b/core/services/relay/evm/mercury/mocks/pipeline.go @@ -23,6 +23,10 @@ type MockTask struct { result pipeline.Result } +func (m *MockTask) GetDescendantTasks() []pipeline.Task { return nil } + +func (m *MockTask) TaskTags() string { return "{\"anything\": \"here\"}" } + func (m *MockTask) Type() pipeline.TaskType { return "MockTask" } func (m *MockTask) ID() int { return 0 } func (m *MockTask) DotID() string { return "" } diff --git a/core/services/synchronization/telem/telem.pb.go b/core/services/synchronization/telem/telem.pb.go index d51b9628e22..8ededa8ce1c 100644 --- a/core/services/synchronization/telem/telem.pb.go +++ b/core/services/synchronization/telem/telem.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.34.2 -// protoc v4.25.1 +// protoc v5.28.0 // source: core/services/synchronization/telem/telem.proto package telem diff --git a/core/services/synchronization/telem/telem_automation_custom.pb.go b/core/services/synchronization/telem/telem_automation_custom.pb.go index 30ddce6f790..7c498be9fde 100644 --- a/core/services/synchronization/telem/telem_automation_custom.pb.go +++ b/core/services/synchronization/telem/telem_automation_custom.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.34.2 -// protoc v4.25.1 +// protoc v5.28.0 // source: core/services/synchronization/telem/telem_automation_custom.proto package telem diff --git a/core/services/synchronization/telem/telem_enhanced_ea.pb.go b/core/services/synchronization/telem/telem_enhanced_ea.pb.go index c8983a06fea..687ff7ab4e9 100644 --- a/core/services/synchronization/telem/telem_enhanced_ea.pb.go +++ b/core/services/synchronization/telem/telem_enhanced_ea.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.34.2 -// protoc v4.25.1 +// protoc v5.28.0 // source: core/services/synchronization/telem/telem_enhanced_ea.proto package telem diff --git a/core/services/synchronization/telem/telem_enhanced_ea_mercury.pb.go b/core/services/synchronization/telem/telem_enhanced_ea_mercury.pb.go index 856619e1931..7be7ad8d706 100644 --- a/core/services/synchronization/telem/telem_enhanced_ea_mercury.pb.go +++ b/core/services/synchronization/telem/telem_enhanced_ea_mercury.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.34.2 -// protoc v4.25.1 +// protoc v5.28.0 // source: core/services/synchronization/telem/telem_enhanced_ea_mercury.proto package telem @@ -81,6 +81,7 @@ type EnhancedEAMercury struct { DpBid float64 `protobuf:"fixed64,3,opt,name=dp_bid,json=dpBid,proto3" json:"dp_bid,omitempty"` DpAsk float64 `protobuf:"fixed64,4,opt,name=dp_ask,json=dpAsk,proto3" json:"dp_ask,omitempty"` DpInvariantViolationDetected bool `protobuf:"varint,33,opt,name=dp_invariant_violation_detected,json=dpInvariantViolationDetected,proto3" json:"dp_invariant_violation_detected,omitempty"` + BridgeRequestData string `protobuf:"bytes,35,opt,name=bridge_request_data,json=bridgeRequestData,proto3" json:"bridge_request_data,omitempty"` // v1 fields (block range) CurrentBlockNumber int64 `protobuf:"varint,5,opt,name=current_block_number,json=currentBlockNumber,proto3" json:"current_block_number,omitempty"` CurrentBlockHash string `protobuf:"bytes,6,opt,name=current_block_hash,json=currentBlockHash,proto3" json:"current_block_hash,omitempty"` @@ -190,6 +191,13 @@ func (x *EnhancedEAMercury) GetDpInvariantViolationDetected() bool { return false } +func (x *EnhancedEAMercury) GetBridgeRequestData() string { + if x != nil { + return x.BridgeRequestData + } + return "" +} + func (x *EnhancedEAMercury) GetCurrentBlockNumber() int64 { if x != nil { return x.CurrentBlockNumber @@ -393,7 +401,7 @@ var file_core_services_synchronization_telem_telem_enhanced_ea_mercury_proto_raw 0x73, 0x79, 0x6e, 0x63, 0x68, 0x72, 0x6f, 0x6e, 0x69, 0x7a, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x5f, 0x65, 0x6e, 0x68, 0x61, 0x6e, 0x63, 0x65, 0x64, 0x5f, 0x65, 0x61, 0x5f, 0x6d, 0x65, 0x72, 0x63, 0x75, 0x72, 0x79, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x22, 0xfa, 0x0c, 0x0a, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x22, 0xaa, 0x0d, 0x0a, 0x11, 0x45, 0x6e, 0x68, 0x61, 0x6e, 0x63, 0x65, 0x64, 0x45, 0x41, 0x4d, 0x65, 0x72, 0x63, 0x75, 0x72, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x20, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x0a, 0x0b, @@ -409,6 +417,9 @@ var file_core_services_synchronization_telem_telem_enhanced_ea_mercury_proto_raw 0x69, 0x6f, 0x6e, 0x5f, 0x64, 0x65, 0x74, 0x65, 0x63, 0x74, 0x65, 0x64, 0x18, 0x21, 0x20, 0x01, 0x28, 0x08, 0x52, 0x1c, 0x64, 0x70, 0x49, 0x6e, 0x76, 0x61, 0x72, 0x69, 0x61, 0x6e, 0x74, 0x56, 0x69, 0x6f, 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x65, 0x74, 0x65, 0x63, 0x74, 0x65, 0x64, + 0x12, 0x2e, 0x0a, 0x13, 0x62, 0x72, 0x69, 0x64, 0x67, 0x65, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x23, 0x20, 0x01, 0x28, 0x09, 0x52, 0x11, 0x62, + 0x72, 0x69, 0x64, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x44, 0x61, 0x74, 0x61, 0x12, 0x30, 0x0a, 0x14, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x12, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x4e, 0x75, 0x6d, 0x62, diff --git a/core/services/synchronization/telem/telem_enhanced_ea_mercury.proto b/core/services/synchronization/telem/telem_enhanced_ea_mercury.proto index bb41ff86ee3..c96c58f9ea3 100644 --- a/core/services/synchronization/telem/telem_enhanced_ea_mercury.proto +++ b/core/services/synchronization/telem/telem_enhanced_ea_mercury.proto @@ -19,6 +19,7 @@ message EnhancedEAMercury { double dp_bid=3; double dp_ask=4; bool dp_invariant_violation_detected=33; + string bridge_request_data = 35; // v1 fields (block range) int64 current_block_number=5; diff --git a/core/services/synchronization/telem/telem_functions_request.pb.go b/core/services/synchronization/telem/telem_functions_request.pb.go index 89aa9e3fe37..1a67d1223a8 100644 --- a/core/services/synchronization/telem/telem_functions_request.pb.go +++ b/core/services/synchronization/telem/telem_functions_request.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.34.2 -// protoc v4.25.1 +// protoc v5.28.0 // source: core/services/synchronization/telem/telem_functions_request.proto package telem diff --git a/core/services/synchronization/telem/telem_head_report.pb.go b/core/services/synchronization/telem/telem_head_report.pb.go index 87fa42a57ee..c4101d06560 100644 --- a/core/services/synchronization/telem/telem_head_report.pb.go +++ b/core/services/synchronization/telem/telem_head_report.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.34.2 -// protoc v4.25.1 +// protoc v5.28.0 // source: core/services/synchronization/telem/telem_head_report.proto package telem diff --git a/core/services/synchronization/telem/telem_wsrpc.pb.go b/core/services/synchronization/telem/telem_wsrpc.pb.go index e4028b4de49..e7df2090e4f 100644 --- a/core/services/synchronization/telem/telem_wsrpc.pb.go +++ b/core/services/synchronization/telem/telem_wsrpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-wsrpc. DO NOT EDIT. // versions: // - protoc-gen-go-wsrpc v0.0.1 -// - protoc v4.25.1 +// - protoc v5.28.0 package telem