From 4c34323fd7522d45289d0887106249d47866d17a Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Tue, 13 Dec 2022 15:41:32 -0500 Subject: [PATCH] bug fix --- pkg/pgmodel/ingestor/metric_batcher.go | 9 +++++---- pkg/pgmodel/ingestor/reservation.go | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/pgmodel/ingestor/metric_batcher.go b/pkg/pgmodel/ingestor/metric_batcher.go index 29c98e9adc..840b73aaa4 100644 --- a/pkg/pgmodel/ingestor/metric_batcher.go +++ b/pkg/pgmodel/ingestor/metric_batcher.go @@ -256,7 +256,8 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con numSeries := pending.batch.CountSeries() numSamples, numExemplars := pending.batch.Count() - + wasFull := pending.IsFull() + start := pending.Start select { //try to batch as much as possible before sending case req, ok := <-recvCh: @@ -265,7 +266,7 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con span.AddEvent("Sending last non-empty batch") copySender <- copyRequest{pending, info} metrics.IngestorFlushSeries.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSeries)) - metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(pending.Start).Seconds()) + metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(start).Seconds()) } span.AddEvent("Exiting metric batcher batch loop") span.SetAttributes(attribute.Int("num_series", numSeries)) @@ -276,8 +277,8 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con case copySender <- copyRequest{pending, info}: metrics.IngestorFlushSeries.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSeries)) metrics.IngestorFlushInsertables.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSamples + numExemplars)) - metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(pending.Start).Seconds()) - if pending.IsFull() { + metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(start).Seconds()) + if wasFull { metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher", "reason": "size"}).Inc() } else { metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher", "reason": "requested"}).Inc() diff --git a/pkg/pgmodel/ingestor/reservation.go b/pkg/pgmodel/ingestor/reservation.go index 5d1f769037..7b87e4fbca 100644 --- a/pkg/pgmodel/ingestor/reservation.go +++ b/pkg/pgmodel/ingestor/reservation.go @@ -165,7 +165,7 @@ func (rq *ReservationQueue) Peek() (time.Time, bool) { case <-waitch: case <-time.After(250 * time.Millisecond): } - log.TraceRequest("component", "reservation", "event", "peek", "batched_metrics", rq.q.Len(), "waited", waited, "took", time.Since((*rq.q)[0].GetStartTime())) + log.TraceRequest("component", "reservation", "event", "peek", "batched_metrics", rq.Len(), "waited", waited, "took", time.Since(reservation.GetStartTime())) } return reservation.GetStartTime(), ok }