Skip to content

Commit

Permalink
Fix logging logic
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Nov 12, 2024
1 parent 57da026 commit 8546884
Showing 1 changed file with 23 additions and 29 deletions.
52 changes: 23 additions & 29 deletions core/services/llo/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package llo
import (
"context"
"fmt"
"slices"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -86,11 +87,7 @@ func newDataSource(lggr logger.Logger, registry Registry, t Telemeter) *dataSour

// Observe looks up all streams in the registry and populates a map of stream ID => value
func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, opts llo.DSOpts) error {
var wg sync.WaitGroup
wg.Add(len(streamValues))
var svmu sync.Mutex
var errs []ErrObservationFailed
var errmu sync.Mutex
now := time.Now()

if opts.VerboseLogging() {
streamIDs := make([]streams.StreamID, 0, len(streamValues))
Expand All @@ -101,7 +98,12 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,
d.lggr.Debugw("Observing streams", "streamIDs", streamIDs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr)
}

now := time.Now()
var wg sync.WaitGroup
wg.Add(len(streamValues))

var mu sync.Mutex
var successfulStreamIDs []streams.StreamID
var errs []ErrObservationFailed

for _, streamID := range maps.Keys(streamValues) {
go func(streamID llotypes.StreamID) {
Expand All @@ -111,17 +113,17 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,

stream, exists := d.registry.Get(streamID)
if !exists {
errmu.Lock()
mu.Lock()
errs = append(errs, ErrObservationFailed{streamID: streamID, reason: fmt.Sprintf("missing stream: %d", streamID)})
errmu.Unlock()
mu.Unlock()
promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
return
}
run, trrs, err := stream.Run(ctx)
if err != nil {
errmu.Lock()
mu.Lock()
errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "pipeline run failed"})
errmu.Unlock()
mu.Unlock()
promObservationErrorCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
// TODO: Consolidate/reduce telemetry. We should send all observation results in a single packet
// https://smartcontract-it.atlassian.net/browse/MERC-6290
Expand All @@ -132,42 +134,34 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,
// https://smartcontract-it.atlassian.net/browse/MERC-6290
val, err = ExtractStreamValue(trrs)
if err != nil {
errmu.Lock()
mu.Lock()
errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "failed to extract big.Int"})
errmu.Unlock()
mu.Unlock()
return
}

d.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, nil)

mu.Lock()
defer mu.Unlock()

successfulStreamIDs = append(successfulStreamIDs, streamID)
if val != nil {
svmu.Lock()
defer svmu.Unlock()
streamValues[streamID] = val
}
}(streamID)
}

wg.Wait()

elapsed := time.Since(now)

// Failed observations are always logged at warn level
var successfulStreamIDs []streams.StreamID
var failedStreamIDs []streams.StreamID
var errStrs []string

// Only log errors or if VerboseLogging is turned on
// Only log on errors or if VerboseLogging is turned on
if len(errs) > 0 || opts.VerboseLogging() {
successfulStreamIDs = make([]streams.StreamID, 0, len(streamValues))
for strmID := range streamValues {
successfulStreamIDs = append(successfulStreamIDs, strmID)
}
sort.Slice(successfulStreamIDs, func(i, j int) bool { return successfulStreamIDs[i] < successfulStreamIDs[j] })

slices.Sort(successfulStreamIDs)
sort.Slice(errs, func(i, j int) bool { return errs[i].streamID < errs[j].streamID })
failedStreamIDs = make([]streams.StreamID, len(errs))
errStrs = make([]string, len(errs))

failedStreamIDs := make([]streams.StreamID, len(errs))
errStrs := make([]string, len(errs))
for i, e := range errs {
errStrs[i] = e.String()
failedStreamIDs[i] = e.streamID
Expand Down

0 comments on commit 8546884

Please sign in to comment.