Skip to content

Commit

Permalink
instr(metrics): Tag aggregator (#2559)
Browse files Browse the repository at this point in the history
We've been seeing more invalid timestamps in the metrics aggregator over
the past weeks. The timing suggests these are span metrics, so tag
metrics and errors with the aggregator name (`"spans"` vs `"default"`)
to verify.
  • Loading branch information
jjbayer authored Oct 3, 2023
1 parent ce02c1c commit e4148b3
Showing 1 changed file with 37 additions and 12 deletions.
49 changes: 37 additions & 12 deletions relay-metrics/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,10 +340,6 @@ impl AggregatorConfig {
let output_timestamp = UnixTimestamp::from_secs(ts);

if !self.timestamp_range().contains(&output_timestamp) {
let delta = (ts as i64) - (UnixTimestamp::now().as_secs() as i64);
relay_statsd::metric!(
histogram(MetricHistograms::InvalidBucketTimestamp) = delta as f64,
);
return Err(AggregateMetricsErrorKind::InvalidTimestamp(timestamp).into());
}

Expand Down Expand Up @@ -920,6 +916,28 @@ impl AggregatorService {
key
}

// Wrapper for [`AggregatorConfig::get_bucket_timestamp`].
// Logs a statsd metric for invalid timestamps.
fn get_bucket_timestamp(
&self,
timestamp: UnixTimestamp,
bucket_width: u64,
) -> Result<UnixTimestamp, AggregateMetricsError> {
let res = self.config.get_bucket_timestamp(timestamp, bucket_width);
if let Err(AggregateMetricsError {
kind: AggregateMetricsErrorKind::InvalidTimestamp(ts),
}) = res
{
let delta = (ts.as_secs() as i64) - (UnixTimestamp::now().as_secs() as i64);
relay_statsd::metric!(
histogram(MetricHistograms::InvalidBucketTimestamp) = delta as f64,
aggregator = &self.name,
);
}

res
}

/// Merge a preaggregated bucket into this aggregator.
///
/// If no bucket exists for the given bucket key, a new bucket will be created.
Expand All @@ -928,9 +946,7 @@ impl AggregatorService {
project_key: ProjectKey,
bucket: Bucket,
) -> Result<(), AggregateMetricsError> {
let timestamp = self
.config
.get_bucket_timestamp(bucket.timestamp, bucket.width)?;
let timestamp = self.get_bucket_timestamp(bucket.timestamp, bucket.width)?;
let key = BucketKey {
project_key,
timestamp,
Expand Down Expand Up @@ -1024,7 +1040,7 @@ impl AggregatorService {
{
for bucket in buckets.into_iter() {
if let Err(error) = self.merge(project_key, bucket) {
relay_log::error!(error = &error as &dyn Error);
relay_log::error!(tags.aggregator = self.name, error = &error as &dyn Error);
}
}

Expand All @@ -1043,7 +1059,8 @@ impl AggregatorService {
// We only emit statsd metrics for the cost on flush (and not when merging the buckets),
// assuming that this gives us more than enough data points.
relay_statsd::metric!(
gauge(MetricGauges::BucketsCost) = self.cost_tracker.total_cost as u64
gauge(MetricGauges::BucketsCost) = self.cost_tracker.total_cost as u64,
aggregator = &self.name,
);

let mut buckets = HashMap::<ProjectKey, Vec<HashedBucket>>::new();
Expand Down Expand Up @@ -1099,7 +1116,8 @@ impl AggregatorService {
relay_statsd::metric!(
gauge(MetricGauges::AvgBucketSize) = item_count as f64 / bucket_count as f64,
metric_type = ty.as_str(),
namespace = namespace.as_str()
namespace = namespace.as_str(),
aggregator = &self.name,
);
}

Expand Down Expand Up @@ -1216,7 +1234,11 @@ impl AggregatorService {
buckets,
} = msg;
if let Err(err) = self.merge_all(project_key, buckets) {
relay_log::error!(error = &err as &dyn Error, "failed to merge buckets");
relay_log::error!(
tags.aggregator = &self.name,
error = &err as &dyn Error,
"failed to merge buckets"
);
}
}

Expand Down Expand Up @@ -1275,7 +1297,10 @@ impl Drop for AggregatorService {
fn drop(&mut self) {
let remaining_buckets = self.buckets.len();
if remaining_buckets > 0 {
relay_log::error!("metrics aggregator dropping {remaining_buckets} buckets");
relay_log::error!(
tags.aggregator = &self.name,
"metrics aggregator dropping {remaining_buckets} buckets"
);
relay_statsd::metric!(
counter(MetricCounters::BucketsDropped) += remaining_buckets as i64,
aggregator = &self.name,
Expand Down

0 comments on commit e4148b3

Please sign in to comment.