Skip to content

Commit

Permalink
fix(server): Check rate limits with namespace in fast-path (#3447)
Browse files Browse the repository at this point in the history
Checks rate limits for metrics with their namespace in the fast-path.
Previously, the namespace was not passed to the check function, which
caused all namespace-specific rate limits to be skipped during the
check.

This also simplifies `check_buckets` and removes an indirection to an
inner method. The signature of the method could no longer be fulfilled,
as now there can be partial rate limiting inside of the function.
Originally, this function was introduced to test that no error log is
emitted for one of the failure branches. However, the test did not
actually assert that and so we remove this test and simplify instead.
  • Loading branch information
jan-auer authored Apr 18, 2024
1 parent 554a10c commit 680cf14
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 85 deletions.
5 changes: 5 additions & 0 deletions relay-quotas/src/quota.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,12 @@ impl Quota {
/// There are a few conditions at which quotas are invalid:
/// - The quota only applies to `Unknown` data categories.
/// - The quota is counted (not limit `0`) but specifies categories with different units.
/// - The quota references an unsupported namespace.
pub fn is_valid(&self) -> bool {
if self.namespace == Some(MetricNamespace::Unsupported) {
return false;
}

let mut units = self.categories.iter().filter_map(CategoryUnit::from);

match units.next() {
Expand Down
136 changes: 51 additions & 85 deletions relay-server/src/services/project.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::BTreeSet;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -1137,102 +1138,79 @@ impl Project {
pub fn check_buckets(
&mut self,
outcome_aggregator: Addr<TrackOutcome>,
buckets: Vec<Bucket>,
mut buckets: Vec<Bucket>,
) -> Option<(Scoping, ProjectMetrics)> {
match self.check_buckets_inner(outcome_aggregator, buckets) {
CheckedBuckets::Ok(scoping, metrics) => return Some((scoping, metrics)),
CheckedBuckets::ProjectExpired(len) => {
relay_log::error!(
tags.project_key = self.project_key.as_str(),
"there is no project state: dropping {len} buckets"
);
}
CheckedBuckets::ProjectDisabled(len) => {
relay_log::debug!("dropping {len} buckets for disabled project");
}

CheckedBuckets::NoScoping(len) => {
relay_log::error!(
tags.project_key = self.project_key.as_str(),
"there is no scoping: dropping {len} buckets"
);
}
CheckedBuckets::RateLimited(len) => {
relay_log::debug!("dropping {len} buckets due to rate limit");
}
};

None
}

/// Unit-testable helper function for [`Self::check_buckets`].
///
/// Returns [`CheckedBuckets::Ok`] if metrics are currently allowed, or the reject reason otherwise.
fn check_buckets_inner(
&mut self,
outcome_aggregator: Addr<TrackOutcome>,
buckets: Vec<Bucket>,
) -> CheckedBuckets {
let len = buckets.len();

let Some(project_state) = self.valid_state() else {
return CheckedBuckets::ProjectExpired(len);
relay_log::error!(
tags.project_key = self.project_key.as_str(),
"there is no project state: dropping {} buckets",
buckets.len(),
);
return None;
};

if project_state.invalid() || project_state.disabled() {
return CheckedBuckets::ProjectDisabled(len);
relay_log::debug!("dropping {} buckets for disabled project", buckets.len());
return None;
}

let Some(scoping) = self.scoping() else {
return CheckedBuckets::NoScoping(len);
relay_log::error!(
tags.project_key = self.project_key.as_str(),
"there is no scoping: dropping {} buckets",
buckets.len(),
);
return None;
};

let limits = self.rate_limits().check_with_quotas(
project_state.get_quotas(),
scoping.item(DataCategory::MetricBucket),
);
let namespaces: BTreeSet<MetricNamespace> = buckets
.iter()
.filter_map(|bucket| bucket.name.try_namespace())
.collect();

if limits.is_limited() {
let mode = project_state.get_extraction_mode();
let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
utils::reject_metrics::<Vec<Bucket>>(
&outcome_aggregator,
utils::extract_metric_quantities(&buckets, mode),
scoping,
Outcome::RateLimited(reason_code),
None,
None,
let mode = project_state.get_extraction_mode();
for namespace in namespaces {
let limits = self.rate_limits().check_with_quotas(
project_state.get_quotas(),
scoping.item(DataCategory::MetricBucket),
);

return CheckedBuckets::RateLimited(len);
if limits.is_limited() {
// NB: Until Vec::extract_if is stable, we have to iterate twice.
let matching_buckets = buckets
.iter()
.filter(|bucket| bucket.name.try_namespace() == Some(namespace));

let quantities = utils::extract_metric_quantities(matching_buckets, mode);
relay_log::debug!("dropping {} buckets due to rate limit", quantities.buckets);

let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
utils::reject_metrics::<Vec<Bucket>>(
&outcome_aggregator,
quantities,
scoping,
Outcome::RateLimited(reason_code),
None,
None,
);

buckets.retain(|bucket| bucket.name.try_namespace() != Some(namespace));
}
}

if buckets.is_empty() {
return None;
}

let project_metrics = ProjectMetrics {
buckets,
project_state,
};

CheckedBuckets::Ok(scoping, project_metrics)
Some((scoping, project_metrics))
}
}

/// Result of a bucket check.
#[derive(Debug)]
enum CheckedBuckets {
/// Metrics are allowed. Contains the metrics plus scoping information.
Ok(Scoping, ProjectMetrics),
/// The project has expired while the bucket was in aggregation.
///
/// This should not happen as long as the aggregation time is shorter than the refresh time.
ProjectExpired(usize),
/// The project is disabled or invalid.
ProjectDisabled(usize),
/// The project has no project ID, even though it is not disabled. This should never happen.
NoScoping(usize),
/// The buckets were rate limited.
RateLimited(usize),
}

/// Removes tags based on user configured deny list.
fn remove_matching_bucket_tags(metric_config: &Metrics, bucket: &mut Bucket) {
for tag_block in &metric_config.denied_tags {
Expand Down Expand Up @@ -1309,18 +1287,6 @@ mod tests {
}
}

#[test]
fn no_error_on_disabled_project() {
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let mut project = Project::new(project_key, Arc::new(Config::default()));
let mut project_state = ProjectState::allowed();
project_state.disabled = true;
project.state = State::Cached(project_state.into());

let result = project.check_buckets_inner(Addr::custom().0, vec![]);
assert!(matches!(result, CheckedBuckets::ProjectDisabled(0)));
}

#[tokio::test]
async fn test_stale_cache() {
let (addr, _) = mock_service("project_cache", (), |&mut (), _| {});
Expand Down

0 comments on commit 680cf14

Please sign in to comment.