Skip to content

Commit

Permalink
Implement v0.3-compatible telemetry for LLO (#14440)
Browse files Browse the repository at this point in the history
* Implement v0.3-compatible telemetry for LLO

* Remove debugging

* update go mod

* Fix test race

* Fix linter

* Correct epoch/round numbres

* nolint

* Fix naming
  • Loading branch information
samsondav authored Sep 17, 2024
1 parent 52b480f commit ab5a2c6
Show file tree
Hide file tree
Showing 18 changed files with 619 additions and 117 deletions.
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ require (
github.com/smartcontractkit/chain-selectors v1.0.23 // indirect
github.com/smartcontractkit/chainlink-ccip v0.0.0-20240916150615-85b8aa5fa7e6 // indirect
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 // indirect
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240906125718-9f0a98d32fbc // indirect
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 // indirect
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240910155501-42f20443189f // indirect
github.com/smartcontractkit/chainlink-solana v1.1.1-0.20240911182932-3c609a6ac664 // indirect
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20240911194142-506bc469d8ae // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1087,8 +1087,8 @@ github.com/smartcontractkit/chainlink-common v0.2.2-0.20240913161926-ce5d667907c
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240913161926-ce5d667907ce/go.mod h1:sjiiPwd4KsYOCf68MwL86EKphdXeT66EY7j53WH5DCc=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 h1:lTGIOQYLk1Ufn++X/AvZnt6VOcuhste5yp+C157No/Q=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7/go.mod h1:BMYE1vC/pGmdFSsOJdPrAA0/4gZ0Xo0SxTMdGspBtRo=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240906125718-9f0a98d32fbc h1:tRmTlaoAt+7FakMXXgeCuRPmzzBo5jsGpeCVvcU6KMc=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240906125718-9f0a98d32fbc/go.mod h1:PwPcmQNAzVmU8r8JWKrDRgvXesDwxnqbMD6DvYt/Z7M=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 h1:yRk4ektpx/UxwarqAfgxUXLrsYXlaNeP1NOwzHGrK2Q=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2/go.mod h1:rNhNSrrRMvkgAm5SA6bNTdh2340bTQQZdUVNtZ2o2bk=
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240910155501-42f20443189f h1:p4p3jBT91EQyLuAMvHD+zNJsuAYI/QjJbzuGUJ7wIgg=
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240910155501-42f20443189f/go.mod h1:FLlWBt2hwiMVgt9AcSo6wBJYIRd/nsc8ENbV1Wir1bw=
github.com/smartcontractkit/chainlink-solana v1.1.1-0.20240911182932-3c609a6ac664 h1:JPs35oSO07PK3Qv7Kyv0GJHVLacIE1IkrvefaPyBjKs=
Expand Down
41 changes: 27 additions & 14 deletions core/services/llo/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,24 @@ var _ llo.DataSource = &dataSource{}
type dataSource struct {
lggr logger.Logger
registry Registry

t Telemeter
}

func NewDataSource(lggr logger.Logger, registry Registry, t Telemeter) llo.DataSource {
return newDataSource(lggr, registry, t)
}

func newDataSource(lggr logger.Logger, registry Registry) llo.DataSource {
return &dataSource{lggr.Named("DataSource"), registry}
func newDataSource(lggr logger.Logger, registry Registry, t Telemeter) *dataSource {
return &dataSource{lggr.Named("DataSource"), registry, t}
}

// Observe looks up all streams in the registry and populates a map of stream ID => value
func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, opts llo.DSOpts) error {
var wg sync.WaitGroup
wg.Add(len(streamValues))
var svmu sync.Mutex
var errors []ErrObservationFailed
var errs []ErrObservationFailed
var errmu sync.Mutex

if opts.VerboseLogging() {
Expand All @@ -91,7 +97,7 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,
streamIDs = append(streamIDs, streamID)
}
sort.Slice(streamIDs, func(i, j int) bool { return streamIDs[i] < streamIDs[j] })
d.lggr.Debugw("Observing streams", "streamIDs", streamIDs, "seqNr", opts.SeqNr())
d.lggr.Debugw("Observing streams", "streamIDs", streamIDs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr)
}

for _, streamID := range maps.Keys(streamValues) {
Expand All @@ -103,27 +109,34 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,
stream, exists := d.registry.Get(streamID)
if !exists {
errmu.Lock()
errors = append(errors, ErrObservationFailed{streamID: streamID, reason: fmt.Sprintf("missing stream: %d", streamID)})
errs = append(errs, ErrObservationFailed{streamID: streamID, reason: fmt.Sprintf("missing stream: %d", streamID)})
errmu.Unlock()
promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
return
}
run, trrs, err := stream.Run(ctx)
if err != nil {
errmu.Lock()
errors = append(errors, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "pipeline run failed"})
errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "pipeline run failed"})
errmu.Unlock()
promObservationErrorCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
// TODO: Consolidate/reduce telemetry. We should send all observation results in a single packet
// https://smartcontract-it.atlassian.net/browse/MERC-6290
d.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, nil, err)
return
}
// TODO: Consolidate/reduce telemetry. We should send all observation results in a single packet
// https://smartcontract-it.atlassian.net/browse/MERC-6290
val, err = ExtractStreamValue(trrs)
if err != nil {
errmu.Lock()
errors = append(errors, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "failed to extract big.Int"})
errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "failed to extract big.Int"})
errmu.Unlock()
return
}

