diff --git a/core/services/llo/data_source.go b/core/services/llo/data_source.go index ef333f821a1..8f35eabbd02 100644 --- a/core/services/llo/data_source.go +++ b/core/services/llo/data_source.go @@ -5,6 +5,7 @@ import ( "fmt" "sort" "sync" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -100,6 +101,8 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, d.lggr.Debugw("Observing streams", "streamIDs", streamIDs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr) } + now := time.Now() + for _, streamID := range maps.Keys(streamValues) { go func(streamID llotypes.StreamID) { defer wg.Done() @@ -147,26 +150,36 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, wg.Wait() + elapsed := time.Since(now) + // Failed observations are always logged at warn level + var successfulStreamIDs []streams.StreamID var failedStreamIDs []streams.StreamID - if len(errs) > 0 { + var errStrs []string + + // Only log errors or if VerboseLogging is turned on + if len(errs) > 0 || opts.VerboseLogging() { + successfulStreamIDs = make([]streams.StreamID, 0, len(streamValues)) + for strmID := range streamValues { + successfulStreamIDs = append(successfulStreamIDs, strmID) + } + sort.Slice(successfulStreamIDs, func(i, j int) bool { return successfulStreamIDs[i] < successfulStreamIDs[j] }) + sort.Slice(errs, func(i, j int) bool { return errs[i].streamID < errs[j].streamID }) failedStreamIDs = make([]streams.StreamID, len(errs)) - errStrs := make([]string, len(errs)) + errStrs = make([]string, len(errs)) for i, e := range errs { errStrs[i] = e.String() failedStreamIDs[i] = e.streamID } - d.lggr.Warnw("Observation failed for streams", "failedStreamIDs", failedStreamIDs, "errs", errStrs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr) - } - if opts.VerboseLogging() { - successes := make([]streams.StreamID, 0, len(streamValues)) - for strmID := range streamValues { - successes = append(successes, strmID) + lggr := logger.With(d.lggr, "elapsed", elapsed, "nSuccessfulStreams", len(successfulStreamIDs), "nFailedStreams", len(failedStreamIDs), "successfulStreamIDs", successfulStreamIDs, "failedStreamIDs", failedStreamIDs, "errs", errStrs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr) + + if len(errs) == 0 && opts.VerboseLogging() { + lggr.Infow("Observation succeeded for all streams") + } else if len(errs) > 0 { + lggr.Warnw("Observation failed for streams") } - sort.Slice(successes, func(i, j int) bool { return successes[i] < successes[j] }) - d.lggr.Debugw("Observation complete", "successfulStreamIDs", successes, "failedStreamIDs", failedStreamIDs, "configDigest", opts.ConfigDigest(), "values", streamValues, "seqNr", opts.OutCtx().SeqNr) } return nil diff --git a/core/services/ocrcommon/telemetry.go b/core/services/ocrcommon/telemetry.go index e20b2485d86..58e57950cd3 100644 --- a/core/services/ocrcommon/telemetry.go +++ b/core/services/ocrcommon/telemetry.go @@ -164,7 +164,6 @@ func ParseMercuryEATelemetry(lggr logger.Logger, trrs pipeline.TaskRunResults, f bridgeRawResponse, ok := trr.Result.Value.(string) if !ok { - lggr.Warnw(fmt.Sprintf("cannot get bridge response from bridge task, id=%s, name=%q, expected string got %T", trr.Task.DotID(), bridgeName, trr.Result.Value), "dotID", trr.Task.DotID(), "bridgeName", bridgeName) continue } eaTelem, err := parseEATelemetry([]byte(bridgeRawResponse))