Skip to content

Commit

Permalink
Fix throughput calculation from collectors metrics (#1621)
Browse files Browse the repository at this point in the history
Co-authored-by: Amir Blum <amirgiraffe@gmail.com>
  • Loading branch information
RonFed and blumamir authored Oct 25, 2024
1 parent 1f84889 commit df56ec0
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 23 deletions.
23 changes: 8 additions & 15 deletions frontend/endpoints/collector_metrics/cluster_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (dm *destinationsMetrics) updateDestinationMetricsByExporter(dp pmetric.Num

// From this point on, we are updating the existing destination metrics
var throughputPtr *int64
var dataSentInInterval int64
var dataSentInInterval float64
dtm := currentVal.clusterCollectorsTraffic[clusterCollectorID]

// the metric data in 'dp' represent the number of spans/metrics/logs sent by the exporter
Expand All @@ -208,20 +208,20 @@ func (dm *destinationsMetrics) updateDestinationMetricsByExporter(dp pmetric.Num
case exporterSentSpansMetricName:
throughputPtr = &dtm.tracesThroughput
spansInInterval := int64(dp.DoubleValue()) - dtm.sentSpans
dataSentInInterval = int64(float64(spansInInterval) * dm.avgCalculator.lastCalculatedAvgSpanSize())
dtm.tracesDataSent += dataSentInInterval
dataSentInInterval = float64(spansInInterval) * dm.avgCalculator.lastCalculatedAvgSpanSize()
dtm.tracesDataSent += int64(dataSentInInterval)
dtm.sentSpans = int64(dp.DoubleValue())
case exporterSentMetricsMetricName:
throughputPtr = &dtm.metricsThroughput
metricsInInterval := int64(dp.DoubleValue()) - dtm.sentMetrics
dataSentInInterval = int64(float64(metricsInInterval) * dm.avgCalculator.lastCalculatedAvgMetricSize())
dtm.metricsDataSent += dataSentInInterval
dataSentInInterval = float64(metricsInInterval) * dm.avgCalculator.lastCalculatedAvgMetricSize()
dtm.metricsDataSent += int64(dataSentInInterval)
dtm.sentMetrics = int64(dp.DoubleValue())
case exporterSentLogsMetricName:
throughputPtr = &dtm.logsThroughput
logsInInterval := int64(dp.DoubleValue()) - dtm.sentLogs
dataSentInInterval = int64(float64(logsInInterval) * dm.avgCalculator.lastCalculatedAvgLogSize())
dtm.logsDataSent += dataSentInInterval
dataSentInInterval = float64(logsInInterval) * dm.avgCalculator.lastCalculatedAvgLogSize()
dtm.logsDataSent += int64(dataSentInInterval)
dtm.sentLogs = int64(dp.DoubleValue())
}

Expand All @@ -235,14 +235,7 @@ func (dm *destinationsMetrics) updateDestinationMetricsByExporter(dp pmetric.Num
return
}

timeDiff := newTime.Sub(oldTime).Seconds()

var throughput int64
// calculate throughput only if the new value is greater than the old value and the time difference is positive
// otherwise, the throughput is set to 0
if dataSentInInterval > 0 && timeDiff > 0 {
throughput = (dataSentInInterval) / int64(timeDiff)
}
throughput := calculateThroughput(dataSentInInterval, newTime, oldTime)

*throughputPtr = throughput
}
Expand Down
9 changes: 1 addition & 8 deletions frontend/endpoints/collector_metrics/node_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,7 @@ func (sm *sourcesMetrics) updateSourceMetrics(dp pmetric.NumberDataPoint, metric
return
}

timeDiff := newTime.Sub(oldTime).Seconds()

var throughput int64
// calculate throughput only if the new value is greater than the old value and the time difference is positive
// otherwise, the throughput is set to 0
if newVal > oldVal && timeDiff > 0 {
throughput = (newVal - oldVal) / int64(timeDiff)
}
throughput := calculateThroughput(float64(newVal-oldVal), newTime, oldTime)

*throughputPtr = throughput
}
Expand Down
14 changes: 14 additions & 0 deletions frontend/endpoints/collector_metrics/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package collectormetrics

import "time"

func calculateThroughput(diff float64, currentTime, prevTime time.Time) int64 {
elapsed := currentTime.Sub(prevTime).Seconds()

var throughput int64
if diff > 0 && elapsed > 0 {
throughput = int64(diff / elapsed)
}

return throughput
}
33 changes: 33 additions & 0 deletions frontend/endpoints/collector_metrics/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package collectormetrics

import (
"testing"
"time"
)

func TestCalculateThroughput(t *testing.T) {
cases := []struct {
name string
diff float64
currentTime time.Time
prevTime time.Time
want int64
}{
{name: "no diff and no time change", diff: 0, currentTime: time.Now(), prevTime: time.Now(), want: 0},
{name: "diff of 1 in 1 second", diff: 1, currentTime: time.Now(), prevTime: time.Now().Add(-1 * time.Second), want: 1},
{name: "diff of 1 in 2 seconds", diff: 1, currentTime: time.Now(), prevTime: time.Now().Add(-2 * time.Second), want: 0},
{name: "diff of 0 in 1 second", diff: 0, currentTime: time.Now(), prevTime: time.Now().Add(-1 * time.Second), want: 0},
{name: "diff of 100 in 10 seconds", diff: 100, currentTime: time.Now(), prevTime: time.Now().Add(-10 * time.Second), want: 10},
{name: "diff of 100 in 0.1 seconds", diff: 100, currentTime: time.Now(), prevTime: time.Now().Add(-100 * time.Millisecond), want: 1000},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
got := calculateThroughput(c.diff, c.currentTime, c.prevTime)
if got != c.want {
t.Errorf("calculateThroughput(%f, %v, %v) = %d, want %d", c.diff, c.currentTime, c.prevTime, got, c.want)
}
})
}

}

0 comments on commit df56ec0

Please sign in to comment.