Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multistream specs #15603

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/rotten-books-cross.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Support multiple streamIDs in stream specs #added
94 changes: 12 additions & 82 deletions core/services/llo/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package llo

import (
"context"
"errors"
"fmt"
"slices"
"sort"
"strconv"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/shopspring/decimal"
"golang.org/x/exp/maps"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
Expand All @@ -19,7 +20,6 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/streams"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

var (
Expand All @@ -42,7 +42,7 @@ var (
)

type Registry interface {
Get(streamID streams.StreamID) (strm streams.Stream, exists bool)
Get(streamID streams.StreamID) (p streams.Pipeline, exists bool)
}

type ErrObservationFailed struct {
Expand Down Expand Up @@ -109,43 +109,24 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,
successfulStreamIDs := make([]streams.StreamID, 0, len(streamValues))
var errs []ErrObservationFailed

// oc only lives for the duration of this Observe call
oc := NewObservationContext(d.registry, d.t)

for _, streamID := range maps.Keys(streamValues) {
go func(streamID llotypes.StreamID) {
defer wg.Done()

var val llo.StreamValue

stream, exists := d.registry.Get(streamID)
if !exists {
mu.Lock()
errs = append(errs, ErrObservationFailed{streamID: streamID, reason: fmt.Sprintf("missing stream: %d", streamID)})
mu.Unlock()
promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
return
}
run, trrs, err := stream.Run(ctx)
if err != nil {
mu.Lock()
errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "pipeline run failed"})
mu.Unlock()
promObservationErrorCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
// TODO: Consolidate/reduce telemetry. We should send all observation results in a single packet
// https://smartcontract-it.atlassian.net/browse/MERC-6290
d.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, nil, err)
return
}
// TODO: Consolidate/reduce telemetry. We should send all observation results in a single packet
// https://smartcontract-it.atlassian.net/browse/MERC-6290
val, err = ExtractStreamValue(trrs)
val, err := oc.Observe(ctx, streamID, opts)
if err != nil {
if errors.As(err, &ErrMissingStream{}) {
promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
}
promObservationErrorCount.WithLabelValues(strconv.FormatUint(uint64(streamID), 10)).Inc()
mu.Lock()
errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "failed to extract big.Int"})
errs = append(errs, ErrObservationFailed{inner: err, streamID: streamID, reason: "failed to observe stream"})
mu.Unlock()
return
}

d.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, nil)

mu.Lock()
defer mu.Unlock()

Expand Down Expand Up @@ -186,54 +167,3 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,

return nil
}

// ExtractStreamValue extracts a StreamValue from a TaskRunResults
func ExtractStreamValue(trrs pipeline.TaskRunResults) (llo.StreamValue, error) {
// pipeline.TaskRunResults comes ordered asc by index, this is guaranteed
// by the pipeline executor
finaltrrs := trrs.Terminals()

// HACK: Right now we rely on the number of outputs to determine whether
// its a Decimal or a Quote.
// This isn't very robust or future-proof but is sufficient to support v0.3
// compat.
// There are a number of different possible ways to solve this in future.
// See: https://smartcontract-it.atlassian.net/browse/MERC-5934
switch len(finaltrrs) {
case 1:
res := finaltrrs[0].Result
if res.Error != nil {
return nil, res.Error
}
val, err := toDecimal(res.Value)
if err != nil {
return nil, fmt.Errorf("failed to parse BenchmarkPrice: %w", err)
}
return llo.ToDecimal(val), nil
case 3:
// Expect ordering of Benchmark, Bid, Ask
results := make([]decimal.Decimal, 3)
for i, trr := range finaltrrs {
res := trr.Result
if res.Error != nil {
return nil, fmt.Errorf("failed to parse stream output into Quote (task index: %d): %w", i, res.Error)
}
val, err := toDecimal(res.Value)
if err != nil {
return nil, fmt.Errorf("failed to parse decimal: %w", err)
}
results[i] = val
}
return &llo.Quote{
Benchmark: results[0],
Bid: results[1],
Ask: results[2],
}, nil
default:
return nil, fmt.Errorf("invalid number of results, expected: 1 or 3, got: %d", len(finaltrrs))
}
}

func toDecimal(val interface{}) (decimal.Decimal, error) {
return utils.ToDecimal(val)
}
75 changes: 58 additions & 17 deletions core/services/llo/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,36 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/streams"
)

