diff --git a/frontend/endpoints/collector_metrics/cluster_collector.go b/frontend/endpoints/collector_metrics/cluster_collector.go index 69a904844..df467ede3 100644 --- a/frontend/endpoints/collector_metrics/cluster_collector.go +++ b/frontend/endpoints/collector_metrics/cluster_collector.go @@ -199,7 +199,7 @@ func (dm *destinationsMetrics) updateDestinationMetricsByExporter(dp pmetric.Num // From this point on, we are updating the existing destination metrics var throughputPtr *int64 - var dataSentInInterval int64 + var dataSentInInterval float64 dtm := currentVal.clusterCollectorsTraffic[clusterCollectorID] // the metric data in 'dp' represent the number of spans/metrics/logs sent by the exporter @@ -208,20 +208,20 @@ func (dm *destinationsMetrics) updateDestinationMetricsByExporter(dp pmetric.Num case exporterSentSpansMetricName: throughputPtr = &dtm.tracesThroughput spansInInterval := int64(dp.DoubleValue()) - dtm.sentSpans - dataSentInInterval = int64(float64(spansInInterval) * dm.avgCalculator.lastCalculatedAvgSpanSize()) - dtm.tracesDataSent += dataSentInInterval + dataSentInInterval = float64(spansInInterval) * dm.avgCalculator.lastCalculatedAvgSpanSize() + dtm.tracesDataSent += int64(dataSentInInterval) dtm.sentSpans = int64(dp.DoubleValue()) case exporterSentMetricsMetricName: throughputPtr = &dtm.metricsThroughput metricsInInterval := int64(dp.DoubleValue()) - dtm.sentMetrics - dataSentInInterval = int64(float64(metricsInInterval) * dm.avgCalculator.lastCalculatedAvgMetricSize()) - dtm.metricsDataSent += dataSentInInterval + dataSentInInterval = float64(metricsInInterval) * dm.avgCalculator.lastCalculatedAvgMetricSize() + dtm.metricsDataSent += int64(dataSentInInterval) dtm.sentMetrics = int64(dp.DoubleValue()) case exporterSentLogsMetricName: throughputPtr = &dtm.logsThroughput logsInInterval := int64(dp.DoubleValue()) - dtm.sentLogs - dataSentInInterval = int64(float64(logsInInterval) * dm.avgCalculator.lastCalculatedAvgLogSize()) - dtm.logsDataSent += dataSentInInterval + dataSentInInterval = float64(logsInInterval) * dm.avgCalculator.lastCalculatedAvgLogSize() + dtm.logsDataSent += int64(dataSentInInterval) dtm.sentLogs = int64(dp.DoubleValue()) } @@ -235,14 +235,7 @@ func (dm *destinationsMetrics) updateDestinationMetricsByExporter(dp pmetric.Num return } - timeDiff := newTime.Sub(oldTime).Seconds() - - var throughput int64 - // calculate throughput only if the new value is greater than the old value and the time difference is positive - // otherwise, the throughput is set to 0 - if dataSentInInterval > 0 && timeDiff > 0 { - throughput = (dataSentInInterval) / int64(timeDiff) - } + throughput := calculateThroughput(dataSentInInterval, newTime, oldTime) *throughputPtr = throughput } diff --git a/frontend/endpoints/collector_metrics/node_collector.go b/frontend/endpoints/collector_metrics/node_collector.go index 61d6e2fa2..49e6623ee 100644 --- a/frontend/endpoints/collector_metrics/node_collector.go +++ b/frontend/endpoints/collector_metrics/node_collector.go @@ -95,14 +95,7 @@ func (sm *sourcesMetrics) updateSourceMetrics(dp pmetric.NumberDataPoint, metric return } - timeDiff := newTime.Sub(oldTime).Seconds() - - var throughput int64 - // calculate throughput only if the new value is greater than the old value and the time difference is positive - // otherwise, the throughput is set to 0 - if newVal > oldVal && timeDiff > 0 { - throughput = (newVal - oldVal) / int64(timeDiff) - } + throughput := calculateThroughput(float64(newVal-oldVal), newTime, oldTime) *throughputPtr = throughput } diff --git a/frontend/endpoints/collector_metrics/utils.go b/frontend/endpoints/collector_metrics/utils.go new file mode 100644 index 000000000..495b03690 --- /dev/null +++ b/frontend/endpoints/collector_metrics/utils.go @@ -0,0 +1,14 @@ +package collectormetrics + +import "time" + +func calculateThroughput(diff float64, currentTime, prevTime time.Time) int64 { + elapsed := currentTime.Sub(prevTime).Seconds() + + var throughput int64 + if diff > 0 && elapsed > 0 { + throughput = int64(diff / elapsed) + } + + return throughput +} \ No newline at end of file diff --git a/frontend/endpoints/collector_metrics/utils_test.go b/frontend/endpoints/collector_metrics/utils_test.go new file mode 100644 index 000000000..0f360d602 --- /dev/null +++ b/frontend/endpoints/collector_metrics/utils_test.go @@ -0,0 +1,33 @@ +package collectormetrics + +import ( + "testing" + "time" +) + +func TestCalculateThroughput(t *testing.T) { + cases := []struct { + name string + diff float64 + currentTime time.Time + prevTime time.Time + want int64 + }{ + {name: "no diff and no time change", diff: 0, currentTime: time.Now(), prevTime: time.Now(), want: 0}, + {name: "diff of 1 in 1 second", diff: 1, currentTime: time.Now(), prevTime: time.Now().Add(-1 * time.Second), want: 1}, + {name: "diff of 1 in 2 seconds", diff: 1, currentTime: time.Now(), prevTime: time.Now().Add(-2 * time.Second), want: 0}, + {name: "diff of 0 in 1 second", diff: 0, currentTime: time.Now(), prevTime: time.Now().Add(-1 * time.Second), want: 0}, + {name: "diff of 100 in 10 seconds", diff: 100, currentTime: time.Now(), prevTime: time.Now().Add(-10 * time.Second), want: 10}, + {name: "diff of 100 in 0.1 seconds", diff: 100, currentTime: time.Now(), prevTime: time.Now().Add(-100 * time.Millisecond), want: 1000}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + got := calculateThroughput(c.diff, c.currentTime, c.prevTime) + if got != c.want { + t.Errorf("calculateThroughput(%f, %v, %v) = %d, want %d", c.diff, c.currentTime, c.prevTime, got, c.want) + } + }) + } + +} \ No newline at end of file