diff --git a/core/services/llo/data_source.go b/core/services/llo/data_source.go index a4d8652660e..0585dec49dc 100644 --- a/core/services/llo/data_source.go +++ b/core/services/llo/data_source.go @@ -3,6 +3,7 @@ package llo import ( "context" "fmt" + "slices" "sort" "sync" "time" @@ -86,11 +87,7 @@ func newDataSource(lggr logger.Logger, registry Registry, t Telemeter) *dataSour // Observe looks up all streams in the registry and populates a map of stream ID => value func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, opts llo.DSOpts) error { - var wg sync.WaitGroup - wg.Add(len(streamValues)) - var svmu sync.Mutex - var errs []ErrObservationFailed - var errmu sync.Mutex + now := time.Now() if opts.VerboseLogging() { streamIDs := make([]streams.StreamID, 0, len(streamValues)) @@ -101,7 +98,12 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, d.lggr.Debugw("Observing streams", "streamIDs", streamIDs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr) } - now := time.Now() + var wg sync.WaitGroup + wg.Add(len(streamValues)) + + var mu sync.Mutex + successfulStreamIDs := make([]streams.StreamID, 0, len(streamValues)) + var errs []ErrObservationFailed for _, streamID := range maps.Keys(streamValues) { go func(streamID llotypes.StreamID) { @@ -111,17 +113,17 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, stream, exists := d.registry.Get(streamID) if !exists { - errmu.Lock() + mu.Lock() errs = append(errs, ErrObservationFailed{streamID: streamID, reason: fmt.Sprintf("missing stream: %d", streamID)}) - errmu.Unlock() + mu.Unlock() promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc() return } run, trrs, err := stream.Run(ctx) if err != nil { - errmu.Lock() + mu.Lock() errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "pipeline run failed"}) - errmu.Unlock() + mu.Unlock() promObservationErrorCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc() // TODO: Consolidate/reduce telemetry. We should send all observation results in a single packet // https://smartcontract-it.atlassian.net/browse/MERC-6290 @@ -132,42 +134,34 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, // https://smartcontract-it.atlassian.net/browse/MERC-6290 val, err = ExtractStreamValue(trrs) if err != nil { - errmu.Lock() + mu.Lock() errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "failed to extract big.Int"}) - errmu.Unlock() + mu.Unlock() return } d.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, nil) + mu.Lock() + defer mu.Unlock() + + successfulStreamIDs = append(successfulStreamIDs, streamID) if val != nil { - svmu.Lock() - defer svmu.Unlock() streamValues[streamID] = val } }(streamID) } wg.Wait() - elapsed := time.Since(now) - // Failed observations are always logged at warn level - var successfulStreamIDs []streams.StreamID - var failedStreamIDs []streams.StreamID - var errStrs []string - - // Only log errors or if VerboseLogging is turned on + // Only log on errors or if VerboseLogging is turned on if len(errs) > 0 || opts.VerboseLogging() { - successfulStreamIDs = make([]streams.StreamID, 0, len(streamValues)) - for strmID := range streamValues { - successfulStreamIDs = append(successfulStreamIDs, strmID) - } - sort.Slice(successfulStreamIDs, func(i, j int) bool { return successfulStreamIDs[i] < successfulStreamIDs[j] }) - + slices.Sort(successfulStreamIDs) sort.Slice(errs, func(i, j int) bool { return errs[i].streamID < errs[j].streamID }) - failedStreamIDs = make([]streams.StreamID, len(errs)) - errStrs = make([]string, len(errs)) + + failedStreamIDs := make([]streams.StreamID, len(errs)) + errStrs := make([]string, len(errs)) for i, e := range errs { errStrs[i] = e.String() failedStreamIDs[i] = e.streamID