From 382d427d145f022bc6cca0d200707001d85c7c09 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Fri, 2 Aug 2024 11:15:39 +0200 Subject: [PATCH] ref(project-cache): Check revision before loading from Redis --- relay-server/src/services/project.rs | 12 +++- .../src/services/project/state/fetch_state.rs | 18 ++++++ .../src/services/project/state/info.rs | 4 +- relay-server/src/services/project_cache.rs | 61 +++++++++++-------- relay-server/src/services/project_redis.rs | 50 ++++++++++++--- relay-server/src/statsd.rs | 7 ++- 6 files changed, 112 insertions(+), 40 deletions(-) diff --git a/relay-server/src/services/project.rs b/relay-server/src/services/project.rs index afe471d9a00..778eca9f897 100644 --- a/relay-server/src/services/project.rs +++ b/relay-server/src/services/project.rs @@ -310,7 +310,11 @@ impl Project { "project {} state requested {attempts} times", self.project_key ); - project_cache.send(RequestUpdate::new(self.project_key, no_cache)); + project_cache.send(RequestUpdate { + project_key: self.project_key, + no_cache, + cached_state: self.state.clone(), + }); } channel @@ -473,7 +477,11 @@ impl Project { self.project_key ); - project_cache.send(RequestUpdate::new(self.project_key, no_cache)); + project_cache.send(RequestUpdate { + project_key: self.project_key, + no_cache, + cached_state: self.state.clone(), + }); return old_state; } diff --git a/relay-server/src/services/project/state/fetch_state.rs b/relay-server/src/services/project/state/fetch_state.rs index a9cbbdf31ae..67ff3fb21ef 100644 --- a/relay-server/src/services/project/state/fetch_state.rs +++ b/relay-server/src/services/project/state/fetch_state.rs @@ -25,6 +25,14 @@ impl ProjectFetchState { } } + /// Refreshes the expiry of the fetch state. + pub fn refresh(old: ProjectFetchState) -> Self { + Self { + last_fetch: Some(Instant::now()), + state: old.state, + } + } + /// Project state for an unknown but allowed project. /// /// This state is used for forwarding in Proxy mode. @@ -32,6 +40,7 @@ impl ProjectFetchState { Self::enabled(ProjectInfo { project_id: None, last_change: None, + rev: None, public_keys: Default::default(), slug: None, config: ProjectConfig::default(), @@ -119,6 +128,15 @@ impl ProjectFetchState { Expiry::Updated } } + + /// Returns the revision of the contained project state. + pub fn revision(&self) -> Option<&str> { + match &self.state { + ProjectState::Enabled(info) => info.rev.as_deref(), + ProjectState::Disabled => None, // TODO: disabled can also have a revision + ProjectState::Pending => None, + } + } } /// Wrapper for a project state, with expiry information. diff --git a/relay-server/src/services/project/state/info.rs b/relay-server/src/services/project/state/info.rs index 08e62b62feb..75216a2af5c 100644 --- a/relay-server/src/services/project/state/info.rs +++ b/relay-server/src/services/project/state/info.rs @@ -30,8 +30,9 @@ pub struct ProjectInfo { /// /// This might be `None` in some rare cases like where states /// are faked locally. - #[serde(default)] pub last_change: Option>, + /// The revision id of the project config. + pub rev: Option, /// Indicates that the project is disabled. /// A container of known public keys in the project. /// @@ -56,6 +57,7 @@ pub struct ProjectInfo { pub struct LimitedProjectInfo { pub project_id: Option, pub last_change: Option>, + pub rev: Option, pub public_keys: SmallVec<[PublicKeyConfig; 1]>, pub slug: Option, #[serde(with = "LimitedProjectConfig")] diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 774efbb4454..9e689761523 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -53,19 +53,15 @@ use crate::utils::{GarbageDisposal, ManagedEnvelope, MemoryChecker, RetryBackoff #[derive(Clone, Debug)] pub struct RequestUpdate { /// The public key to fetch the project by. - project_key: ProjectKey, - + pub project_key: ProjectKey, /// If true, all caches should be skipped and a fresh state should be computed. - no_cache: bool, -} - -impl RequestUpdate { - pub fn new(project_key: ProjectKey, no_cache: bool) -> Self { - Self { - project_key, - no_cache, - } - } + pub no_cache: bool, + /// Previously cached fetch state, if availabled. + /// + /// The upstream request will include the revision of the currently cached state, + /// if the upstream does not have a different revision, this cached + /// state is re-used and its expiry bumped. + pub cached_state: ProjectFetchState, } /// Returns the project state. @@ -459,7 +455,12 @@ impl ProjectSource { } } - async fn fetch(self, project_key: ProjectKey, no_cache: bool) -> Result { + async fn fetch( + self, + project_key: ProjectKey, + no_cache: bool, + cached_state: ProjectFetchState, + ) -> Result { let state_opt = self .local_source .send(FetchOptionalProjectState { project_key }) @@ -479,27 +480,34 @@ impl ProjectSource { #[cfg(feature = "processing")] if let Some(redis_source) = self.redis_source { + let revision = cached_state.revision().map(String::from); + let redis_permit = self.redis_semaphore.acquire().await.map_err(|_| ())?; - let state_fetch_result = - tokio::task::spawn_blocking(move || redis_source.get_config(project_key)) - .await - .map_err(|_| ())?; + let state_fetch_result = tokio::task::spawn_blocking(move || { + redis_source.get_config_if_changed(project_key, revision.as_deref()) + }) + .await + .map_err(|_| ())?; drop(redis_permit); - let state = match state_fetch_result { - Ok(state) => state.sanitized(), + match state_fetch_result { + // New state fetched from Redis, possibly pending. + Ok(Some(state)) => { + let state = state.sanitized(); + if !state.is_pending() { + return Ok(ProjectFetchState::new(state)); + } + } + // Redis reported that we're holding an up to date version of the state alread, + // refresh the state and return the old cached state again. + Ok(None) => return Ok(ProjectFetchState::refresh(cached_state)), Err(error) => { relay_log::error!( error = &error as &dyn Error, "failed to fetch project from Redis", ); - ProjectState::Pending } }; - - if !matches!(state, ProjectState::Pending) { - return Ok(ProjectFetchState::new(state)); - } }; self.upstream_source @@ -734,6 +742,7 @@ impl ProjectCacheBroker { let RequestUpdate { project_key, no_cache, + cached_state, } = message; // Bump the update time of the project in our hashmap to evade eviction. @@ -750,14 +759,14 @@ impl ProjectCacheBroker { tokio::time::sleep_until(next_attempt).await; } let state = source - .fetch(project_key, no_cache) + .fetch(project_key, no_cache, cached_state) .await .unwrap_or_else(|()| ProjectFetchState::disabled()); let message = UpdateProjectState { project_key, - state, no_cache, + state, }; sender.send(message).ok(); diff --git a/relay-server/src/services/project_redis.rs b/relay-server/src/services/project_redis.rs index 275817d7606..52eff1bbe39 100644 --- a/relay-server/src/services/project_redis.rs +++ b/relay-server/src/services/project_redis.rs @@ -52,19 +52,41 @@ impl RedisProjectSource { RedisProjectSource { config, redis } } - pub fn get_config(&self, key: ProjectKey) -> Result { - let mut command = relay_redis::redis::cmd("GET"); - - let prefix = self.config.projectconfig_cache_prefix(); - command.arg(format!("{prefix}:{key}")); + pub fn get_config_if_changed( + &self, + key: ProjectKey, + revision: Option<&str>, + ) -> Result, RedisProjectError> { + let mut client = self.redis.client()?; + let mut connection = client.connection()?; + + // Only check for the revision if we were passed a revision. + if let Some(revision) = revision { + let current_revision: Option = relay_redis::redis::cmd("GET") + .arg(self.get_redis_rev_key(key)) + .query(&mut connection) + .map_err(RedisError::Redis)?; + + if current_revision.as_deref() == Some(revision) { + metric!( + counter(RelayCounters::ProjectStateRedis) += 1, + hit = "revision", + ); + return Ok(None); + } + } - let raw_response_opt: Option> = command - .query(&mut self.redis.client()?.connection()?) + let raw_response_opt: Option> = relay_redis::redis::cmd("GET") + .arg(self.get_redis_project_config_key(key)) + .query(&mut connection) .map_err(RedisError::Redis)?; let response = match raw_response_opt { Some(response) => { - metric!(counter(RelayCounters::ProjectStateRedis) += 1, hit = "true"); + metric!( + counter(RelayCounters::ProjectStateRedis) += 1, + hit = "project_config" + ); let parsed = parse_redis_response(response.as_slice())?; ProjectState::from(parsed) } @@ -77,7 +99,17 @@ impl RedisProjectSource { } }; - Ok(response) + Ok(Some(response)) + } + + fn get_redis_project_config_key(&self, key: ProjectKey) -> String { + let prefix = self.config.projectconfig_cache_prefix(); + format!("{prefix}:{key}") + } + + fn get_redis_rev_key(&self, key: ProjectKey) -> String { + let prefix = self.config.projectconfig_cache_prefix(); + format!("{prefix}:{key}.rev") } } diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 489d01d84d6..c0b3db26e2e 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -647,8 +647,11 @@ pub enum RelayCounters { ProjectStateNoCache, /// Number of times a project state is requested from the central Redis cache. /// - /// This has a tag `hit` with values `true` or `false`. If false the request will be - /// sent to the sentry endpoint. + /// This metric is tagged with: + /// - `hit`: One of: + /// - `revision`: the in cached version was validated with the revision. + /// - `project_config`: the request was handled by the cache. + /// - `false`: the request will be sent to the sentry endpoint. #[cfg(feature = "processing")] ProjectStateRedis, /// Number of times a project is looked up from the cache.