Skip to content

Commit

Permalink
ref(project-cache): Split project cache into shared and private parts (
Browse files Browse the repository at this point in the history
…#4147)

Makes the project cache concurrently accessible using a lockfree shared
map.

- Removes support for `no_cache` queries, they were not used for a long
time and simplify the design (no need to make project accesses
awaitable).
- Removes support for the v2 project cache endpoint, v3 has been out for
> 2 years.

The PR also removed all project cache handling from the old (now called
legacy) project cache and moved it to a newer, simplified project cache
which only does the book-keeping for the actual project cache. The old
Spool V1 implementation and some message handling remains in the legacy
project cache. The spool v1 will eventually be removed, the other
messages can now be replaced and no longer need to be messages and can
be removed in follow up PRs. Some messages likes `ProcessMetrics` have
already been removed.

The garbage disposal has been removed, with the new organization we
generate a lot less overall 'trash' which needs to be disposed as well
as it is no longer necessary to offload the disposal to another thread,
since the service itself has a lot less volume/work to do and service
latency is no longer a major concern for stability.

It is now possible to subscribe to project cache events, this is done
using a tokio broadcast channel, the channel is inherently size limited,
which is problematic, since currently subscribers cannot deal with lag
and cannot miss any messages. To prevent this (until we have a better
way), the channel is set to a maximum size which should theoretically be
impossible to reach. This is more described in code itself.

Functional changes:
- Projects are only expired through the eviction mechanism, they are no
longer expired on access, it's possible to configure Relay in a way
where expired projects are used if the eviction interval is too long. To
compensate for this, the config has been tuned. Ideally the eviction
interval is shorter than the grace period.
  • Loading branch information
Dav1dde authored Oct 30, 2024
1 parent bc70b9e commit 857c2b1
Show file tree
Hide file tree
Showing 37 changed files with 2,198 additions and 2,339 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ jobs:
- run: make test-integration
env:
PYTEST_N: 6
RELAY_VERSION_CHAIN: "20.6.0,latest"
RELAY_VERSION_CHAIN: "23.12.0,latest"

sentry-relay-integration-tests:
name: Sentry-Relay Integration Tests
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
**Breaking Changes**:

- Removes support for metric meta envelope items. ([#4152](https://github.com/getsentry/relay/pull/4152))
- Removes support for the project cache endpoint version 2 and before. ([#4147](https://github.com/getsentry/relay/pull/4147))

**Bug Fixes**

Expand Down
93 changes: 89 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ relay-test = { path = "relay-test" }
relay-protocol-derive = { path = "relay-protocol-derive" }
relay-event-derive = { path = "relay-event-derive" }

ahash = "0.8.11"
android_trace_log = { version = "0.3.0", features = ["serde"] }
anyhow = "1.0.66"
axum = "0.7.5"
Expand Down Expand Up @@ -119,6 +120,7 @@ num-traits = "0.2.18"
num_cpus = "1.13.0"
once_cell = "1.13.1"
opentelemetry-proto = { version = "0.7.0", default-features = false }
papaya = "0.1.4"
parking_lot = "0.12.1"
path-slash = "0.2.1"
pest = "2.1.3"
Expand Down
14 changes: 8 additions & 6 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,9 @@ struct Cache {
/// The cache timeout for project configurations in seconds.
project_expiry: u32,
/// Continue using project state this many seconds after cache expiry while a new state is
/// being fetched. This is added on top of `project_expiry` and `miss_expiry`. Default is 0.
/// being fetched. This is added on top of `project_expiry`.
///
/// Default is 2 minutes.
project_grace_period: u32,
/// The cache timeout for downstream relay info (public keys) in seconds.
relay_expiry: u32,
Expand Down Expand Up @@ -1053,17 +1055,17 @@ impl Default for Cache {
fn default() -> Self {
Cache {
project_request_full_config: false,
project_expiry: 300, // 5 minutes
project_grace_period: 0,
relay_expiry: 3600, // 1 hour
envelope_expiry: 600, // 10 minutes
project_expiry: 300, // 5 minutes
project_grace_period: 120, // 2 minutes
relay_expiry: 3600, // 1 hour
envelope_expiry: 600, // 10 minutes
envelope_buffer_size: 1000,
miss_expiry: 60, // 1 minute
batch_interval: 100, // 100ms
downstream_relays_batch_interval: 100, // 100ms
batch_size: 500,
file_interval: 10, // 10 seconds
eviction_interval: 60, // 60 seconds
eviction_interval: 15, // 15 seconds
global_config_fetch_interval: 10, // 10 seconds
}
}
Expand Down
8 changes: 8 additions & 0 deletions relay-dynamic-config/src/error_boundary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ impl<T> ErrorBoundary<T> {
}
}

/// Converts from `ErrorBoundary<T>` to `ErrorBoundary<&T>`.
pub fn as_ref(&self) -> ErrorBoundary<&T> {
match self {
Self::Ok(t) => ErrorBoundary::Ok(t),
Self::Err(e) => ErrorBoundary::Err(Arc::clone(e)),
}
}

/// Returns the contained [`Ok`] value or computes it from a closure.
#[inline]
pub fn unwrap_or_else<F>(self, op: F) -> T
Expand Down
40 changes: 23 additions & 17 deletions relay-quotas/src/rate_limit.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::fmt;
use std::str::FromStr;
use std::sync::{Arc, Mutex, PoisonError};
use std::time::{Duration, Instant};

use relay_base_schema::metrics::MetricNamespace;
Expand Down Expand Up @@ -279,15 +280,14 @@ impl RateLimits {
}

/// Removes expired rate limits from this instance.
pub fn clean_expired(&mut self) {
let now = Instant::now();
pub fn clean_expired(&mut self, now: Instant) {
self.limits
.retain(|limit| !limit.retry_after.expired_at(now));
}

/// Checks whether any rate limits apply to the given scoping.
///
/// If no limits match, then the returned `RateLimits` instance evalutes `is_ok`. Otherwise, it
/// If no limits match, then the returned `RateLimits` instance evaluates `is_ok`. Otherwise, it
/// contains rate limits that match the given scoping.
pub fn check(&self, scoping: ItemScoping<'_>) -> Self {
self.check_with_quotas(&[], scoping)
Expand All @@ -298,7 +298,7 @@ impl RateLimits {
/// This is similar to `check`. Additionally, it checks for quotas with a static limit `0`, and
/// rejects items even if there is no active rate limit in this instance.
///
/// If no limits or quotas match, then the returned `RateLimits` instance evalutes `is_ok`.
/// If no limits or quotas match, then the returned `RateLimits` instance evaluates `is_ok`.
/// Otherwise, it contains rate limits that match the given scoping.
pub fn check_with_quotas<'a>(
&self,
Expand Down Expand Up @@ -339,7 +339,7 @@ impl RateLimits {

/// Returns `true` if there are any limits contained.
///
/// This is equavalent to checking whether [`Self::longest`] returns `Some`
/// This is equivalent to checking whether [`Self::longest`] returns `Some`
/// or [`Self::iter`] returns an iterator with at least one item.
pub fn is_empty(&self) -> bool {
self.limits.is_empty()
Expand Down Expand Up @@ -402,7 +402,7 @@ impl<'a> IntoIterator for &'a RateLimits {
///
/// The data structure makes sure no expired rate limits are enforced.
#[derive(Debug, Default)]
pub struct CachedRateLimits(RateLimits);
pub struct CachedRateLimits(Mutex<Arc<RateLimits>>);

impl CachedRateLimits {
/// Creates a new, empty instance without any rate limits enforced.
Expand All @@ -413,25 +413,31 @@ impl CachedRateLimits {
/// Adds a limit to this collection.
///
/// See also: [`RateLimits::add`].
pub fn add(&mut self, limit: RateLimit) {
self.0.add(limit);
pub fn add(&self, limit: RateLimit) {
let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
let current = Arc::make_mut(&mut inner);
current.add(limit);
}

/// Merges more rate limits into this instance.
///
/// See also: [`RateLimits::merge`].
pub fn merge(&mut self, rate_limits: RateLimits) {
for limit in rate_limits {
self.add(limit)
pub fn merge(&self, limits: RateLimits) {
let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
let current = Arc::make_mut(&mut inner);
for limit in limits {
current.add(limit)
}
}

/// Returns a reference to the contained rate limits.
///
/// This call gurantuess that at the time of call no returned rate limit is expired.
pub fn current_limits(&mut self) -> &RateLimits {
self.0.clean_expired();
&self.0
/// This call guarantees that at the time of call no returned rate limit is expired.
pub fn current_limits(&self) -> Arc<RateLimits> {
let now = Instant::now();
let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
Arc::make_mut(&mut inner).clean_expired(now);
Arc::clone(&inner)
}
}

Expand Down Expand Up @@ -927,7 +933,7 @@ mod tests {
// Sanity check before running `clean_expired`
assert_eq!(rate_limits.iter().count(), 2);

rate_limits.clean_expired();
rate_limits.clean_expired(Instant::now());

// Check that the expired limit has been removed
insta::assert_ron_snapshot!(rate_limits, @r###"
Expand Down Expand Up @@ -1120,7 +1126,7 @@ mod tests {

#[test]
fn test_cached_rate_limits_expired() {
let mut cached = CachedRateLimits::new();
let cached = CachedRateLimits::new();

// Active error limit
cached.add(RateLimit {
Expand Down
Loading

0 comments on commit 857c2b1

Please sign in to comment.