diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 4b11f323a57..88c58caa0b7 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -273,7 +273,7 @@ require ( github.com/smartcontractkit/chain-selectors v1.0.23 // indirect github.com/smartcontractkit/chainlink-ccip v0.0.0-20240916150615-85b8aa5fa7e6 // indirect github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 // indirect - github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240906125718-9f0a98d32fbc // indirect + github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 // indirect github.com/smartcontractkit/chainlink-feeds v0.0.0-20240910155501-42f20443189f // indirect github.com/smartcontractkit/chainlink-solana v1.1.1-0.20240911182932-3c609a6ac664 // indirect github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20240911194142-506bc469d8ae // indirect diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 5a945009635..ed669bbbe57 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1087,8 +1087,8 @@ github.com/smartcontractkit/chainlink-common v0.2.2-0.20240913161926-ce5d667907c github.com/smartcontractkit/chainlink-common v0.2.2-0.20240913161926-ce5d667907ce/go.mod h1:sjiiPwd4KsYOCf68MwL86EKphdXeT66EY7j53WH5DCc= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 h1:lTGIOQYLk1Ufn++X/AvZnt6VOcuhste5yp+C157No/Q= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7/go.mod h1:BMYE1vC/pGmdFSsOJdPrAA0/4gZ0Xo0SxTMdGspBtRo= -github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240906125718-9f0a98d32fbc h1:tRmTlaoAt+7FakMXXgeCuRPmzzBo5jsGpeCVvcU6KMc= -github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240906125718-9f0a98d32fbc/go.mod h1:PwPcmQNAzVmU8r8JWKrDRgvXesDwxnqbMD6DvYt/Z7M= +github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 h1:yRk4ektpx/UxwarqAfgxUXLrsYXlaNeP1NOwzHGrK2Q= +github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2/go.mod h1:rNhNSrrRMvkgAm5SA6bNTdh2340bTQQZdUVNtZ2o2bk= github.com/smartcontractkit/chainlink-feeds v0.0.0-20240910155501-42f20443189f h1:p4p3jBT91EQyLuAMvHD+zNJsuAYI/QjJbzuGUJ7wIgg= github.com/smartcontractkit/chainlink-feeds v0.0.0-20240910155501-42f20443189f/go.mod h1:FLlWBt2hwiMVgt9AcSo6wBJYIRd/nsc8ENbV1Wir1bw= github.com/smartcontractkit/chainlink-solana v1.1.1-0.20240911182932-3c609a6ac664 h1:JPs35oSO07PK3Qv7Kyv0GJHVLacIE1IkrvefaPyBjKs= diff --git a/core/services/llo/data_source.go b/core/services/llo/data_source.go index 3d265de2618..bad1bf0896a 100644 --- a/core/services/llo/data_source.go +++ b/core/services/llo/data_source.go @@ -71,10 +71,16 @@ var _ llo.DataSource = &dataSource{} type dataSource struct { lggr logger.Logger registry Registry + + t Telemeter +} + +func NewDataSource(lggr logger.Logger, registry Registry, t Telemeter) llo.DataSource { + return newDataSource(lggr, registry, t) } -func newDataSource(lggr logger.Logger, registry Registry) llo.DataSource { - return &dataSource{lggr.Named("DataSource"), registry} +func newDataSource(lggr logger.Logger, registry Registry, t Telemeter) *dataSource { + return &dataSource{lggr.Named("DataSource"), registry, t} } // Observe looks up all streams in the registry and populates a map of stream ID => value @@ -82,7 +88,7 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, var wg sync.WaitGroup wg.Add(len(streamValues)) var svmu sync.Mutex - var errors []ErrObservationFailed + var errs []ErrObservationFailed var errmu sync.Mutex if opts.VerboseLogging() { @@ -91,7 +97,7 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, streamIDs = append(streamIDs, streamID) } sort.Slice(streamIDs, func(i, j int) bool { return streamIDs[i] < streamIDs[j] }) - d.lggr.Debugw("Observing streams", "streamIDs", streamIDs, "seqNr", opts.SeqNr()) + d.lggr.Debugw("Observing streams", "streamIDs", streamIDs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr) } for _, streamID := range maps.Keys(streamValues) { @@ -103,7 +109,7 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, stream, exists := d.registry.Get(streamID) if !exists { errmu.Lock() - errors = append(errors, ErrObservationFailed{streamID: streamID, reason: fmt.Sprintf("missing stream: %d", streamID)}) + errs = append(errs, ErrObservationFailed{streamID: streamID, reason: fmt.Sprintf("missing stream: %d", streamID)}) errmu.Unlock() promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc() return @@ -111,19 +117,26 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, run, trrs, err := stream.Run(ctx) if err != nil { errmu.Lock() - errors = append(errors, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "pipeline run failed"}) + errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "pipeline run failed"}) errmu.Unlock() promObservationErrorCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc() + // TODO: Consolidate/reduce telemetry. We should send all observation results in a single packet + // https://smartcontract-it.atlassian.net/browse/MERC-6290 + d.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, nil, err) return } + // TODO: Consolidate/reduce telemetry. We should send all observation results in a single packet + // https://smartcontract-it.atlassian.net/browse/MERC-6290 val, err = ExtractStreamValue(trrs) if err != nil { errmu.Lock() - errors = append(errors, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "failed to extract big.Int"}) + errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "failed to extract big.Int"}) errmu.Unlock() return } + d.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, nil) + if val != nil { svmu.Lock() defer svmu.Unlock() @@ -136,15 +149,15 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, // Failed observations are always logged at warn level var failedStreamIDs []streams.StreamID - if len(errors) > 0 { - sort.Slice(errors, func(i, j int) bool { return errors[i].streamID < errors[j].streamID }) - failedStreamIDs = make([]streams.StreamID, len(errors)) - errStrs := make([]string, len(errors)) - for i, e := range errors { + if len(errs) > 0 { + sort.Slice(errs, func(i, j int) bool { return errs[i].streamID < errs[j].streamID }) + failedStreamIDs = make([]streams.StreamID, len(errs)) + errStrs := make([]string, len(errs)) + for i, e := range errs { errStrs[i] = e.String() failedStreamIDs[i] = e.streamID } - d.lggr.Warnw("Observation failed for streams", "failedStreamIDs", failedStreamIDs, "errors", errStrs, "seqNr", opts.SeqNr()) + d.lggr.Warnw("Observation failed for streams", "failedStreamIDs", failedStreamIDs, "errs", errStrs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr) } if opts.VerboseLogging() { @@ -153,7 +166,7 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, successes = append(successes, strmID) } sort.Slice(successes, func(i, j int) bool { return successes[i] < successes[j] }) - d.lggr.Debugw("Observation complete", "successfulStreamIDs", successes, "failedStreamIDs", failedStreamIDs, "values", streamValues, "seqNr", opts.SeqNr()) + d.lggr.Debugw("Observation complete", "successfulStreamIDs", successes, "failedStreamIDs", failedStreamIDs, "configDigest", opts.ConfigDigest(), "values", streamValues, "seqNr", opts.OutCtx().SeqNr) } return nil diff --git a/core/services/llo/data_source_test.go b/core/services/llo/data_source_test.go index 59cd870bc91..932c4c0c73a 100644 --- a/core/services/llo/data_source_test.go +++ b/core/services/llo/data_source_test.go @@ -4,10 +4,15 @@ import ( "context" "errors" "math/big" + "sync" "testing" "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/smartcontractkit/chainlink-data-streams/llo" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" @@ -35,8 +40,9 @@ func (m *mockRegistry) Get(streamID streams.StreamID) (strm streams.Stream, exis return } -func makeStreamWithSingleResult[T any](res T, err error) *mockStream { +func makeStreamWithSingleResult[T any](runID int64, res T, err error) *mockStream { return &mockStream{ + run: &pipeline.Run{ID: runID}, trrs: []pipeline.TaskRunResult{pipeline.TaskRunResult{Task: &pipeline.MemoTask{}, Result: pipeline.Result{Value: res}}}, err: err, } @@ -52,30 +58,59 @@ func makeStreamValues() llo.StreamValues { type mockOpts struct{} -func (m mockOpts) VerboseLogging() bool { return true } -func (m mockOpts) SeqNr() uint64 { return 42 } +func (m *mockOpts) VerboseLogging() bool { return true } +func (m *mockOpts) SeqNr() uint64 { return 1042 } +func (m *mockOpts) OutCtx() ocr3types.OutcomeContext { + return ocr3types.OutcomeContext{SeqNr: 1042, PreviousOutcome: ocr3types.Outcome([]byte("foo"))} +} +func (m *mockOpts) ConfigDigest() ocr2types.ConfigDigest { + return ocr2types.ConfigDigest{6, 5, 4} +} + +type mockTelemeter struct { + mu sync.Mutex + v3PremiumLegacyPackets []v3PremiumLegacyPacket +} + +type v3PremiumLegacyPacket struct { + run *pipeline.Run + trrs pipeline.TaskRunResults + streamID uint32 + opts llo.DSOpts + val llo.StreamValue + err error +} + +var _ Telemeter = &mockTelemeter{} + +func (m *mockTelemeter) EnqueueV3PremiumLegacy(run *pipeline.Run, trrs pipeline.TaskRunResults, streamID uint32, opts llo.DSOpts, val llo.StreamValue, err error) { + m.mu.Lock() + defer m.mu.Unlock() + m.v3PremiumLegacyPackets = append(m.v3PremiumLegacyPackets, v3PremiumLegacyPacket{run, trrs, streamID, opts, val, err}) +} func Test_DataSource(t *testing.T) { lggr := logger.TestLogger(t) reg := &mockRegistry{make(map[streams.StreamID]*mockStream)} - ds := newDataSource(lggr, reg) + ds := newDataSource(lggr, reg, NullTelemeter) ctx := testutils.Context(t) + opts := &mockOpts{} t.Run("Observe", func(t *testing.T) { t.Run("doesn't set any values if no streams are defined", func(t *testing.T) { vals := makeStreamValues() - err := ds.Observe(ctx, vals, mockOpts{}) + err := ds.Observe(ctx, vals, opts) assert.NoError(t, err) assert.Equal(t, makeStreamValues(), vals) }) t.Run("observes each stream with success and returns values matching map argument", func(t *testing.T) { - reg.streams[1] = makeStreamWithSingleResult[*big.Int](big.NewInt(2181), nil) - reg.streams[2] = makeStreamWithSingleResult[*big.Int](big.NewInt(40602), nil) - reg.streams[3] = makeStreamWithSingleResult[*big.Int](big.NewInt(15), nil) + reg.streams[1] = makeStreamWithSingleResult[*big.Int](1, big.NewInt(2181), nil) + reg.streams[2] = makeStreamWithSingleResult[*big.Int](2, big.NewInt(40602), nil) + reg.streams[3] = makeStreamWithSingleResult[*big.Int](3, big.NewInt(15), nil) vals := makeStreamValues() - err := ds.Observe(ctx, vals, mockOpts{}) + err := ds.Observe(ctx, vals, opts) assert.NoError(t, err) assert.Equal(t, llo.StreamValues{ @@ -85,12 +120,12 @@ func Test_DataSource(t *testing.T) { }, vals) }) t.Run("observes each stream and returns success/errors", func(t *testing.T) { - reg.streams[1] = makeStreamWithSingleResult[*big.Int](big.NewInt(2181), errors.New("something exploded")) - reg.streams[2] = makeStreamWithSingleResult[*big.Int](big.NewInt(40602), nil) - reg.streams[3] = makeStreamWithSingleResult[*big.Int](nil, errors.New("something exploded 2")) + reg.streams[1] = makeStreamWithSingleResult[*big.Int](1, big.NewInt(2181), errors.New("something exploded")) + reg.streams[2] = makeStreamWithSingleResult[*big.Int](2, big.NewInt(40602), nil) + reg.streams[3] = makeStreamWithSingleResult[*big.Int](3, nil, errors.New("something exploded 2")) vals := makeStreamValues() - err := ds.Observe(ctx, vals, mockOpts{}) + err := ds.Observe(ctx, vals, opts) assert.NoError(t, err) assert.Equal(t, llo.StreamValues{ @@ -99,5 +134,37 @@ func Test_DataSource(t *testing.T) { 3: nil, }, vals) }) + + t.Run("records telemetry", func(t *testing.T) { + tm := &mockTelemeter{} + ds.t = tm + + reg.streams[1] = makeStreamWithSingleResult[*big.Int](100, big.NewInt(2181), nil) + reg.streams[2] = makeStreamWithSingleResult[*big.Int](101, big.NewInt(40602), nil) + reg.streams[3] = makeStreamWithSingleResult[*big.Int](102, big.NewInt(15), nil) + + vals := makeStreamValues() + err := ds.Observe(ctx, vals, opts) + assert.NoError(t, err) + + assert.Equal(t, llo.StreamValues{ + 2: llo.ToDecimal(decimal.NewFromInt(40602)), + 1: llo.ToDecimal(decimal.NewFromInt(2181)), + 3: llo.ToDecimal(decimal.NewFromInt(15)), + }, vals) + + require.Len(t, tm.v3PremiumLegacyPackets, 3) + m := make(map[int]v3PremiumLegacyPacket) + for _, pkt := range tm.v3PremiumLegacyPackets { + m[int(pkt.run.ID)] = pkt + } + pkt := m[100] + assert.Equal(t, 100, int(pkt.run.ID)) + assert.Len(t, pkt.trrs, 1) + assert.Equal(t, 1, int(pkt.streamID)) + assert.Equal(t, opts, pkt.opts) + assert.Equal(t, "2181", pkt.val.(*llo.Decimal).String()) + assert.Nil(t, pkt.err) + }) }) } diff --git a/core/services/llo/delegate.go b/core/services/llo/delegate.go index aff670e807d..ab55c6b2a95 100644 --- a/core/services/llo/delegate.go +++ b/core/services/llo/delegate.go @@ -37,16 +37,18 @@ type delegate struct { prrc llo.PredecessorRetirementReportCache src llo.ShouldRetireCache ds llo.DataSource + t services.Service oracle Closer } type DelegateConfig struct { - Logger logger.Logger - DataSource sqlutil.DataSource - Runner streams.Runner - Registry Registry - JobName null.String + Logger logger.Logger + DataSource sqlutil.DataSource + Runner streams.Runner + Registry Registry + JobName null.String + CaptureEATelemetry bool // LLO ChannelDefinitionCache llotypes.ChannelDefinitionCache @@ -67,6 +69,7 @@ type DelegateConfig struct { } func NewDelegate(cfg DelegateConfig) (job.ServiceCtx, error) { + lggr := cfg.Logger.With("jobName", cfg.JobName.ValueOrZero()) if cfg.DataSource == nil { return nil, errors.New("DataSource must not be nil") } @@ -82,9 +85,15 @@ func NewDelegate(cfg DelegateConfig) (job.ServiceCtx, error) { // https://smartcontract-it.atlassian.net/browse/MERC-3386 prrc := llo.NewPredecessorRetirementReportCache() src := llo.NewShouldRetireCache() - ds := newDataSource(cfg.Logger.Named("DataSource"), cfg.Registry) + var t TelemeterService + if cfg.CaptureEATelemetry { + t = NewTelemeterService(lggr, cfg.MonitoringEndpoint) + } else { + t = NullTelemeter + } + ds := newDataSource(lggr.Named("DataSource"), cfg.Registry, t) - return &delegate{services.StateMachine{}, cfg, codecs, prrc, src, ds, nil}, nil + return &delegate{services.StateMachine{}, cfg, codecs, prrc, src, ds, t, nil}, nil } func (d *delegate) Start(ctx context.Context) error { diff --git a/core/services/llo/evm/report_codec_premium_legacy.go b/core/services/llo/evm/report_codec_premium_legacy.go index da14aceab69..f8d72619ea1 100644 --- a/core/services/llo/evm/report_codec_premium_legacy.go +++ b/core/services/llo/evm/report_codec_premium_legacy.go @@ -155,10 +155,15 @@ func ExtractReportValues(report llo.Report) (nativePrice, linkPrice *llo.Decimal // MERC-3524 var LLOExtraHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001") -func LegacyReportContext(cd ocr2types.ConfigDigest, seqNr uint64) ocr2types.ReportContext { +func SeqNrToEpochAndRound(seqNr uint64) (epoch uint32, round uint8) { // Simulate 256 rounds/epoch - epoch := seqNr / 256 - round := seqNr % 256 + epoch = uint32(seqNr / 256) // nolint + round = uint8(seqNr % 256) // nolint + return +} + +func LegacyReportContext(cd ocr2types.ConfigDigest, seqNr uint64) ocr2types.ReportContext { + epoch, round := SeqNrToEpochAndRound(seqNr) return ocr2types.ReportContext{ ReportTimestamp: ocr2types.ReportTimestamp{ ConfigDigest: cd, diff --git a/core/services/llo/telemetry.go b/core/services/llo/telemetry.go new file mode 100644 index 00000000000..62b586f5cc8 --- /dev/null +++ b/core/services/llo/telemetry.go @@ -0,0 +1,184 @@ +package llo + +import ( + "context" + "errors" + "fmt" + + "github.com/smartcontractkit/libocr/commontypes" + "google.golang.org/protobuf/proto" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-data-streams/llo" + + "github.com/smartcontractkit/chainlink/v2/core/services/llo/evm" + "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline/eautils" + mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils" + "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem" +) + +const adapterLWBAErrorName = "AdapterLWBAError" + +type Telemeter interface { + EnqueueV3PremiumLegacy(run *pipeline.Run, trrs pipeline.TaskRunResults, streamID uint32, opts llo.DSOpts, val llo.StreamValue, err error) +} + +type TelemeterService interface { + Telemeter + services.Service +} + +func NewTelemeterService(lggr logger.Logger, monitoringEndpoint commontypes.MonitoringEndpoint) TelemeterService { + if monitoringEndpoint == nil { + return NullTelemeter + } + return newTelemeter(lggr, monitoringEndpoint) +} + +func newTelemeter(lggr logger.Logger, monitoringEndpoint commontypes.MonitoringEndpoint) *telemeter { + chTelemetryObservation := make(chan TelemetryObservation, 100) + t := &telemeter{ + chTelemetryObservation: chTelemetryObservation, + monitoringEndpoint: monitoringEndpoint, + } + t.Service, t.eng = services.Config{ + Name: "LLOTelemeterService", + Start: t.start, + }.NewServiceEngine(lggr) + + return t +} + +type telemeter struct { + services.Service + eng *services.Engine + + monitoringEndpoint commontypes.MonitoringEndpoint + chTelemetryObservation chan TelemetryObservation +} + +func (t *telemeter) EnqueueV3PremiumLegacy(run *pipeline.Run, trrs pipeline.TaskRunResults, streamID uint32, opts llo.DSOpts, val llo.StreamValue, err error) { + var adapterError *eautils.AdapterError + var dpInvariantViolationDetected bool + if errors.As(err, &adapterError) && adapterError.Name == adapterLWBAErrorName { + dpInvariantViolationDetected = true + } else if err != nil { + // ignore errors + return + } + tObs := TelemetryObservation{run, trrs, streamID, opts, val, dpInvariantViolationDetected} + select { + case t.chTelemetryObservation <- tObs: + default: + } +} + +func (t *telemeter) start(_ context.Context) error { + t.eng.Go(func(ctx context.Context) { + for { + select { + case tObs := <-t.chTelemetryObservation: + t.collectV3PremiumLegacyTelemetry(tObs) + case <-ctx.Done(): + return + } + } + }) + return nil +} + +func (t *telemeter) collectV3PremiumLegacyTelemetry(d TelemetryObservation) { + eaTelemetryValues := ocrcommon.ParseMercuryEATelemetry(t.eng.SugaredLogger, d.trrs, mercuryutils.REPORT_V3) + for _, eaTelem := range eaTelemetryValues { + var benchmarkPrice, bidPrice, askPrice int64 + var bp, bid, ask string + switch v := d.val.(type) { + case *llo.Decimal: + benchmarkPrice = v.Decimal().IntPart() + bp = v.Decimal().String() + case *llo.Quote: + benchmarkPrice = v.Benchmark.IntPart() + bp = v.Benchmark.String() + bidPrice = v.Bid.IntPart() + bid = v.Bid.String() + askPrice = v.Ask.IntPart() + ask = v.Ask.String() + } + epoch, round := evm.SeqNrToEpochAndRound(d.opts.OutCtx().SeqNr) + tea := &telem.EnhancedEAMercury{ + DataSource: eaTelem.DataSource, + DpBenchmarkPrice: eaTelem.DpBenchmarkPrice, + DpBid: eaTelem.DpBid, + DpAsk: eaTelem.DpAsk, + DpInvariantViolationDetected: d.dpInvariantViolationDetected, + BridgeTaskRunStartedTimestamp: eaTelem.BridgeTaskRunStartedTimestamp, + BridgeTaskRunEndedTimestamp: eaTelem.BridgeTaskRunEndedTimestamp, + ProviderRequestedTimestamp: eaTelem.ProviderRequestedTimestamp, + ProviderReceivedTimestamp: eaTelem.ProviderReceivedTimestamp, + ProviderDataStreamEstablished: eaTelem.ProviderDataStreamEstablished, + ProviderIndicatedTime: eaTelem.ProviderIndicatedTime, + Feed: fmt.Sprintf("streamID:%d", d.streamID), + ObservationBenchmarkPrice: benchmarkPrice, + ObservationBid: bidPrice, + ObservationAsk: askPrice, + ObservationBenchmarkPriceString: bp, + ObservationBidString: bid, + ObservationAskString: ask, + IsLinkFeed: false, + IsNativeFeed: false, + ConfigDigest: d.opts.ConfigDigest().Hex(), + Round: int64(round), + Epoch: int64(epoch), + AssetSymbol: eaTelem.AssetSymbol, + Version: uint32(1000 + mercuryutils.REPORT_V3), // add 1000 to distinguish between legacy feeds, this can be changed if necessary + } + + bytes, err := proto.Marshal(tea) + if err != nil { + t.eng.SugaredLogger.Warnf("protobuf marshal failed %v", err.Error()) + continue + } + + t.monitoringEndpoint.SendLog(bytes) + } +} + +type TelemetryObservation struct { + run *pipeline.Run + trrs pipeline.TaskRunResults + streamID uint32 + opts llo.DSOpts + val llo.StreamValue + dpInvariantViolationDetected bool +} + +var NullTelemeter TelemeterService = &nullTelemeter{} + +type nullTelemeter struct{} + +func (t *nullTelemeter) EnqueueV3PremiumLegacy(run *pipeline.Run, trrs pipeline.TaskRunResults, streamID uint32, opts llo.DSOpts, val llo.StreamValue, err error) { +} +func (t *nullTelemeter) Start(context.Context) error { + return nil +} +func (t *nullTelemeter) Close() error { + return nil +} +func (t *nullTelemeter) Healthy() error { + return nil +} +func (t *nullTelemeter) Unhealthy() error { + return nil +} +func (t *nullTelemeter) HealthReport() map[string]error { + return nil +} +func (t *nullTelemeter) Name() string { + return "NullTelemeter" +} +func (t *nullTelemeter) Ready() error { + return nil +} diff --git a/core/services/llo/telemetry_test.go b/core/services/llo/telemetry_test.go new file mode 100644 index 00000000000..ec77e959d24 --- /dev/null +++ b/core/services/llo/telemetry_test.go @@ -0,0 +1,216 @@ +package llo + +import ( + "errors" + "testing" + "time" + + "github.com/shopspring/decimal" + "github.com/smartcontractkit/libocr/commontypes" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "gopkg.in/guregu/null.v4" + + "github.com/smartcontractkit/chainlink-data-streams/llo" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline/eautils" + "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" +) + +var _ commontypes.MonitoringEndpoint = &mockMonitoringEndpoint{} + +type mockMonitoringEndpoint struct { + chLogs chan []byte +} + +func (m *mockMonitoringEndpoint) SendLog(log []byte) { + m.chLogs <- log +} + +const bridgeResponse = `{ + "meta":{ + "adapterName":"data-source-name" + }, + "timestamps":{ + "providerDataRequestedUnixMs":92233720368547760, + "providerDataReceivedUnixMs":-92233720368547760, + "providerDataStreamEstablishedUnixMs":1, + "providerIndicatedTimeUnixMs":-123456789 + } + }` + +var trrs = pipeline.TaskRunResults{ + pipeline.TaskRunResult{ + Task: &pipeline.BridgeTask{ + Name: "test-bridge-1", + BaseTask: pipeline.NewBaseTask(0, "ds1", nil, nil, 0), + RequestData: `{"data":{"from":"eth", "to":"usd"}}`, + }, + Result: pipeline.Result{ + Value: bridgeResponse, + }, + CreatedAt: time.Unix(0, 0), + FinishedAt: null.TimeFrom(time.Unix(0, 0)), + }, + pipeline.TaskRunResult{ + Task: &pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(1, "ds1_parse", nil, nil, 1), + }, + Result: pipeline.Result{ + Value: "123456.123456789", + }, + }, + pipeline.TaskRunResult{ + Task: &pipeline.BridgeTask{ + Name: "test-bridge-2", + BaseTask: pipeline.NewBaseTask(0, "ds2", nil, nil, 0), + RequestData: `{"data":{"from":"eth", "to":"usd"}}`, + }, + Result: pipeline.Result{ + Value: bridgeResponse, + }, + CreatedAt: time.Unix(1, 0), + FinishedAt: null.TimeFrom(time.Unix(10, 0)), + }, + pipeline.TaskRunResult{ + Task: &pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(1, "ds2_parse", nil, nil, 1), + }, + Result: pipeline.Result{ + Value: "12345678", + }, + }, + pipeline.TaskRunResult{ + Task: &pipeline.BridgeTask{ + Name: "test-bridge-3", + BaseTask: pipeline.NewBaseTask(0, "ds3", nil, nil, 0), + RequestData: `{"data":{"from":"eth", "to":"usd"}}`, + }, + Result: pipeline.Result{ + Value: bridgeResponse, + }, + CreatedAt: time.Unix(2, 0), + FinishedAt: null.TimeFrom(time.Unix(20, 0)), + }, + pipeline.TaskRunResult{ + Task: &pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(1, "ds3_parse", nil, nil, 1), + }, + Result: pipeline.Result{ + Value: "1234567890", + }, + }, +} + +func Test_Telemeter(t *testing.T) { + lggr := logger.Test(t) + m := &mockMonitoringEndpoint{} + + run := &pipeline.Run{ID: 42} + streamID := uint32(135) + opts := &mockOpts{} + + t.Run("with error", func(t *testing.T) { + tm := newTelemeter(lggr, m) + servicetest.Run(t, tm) + + t.Run("if error is some random failure returns immediately", func(t *testing.T) { + // should return immediately and not even send on the channel + m.chLogs = nil + tm.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, nil, errors.New("test error")) + }) + t.Run("if error is dp invariant violation, sets this flag", func(t *testing.T) { + m.chLogs = make(chan []byte, 100) + adapterError := new(eautils.AdapterError) + adapterError.Name = adapterLWBAErrorName + tm.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, nil, adapterError) + + var i int + for log := range m.chLogs { + decoded := &telem.EnhancedEAMercury{} + require.NoError(t, proto.Unmarshal(log, decoded)) + assert.True(t, decoded.DpInvariantViolationDetected) + if i == 2 { + return + } + i++ + } + }) + }) + t.Run("with decimal value, sets all values correctly", func(t *testing.T) { + tm := newTelemeter(lggr, m) + val := llo.ToDecimal(decimal.NewFromFloat32(102.12)) + servicetest.Run(t, tm) + tm.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, nil) + + var i int + for log := range m.chLogs { + decoded := &telem.EnhancedEAMercury{} + require.NoError(t, proto.Unmarshal(log, decoded)) + assert.Equal(t, int(1003), int(decoded.Version)) + assert.Equal(t, float64(123456.123456789), decoded.DpBenchmarkPrice) + assert.Zero(t, decoded.DpBid) + assert.Zero(t, decoded.DpAsk) + assert.False(t, decoded.DpInvariantViolationDetected) + assert.Zero(t, decoded.CurrentBlockNumber) + assert.Zero(t, decoded.CurrentBlockHash) + assert.Zero(t, decoded.CurrentBlockTimestamp) + assert.Zero(t, decoded.FetchMaxFinalizedTimestamp) + assert.Zero(t, decoded.MaxFinalizedTimestamp) + assert.Zero(t, decoded.ObservationTimestamp) + assert.False(t, decoded.IsLinkFeed) + assert.Zero(t, decoded.LinkPrice) + assert.False(t, decoded.IsNativeFeed) + assert.Zero(t, decoded.NativePrice) + assert.Equal(t, int64(i*1000), decoded.BridgeTaskRunStartedTimestamp) + assert.Equal(t, int64(i*10000), decoded.BridgeTaskRunEndedTimestamp) + assert.Equal(t, int64(92233720368547760), decoded.ProviderRequestedTimestamp) + assert.Equal(t, int64(-92233720368547760), decoded.ProviderReceivedTimestamp) + assert.Equal(t, int64(1), decoded.ProviderDataStreamEstablished) + assert.Equal(t, int64(-123456789), decoded.ProviderIndicatedTime) + assert.Equal(t, "streamID:135", decoded.Feed) + assert.Equal(t, int64(102), decoded.ObservationBenchmarkPrice) + assert.Equal(t, "102.12", decoded.ObservationBenchmarkPriceString) + assert.Zero(t, decoded.ObservationBid) + assert.Zero(t, decoded.ObservationBidString) + assert.Zero(t, decoded.ObservationAsk) + assert.Zero(t, decoded.ObservationAskString) + assert.Zero(t, decoded.ObservationMarketStatus) + assert.Equal(t, "0605040000000000000000000000000000000000000000000000000000000000", decoded.ConfigDigest) + assert.Equal(t, int64(18), decoded.Round) + assert.Equal(t, int64(4), decoded.Epoch) + assert.Equal(t, "eth/usd", decoded.AssetSymbol) + if i == 2 { + return + } + i++ + } + }) + t.Run("with quote value", func(t *testing.T) { + tm := newTelemeter(lggr, m) + val := &llo.Quote{Bid: decimal.NewFromFloat32(102.12), Benchmark: decimal.NewFromFloat32(103.32), Ask: decimal.NewFromFloat32(104.25)} + servicetest.Run(t, tm) + tm.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, nil) + + var i int + for log := range m.chLogs { + decoded := &telem.EnhancedEAMercury{} + require.NoError(t, proto.Unmarshal(log, decoded)) + assert.Equal(t, int64(103), decoded.ObservationBenchmarkPrice) + assert.Equal(t, "103.32", decoded.ObservationBenchmarkPriceString) + assert.Equal(t, int64(102), decoded.ObservationBid) + assert.Equal(t, "102.12", decoded.ObservationBidString) + assert.Equal(t, int64(104), decoded.ObservationAsk) + assert.Equal(t, "104.25", decoded.ObservationAskString) + assert.Zero(t, decoded.ObservationMarketStatus) + if i == 2 { + return + } + i++ + } + }) +} diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index a6bb63b6c8e..18f4d6224e7 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -1015,7 +1015,8 @@ func (d *Delegate) newServicesLLO( Runner: d.pipelineRunner, Registry: d.streamRegistry, - JobName: jb.Name, + JobName: jb.Name, + CaptureEATelemetry: jb.OCR2OracleSpec.CaptureEATelemetry, ChannelDefinitionCache: provider.ChannelDefinitionCache(), @@ -1025,13 +1026,11 @@ func (d *Delegate) newServicesLLO( ContractConfigTracker: provider.ContractConfigTracker(), Database: ocrDB, LocalConfig: lc, - // TODO: Telemetry for llo - // https://smartcontract-it.atlassian.net/browse/MERC-3603 - MonitoringEndpoint: nil, - OffchainConfigDigester: provider.OffchainConfigDigester(), - OffchainKeyring: kb, - OnchainKeyring: kr, - OCRLogger: ocrLogger, + MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, fmt.Sprintf("%d", pluginCfg.DonID), synchronization.EnhancedEAMercury), + OffchainConfigDigester: provider.OffchainConfigDigester(), + OffchainKeyring: kb, + OnchainKeyring: kr, + OCRLogger: ocrLogger, // Enable verbose logging if either Mercury.VerboseLogging is on or OCR2.TraceLogging is on ReportingPluginConfig: datastreamsllo.Config{VerboseLogging: d.cfg.Mercury().VerboseLogging() || d.cfg.OCR2().TraceLogging()}, diff --git a/core/services/ocr2/plugins/llo/integration_test.go b/core/services/ocr2/plugins/llo/integration_test.go index 62c43eaf4da..2e49e989462 100644 --- a/core/services/ocr2/plugins/llo/integration_test.go +++ b/core/services/ocr2/plugins/llo/integration_test.go @@ -210,7 +210,6 @@ func TestIntegration_LLO(t *testing.T) { } serverURL := startMercuryServer(t, srv, clientPubKeys) - // TODO: all args? steve, backend, configurator, configuratorAddress, verifier, _, verifierProxy, _, configStore, configStoreAddress := setupBlockchain(t) fromBlock := 1 diff --git a/core/services/ocrcommon/telemetry.go b/core/services/ocrcommon/telemetry.go index 7db29aaacba..2be3e2228c4 100644 --- a/core/services/ocrcommon/telemetry.go +++ b/core/services/ocrcommon/telemetry.go @@ -11,13 +11,13 @@ import ( ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "google.golang.org/protobuf/proto" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" v1types "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v1" v2types "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v2" v3types "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3" v4types "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v4" - "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils" @@ -25,12 +25,19 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/utils" ) -type eaTelemetry struct { +type EATelemetry struct { DataSource string ProviderRequestedTimestamp int64 ProviderReceivedTimestamp int64 ProviderDataStreamEstablished int64 ProviderIndicatedTime int64 + + DpBenchmarkPrice float64 + DpBid float64 + DpAsk float64 + BridgeTaskRunStartedTimestamp int64 + BridgeTaskRunEndedTimestamp int64 + AssetSymbol string } type EnhancedTelemetryData struct { @@ -144,8 +151,37 @@ func (e *EnhancedTelemetryService[T]) getChainID() string { } } +func ParseMercuryEATelemetry(lggr logger.Logger, trrs pipeline.TaskRunResults, feedVersion mercuryutils.FeedVersion) (eaTelemetryValues []EATelemetry) { + for _, trr := range trrs { + if trr.Task.Type() != pipeline.TaskTypeBridge { + continue + } + bridgeTask := trr.Task.(*pipeline.BridgeTask) + bridgeName := bridgeTask.Name + + bridgeRawResponse, ok := trr.Result.Value.(string) + if !ok { + lggr.Warnw(fmt.Sprintf("cannot get bridge response from bridge task, id=%s, name=%q, expected string got %T", trr.Task.DotID(), bridgeName, trr.Result.Value), "dotID", trr.Task.DotID(), "bridgeName", bridgeName) + continue + } + eaTelem, err := parseEATelemetry([]byte(bridgeRawResponse)) + if err != nil { + lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry, id=%s, name=%q", trr.Task.DotID(), bridgeName), "err", err, "dotID", trr.Task.DotID(), "bridgeName", bridgeName) + } + + eaTelem.DpBenchmarkPrice, eaTelem.DpBid, eaTelem.DpAsk = getPricesFromResults(lggr, trr, trrs, feedVersion) + + eaTelem.BridgeTaskRunStartedTimestamp = trr.CreatedAt.UnixMilli() + eaTelem.BridgeTaskRunEndedTimestamp = trr.FinishedAt.Time.UnixMilli() + eaTelem.AssetSymbol = getAssetSymbolFromRequestData(bridgeTask.RequestData) + + eaTelemetryValues = append(eaTelemetryValues, eaTelem) + } + return +} + // parseEATelemetry attempts to parse the bridge telemetry -func parseEATelemetry(b []byte) (eaTelemetry, error) { +func parseEATelemetry(b []byte) (EATelemetry, error) { type eaTimestamps struct { ProviderRequestedTimestamp int64 `json:"providerDataRequestedUnixMs"` ProviderReceivedTimestamp int64 `json:"providerDataReceivedUnixMs"` @@ -163,10 +199,10 @@ func parseEATelemetry(b []byte) (eaTelemetry, error) { t := eaTelem{} if err := json.Unmarshal(b, &t); err != nil { - return eaTelemetry{}, err + return EATelemetry{}, err } - return eaTelemetry{ + return EATelemetry{ DataSource: t.TelemMeta.AdapterName, ProviderRequestedTimestamp: t.TelemTimestamps.ProviderRequestedTimestamp, ProviderReceivedTimestamp: t.TelemTimestamps.ProviderReceivedTimestamp, @@ -378,40 +414,21 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced } } - for _, trr := range d.TaskRunResults { - if trr.Task.Type() != pipeline.TaskTypeBridge { - continue - } - bridgeTask := trr.Task.(*pipeline.BridgeTask) - bridgeName := bridgeTask.Name - - bridgeRawResponse, ok := trr.Result.Value.(string) - if !ok { - e.lggr.Warnw(fmt.Sprintf("cannot get bridge response from bridge task, job=%d, id=%s, name=%q, expected string got %T", e.job.ID, trr.Task.DotID(), bridgeName, trr.Result.Value), "jobID", e.job.ID, "dotID", trr.Task.DotID(), "bridgeName", bridgeName) - continue - } - eaTelem, err := parseEATelemetry([]byte(bridgeRawResponse)) - if err != nil { - e.lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry, job=%d, id=%s, name=%q", e.job.ID, trr.Task.DotID(), bridgeName), "err", err, "jobID", e.job.ID, "dotID", trr.Task.DotID(), "bridgeName", bridgeName) - } - - assetSymbol := e.getAssetSymbolFromRequestData(bridgeTask.RequestData) - - benchmarkPrice, bidPrice, askPrice := e.getPricesFromResults(trr, d.TaskRunResults, d.FeedVersion) - + eaTelemetryValues := ParseMercuryEATelemetry(logger.Sugared(e.lggr).With("jobID", e.job.ID), d.TaskRunResults, d.FeedVersion) + for _, eaTelem := range eaTelemetryValues { t := &telem.EnhancedEAMercury{ DataSource: eaTelem.DataSource, - DpBenchmarkPrice: benchmarkPrice, - DpBid: bidPrice, - DpAsk: askPrice, + DpBenchmarkPrice: eaTelem.DpBenchmarkPrice, + DpBid: eaTelem.DpBid, + DpAsk: eaTelem.DpAsk, DpInvariantViolationDetected: d.DpInvariantViolationDetected, CurrentBlockNumber: bn, CurrentBlockHash: bh, CurrentBlockTimestamp: bt, FetchMaxFinalizedTimestamp: d.FetchMaxFinalizedTimestamp, MaxFinalizedTimestamp: mfts, - BridgeTaskRunStartedTimestamp: trr.CreatedAt.UnixMilli(), - BridgeTaskRunEndedTimestamp: trr.FinishedAt.Time.UnixMilli(), + BridgeTaskRunStartedTimestamp: eaTelem.BridgeTaskRunStartedTimestamp, + BridgeTaskRunEndedTimestamp: eaTelem.BridgeTaskRunEndedTimestamp, ProviderRequestedTimestamp: eaTelem.ProviderRequestedTimestamp, ProviderReceivedTimestamp: eaTelem.ProviderReceivedTimestamp, ProviderDataStreamEstablished: eaTelem.ProviderDataStreamEstablished, @@ -431,7 +448,7 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced ConfigDigest: d.RepTimestamp.ConfigDigest.Hex(), Round: int64(d.RepTimestamp.Round), Epoch: int64(d.RepTimestamp.Epoch), - AssetSymbol: assetSymbol, + AssetSymbol: eaTelem.AssetSymbol, Version: uint32(d.FeedVersion), } @@ -446,7 +463,7 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced } // getAssetSymbolFromRequestData parses the requestData of the bridge to generate an asset symbol pair -func (e *EnhancedTelemetryService[T]) getAssetSymbolFromRequestData(requestData string) string { +func getAssetSymbolFromRequestData(requestData string) string { type reqDataPayload struct { To string `json:"to"` From string `json:"from"` @@ -474,22 +491,22 @@ func ShouldCollectEnhancedTelemetryMercury(jb job.Job) bool { // getPricesFromResults parses the pipeline.TaskRunResults for pipeline.TaskTypeJSONParse and gets the benchmarkPrice, // bid and ask. This functions expects the pipeline.TaskRunResults to be correctly ordered -func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) { +func getPricesFromResults(lggr logger.Logger, startTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) { var benchmarkPrice, askPrice, bidPrice float64 var err error // We rely on task results to be sorted in the correct order benchmarkPriceTask := allTasks.GetNextTaskOf(startTask) if benchmarkPriceTask == nil { - e.lggr.Warnf("cannot parse enhanced EA telemetry benchmark price, task is nil, job %d", e.job.ID) + lggr.Warn("cannot parse enhanced EA telemetry benchmark price, task is nil") return 0, 0, 0 } if benchmarkPriceTask.Task.Type() == pipeline.TaskTypeJSONParse { if benchmarkPriceTask.Result.Error != nil { - e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry benchmark price, job %d, id %s: %s", e.job.ID, benchmarkPriceTask.Task.DotID(), benchmarkPriceTask.Result.Error), "err", benchmarkPriceTask.Result.Error) + lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry benchmark price, id %s: %s", benchmarkPriceTask.Task.DotID(), benchmarkPriceTask.Result.Error), "err", benchmarkPriceTask.Result.Error) } else { benchmarkPrice, err = getResultFloat64(benchmarkPriceTask) if err != nil { - e.lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry benchmark price, job %d, id %s", e.job.ID, benchmarkPriceTask.Task.DotID()), "err", err) + lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry benchmark price, id %s", benchmarkPriceTask.Task.DotID()), "err", err) } } } @@ -501,33 +518,33 @@ func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.Ta bidTask := allTasks.GetNextTaskOf(*benchmarkPriceTask) if bidTask == nil { - e.lggr.Warnf("cannot parse enhanced EA telemetry bid price, task is nil, job %d, id %s", e.job.ID, benchmarkPriceTask.Task.DotID()) + lggr.Warnf("cannot parse enhanced EA telemetry bid price, task is nil, id %s", benchmarkPriceTask.Task.DotID()) return benchmarkPrice, 0, 0 } if bidTask != nil && bidTask.Task.Type() == pipeline.TaskTypeJSONParse { if bidTask.Result.Error != nil { - e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry bid price, job %d, id %s: %s", e.job.ID, bidTask.Task.DotID(), bidTask.Result.Error), "err", bidTask.Result.Error) + lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry bid price, id %s: %s", bidTask.Task.DotID(), bidTask.Result.Error), "err", bidTask.Result.Error) } else { bidPrice, err = getResultFloat64(bidTask) if err != nil { - e.lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry bid price, job %d, id %s", e.job.ID, bidTask.Task.DotID()), "err", err) + lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry bid price, id %s", bidTask.Task.DotID()), "err", err) } } } askTask := allTasks.GetNextTaskOf(*bidTask) if askTask == nil { - e.lggr.Warnf("cannot parse enhanced EA telemetry ask price, task is nil, job %d, id %s", e.job.ID, benchmarkPriceTask.Task.DotID()) + lggr.Warnf("cannot parse enhanced EA telemetry ask price, task is nil, id %s", benchmarkPriceTask.Task.DotID()) return benchmarkPrice, bidPrice, 0 } if askTask != nil && askTask.Task.Type() == pipeline.TaskTypeJSONParse { if bidTask.Result.Error != nil { - e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry ask price, job %d, id %s: %s", e.job.ID, askTask.Task.DotID(), askTask.Result.Error), "err", askTask.Result.Error) + lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry ask price, id %s: %s", askTask.Task.DotID(), askTask.Result.Error), "err", askTask.Result.Error) } else { askPrice, err = getResultFloat64(askTask) if err != nil { - e.lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry ask price, job %d, id %s", e.job.ID, askTask.Task.DotID()), "err", err) + lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry ask price, id %s", askTask.Task.DotID()), "err", err) } } } diff --git a/core/services/ocrcommon/telemetry_test.go b/core/services/ocrcommon/telemetry_test.go index f764e7380f8..e257554803c 100644 --- a/core/services/ocrcommon/telemetry_test.go +++ b/core/services/ocrcommon/telemetry_test.go @@ -447,19 +447,13 @@ var trrsMercuryV2 = pipeline.TaskRunResults{ func TestGetPricesFromResults(t *testing.T) { lggr, logs := logger.TestLoggerObserved(t, zap.WarnLevel) - e := EnhancedTelemetryService[EnhancedTelemetryMercuryData]{ - lggr: lggr, - job: &job.Job{ - ID: 0, - }, - } - benchmarkPrice, bid, ask := e.getPricesFromResults(trrsMercuryV1[0], trrsMercuryV1, 1) + benchmarkPrice, bid, ask := getPricesFromResults(lggr, trrsMercuryV1[0], trrsMercuryV1, 1) require.Equal(t, 123456.123456, benchmarkPrice) require.Equal(t, 1234567.1234567, bid) require.Equal(t, float64(321123), ask) - benchmarkPrice, bid, ask = e.getPricesFromResults(trrsMercuryV1[0], pipeline.TaskRunResults{}, 1) + benchmarkPrice, bid, ask = getPricesFromResults(lggr, trrsMercuryV1[0], pipeline.TaskRunResults{}, 1) require.Equal(t, float64(0), benchmarkPrice) require.Equal(t, float64(0), bid) require.Equal(t, float64(0), ask) @@ -467,12 +461,12 @@ func TestGetPricesFromResults(t *testing.T) { require.Contains(t, logs.All()[0].Message, "cannot parse enhanced EA telemetry") tt := trrsMercuryV1[:2] - e.getPricesFromResults(trrsMercuryV1[0], tt, 1) + getPricesFromResults(lggr, trrsMercuryV1[0], tt, 1) require.Equal(t, 2, logs.Len()) require.Contains(t, logs.All()[1].Message, "cannot parse enhanced EA telemetry bid price, task is nil") tt = trrsMercuryV1[:3] - e.getPricesFromResults(trrsMercuryV1[0], tt, 1) + getPricesFromResults(lggr, trrsMercuryV1[0], tt, 1) require.Equal(t, 3, logs.Len()) require.Contains(t, logs.All()[2].Message, "cannot parse enhanced EA telemetry ask price, task is nil") @@ -510,7 +504,7 @@ func TestGetPricesFromResults(t *testing.T) { Value: nil, }, }} - benchmarkPrice, bid, ask = e.getPricesFromResults(trrsMercuryV1[0], trrs2, 3) + benchmarkPrice, bid, ask = getPricesFromResults(lggr, trrsMercuryV1[0], trrs2, 3) require.Equal(t, benchmarkPrice, float64(0)) require.Equal(t, bid, float64(0)) require.Equal(t, ask, float64(0)) @@ -519,7 +513,7 @@ func TestGetPricesFromResults(t *testing.T) { require.Contains(t, logs.All()[4].Message, "cannot parse enhanced EA telemetry bid price") require.Contains(t, logs.All()[5].Message, "cannot parse enhanced EA telemetry ask price") - benchmarkPrice, bid, ask = e.getPricesFromResults(trrsMercuryV1[0], trrsMercuryV2, 2) + benchmarkPrice, bid, ask = getPricesFromResults(lggr, trrsMercuryV1[0], trrsMercuryV2, 2) require.Equal(t, 123456.123456, benchmarkPrice) require.Equal(t, float64(0), bid) require.Equal(t, float64(0), ask) @@ -542,10 +536,9 @@ func TestShouldCollectEnhancedTelemetryMercury(t *testing.T) { } func TestGetAssetSymbolFromRequestData(t *testing.T) { - e := EnhancedTelemetryService[EnhancedTelemetryMercuryData]{} - require.Equal(t, e.getAssetSymbolFromRequestData(""), "") + require.Equal(t, getAssetSymbolFromRequestData(""), "") reqData := `{"data":{"to":"LINK","from":"USD"}}` - require.Equal(t, e.getAssetSymbolFromRequestData(reqData), "USD/LINK") + require.Equal(t, getAssetSymbolFromRequestData(reqData), "USD/LINK") } func TestCollectMercuryEnhancedTelemetryV1(t *testing.T) { @@ -660,7 +653,7 @@ func TestCollectMercuryEnhancedTelemetryV1(t *testing.T) { wg.Wait() require.Equal(t, 2, logs.Len()) - require.Contains(t, logs.All()[0].Message, `cannot get bridge response from bridge task, job=0, id=ds1, name="test-mercury-bridge-1"`) + require.Contains(t, logs.All()[0].Message, `cannot get bridge response from bridge task, id=ds1, name="test-mercury-bridge-1"`) require.Contains(t, logs.All()[1].Message, "cannot parse EA telemetry") chDone <- struct{}{} } diff --git a/go.mod b/go.mod index e900c0ebcc7..6f9d925858f 100644 --- a/go.mod +++ b/go.mod @@ -77,7 +77,7 @@ require ( github.com/smartcontractkit/chainlink-ccip v0.0.0-20240916150615-85b8aa5fa7e6 github.com/smartcontractkit/chainlink-common v0.2.2-0.20240913161926-ce5d667907ce github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 - github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240906125718-9f0a98d32fbc + github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 github.com/smartcontractkit/chainlink-feeds v0.0.0-20240910155501-42f20443189f github.com/smartcontractkit/chainlink-solana v1.1.1-0.20240911182932-3c609a6ac664 github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20240911194142-506bc469d8ae diff --git a/go.sum b/go.sum index 1414f564a8b..6edbb94eb94 100644 --- a/go.sum +++ b/go.sum @@ -1048,8 +1048,8 @@ github.com/smartcontractkit/chainlink-common v0.2.2-0.20240913161926-ce5d667907c github.com/smartcontractkit/chainlink-common v0.2.2-0.20240913161926-ce5d667907ce/go.mod h1:sjiiPwd4KsYOCf68MwL86EKphdXeT66EY7j53WH5DCc= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 h1:lTGIOQYLk1Ufn++X/AvZnt6VOcuhste5yp+C157No/Q= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7/go.mod h1:BMYE1vC/pGmdFSsOJdPrAA0/4gZ0Xo0SxTMdGspBtRo= -github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240906125718-9f0a98d32fbc h1:tRmTlaoAt+7FakMXXgeCuRPmzzBo5jsGpeCVvcU6KMc= -github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240906125718-9f0a98d32fbc/go.mod h1:PwPcmQNAzVmU8r8JWKrDRgvXesDwxnqbMD6DvYt/Z7M= +github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 h1:yRk4ektpx/UxwarqAfgxUXLrsYXlaNeP1NOwzHGrK2Q= +github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2/go.mod h1:rNhNSrrRMvkgAm5SA6bNTdh2340bTQQZdUVNtZ2o2bk= github.com/smartcontractkit/chainlink-feeds v0.0.0-20240910155501-42f20443189f h1:p4p3jBT91EQyLuAMvHD+zNJsuAYI/QjJbzuGUJ7wIgg= github.com/smartcontractkit/chainlink-feeds v0.0.0-20240910155501-42f20443189f/go.mod h1:FLlWBt2hwiMVgt9AcSo6wBJYIRd/nsc8ENbV1Wir1bw= github.com/smartcontractkit/chainlink-solana v1.1.1-0.20240911182932-3c609a6ac664 h1:JPs35oSO07PK3Qv7Kyv0GJHVLacIE1IkrvefaPyBjKs= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 40c8964c031..a12e8c18dcd 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -406,7 +406,7 @@ require ( github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 // indirect - github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240906125718-9f0a98d32fbc // indirect + github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 // indirect github.com/smartcontractkit/chainlink-feeds v0.0.0-20240910155501-42f20443189f // indirect github.com/smartcontractkit/chainlink-solana v1.1.1-0.20240911182932-3c609a6ac664 // indirect github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20240911194142-506bc469d8ae // indirect diff --git a/integration-tests/go.sum b/integration-tests/go.sum index db4c6ed040d..a559c099014 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1429,8 +1429,8 @@ github.com/smartcontractkit/chainlink-common v0.2.2-0.20240913161926-ce5d667907c github.com/smartcontractkit/chainlink-common v0.2.2-0.20240913161926-ce5d667907ce/go.mod h1:sjiiPwd4KsYOCf68MwL86EKphdXeT66EY7j53WH5DCc= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 h1:lTGIOQYLk1Ufn++X/AvZnt6VOcuhste5yp+C157No/Q= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7/go.mod h1:BMYE1vC/pGmdFSsOJdPrAA0/4gZ0Xo0SxTMdGspBtRo= -github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240906125718-9f0a98d32fbc h1:tRmTlaoAt+7FakMXXgeCuRPmzzBo5jsGpeCVvcU6KMc= -github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240906125718-9f0a98d32fbc/go.mod h1:PwPcmQNAzVmU8r8JWKrDRgvXesDwxnqbMD6DvYt/Z7M= +github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 h1:yRk4ektpx/UxwarqAfgxUXLrsYXlaNeP1NOwzHGrK2Q= +github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2/go.mod h1:rNhNSrrRMvkgAm5SA6bNTdh2340bTQQZdUVNtZ2o2bk= github.com/smartcontractkit/chainlink-feeds v0.0.0-20240910155501-42f20443189f h1:p4p3jBT91EQyLuAMvHD+zNJsuAYI/QjJbzuGUJ7wIgg= github.com/smartcontractkit/chainlink-feeds v0.0.0-20240910155501-42f20443189f/go.mod h1:FLlWBt2hwiMVgt9AcSo6wBJYIRd/nsc8ENbV1Wir1bw= github.com/smartcontractkit/chainlink-solana v1.1.1-0.20240911182932-3c609a6ac664 h1:JPs35oSO07PK3Qv7Kyv0GJHVLacIE1IkrvefaPyBjKs= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index 00c6453814f..f2aff6511e9 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -386,7 +386,7 @@ require ( github.com/smartcontractkit/chain-selectors v1.0.23 // indirect github.com/smartcontractkit/chainlink-ccip v0.0.0-20240916150615-85b8aa5fa7e6 // indirect github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 // indirect - github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240906125718-9f0a98d32fbc // indirect + github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 // indirect github.com/smartcontractkit/chainlink-feeds v0.0.0-20240910155501-42f20443189f // indirect github.com/smartcontractkit/chainlink-solana v1.1.1-0.20240911182932-3c609a6ac664 // indirect github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20240911194142-506bc469d8ae // indirect diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index bf4ca8c6ba6..3efe39cca8c 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -1403,8 +1403,8 @@ github.com/smartcontractkit/chainlink-common v0.2.2-0.20240913161926-ce5d667907c github.com/smartcontractkit/chainlink-common v0.2.2-0.20240913161926-ce5d667907ce/go.mod h1:sjiiPwd4KsYOCf68MwL86EKphdXeT66EY7j53WH5DCc= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 h1:lTGIOQYLk1Ufn++X/AvZnt6VOcuhste5yp+C157No/Q= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7/go.mod h1:BMYE1vC/pGmdFSsOJdPrAA0/4gZ0Xo0SxTMdGspBtRo= -github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240906125718-9f0a98d32fbc h1:tRmTlaoAt+7FakMXXgeCuRPmzzBo5jsGpeCVvcU6KMc= -github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240906125718-9f0a98d32fbc/go.mod h1:PwPcmQNAzVmU8r8JWKrDRgvXesDwxnqbMD6DvYt/Z7M= +github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 h1:yRk4ektpx/UxwarqAfgxUXLrsYXlaNeP1NOwzHGrK2Q= +github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2/go.mod h1:rNhNSrrRMvkgAm5SA6bNTdh2340bTQQZdUVNtZ2o2bk= github.com/smartcontractkit/chainlink-feeds v0.0.0-20240910155501-42f20443189f h1:p4p3jBT91EQyLuAMvHD+zNJsuAYI/QjJbzuGUJ7wIgg= github.com/smartcontractkit/chainlink-feeds v0.0.0-20240910155501-42f20443189f/go.mod h1:FLlWBt2hwiMVgt9AcSo6wBJYIRd/nsc8ENbV1Wir1bw= github.com/smartcontractkit/chainlink-solana v1.1.1-0.20240911182932-3c609a6ac664 h1:JPs35oSO07PK3Qv7Kyv0GJHVLacIE1IkrvefaPyBjKs=