Skip to content

Commit

Permalink
ref(metrics): Better align flush times with intervals (#2904)
Browse files Browse the repository at this point in the history
Align the flush times of backdated metric buckets with bucket intervals
so that global flushing becomes more effective. This is done through two
changes:

1. Allow to configure `ShiftKey::None` to disable shifting of flushed
   buckets. If set to `none`, buckets from all projects are flushed at
   the same time.
2. For backdated buckets, do not use the precise current timestamp.
   Instead, round it to the current bucket interval, then add the
   debounce delay. When shifting is configured, the shift is now also
   added to backdated buckets.

Integration tests by default use `shift_key: none` now to improve their
execution time, which also helps reduce flakiness.
  • Loading branch information
jan-auer authored Jan 3, 2024
1 parent 66c2f82 commit 3494d06
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 138 deletions.
58 changes: 9 additions & 49 deletions relay-common/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,6 @@ pub fn chrono_to_positive_millis(duration: chrono::Duration) -> f64 {
duration_to_millis(duration.to_std().unwrap_or_default())
}

/// The conversion result of [`UnixTimestamp::to_instant`].
///
/// If the time is outside of what can be represented in an [`Instant`], this is `Past` or
/// `Future`.
#[derive(Clone, Copy, Debug)]
pub enum MonotonicResult {
/// A time before the earliest representable `Instant`.
Past,
/// A representable `Instant`.
Instant(Instant),
/// A time after the latest representable `Instant`.
Future,
}

/// A unix timestamp (full seconds elapsed since 1970-01-01 00:00 UTC).
#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct UnixTimestamp(u64);
Expand Down Expand Up @@ -133,41 +119,6 @@ impl UnixTimestamp {
NaiveDateTime::from_timestamp_opt(self.0 as i64, 0)
.map(|n| DateTime::from_naive_utc_and_offset(n, Utc))
}

/// Converts the UNIX timestamp into an `Instant` based on the current system timestamp.
///
/// Returns [`MonotonicResult::Instant`] if the timestamp can be represented. Otherwise, returns
/// [`MonotonicResult::Past`] or [`MonotonicResult::Future`].
///
/// Note that the system time is subject to skew, so subsequent calls to `to_instant` may return
/// different values.
///
/// # Example
///
/// ```
/// use std::time::{Duration, Instant};
/// use relay_common::time::{MonotonicResult, UnixTimestamp};
///
/// let timestamp = UnixTimestamp::now();
/// if let MonotonicResult::Instant(instant) = timestamp.to_instant() {
/// assert!((Instant::now() - instant) < Duration::from_millis(1));
/// }
/// ```
pub fn to_instant(self) -> MonotonicResult {
let now = Self::now();

if self > now {
match Instant::now().checked_add(self - now) {
Some(instant) => MonotonicResult::Instant(instant),
None => MonotonicResult::Future,
}
} else {
match Instant::now().checked_sub(now - self) {
Some(instant) => MonotonicResult::Instant(instant),
None => MonotonicResult::Past,
}
}
}
}

impl fmt::Debug for UnixTimestamp {
Expand All @@ -182,6 +133,15 @@ impl fmt::Display for UnixTimestamp {
}
}

/// Adds _whole_ seconds of the given duration to the timestamp.
impl std::ops::Add<Duration> for UnixTimestamp {
type Output = Self;

fn add(self, rhs: Duration) -> Self::Output {
Self(self.0.saturating_add(rhs.as_secs()))
}
}

