diff --git a/relay-server/src/services/project.rs b/relay-server/src/services/project.rs index afe471d9a0..778eca9f89 100644 --- a/relay-server/src/services/project.rs +++ b/relay-server/src/services/project.rs @@ -310,7 +310,11 @@ impl Project { "project {} state requested {attempts} times", self.project_key ); - project_cache.send(RequestUpdate::new(self.project_key, no_cache)); + project_cache.send(RequestUpdate { + project_key: self.project_key, + no_cache, + cached_state: self.state.clone(), + }); } channel @@ -473,7 +477,11 @@ impl Project { self.project_key ); - project_cache.send(RequestUpdate::new(self.project_key, no_cache)); + project_cache.send(RequestUpdate { + project_key: self.project_key, + no_cache, + cached_state: self.state.clone(), + }); return old_state; } diff --git a/relay-server/src/services/project/state.rs b/relay-server/src/services/project/state.rs index 6c770b2d5b..dad7f5b4b9 100644 --- a/relay-server/src/services/project/state.rs +++ b/relay-server/src/services/project/state.rs @@ -53,6 +53,17 @@ impl ProjectState { } } + /// Returns the revision of the contained project info. + /// + /// `None` if the revision is missing or not available. + pub fn revision(&self) -> Option<&str> { + match &self { + ProjectState::Enabled(info) => info.rev.as_deref(), + ProjectState::Disabled => None, + ProjectState::Pending => None, + } + } + /// Creates `Scoping` for this project if the state is loaded. /// /// Returns `Some` if the project state has been fetched and contains a project identifier, diff --git a/relay-server/src/services/project/state/fetch_state.rs b/relay-server/src/services/project/state/fetch_state.rs index a9cbbdf31a..abc4a9f79e 100644 --- a/relay-server/src/services/project/state/fetch_state.rs +++ b/relay-server/src/services/project/state/fetch_state.rs @@ -25,6 +25,14 @@ impl ProjectFetchState { } } + /// Refreshes the expiry of the fetch state. + pub fn refresh(old: ProjectFetchState) -> Self { + Self { + last_fetch: Some(Instant::now()), + state: old.state, + } + } + /// Project state for an unknown but allowed project. /// /// This state is used for forwarding in Proxy mode. @@ -32,6 +40,7 @@ impl ProjectFetchState { Self::enabled(ProjectInfo { project_id: None, last_change: None, + rev: None, public_keys: Default::default(), slug: None, config: ProjectConfig::default(), @@ -119,6 +128,13 @@ impl ProjectFetchState { Expiry::Updated } } + + /// Returns the revision of the contained project state. + /// + /// See: [`ProjectState::revision`]. + pub fn revision(&self) -> Option<&str> { + self.state.revision() + } } /// Wrapper for a project state, with expiry information. diff --git a/relay-server/src/services/project/state/info.rs b/relay-server/src/services/project/state/info.rs index 08e62b62fe..75216a2af5 100644 --- a/relay-server/src/services/project/state/info.rs +++ b/relay-server/src/services/project/state/info.rs @@ -30,8 +30,9 @@ pub struct ProjectInfo { /// /// This might be `None` in some rare cases like where states /// are faked locally. - #[serde(default)] pub last_change: Option>, + /// The revision id of the project config. + pub rev: Option, /// Indicates that the project is disabled. /// A container of known public keys in the project. /// @@ -56,6 +57,7 @@ pub struct ProjectInfo { pub struct LimitedProjectInfo { pub project_id: Option, pub last_change: Option>, + pub rev: Option, pub public_keys: SmallVec<[PublicKeyConfig; 1]>, pub slug: Option, #[serde(with = "LimitedProjectConfig")] diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 774efbb445..55aef03be8 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -53,19 +53,15 @@ use crate::utils::{GarbageDisposal, ManagedEnvelope, MemoryChecker, RetryBackoff #[derive(Clone, Debug)] pub struct RequestUpdate { /// The public key to fetch the project by. - project_key: ProjectKey, - + pub project_key: ProjectKey, /// If true, all caches should be skipped and a fresh state should be computed. - no_cache: bool, -} - -impl RequestUpdate { - pub fn new(project_key: ProjectKey, no_cache: bool) -> Self { - Self { - project_key, - no_cache, - } - } + pub no_cache: bool, + /// Previously cached fetch state, if available. + /// + /// The upstream request will include the revision of the currently cached state, + /// if the upstream does not have a different revision, this cached + /// state is re-used and its expiry bumped. + pub cached_state: ProjectFetchState, } /// Returns the project state. @@ -459,7 +455,12 @@ impl ProjectSource { } } - async fn fetch(self, project_key: ProjectKey, no_cache: bool) -> Result { + async fn fetch( + self, + project_key: ProjectKey, + no_cache: bool, + _cached_state: ProjectFetchState, + ) -> Result { let state_opt = self .local_source .send(FetchOptionalProjectState { project_key }) @@ -479,27 +480,34 @@ impl ProjectSource { #[cfg(feature = "processing")] if let Some(redis_source) = self.redis_source { + let revision = _cached_state.revision().map(String::from); + let redis_permit = self.redis_semaphore.acquire().await.map_err(|_| ())?; - let state_fetch_result = - tokio::task::spawn_blocking(move || redis_source.get_config(project_key)) - .await - .map_err(|_| ())?; + let state_fetch_result = tokio::task::spawn_blocking(move || { + redis_source.get_config_if_changed(project_key, revision.as_deref()) + }) + .await + .map_err(|_| ())?; drop(redis_permit); - let state = match state_fetch_result { - Ok(state) => state.sanitized(), + match state_fetch_result { + // New state fetched from Redis, possibly pending. + Ok(Some(state)) => { + let state = state.sanitized(); + if !state.is_pending() { + return Ok(ProjectFetchState::new(state)); + } + } + // Redis reported that we're holding an up-to-date version of the state already, + // refresh the state and return the old cached state again. + Ok(None) => return Ok(ProjectFetchState::refresh(_cached_state)), Err(error) => { relay_log::error!( error = &error as &dyn Error, "failed to fetch project from Redis", ); - ProjectState::Pending } }; - - if !matches!(state, ProjectState::Pending) { - return Ok(ProjectFetchState::new(state)); - } }; self.upstream_source @@ -734,6 +742,7 @@ impl ProjectCacheBroker { let RequestUpdate { project_key, no_cache, + cached_state, } = message; // Bump the update time of the project in our hashmap to evade eviction. @@ -750,14 +759,14 @@ impl ProjectCacheBroker { tokio::time::sleep_until(next_attempt).await; } let state = source - .fetch(project_key, no_cache) + .fetch(project_key, no_cache, cached_state) .await .unwrap_or_else(|()| ProjectFetchState::disabled()); let message = UpdateProjectState { project_key, - state, no_cache, + state, }; sender.send(message).ok(); diff --git a/relay-server/src/services/project_redis.rs b/relay-server/src/services/project_redis.rs index 275817d760..edee03a829 100644 --- a/relay-server/src/services/project_redis.rs +++ b/relay-server/src/services/project_redis.rs @@ -52,32 +52,85 @@ impl RedisProjectSource { RedisProjectSource { config, redis } } - pub fn get_config(&self, key: ProjectKey) -> Result { - let mut command = relay_redis::redis::cmd("GET"); - - let prefix = self.config.projectconfig_cache_prefix(); - command.arg(format!("{prefix}:{key}")); - - let raw_response_opt: Option> = command - .query(&mut self.redis.client()?.connection()?) - .map_err(RedisError::Redis)?; - - let response = match raw_response_opt { - Some(response) => { - metric!(counter(RelayCounters::ProjectStateRedis) += 1, hit = "true"); - let parsed = parse_redis_response(response.as_slice())?; - ProjectState::from(parsed) - } - None => { + /// Fetches a project config from Redis. + /// + /// Returns `None` if the the project config stored in Redis has the same `revision`. + /// Always returns a project state if the passed `revision` is `None`. + /// + /// The returned project state is [`ProjectState::Pending`] if the requested project config is not + /// stored in Redis. + pub fn get_config_if_changed( + &self, + key: ProjectKey, + revision: Option<&str>, + ) -> Result, RedisProjectError> { + let mut client = self.redis.client()?; + let mut connection = client.connection()?; + + // Only check for the revision if we were passed a revision. + if let Some(revision) = revision { + let current_revision: Option = relay_redis::redis::cmd("GET") + .arg(self.get_redis_rev_key(key)) + .query(&mut connection) + .map_err(RedisError::Redis)?; + + relay_log::trace!( + "Redis revision {current_revision:?}, requested revision {revision:?}" + ); + if current_revision.as_deref() == Some(revision) { metric!( counter(RelayCounters::ProjectStateRedis) += 1, - hit = "false" + hit = "revision", ); - ProjectState::Pending + return Ok(None); } + } + + let raw_response_opt: Option> = relay_redis::redis::cmd("GET") + .arg(self.get_redis_project_config_key(key)) + .query(&mut connection) + .map_err(RedisError::Redis)?; + + let Some(response) = raw_response_opt else { + metric!( + counter(RelayCounters::ProjectStateRedis) += 1, + hit = "false" + ); + return Ok(Some(ProjectState::Pending)); }; - Ok(response) + let response = ProjectState::from(parse_redis_response(response.as_slice())?); + + // If we were passed a revision, check if we just loaded the same revision from Redis. + // + // We always want to keep the old revision alive if possible, since the already loaded + // version has already initialized caches. + // + // While this is theoretically possible this should always been handled using the above revision + // check using the additional Redis key. + if revision.is_some() && response.revision() == revision { + metric!( + counter(RelayCounters::ProjectStateRedis) += 1, + hit = "project_config_revision" + ); + Ok(None) + } else { + metric!( + counter(RelayCounters::ProjectStateRedis) += 1, + hit = "project_config" + ); + Ok(Some(response)) + } + } + + fn get_redis_project_config_key(&self, key: ProjectKey) -> String { + let prefix = self.config.projectconfig_cache_prefix(); + format!("{prefix}:{key}") + } + + fn get_redis_rev_key(&self, key: ProjectKey) -> String { + let prefix = self.config.projectconfig_cache_prefix(); + format!("{prefix}:{key}.rev") } } diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 489d01d84d..1113d528fc 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -647,8 +647,13 @@ pub enum RelayCounters { ProjectStateNoCache, /// Number of times a project state is requested from the central Redis cache. /// - /// This has a tag `hit` with values `true` or `false`. If false the request will be - /// sent to the sentry endpoint. + /// This metric is tagged with: + /// - `hit`: One of: + /// - `revision`: the cached version was validated to be up to date using its revision. + /// - `project_config`: the request was handled by the cache. + /// - `project_config_revision`: the request was handled by the cache and the revision did + /// not change. + /// - `false`: the request will be sent to the sentry endpoint. #[cfg(feature = "processing")] ProjectStateRedis, /// Number of times a project is looked up from the cache. diff --git a/tests/integration/test_query.py b/tests/integration/test_query.py index d9bcc14556..417b63c759 100644 --- a/tests/integration/test_query.py +++ b/tests/integration/test_query.py @@ -256,5 +256,41 @@ def test_processing_redis_query_compressed( relay.send_event(project_id) - event, v = events_consumer.get_event() + event, _ = events_consumer.get_event() + assert event["logentry"] == {"formatted": "Hello, World!"} + + +def test_processing_redis_query_with_revision( + mini_sentry, + redis_client, + relay_with_processing, + events_consumer, + outcomes_consumer, +): + outcomes_consumer = outcomes_consumer() + events_consumer = events_consumer() + + relay = relay_with_processing( + {"limits": {"query_timeout": 10}, "cache": {"project_expiry": 1}} + ) + project_id = 42 + cfg = mini_sentry.add_full_project_config(project_id) + cfg["rev"] = "123" + + key = mini_sentry.get_dsn_public_key(project_id) + projectconfig_cache_prefix = relay.options["processing"][ + "projectconfig_cache_prefix" + ] + redis_client.setex(f"{projectconfig_cache_prefix}:{key}", 3600, json.dumps(cfg)) + redis_client.setex(f"{projectconfig_cache_prefix}:{key}.rev", 3600, cfg["rev"]) + + relay.send_event(project_id) + event, _ = events_consumer.get_event() + assert event["logentry"] == {"formatted": "Hello, World!"} + + # 1 second timeout on the project cache, make sure it times out + time.sleep(2) + + relay.send_event(project_id) + event, _ = events_consumer.get_event() assert event["logentry"] == {"formatted": "Hello, World!"}