Skip to content

Commit

Permalink
bulkerapp: improve queue size metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Jan 23, 2025
1 parent b3739ed commit 582b1b3
Showing 1 changed file with 12 additions and 1 deletion.
13 changes: 12 additions & 1 deletion bulkerapp/app/abstract_batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ func (bc *AbstractBatchConsumer) ConsumeAll() (counters BatchCounters, err error
bc.Debugf("Consumer should not consume. offsets: %d-%d", lowOffset, highOffset)
return BatchCounters{}, nil
}
metrics.ConsumerQueueSize(bc.topicId, bc.mode, bc.destinationId, bc.tableName).Set(math.Max(float64(highOffset-lowOffset-int64(maxBatchSize)), 0))
metrics.ConsumerQueueSize(bc.topicId, bc.mode, bc.destinationId, bc.tableName).Set(math.Max(float64(highOffset-lowOffset), 0))
lastMetricTime := time.Now()
bc.Debugf("Starting consuming messages from topic. Messages in topic: ~%d. ", highOffset-lowOffset)
batchNumber := 1
for {
Expand All @@ -303,6 +304,16 @@ func (bc *AbstractBatchConsumer) ConsumeAll() (counters BatchCounters, err error
}
totalState.Merge(batchState)
counters.accumulate(batchStats)
if time.Since(lastMetricTime) > 5*time.Minute {
_, newHighOffset, err := bc.consumer.Load().QueryWatermarkOffsets(bc.topicId, 0, 10_000)
if err != nil {
bc.Errorf("Failed to query watermark offsets: %v", err)
bc.errorMetric("query_watermark_failed")
} else {
metrics.ConsumerQueueSize(bc.topicId, bc.mode, bc.destinationId, bc.tableName).Set(math.Max(float64(newHighOffset-lowOffset-int64(counters.consumed)), 0))
}
lastMetricTime = time.Now()
}
if !nextBatch {
err = err2
return
Expand Down

0 comments on commit 582b1b3

Please sign in to comment.