impl std::ops::Sub for UnixTimestamp {
type Output = Duration;

Expand Down
98 changes: 47 additions & 51 deletions relay-metrics/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{fmt, mem};

use fnv::FnvHasher;
use relay_base_schema::project::ProjectKey;
use relay_common::time::{MonotonicResult, UnixTimestamp};
use relay_common::time::UnixTimestamp;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::time::Instant;
Expand Down Expand Up @@ -117,6 +117,9 @@ pub enum ShiftKey {
///
/// Only for use in processing Relays.
Bucket,

/// Do not apply shift. This should be set when `http.global_metrics` is used.
None,
}

/// Parameters used by the [`Aggregator`].
Expand Down Expand Up @@ -194,31 +197,35 @@ impl AggregatorConfig {
/// Recent buckets are flushed after a grace period of `initial_delay`. Backdated buckets, that
/// is, buckets that lie in the past, are flushed after the shorter `debounce_delay`.
fn get_flush_time(&self, bucket_key: &BucketKey) -> Instant {
let now = Instant::now();
let mut flush = None;
let initial_flush = bucket_key.timestamp + self.bucket_interval() + self.initial_delay();

if let MonotonicResult::Instant(instant) = bucket_key.timestamp.to_instant() {
let instant = Instant::from_std(instant);
let bucket_end = instant + self.bucket_interval();
let initial_flush = bucket_end + self.initial_delay();
// If the initial flush is still pending, use that.
if initial_flush > now {
flush = Some(initial_flush + self.flush_time_shift(bucket_key));
}
}
let now = UnixTimestamp::now();
let backdated = initial_flush <= now;

let delay = UnixTimestamp::now().as_secs() as i64 - bucket_key.timestamp.as_secs() as i64;
let delay = now.as_secs() as i64 - bucket_key.timestamp.as_secs() as i64;
relay_statsd::metric!(
histogram(MetricHistograms::BucketsDelay) = delay as f64,
backdated = if flush.is_none() { "true" } else { "false" },
backdated = if backdated { "true" } else { "false" },
);

// If the initial flush time has passed or cannot be represented, debounce future flushes
// with the `debounce_delay` starting now.
match flush {
Some(initial_flush) => initial_flush,
None => now + self.debounce_delay(),
}
let flush_timestamp = if backdated {
// If the initial flush time has passed or cannot be represented, debounce future
// flushes with the `debounce_delay` starting now. However, align the current timestamp
// with the bucket interval for proper batching.
let floor = (now.as_secs() / self.bucket_interval) * self.bucket_interval;
UnixTimestamp::from_secs(floor) + self.bucket_interval() + self.debounce_delay()
} else {
// If the initial flush is still pending, use that.
initial_flush
};

let instant = if flush_timestamp > now {
Instant::now().checked_add(flush_timestamp - now)
} else {
Instant::now().checked_sub(now - flush_timestamp)
};

instant.unwrap_or_else(Instant::now) + self.flush_time_shift(bucket_key)
}

/// The delay to debounce backdated flushes.
Expand Down Expand Up @@ -247,32 +254,23 @@ impl AggregatorConfig {
hasher.finish()
}
ShiftKey::Bucket => bucket.hash64(),
ShiftKey::None => return Duration::ZERO,
};
let shift_millis = hash_value % (self.bucket_interval * 1000);

let shift_millis = hash_value % (self.bucket_interval * 1000);
Duration::from_millis(shift_millis)
}

