Skip to content

Commit

Permalink
feat(metrics): Configurable flush time shift (#2349)
Browse files Browse the repository at this point in the history
We currently shift the flush time of each metrics bucket by an offset
based on the project key. This prevents large peaks every interval, but
still allows flushing buckets from the same projects together, which
makes sense especially when the buckets are then sent to the upstream in
per-project envelopes.

With span metrics, we have seen that even buckets from the same project
can cause a lot of traffic. With
#2341, we want to increase the
bucket interval for span metrics to one minute. To prevent large,
project-specific peaks for span metrics, allow configuring a more random
distribution over time.

This should be used only in processing Relays where buckets are sent via
Kafka.
  • Loading branch information
jjbayer authored Jul 25, 2023
1 parent b6a1531 commit aae44c0
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
**Internal**:

- Add capability to configure metrics aggregators per use case. ([#2341](https://github.com/getsentry/relay/pull/2341))
- Configurable flush time offsets for metrics buckets. ([#2349](https://github.com/getsentry/relay/pull/2349))

## 23.7.0

Expand Down
67 changes: 54 additions & 13 deletions relay-metrics/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,24 @@ impl BucketKey {
}
}

/// Configuration value for [`AggregatorConfig::shift_key`].
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum ShiftKey {
/// Shifts the flush time by an offset based on the [`ProjectKey`].
///
/// This allows buckets from the same project to be flushed together.
#[default]
Project,

/// Shifts the flush time by an offset based on the bucket key itself.
///
/// This allows for a completely random distribution of bucket flush times.
///
/// Only for use in processing Relays.
Bucket,
}

/// Parameters used by the [`AggregatorService`].
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(default)]
Expand Down Expand Up @@ -1010,6 +1028,12 @@ pub struct AggregatorConfig {
///
/// Defaults to `None`, i.e. no limit.
pub max_project_key_bucket_bytes: Option<usize>,

/// Key used to shift the flush time of a bucket.
///
/// This prevents flushing all buckets from a bucket interval at the same
/// time by computing an offset from the hash of the given key.
pub shift_key: ShiftKey,
}

impl AggregatorConfig {
Expand Down Expand Up @@ -1068,28 +1092,21 @@ impl AggregatorConfig {
///
/// Recent buckets are flushed after a grace period of `initial_delay`. Backdated buckets, that
/// is, buckets that lie in the past, are flushed after the shorter `debounce_delay`.
fn get_flush_time(&self, bucket_timestamp: UnixTimestamp, project_key: ProjectKey) -> Instant {
fn get_flush_time(&self, bucket_key: &BucketKey) -> Instant {
let now = Instant::now();
let mut flush = None;

if let MonotonicResult::Instant(instant) = bucket_timestamp.to_instant() {
if let MonotonicResult::Instant(instant) = bucket_key.timestamp.to_instant() {
let instant = Instant::from_std(instant);
let bucket_end = instant + self.bucket_interval();
let initial_flush = bucket_end + self.initial_delay();
// If the initial flush is still pending, use that.
if initial_flush > now {
// Shift deterministically within one bucket interval based on the project key. This
// distributes buckets over time while also flushing all buckets of the same project
// key together.
let mut hasher = FnvHasher::default();
hasher.write(project_key.as_str().as_bytes());
let shift_millis = hasher.finish() % (self.bucket_interval * 1000);

flush = Some(initial_flush + Duration::from_millis(shift_millis));
flush = Some(initial_flush + self.flush_time_shift(bucket_key));
}
}

let delay = UnixTimestamp::now().as_secs() as i64 - bucket_timestamp.as_secs() as i64;
let delay = UnixTimestamp::now().as_secs() as i64 - bucket_key.timestamp.as_secs() as i64;
relay_statsd::metric!(
histogram(MetricHistograms::BucketsDelay) = delay as f64,
backdated = if flush.is_none() { "true" } else { "false" },
Expand All @@ -1102,6 +1119,23 @@ impl AggregatorConfig {
None => now + self.debounce_delay(),
}
}

// Shift deterministically within one bucket interval based on the project or bucket key.
//
// This distributes buckets over time to prevent peaks.
fn flush_time_shift(&self, bucket: &BucketKey) -> Duration {
let hash_value = match self.shift_key {
ShiftKey::Project => {
let mut hasher = FnvHasher::default();
hasher.write(bucket.project_key.as_str().as_bytes());
hasher.finish()
}
ShiftKey::Bucket => bucket.hash64(),
};
let shift_millis = hash_value % (self.bucket_interval * 1000);

Duration::from_millis(shift_millis)
}
}

impl Default for AggregatorConfig {
Expand All @@ -1119,6 +1153,7 @@ impl Default for AggregatorConfig {
max_tag_value_length: 200,
max_total_bucket_bytes: None,
max_project_key_bucket_bytes: None,
shift_key: ShiftKey::default(),
}
}
}
Expand Down Expand Up @@ -1671,7 +1706,6 @@ impl AggregatorService {
key: BucketKey,
value: T,
) -> Result<(), AggregateMetricsError> {
let timestamp = key.timestamp;
let project_key = key.project_key;

let key = Self::validate_bucket_key(key, &self.config)?;
Expand Down Expand Up @@ -1734,7 +1768,7 @@ impl AggregatorService {
metric_name = metric_name_tag(&entry.key().metric_name),
);

let flush_at = self.config.get_flush_time(timestamp, project_key);
let flush_at = self.config.get_flush_time(entry.key());
let bucket = value.into();
added_cost = entry.key().cost() + bucket.cost();
entry.insert(QueuedBucket::new(flush_at, bucket));
Expand Down Expand Up @@ -3241,4 +3275,11 @@ mod tests {
fn test_capped_iter_completeness_100() {
test_capped_iter_completeness(100, 4);
}

#[test]
fn test_parse_shift_key() {
let json = r#"{"shift_key": "bucket"}"#;
let parsed: AggregatorConfig = serde_json::from_str(json).unwrap();
assert!(matches!(parsed.shift_key, ShiftKey::Bucket));
}
}

0 comments on commit aae44c0

Please sign in to comment.