Skip to content

Commit

Permalink
Multistream specs
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Dec 10, 2024
1 parent 54938d4 commit 358a742
Show file tree
Hide file tree
Showing 10 changed files with 561 additions and 221 deletions.
94 changes: 12 additions & 82 deletions core/services/llo/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package llo

import (
"context"
"errors"
"fmt"
"slices"
"sort"
"strconv"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/shopspring/decimal"
"golang.org/x/exp/maps"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
Expand All @@ -19,7 +20,6 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/streams"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

var (
Expand All @@ -42,7 +42,7 @@ var (
)

type Registry interface {
Get(streamID streams.StreamID) (strm streams.Stream, exists bool)
Get(streamID streams.StreamID) (p streams.Pipeline, exists bool)
}

type ErrObservationFailed struct {
Expand Down Expand Up @@ -109,43 +109,24 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,
successfulStreamIDs := make([]streams.StreamID, 0, len(streamValues))
var errs []ErrObservationFailed

// oc only lives for the duration of this Observe call
oc := NewObservationContext(d.registry, d.t)

for _, streamID := range maps.Keys(streamValues) {
go func(streamID llotypes.StreamID) {
defer wg.Done()

var val llo.StreamValue

stream, exists := d.registry.Get(streamID)
if !exists {
mu.Lock()
errs = append(errs, ErrObservationFailed{streamID: streamID, reason: fmt.Sprintf("missing stream: %d", streamID)})
mu.Unlock()
promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
return
}
run, trrs, err := stream.Run(ctx)
if err != nil {
mu.Lock()
errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "pipeline run failed"})
mu.Unlock()
promObservationErrorCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
// TODO: Consolidate/reduce telemetry. We should send all observation results in a single packet
// https://smartcontract-it.atlassian.net/browse/MERC-6290
d.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, nil, err)
return
}
// TODO: Consolidate/reduce telemetry. We should send all observation results in a single packet
// https://smartcontract-it.atlassian.net/browse/MERC-6290
val, err = ExtractStreamValue(trrs)
val, err := oc.Observe(ctx, streamID, opts)
if err != nil {
if errors.As(err, &ErrMissingStream{}) {
promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
}
promObservationErrorCount.WithLabelValues(strconv.FormatUint(uint64(streamID), 10)).Inc()
mu.Lock()
errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "failed to extract big.Int"})
errs = append(errs, ErrObservationFailed{inner: err, streamID: streamID, reason: "failed to observe stream"})
mu.Unlock()
return
}

d.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, nil)

mu.Lock()
defer mu.Unlock()

Expand Down Expand Up @@ -186,54 +167,3 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,

return nil
}

// ExtractStreamValue extracts a StreamValue from a TaskRunResults
func ExtractStreamValue(trrs pipeline.TaskRunResults) (llo.StreamValue, error) {
// pipeline.TaskRunResults comes ordered asc by index, this is guaranteed
// by the pipeline executor
finaltrrs := trrs.Terminals()

// HACK: Right now we rely on the number of outputs to determine whether
// its a Decimal or a Quote.
// This isn't very robust or future-proof but is sufficient to support v0.3
// compat.
// There are a number of different possible ways to solve this in future.
// See: https://smartcontract-it.atlassian.net/browse/MERC-5934
switch len(finaltrrs) {
case 1:
res := finaltrrs[0].Result
if res.Error != nil {
return nil, res.Error
}
val, err := toDecimal(res.Value)
if err != nil {
return nil, fmt.Errorf("failed to parse BenchmarkPrice: %w", err)
}
return llo.ToDecimal(val), nil
case 3:
// Expect ordering of Benchmark, Bid, Ask
results := make([]decimal.Decimal, 3)
for i, trr := range finaltrrs {
res := trr.Result
if res.Error != nil {
return nil, fmt.Errorf("failed to parse stream output into Quote (task index: %d): %w", i, res.Error)
}
val, err := toDecimal(res.Value)
if err != nil {
return nil, fmt.Errorf("failed to parse decimal: %w", err)
}
results[i] = val
}
return &llo.Quote{
Benchmark: results[0],
Bid: results[1],
Ask: results[2],
}, nil
default:
return nil, fmt.Errorf("invalid number of results, expected: 1 or 3, got: %d", len(finaltrrs))
}
}

func toDecimal(val interface{}) (decimal.Decimal, error) {
return utils.ToDecimal(val)
}
75 changes: 58 additions & 17 deletions core/services/llo/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,36 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/streams"
)

