From c7de1f1c57b779b3b02c093681dff5b0d3408c79 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Tue, 27 Aug 2024 15:34:41 +0200 Subject: [PATCH] review changes --- relay-server/src/endpoints/project_configs.rs | 13 ++++++++++- relay-server/src/services/project/state.rs | 2 +- relay-server/src/services/project_cache.rs | 4 ++-- relay-server/src/services/project_redis.rs | 4 ++-- relay-server/src/services/project_upstream.rs | 22 +++++++++---------- relay-ua/uap-core | 2 +- 6 files changed, 29 insertions(+), 18 deletions(-) diff --git a/relay-server/src/endpoints/project_configs.rs b/relay-server/src/endpoints/project_configs.rs index 0e9ebe677d..81711b7f45 100644 --- a/relay-server/src/endpoints/project_configs.rs +++ b/relay-server/src/endpoints/project_configs.rs @@ -114,7 +114,18 @@ fn into_valid_keys( public_keys: Vec>, revisions: Option>>>, ) -> impl Iterator)> { - let revisions = revisions.and_then(|e| e.ok()).unwrap_or_default(); + let mut revisions = revisions.and_then(|e| e.ok()).unwrap_or_default(); + if !revisions.is_empty() && revisions.len() != public_keys.len() { + // The downstream sent us a different amount of revisions than project keys, + // this indicates an error in the downstream code. Just to be safe, discard + // all revisions and carry on as if the downstream never sent any revisions. + relay_log::warn!( + "downstream sent {} project keys but {} revisions, discarding all revisions", + public_keys.len(), + revisions.len() + ); + revisions.clear(); + } let revisions = revisions.into_iter().chain(std::iter::repeat(None)); std::iter::zip(public_keys, revisions).filter_map(|(public_key, revision)| { diff --git a/relay-server/src/services/project/state.rs b/relay-server/src/services/project/state.rs index 4f49a79229..70616d4a6e 100644 --- a/relay-server/src/services/project/state.rs +++ b/relay-server/src/services/project/state.rs @@ -122,5 +122,5 @@ pub enum UpstreamProjectState { /// The upstream sent a [`ProjectState`]. New(ProjectState), /// The upstream indicated that there is no newer version of the state available. - Unchanged, + NotModified, } diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 54743933e5..c7d7331445 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -502,7 +502,7 @@ impl ProjectSource { } // Redis reported that we're holding an up-to-date version of the state already, // refresh the state and return the old cached state again. - Ok(UpstreamProjectState::Unchanged) => { + Ok(UpstreamProjectState::NotModified) => { return Ok(ProjectFetchState::refresh(cached_state)) } Err(error) => { @@ -526,7 +526,7 @@ impl ProjectSource { match state { UpstreamProjectState::New(state) => Ok(ProjectFetchState::new(state.sanitized())), - UpstreamProjectState::Unchanged => Ok(ProjectFetchState::refresh(cached_state)), + UpstreamProjectState::NotModified => Ok(ProjectFetchState::refresh(cached_state)), } } } diff --git a/relay-server/src/services/project_redis.rs b/relay-server/src/services/project_redis.rs index 4f75254de2..18e8dd0e53 100644 --- a/relay-server/src/services/project_redis.rs +++ b/relay-server/src/services/project_redis.rs @@ -80,7 +80,7 @@ impl RedisProjectSource { counter(RelayCounters::ProjectStateRedis) += 1, hit = "revision", ); - return Ok(UpstreamProjectState::Unchanged); + return Ok(UpstreamProjectState::NotModified); } } @@ -111,7 +111,7 @@ impl RedisProjectSource { counter(RelayCounters::ProjectStateRedis) += 1, hit = "project_config_revision" ); - Ok(UpstreamProjectState::Unchanged) + Ok(UpstreamProjectState::NotModified) } else { metric!( counter(RelayCounters::ProjectStateRedis) += 1, diff --git a/relay-server/src/services/project_upstream.rs b/relay-server/src/services/project_upstream.rs index 157e4cda88..4d7d5338ed 100644 --- a/relay-server/src/services/project_upstream.rs +++ b/relay-server/src/services/project_upstream.rs @@ -132,18 +132,19 @@ impl ProjectStateChannel { self.no_cache = true; } - pub fn attach(&mut self, sender: BroadcastSender) { - self.channel.attach(sender) - } - - /// Updates the revision in the current channel. + /// Attaches a new sender to the same channel. /// - /// If the new revision is equal to the contained revision this is a no-op. + /// Also makes sure the new sender's revision matches the already requested revision. /// If the new revision is different from the contained revision this clears the revision. /// To not have multiple fetches per revision per batch, we need to find a common denominator /// for requests with different revisions, which is always to fetch the full project config. - pub fn sync_revision(&mut self, new_revision: Option) { - if self.revision != new_revision { + pub fn attach( + &mut self, + sender: BroadcastSender, + revision: Option, + ) { + self.channel.attach(sender); + if self.revision != revision { self.revision = None; } } @@ -418,7 +419,7 @@ impl UpstreamProjectSourceService { let mut result = "ok"; let state = if response.unchanged.contains(&key) { result = "ok_unchanged"; - UpstreamProjectState::Unchanged + UpstreamProjectState::NotModified } else { let state = response .configs @@ -571,8 +572,7 @@ impl UpstreamProjectSourceService { } Entry::Occupied(mut entry) => { let channel = entry.get_mut(); - channel.attach(sender); - channel.sync_revision(current_revision); + channel.attach(sender, current_revision); // Ensure upstream skips caches if one of the recipients requests an uncached response. This // operation is additive across requests. if no_cache { diff --git a/relay-ua/uap-core b/relay-ua/uap-core index bbd43aed9a..d668d6c615 160000 --- a/relay-ua/uap-core +++ b/relay-ua/uap-core @@ -1 +1 @@ -Subproject commit bbd43aed9a623486191a33c3af9e463e89c85f7a +Subproject commit d668d6c6157db7737edfc0280adc6610c1b88029