d.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, nil)

if val != nil {
svmu.Lock()
defer svmu.Unlock()
Expand All @@ -136,15 +149,15 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,

// Failed observations are always logged at warn level
var failedStreamIDs []streams.StreamID
if len(errors) > 0 {
sort.Slice(errors, func(i, j int) bool { return errors[i].streamID < errors[j].streamID })
failedStreamIDs = make([]streams.StreamID, len(errors))
errStrs := make([]string, len(errors))
for i, e := range errors {
if len(errs) > 0 {
sort.Slice(errs, func(i, j int) bool { return errs[i].streamID < errs[j].streamID })
failedStreamIDs = make([]streams.StreamID, len(errs))
errStrs := make([]string, len(errs))
for i, e := range errs {
errStrs[i] = e.String()
failedStreamIDs[i] = e.streamID
}
d.lggr.Warnw("Observation failed for streams", "failedStreamIDs", failedStreamIDs, "errors", errStrs, "seqNr", opts.SeqNr())
d.lggr.Warnw("Observation failed for streams", "failedStreamIDs", failedStreamIDs, "errs", errStrs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr)
}

if opts.VerboseLogging() {
Expand All @@ -153,7 +166,7 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,
successes = append(successes, strmID)
}
sort.Slice(successes, func(i, j int) bool { return successes[i] < successes[j] })
d.lggr.Debugw("Observation complete", "successfulStreamIDs", successes, "failedStreamIDs", failedStreamIDs, "values", streamValues, "seqNr", opts.SeqNr())
d.lggr.Debugw("Observation complete", "successfulStreamIDs", successes, "failedStreamIDs", failedStreamIDs, "configDigest", opts.ConfigDigest(), "values", streamValues, "seqNr", opts.OutCtx().SeqNr)
}

return nil
Expand Down
93 changes: 80 additions & 13 deletions core/services/llo/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@ import (
"context"
"errors"
"math/big"
"sync"
"testing"

"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-data-streams/llo"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
Expand Down Expand Up @@ -35,8 +40,9 @@ func (m *mockRegistry) Get(streamID streams.StreamID) (strm streams.Stream, exis
return
}

func makeStreamWithSingleResult[T any](res T, err error) *mockStream {
func makeStreamWithSingleResult[T any](runID int64, res T, err error) *mockStream {
return &mockStream{
run: &pipeline.Run{ID: runID},
trrs: []pipeline.TaskRunResult{pipeline.TaskRunResult{Task: &pipeline.MemoTask{}, Result: pipeline.Result{Value: res}}},
err: err,
}
Expand All @@ -52,30 +58,59 @@ func makeStreamValues() llo.StreamValues {

type mockOpts struct{}

func (m mockOpts) VerboseLogging() bool { return true }
func (m mockOpts) SeqNr() uint64 { return 42 }
func (m *mockOpts) VerboseLogging() bool { return true }
func (m *mockOpts) SeqNr() uint64 { return 1042 }
func (m *mockOpts) OutCtx() ocr3types.OutcomeContext {
return ocr3types.OutcomeContext{SeqNr: 1042, PreviousOutcome: ocr3types.Outcome([]byte("foo"))}
}
func (m *mockOpts) ConfigDigest() ocr2types.ConfigDigest {
return ocr2types.ConfigDigest{6, 5, 4}
}

type mockTelemeter struct {
mu sync.Mutex
v3PremiumLegacyPackets []v3PremiumLegacyPacket
}

type v3PremiumLegacyPacket struct {
run *pipeline.Run
trrs pipeline.TaskRunResults
streamID uint32
opts llo.DSOpts
val llo.StreamValue
err error
}

var _ Telemeter = &mockTelemeter{}

func (m *mockTelemeter) EnqueueV3PremiumLegacy(run *pipeline.Run, trrs pipeline.TaskRunResults, streamID uint32, opts llo.DSOpts, val llo.StreamValue, err error) {
m.mu.Lock()
defer m.mu.Unlock()
m.v3PremiumLegacyPackets = append(m.v3PremiumLegacyPackets, v3PremiumLegacyPacket{run, trrs, streamID, opts, val, err})
}

func Test_DataSource(t *testing.T) {
lggr := logger.TestLogger(t)
reg := &mockRegistry{make(map[streams.StreamID]*mockStream)}
ds := newDataSource(lggr, reg)
ds := newDataSource(lggr, reg, NullTelemeter)
ctx := testutils.Context(t)
opts := &mockOpts{}

t.Run("Observe", func(t *testing.T) {
t.Run("doesn't set any values if no streams are defined", func(t *testing.T) {
vals := makeStreamValues()
err := ds.Observe(ctx, vals, mockOpts{})
err := ds.Observe(ctx, vals, opts)
assert.NoError(t, err)

assert.Equal(t, makeStreamValues(), vals)
})
t.Run("observes each stream with success and returns values matching map argument", func(t *testing.T) {
reg.streams[1] = makeStreamWithSingleResult[*big.Int](big.NewInt(2181), nil)
reg.streams[2] = makeStreamWithSingleResult[*big.Int](big.NewInt(40602), nil)
reg.streams[3] = makeStreamWithSingleResult[*big.Int](big.NewInt(15), nil)
reg.streams[1] = makeStreamWithSingleResult[*big.Int](1, big.NewInt(2181), nil)
reg.streams[2] = makeStreamWithSingleResult[*big.Int](2, big.NewInt(40602), nil)
reg.streams[3] = makeStreamWithSingleResult[*big.Int](3, big.NewInt(15), nil)

vals := makeStreamValues()
err := ds.Observe(ctx, vals, mockOpts{})
err := ds.Observe(ctx, vals, opts)
assert.NoError(t, err)

assert.Equal(t, llo.StreamValues{
Expand All @@ -85,12 +120,12 @@ func Test_DataSource(t *testing.T) {
}, vals)
})
t.Run("observes each stream and returns success/errors", func(t *testing.T) {
reg.streams[1] = makeStreamWithSingleResult[*big.Int](big.NewInt(2181), errors.New("something exploded"))
reg.streams[2] = makeStreamWithSingleResult[*big.Int](big.NewInt(40602), nil)
reg.streams[3] = makeStreamWithSingleResult[*big.Int](nil, errors.New("something exploded 2"))
reg.streams[1] = makeStreamWithSingleResult[*big.Int](1, big.NewInt(2181), errors.New("something exploded"))
reg.streams[2] = makeStreamWithSingleResult[*big.Int](2, big.NewInt(40602), nil)
reg.streams[3] = makeStreamWithSingleResult[*big.Int](3, nil, errors.New("something exploded 2"))

vals := makeStreamValues()
err := ds.Observe(ctx, vals, mockOpts{})
err := ds.Observe(ctx, vals, opts)
assert.NoError(t, err)

assert.Equal(t, llo.StreamValues{
Expand All @@ -99,5 +134,37 @@ func Test_DataSource(t *testing.T) {
3: nil,
}, vals)
})

t.Run("records telemetry", func(t *testing.T) {
tm := &mockTelemeter{}
ds.t = tm

reg.streams[1] = makeStreamWithSingleResult[*big.Int](100, big.NewInt(2181), nil)
reg.streams[2] = makeStreamWithSingleResult[*big.Int](101, big.NewInt(40602), nil)
reg.streams[3] = makeStreamWithSingleResult[*big.Int](102, big.NewInt(15), nil)

vals := makeStreamValues()
err := ds.Observe(ctx, vals, opts)
assert.NoError(t, err)

assert.Equal(t, llo.StreamValues{
2: llo.ToDecimal(decimal.NewFromInt(40602)),
1: llo.ToDecimal(decimal.NewFromInt(2181)),
3: llo.ToDecimal(decimal.NewFromInt(15)),
}, vals)

require.Len(t, tm.v3PremiumLegacyPackets, 3)
m := make(map[int]v3PremiumLegacyPacket)
for _, pkt := range tm.v3PremiumLegacyPackets {
m[int(pkt.run.ID)] = pkt
}
pkt := m[100]
assert.Equal(t, 100, int(pkt.run.ID))
assert.Len(t, pkt.trrs, 1)
assert.Equal(t, 1, int(pkt.streamID))
assert.Equal(t, opts, pkt.opts)
assert.Equal(t, "2181", pkt.val.(*llo.Decimal).String())
assert.Nil(t, pkt.err)
})
})
}
23 changes: 16 additions & 7 deletions core/services/llo/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,18 @@ type delegate struct {
prrc llo.PredecessorRetirementReportCache
src llo.ShouldRetireCache
ds llo.DataSource
t services.Service

oracle Closer
}

type DelegateConfig struct {
Logger logger.Logger
DataSource sqlutil.DataSource
Runner streams.Runner
Registry Registry
JobName null.String
Logger logger.Logger
DataSource sqlutil.DataSource
Runner streams.Runner
Registry Registry
JobName null.String
CaptureEATelemetry bool

// LLO
ChannelDefinitionCache llotypes.ChannelDefinitionCache
Expand All @@ -67,6 +69,7 @@ type DelegateConfig struct {
}

func NewDelegate(cfg DelegateConfig) (job.ServiceCtx, error) {
lggr := cfg.Logger.With("jobName", cfg.JobName.ValueOrZero())
if cfg.DataSource == nil {
return nil, errors.New("DataSource must not be nil")
}
Expand All @@ -82,9 +85,15 @@ func NewDelegate(cfg DelegateConfig) (job.ServiceCtx, error) {
// https://smartcontract-it.atlassian.net/browse/MERC-3386
prrc := llo.NewPredecessorRetirementReportCache()
src := llo.NewShouldRetireCache()
ds := newDataSource(cfg.Logger.Named("DataSource"), cfg.Registry)
var t TelemeterService
if cfg.CaptureEATelemetry {
t = NewTelemeterService(lggr, cfg.MonitoringEndpoint)
} else {
t = NullTelemeter
}
ds := newDataSource(lggr.Named("DataSource"), cfg.Registry, t)

return &delegate{services.StateMachine{}, cfg, codecs, prrc, src, ds, nil}, nil
return &delegate{services.StateMachine{}, cfg, codecs, prrc, src, ds, t, nil}, nil
}

func (d *delegate) Start(ctx context.Context) error {
Expand Down
11 changes: 8 additions & 3 deletions core/services/llo/evm/report_codec_premium_legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,15 @@ func ExtractReportValues(report llo.Report) (nativePrice, linkPrice *llo.Decimal
// MERC-3524
var LLOExtraHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001")

func LegacyReportContext(cd ocr2types.ConfigDigest, seqNr uint64) ocr2types.ReportContext {
func SeqNrToEpochAndRound(seqNr uint64) (epoch uint32, round uint8) {
// Simulate 256 rounds/epoch
epoch := seqNr / 256
round := seqNr % 256
epoch = uint32(seqNr / 256) // nolint
round = uint8(seqNr % 256) // nolint
return
}

func LegacyReportContext(cd ocr2types.ConfigDigest, seqNr uint64) ocr2types.ReportContext {
epoch, round := SeqNrToEpochAndRound(seqNr)
return ocr2types.ReportContext{
ReportTimestamp: ocr2types.ReportTimestamp{
ConfigDigest: cd,
Expand Down
Loading

0 comments on commit ab5a2c6

Please sign in to comment.