diff --git a/relay-metrics/src/aggregation.rs b/relay-metrics/src/aggregation.rs index 034b2dd5a5..683468134d 100644 --- a/relay-metrics/src/aggregation.rs +++ b/relay-metrics/src/aggregation.rs @@ -340,10 +340,6 @@ impl AggregatorConfig { let output_timestamp = UnixTimestamp::from_secs(ts); if !self.timestamp_range().contains(&output_timestamp) { - let delta = (ts as i64) - (UnixTimestamp::now().as_secs() as i64); - relay_statsd::metric!( - histogram(MetricHistograms::InvalidBucketTimestamp) = delta as f64, - ); return Err(AggregateMetricsErrorKind::InvalidTimestamp(timestamp).into()); } @@ -920,6 +916,28 @@ impl AggregatorService { key } + // Wrapper for [`AggregatorConfig::get_bucket_timestamp`]. + // Logs a statsd metric for invalid timestamps. + fn get_bucket_timestamp( + &self, + timestamp: UnixTimestamp, + bucket_width: u64, + ) -> Result { + let res = self.config.get_bucket_timestamp(timestamp, bucket_width); + if let Err(AggregateMetricsError { + kind: AggregateMetricsErrorKind::InvalidTimestamp(ts), + }) = res + { + let delta = (ts.as_secs() as i64) - (UnixTimestamp::now().as_secs() as i64); + relay_statsd::metric!( + histogram(MetricHistograms::InvalidBucketTimestamp) = delta as f64, + aggregator = &self.name, + ); + } + + res + } + /// Merge a preaggregated bucket into this aggregator. /// /// If no bucket exists for the given bucket key, a new bucket will be created. @@ -928,9 +946,7 @@ impl AggregatorService { project_key: ProjectKey, bucket: Bucket, ) -> Result<(), AggregateMetricsError> { - let timestamp = self - .config - .get_bucket_timestamp(bucket.timestamp, bucket.width)?; + let timestamp = self.get_bucket_timestamp(bucket.timestamp, bucket.width)?; let key = BucketKey { project_key, timestamp, @@ -1024,7 +1040,7 @@ impl AggregatorService { { for bucket in buckets.into_iter() { if let Err(error) = self.merge(project_key, bucket) { - relay_log::error!(error = &error as &dyn Error); + relay_log::error!(tags.aggregator = self.name, error = &error as &dyn Error); } } @@ -1043,7 +1059,8 @@ impl AggregatorService { // We only emit statsd metrics for the cost on flush (and not when merging the buckets), // assuming that this gives us more than enough data points. relay_statsd::metric!( - gauge(MetricGauges::BucketsCost) = self.cost_tracker.total_cost as u64 + gauge(MetricGauges::BucketsCost) = self.cost_tracker.total_cost as u64, + aggregator = &self.name, ); let mut buckets = HashMap::>::new(); @@ -1099,7 +1116,8 @@ impl AggregatorService { relay_statsd::metric!( gauge(MetricGauges::AvgBucketSize) = item_count as f64 / bucket_count as f64, metric_type = ty.as_str(), - namespace = namespace.as_str() + namespace = namespace.as_str(), + aggregator = &self.name, ); } @@ -1216,7 +1234,11 @@ impl AggregatorService { buckets, } = msg; if let Err(err) = self.merge_all(project_key, buckets) { - relay_log::error!(error = &err as &dyn Error, "failed to merge buckets"); + relay_log::error!( + tags.aggregator = &self.name, + error = &err as &dyn Error, + "failed to merge buckets" + ); } } @@ -1275,7 +1297,10 @@ impl Drop for AggregatorService { fn drop(&mut self) { let remaining_buckets = self.buckets.len(); if remaining_buckets > 0 { - relay_log::error!("metrics aggregator dropping {remaining_buckets} buckets"); + relay_log::error!( + tags.aggregator = &self.name, + "metrics aggregator dropping {remaining_buckets} buckets" + ); relay_statsd::metric!( counter(MetricCounters::BucketsDropped) += remaining_buckets as i64, aggregator = &self.name,