/// Determines the target bucket for an incoming bucket timestamp and bucket width.
///
/// We select the output bucket which overlaps with the center of the incoming bucket.
/// Fails if timestamp is too old or too far into the future.
fn get_bucket_timestamp(
&self,
timestamp: UnixTimestamp,
bucket_width: u64,
) -> Result<UnixTimestamp, AggregateMetricsError> {
fn get_bucket_timestamp(&self, timestamp: UnixTimestamp, bucket_width: u64) -> UnixTimestamp {
// Find middle of the input bucket to select a target
let ts = timestamp.as_secs().saturating_add(bucket_width / 2);
// Align target_timestamp to output bucket width
let ts = (ts / self.bucket_interval) * self.bucket_interval;
let output_timestamp = UnixTimestamp::from_secs(ts);

if !self.timestamp_range().contains(&output_timestamp) {
return Err(AggregateMetricsErrorKind::InvalidTimestamp(timestamp).into());
}

Ok(output_timestamp)
UnixTimestamp::from_secs(ts)
}

/// Returns the valid range for metrics timestamps.
Expand Down Expand Up @@ -696,26 +694,27 @@ impl Aggregator {
key
}

// Wrapper for [`AggregatorConfig::get_bucket_timestamp`].
// Logs a statsd metric for invalid timestamps.
/// Wrapper for [`AggregatorConfig::get_bucket_timestamp`].
///
/// Logs a statsd metric for invalid timestamps.
fn get_bucket_timestamp(
&self,
timestamp: UnixTimestamp,
bucket_width: u64,
) -> Result<UnixTimestamp, AggregateMetricsError> {
let res = self.config.get_bucket_timestamp(timestamp, bucket_width);
if let Err(AggregateMetricsError {
kind: AggregateMetricsErrorKind::InvalidTimestamp(ts),
}) = res
{
let delta = (ts.as_secs() as i64) - (UnixTimestamp::now().as_secs() as i64);
let bucket_ts = self.config.get_bucket_timestamp(timestamp, bucket_width);

if !self.config.timestamp_range().contains(&bucket_ts) {
let delta = (bucket_ts.as_secs() as i64) - (UnixTimestamp::now().as_secs() as i64);
relay_statsd::metric!(
histogram(MetricHistograms::InvalidBucketTimestamp) = delta as f64,
aggregator = &self.name,
);

return Err(AggregateMetricsErrorKind::InvalidTimestamp(timestamp).into());
}

res
Ok(bucket_ts)
}

/// Merge a preaggregated bucket into this aggregator.
Expand Down Expand Up @@ -863,7 +862,6 @@ impl fmt::Debug for Aggregator {

#[cfg(test)]
mod tests {

use similar_asserts::assert_eq;

use super::*;
Expand All @@ -880,7 +878,7 @@ mod tests {
max_tag_key_length: 200,
max_tag_value_length: 200,
max_project_key_bucket_bytes: None,
..Default::default()
shift_key: ShiftKey::default(),
}
}

Expand Down Expand Up @@ -1181,7 +1179,7 @@ mod tests {
assert_eq!(total_cost, current_cost + expected_added_cost);
}

aggregator.pop_flush_buckets(false);
aggregator.pop_flush_buckets(true);
assert_eq!(aggregator.cost_tracker.total_cost, 0);
}

Expand All @@ -1194,8 +1192,10 @@ mod tests {
..Default::default()
};

let aggregator = Aggregator::new(config);

assert!(matches!(
config
aggregator
.get_bucket_timestamp(UnixTimestamp::from_secs(u64::MAX), 2)
.unwrap_err()
.kind,
Expand All @@ -1215,9 +1215,7 @@ mod tests {
let now = UnixTimestamp::now().as_secs();
let rounded_now = UnixTimestamp::from_secs(now / 10 * 10);
assert_eq!(
config
.get_bucket_timestamp(UnixTimestamp::from_secs(now), 0)
.unwrap(),
config.get_bucket_timestamp(UnixTimestamp::from_secs(now), 0),
rounded_now
);
}
Expand All @@ -1236,7 +1234,6 @@ mod tests {
assert_eq!(
config
.get_bucket_timestamp(UnixTimestamp::from_secs(now), 20)
.unwrap()
.as_secs(),
rounded_now + 10
);
Expand All @@ -1256,7 +1253,6 @@ mod tests {
assert_eq!(
config
.get_bucket_timestamp(UnixTimestamp::from_secs(now), 23)
.unwrap()
.as_secs(),
rounded_now + 10
);
Expand Down
37 changes: 0 additions & 37 deletions relay-metrics/src/aggregatorservice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,6 @@ impl MergeBuckets {

#[cfg(test)]
mod tests {

use std::collections::BTreeMap;
use std::sync::{Arc, RwLock};

Expand Down Expand Up @@ -498,40 +497,4 @@ mod tests {
// receiver must have 1 bucket flushed
assert_eq!(receiver.bucket_count(), 1);
}

#[tokio::test]
async fn test_merge_back() {
relay_test::setup();
tokio::time::pause();

// Create a receiver which accepts nothing:
let receiver = TestReceiver {
reject_all: true,
..TestReceiver::default()
};
let recipient = receiver.clone().start().recipient();

let config = AggregatorServiceConfig {
bucket_interval: 1,
initial_delay: 0,
debounce_delay: 0,
..Default::default()
};
let aggregator = AggregatorService::new(config, Some(recipient)).start();

let mut bucket = some_bucket();
bucket.timestamp = UnixTimestamp::now();

aggregator.send(MergeBuckets {
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
buckets: vec![bucket],
});

assert_eq!(receiver.bucket_count(), 0);

tokio::time::sleep(Duration::from_millis(1100)).await;
let bucket_count = aggregator.send(BucketCountInquiry).await.unwrap();
assert_eq!(bucket_count, 1);
assert_eq!(receiver.bucket_count(), 0);
}
}
8 changes: 7 additions & 1 deletion tests/integration/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@
from .test_envelope import generate_transaction_item

TEST_CONFIG = {
"aggregator": {"bucket_interval": 1, "initial_delay": 0, "debounce_delay": 0}
"aggregator": {
"bucket_interval": 1,
"initial_delay": 0,
"debounce_delay": 0,
"shift_key": "none",
}
}


Expand Down Expand Up @@ -138,6 +143,7 @@ def test_metrics_partition_key(mini_sentry, relay, metrics_partitions, expected_
"max_secs_in_past": forever,
"max_secs_in_future": forever,
"flush_partitions": metrics_partitions,
"shift_key": "none",
},
}
relay = relay(mini_sentry, options=relay_config)
Expand Down

0 comments on commit 3494d06

Please sign in to comment.