type mockStream struct {
type mockPipeline struct {
run *pipeline.Run
trrs pipeline.TaskRunResults
err error

streamIDs []streams.StreamID

runCount int
}

func (m *mockStream) Run(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) {
func (m *mockPipeline) Run(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) {
m.runCount++
return m.run, m.trrs, m.err
}

func (m *mockPipeline) StreamIDs() []streams.StreamID {
return m.streamIDs
}

type mockRegistry struct {
streams map[streams.StreamID]*mockStream
pipelines map[streams.StreamID]*mockPipeline
}

func (m *mockRegistry) Get(streamID streams.StreamID) (strm streams.Stream, exists bool) {
strm, exists = m.streams[streamID]
func (m *mockRegistry) Get(streamID streams.StreamID) (p streams.Pipeline, exists bool) {
p, exists = m.pipelines[streamID]
return
}

func makeStreamWithSingleResult[T any](runID int64, res T, err error) *mockStream {
return &mockStream{
func makePipelineWithSingleResult[T any](runID int64, res T, err error) *mockPipeline {
return &mockPipeline{
run: &pipeline.Run{ID: runID},
trrs: []pipeline.TaskRunResult{pipeline.TaskRunResult{Task: &pipeline.MemoTask{}, Result: pipeline.Result{Value: res}}},
err: err,
Expand Down Expand Up @@ -91,7 +100,7 @@ func (m *mockTelemeter) EnqueueV3PremiumLegacy(run *pipeline.Run, trrs pipeline.

func Test_DataSource(t *testing.T) {
lggr := logger.TestLogger(t)
reg := &mockRegistry{make(map[streams.StreamID]*mockStream)}
reg := &mockRegistry{make(map[streams.StreamID]*mockPipeline)}
ds := newDataSource(lggr, reg, NullTelemeter)
ctx := testutils.Context(t)
opts := &mockOpts{}
Expand All @@ -105,9 +114,9 @@ func Test_DataSource(t *testing.T) {
assert.Equal(t, makeStreamValues(), vals)
})
t.Run("observes each stream with success and returns values matching map argument", func(t *testing.T) {
reg.streams[1] = makeStreamWithSingleResult[*big.Int](1, big.NewInt(2181), nil)
reg.streams[2] = makeStreamWithSingleResult[*big.Int](2, big.NewInt(40602), nil)
reg.streams[3] = makeStreamWithSingleResult[*big.Int](3, big.NewInt(15), nil)
reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(2181), nil)
reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(40602), nil)
reg.pipelines[3] = makePipelineWithSingleResult[*big.Int](3, big.NewInt(15), nil)

vals := makeStreamValues()
err := ds.Observe(ctx, vals, opts)
Expand All @@ -120,9 +129,9 @@ func Test_DataSource(t *testing.T) {
}, vals)
})
t.Run("observes each stream and returns success/errors", func(t *testing.T) {
reg.streams[1] = makeStreamWithSingleResult[*big.Int](1, big.NewInt(2181), errors.New("something exploded"))
reg.streams[2] = makeStreamWithSingleResult[*big.Int](2, big.NewInt(40602), nil)
reg.streams[3] = makeStreamWithSingleResult[*big.Int](3, nil, errors.New("something exploded 2"))
reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(2181), errors.New("something exploded"))
reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(40602), nil)
reg.pipelines[3] = makePipelineWithSingleResult[*big.Int](3, nil, errors.New("something exploded 2"))

vals := makeStreamValues()
err := ds.Observe(ctx, vals, opts)
Expand All @@ -139,9 +148,9 @@ func Test_DataSource(t *testing.T) {
tm := &mockTelemeter{}
ds.t = tm

reg.streams[1] = makeStreamWithSingleResult[*big.Int](100, big.NewInt(2181), nil)
reg.streams[2] = makeStreamWithSingleResult[*big.Int](101, big.NewInt(40602), nil)
reg.streams[3] = makeStreamWithSingleResult[*big.Int](102, big.NewInt(15), nil)
reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](100, big.NewInt(2181), nil)
reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](101, big.NewInt(40602), nil)
reg.pipelines[3] = makePipelineWithSingleResult[*big.Int](102, big.NewInt(15), nil)

vals := makeStreamValues()
err := ds.Observe(ctx, vals, opts)
Expand All @@ -166,5 +175,37 @@ func Test_DataSource(t *testing.T) {
assert.Equal(t, "2181", pkt.val.(*llo.Decimal).String())
assert.Nil(t, pkt.err)
})

t.Run("records telemetry for errors", func(t *testing.T) {
tm := &mockTelemeter{}
ds.t = tm

reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](100, big.NewInt(2181), errors.New("something exploded"))
reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](101, big.NewInt(40602), nil)
reg.pipelines[3] = makePipelineWithSingleResult[*big.Int](102, nil, errors.New("something exploded 2"))

vals := makeStreamValues()
err := ds.Observe(ctx, vals, opts)
assert.NoError(t, err)

assert.Equal(t, llo.StreamValues{
2: llo.ToDecimal(decimal.NewFromInt(40602)),
1: nil,
3: nil,
}, vals)

require.Len(t, tm.v3PremiumLegacyPackets, 3)
m := make(map[int]v3PremiumLegacyPacket)
for _, pkt := range tm.v3PremiumLegacyPackets {
m[int(pkt.run.ID)] = pkt
}
pkt := m[100]
assert.Equal(t, 100, int(pkt.run.ID))
assert.Len(t, pkt.trrs, 1)
assert.Equal(t, 1, int(pkt.streamID))
assert.Equal(t, opts, pkt.opts)
assert.Nil(t, pkt.val)
assert.NotNil(t, pkt.err)
})
})
}
Loading

0 comments on commit 358a742

Please sign in to comment.