Skip to content

Commit

Permalink
fix(metrics): Improve flush time calculation in metrics aggregator (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo authored Jun 14, 2024
1 parent 50dbab7 commit c2b7f38
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
**Internal**:

- Treat arrays of pairs as key-value mappings during PII scrubbing. ([#3639](https://github.com/getsentry/relay/pull/3639))
- Improve flush time calculation in metrics aggregator. ([#3726](https://github.com/getsentry/relay/pull/3726))

## 24.5.1

Expand Down
128 changes: 91 additions & 37 deletions relay-metrics/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,42 +231,6 @@ pub struct AggregatorConfig {
}

impl AggregatorConfig {
/// Returns the instant at which a bucket should be flushed.
///
/// Recent buckets are flushed after a grace period of `initial_delay`. Backdated buckets, that
/// is, buckets that lie in the past, are flushed after the shorter `debounce_delay`.
fn get_flush_time(&self, bucket_key: &BucketKey) -> Instant {
let initial_flush = bucket_key.timestamp + self.bucket_interval() + self.initial_delay();

let now = UnixTimestamp::now();
let backdated = initial_flush <= now;

let delay = now.as_secs() as i64 - bucket_key.timestamp.as_secs() as i64;
relay_statsd::metric!(
histogram(MetricHistograms::BucketsDelay) = delay as f64,
backdated = if backdated { "true" } else { "false" },
);

let flush_timestamp = if backdated {
// If the initial flush time has passed or cannot be represented, debounce future
// flushes with the `debounce_delay` starting now. However, align the current timestamp
// with the bucket interval for proper batching.
let floor = (now.as_secs() / self.bucket_interval) * self.bucket_interval;
UnixTimestamp::from_secs(floor) + self.bucket_interval() + self.debounce_delay()
} else {
// If the initial flush is still pending, use that.
initial_flush
};

let instant = if flush_timestamp > now {
Instant::now().checked_add(flush_timestamp - now)
} else {
Instant::now().checked_sub(now - flush_timestamp)
};

instant.unwrap_or_else(Instant::now) + self.flush_time_shift(bucket_key)
}

/// The delay to debounce backdated flushes.
fn debounce_delay(&self) -> Duration {
Duration::from_secs(self.debounce_delay)
Expand Down Expand Up @@ -510,6 +474,56 @@ impl fmt::Debug for CostTracker {
}
}

/// Returns the instant at which a bucket should be flushed.
///
/// Recent buckets are flushed after a grace period of `initial_delay`. Backdated buckets, that
/// is, buckets that lie in the past, are flushed after the shorter `debounce_delay`.
fn get_flush_time(
config: &AggregatorConfig,
reference_time: Instant,
bucket_key: &BucketKey,
) -> Instant {
let initial_flush = bucket_key.timestamp + config.bucket_interval() + config.initial_delay();

let now = UnixTimestamp::now();
let backdated = initial_flush <= now;

let delay = now.as_secs() as i64 - bucket_key.timestamp.as_secs() as i64;
relay_statsd::metric!(
histogram(MetricHistograms::BucketsDelay) = delay as f64,
backdated = if backdated { "true" } else { "false" },
);

let flush_timestamp = if backdated {
// If the initial flush time has passed or can't be represented, we want to treat the
// flush of the bucket as if it came in with the timestamp of current bucket based on
// the now timestamp.
//
// The rationale behind this is that we want to flush this bucket in the earliest slot
// together with buckets that have similar characteristics (e.g., same partition,
// project...).
let floored_timestamp = (now.as_secs() / config.bucket_interval) * config.bucket_interval;
UnixTimestamp::from_secs(floored_timestamp)
+ config.bucket_interval()
+ config.debounce_delay()
} else {
// If the initial flush is still pending, use that.
initial_flush
};

let instant = if flush_timestamp > now {
Instant::now().checked_add(flush_timestamp - now)
} else {
Instant::now().checked_sub(now - flush_timestamp)
}
.unwrap_or_else(Instant::now);

// Since `Instant` doesn't allow to get directly how many seconds elapsed, we leverage the
// diffing to get a duration and round it to the smallest second to get consistent times.
let instant = reference_time + Duration::from_secs((instant - reference_time).as_secs());
instant + config.flush_time_shift(bucket_key)
}

/// A collector of [`Bucket`] submissions.
///
/// # Aggregation
Expand All @@ -535,6 +549,7 @@ pub struct Aggregator {
config: AggregatorConfig,
buckets: HashMap<BucketKey, QueuedBucket>,
cost_tracker: CostTracker,
reference_time: Instant,
}

impl Aggregator {
Expand All @@ -550,6 +565,7 @@ impl Aggregator {
config,
buckets: HashMap::new(),
cost_tracker: CostTracker::default(),
reference_time: Instant::now(),
}
}

Expand Down Expand Up @@ -767,7 +783,7 @@ impl Aggregator {
namespace = entry.key().namespace().as_str(),
);

let flush_at = self.config.get_flush_time(entry.key());
let flush_at = get_flush_time(&self.config, self.reference_time, entry.key());
let value = bucket.value;
added_cost = entry.key().cost() + value.cost();
entry.insert(QueuedBucket::new(flush_at, value, bucket.metadata));
Expand Down Expand Up @@ -1571,4 +1587,42 @@ mod tests {
]
"###);
}

#[test]
fn test_get_flush_time_with_backdated_bucket() {
let mut config = test_config();
config.bucket_interval = 3600;
config.initial_delay = 1300;
config.debounce_delay = 1300;
config.flush_partitions = Some(10);
config.flush_batching = FlushBatching::Partition;

let reference_time = Instant::now();

let now_s =
(UnixTimestamp::now().as_secs() / config.bucket_interval) * config.bucket_interval;

// First bucket has a timestamp two hours ago.
let timestamp = UnixTimestamp::from_secs(now_s - 7200);
let bucket_key_1 = BucketKey {
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
timestamp,
metric_name: "c:transactions/foo".into(),
tags: BTreeMap::new(),
};

// Second bucket has a timestamp in this hour.
let timestamp = UnixTimestamp::from_secs(now_s);
let bucket_key_2 = BucketKey {
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
timestamp,
metric_name: "c:transactions/foo".into(),
tags: BTreeMap::new(),
};

let flush_time_1 = get_flush_time(&config, reference_time, &bucket_key_1);
let flush_time_2 = get_flush_time(&config, reference_time, &bucket_key_2);

assert_eq!(flush_time_1, flush_time_2);
}
}

0 comments on commit c2b7f38

Please sign in to comment.