type mockStream struct {
type mockPipeline struct {
run *pipeline.Run
trrs pipeline.TaskRunResults
err error

streamIDs []streams.StreamID

runCount int
}

func (m *mockStream) Run(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) {
func (m *mockPipeline) Run(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) {
m.runCount++
return m.run, m.trrs, m.err
}

func (m *mockPipeline) StreamIDs() []streams.StreamID {
return m.streamIDs
}

type mockRegistry struct {
streams map[streams.StreamID]*mockStream
pipelines map[streams.StreamID]*mockPipeline
}

func (m *mockRegistry) Get(streamID streams.StreamID) (strm streams.Stream, exists bool) {
strm, exists = m.streams[streamID]
func (m *mockRegistry) Get(streamID streams.StreamID) (p streams.Pipeline, exists bool) {
p, exists = m.pipelines[streamID]
return
}

func makeStreamWithSingleResult[T any](runID int64, res T, err error) *mockStream {
return &mockStream{
func makePipelineWithSingleResult[T any](runID int64, res T, err error) *mockPipeline {
return &mockPipeline{
run: &pipeline.Run{ID: runID},
trrs: []pipeline.TaskRunResult{pipeline.TaskRunResult{Task: &pipeline.MemoTask{}, Result: pipeline.Result{Value: res}}},
err: err,
Expand Down Expand Up @@ -91,7 +100,7 @@ func (m *mockTelemeter) EnqueueV3PremiumLegacy(run *pipeline.Run, trrs pipeline.

func Test_DataSource(t *testing.T) {
lggr := logger.TestLogger(t)
reg := &mockRegistry{make(map[streams.StreamID]*mockStream)}
reg := &mockRegistry{make(map[streams.StreamID]*mockPipeline)}
ds := newDataSource(lggr, reg, NullTelemeter)
ctx := testutils.Context(t)
opts := &mockOpts{}
Expand All @@ -105,9 +114,9 @@ func Test_DataSource(t *testing.T) {
assert.Equal(t, makeStreamValues(), vals)
})
t.Run("observes each stream with success and returns values matching map argument", func(t *testing.T) {
reg.streams[1] = makeStreamWithSingleResult[*big.Int](1, big.NewInt(2181), nil)
reg.streams[2] = makeStreamWithSingleResult[*big.Int](2, big.NewInt(40602), nil)
reg.streams[3] = makeStreamWithSingleResult[*big.Int](3, big.NewInt(15), nil)
reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(2181), nil)
reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(40602), nil)
reg.pipelines[3] = makePipelineWithSingleResult[*big.Int](3, big.NewInt(15), nil)

vals := makeStreamValues()
err := ds.Observe(ctx, vals, opts)
Expand All @@ -120,9 +129,9 @@ func Test_DataSource(t *testing.T) {
}, vals)
})
t.Run("observes each stream and returns success/errors", func(t *testing.T) {
reg.streams[1] = makeStreamWithSingleResult[*big.Int](1, big.NewInt(2181), errors.New("something exploded"))
reg.streams[2] = makeStreamWithSingleResult[*big.Int](2, big.NewInt(40602), nil)
reg.streams[3] = makeStreamWithSingleResult[*big.Int](3, nil, errors.New("something exploded 2"))
reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(2181), errors.New("something exploded"))
reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(40602), nil)
reg.pipelines[3] = makePipelineWithSingleResult[*big.Int](3, nil, errors.New("something exploded 2"))

vals := makeStreamValues()
err := ds.Observe(ctx, vals, opts)
Expand All @@ -139,9 +148,9 @@ func Test_DataSource(t *testing.T) {
tm := &mockTelemeter{}
ds.t = tm

reg.streams[1] = makeStreamWithSingleResult[*big.Int](100, big.NewInt(2181), nil)
reg.streams[2] = makeStreamWithSingleResult[*big.Int](101, big.NewInt(40602), nil)
reg.streams[3] = makeStreamWithSingleResult[*big.Int](102, big.NewInt(15), nil)
reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](100, big.NewInt(2181), nil)
reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](101, big.NewInt(40602), nil)
reg.pipelines[3] = makePipelineWithSingleResult[*big.Int](102, big.NewInt(15), nil)

vals := makeStreamValues()
err := ds.Observe(ctx, vals, opts)
Expand All @@ -166,5 +175,37 @@ func Test_DataSource(t *testing.T) {
assert.Equal(t, "2181", pkt.val.(*llo.Decimal).String())
assert.Nil(t, pkt.err)
})

t.Run("records telemetry for errors", func(t *testing.T) {
tm := &mockTelemeter{}
ds.t = tm

reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](100, big.NewInt(2181), errors.New("something exploded"))
reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](101, big.NewInt(40602), nil)
reg.pipelines[3] = makePipelineWithSingleResult[*big.Int](102, nil, errors.New("something exploded 2"))

vals := makeStreamValues()
err := ds.Observe(ctx, vals, opts)
assert.NoError(t, err)

assert.Equal(t, llo.StreamValues{
2: llo.ToDecimal(decimal.NewFromInt(40602)),
1: nil,
3: nil,
}, vals)

require.Len(t, tm.v3PremiumLegacyPackets, 3)
m := make(map[int]v3PremiumLegacyPacket)
for _, pkt := range tm.v3PremiumLegacyPackets {
m[int(pkt.run.ID)] = pkt
}
pkt := m[100]
assert.Equal(t, 100, int(pkt.run.ID))
assert.Len(t, pkt.trrs, 1)
assert.Equal(t, 1, int(pkt.streamID))
assert.Equal(t, opts, pkt.opts)
assert.Nil(t, pkt.val)
assert.NotNil(t, pkt.err)
})
})
}
Loading
Loading