diff --git a/relay-server/src/services/project_upstream.rs b/relay-server/src/services/project_upstream.rs index 3334e98c31e..e74eb0b6412 100644 --- a/relay-server/src/services/project_upstream.rs +++ b/relay-server/src/services/project_upstream.rs @@ -79,7 +79,10 @@ impl UpstreamQuery for GetProjectStates { /// The wrapper struct for the incoming external requests which also keeps addition information. #[derive(Debug)] struct ProjectStateChannel { + // Main broadcast channel. channel: BroadcastChannel, + // Additional broadcast channels tracked from merging operations. + other: Vec>, deadline: Instant, no_cache: bool, attempts: u64, @@ -99,6 +102,7 @@ impl ProjectStateChannel { Self { no_cache, channel: sender.into_channel(), + other: Vec::new(), deadline: now + timeout, attempts: 0, errors: 0, @@ -115,12 +119,35 @@ impl ProjectStateChannel { } pub fn send(self, state: ProjectFetchState) { + for channel in self.other { + channel.send(state.clone()); + } self.channel.send(state) } pub fn expired(&self) -> bool { Instant::now() > self.deadline } + + pub fn merge(&mut self, channel: ProjectStateChannel) { + let ProjectStateChannel { + channel, + other, + deadline, + no_cache, + attempts, + errors, + pending, + } = channel; + + self.other.push(channel); + self.other.extend(other); + self.deadline = self.deadline.max(deadline); + self.no_cache |= no_cache; + self.attempts += attempts; + self.errors += errors; + self.pending += pending; + } } /// The map of project keys with their project state channels. @@ -269,6 +296,23 @@ impl UpstreamProjectSourceService { } } + /// Merges a [`ProjectStateChannel`] into the existing list of tracked channels. + /// + /// A channel is removed when querying the upstream for the project, + /// when the upstream returns pending for this project it needs to be returned to + /// the list of channels. If there is already another request for the same project + /// outstanding those two requests must be merged. + fn merge_channel(&mut self, key: ProjectKey, channel: ProjectStateChannel) { + match self.state_channels.entry(key) { + Entry::Vacant(e) => { + e.insert(channel); + } + Entry::Occupied(mut e) => { + e.get_mut().merge(channel); + } + } + } + /// Executes an upstream request to fetch project configs. /// /// This assumes that currently no request is running. If the upstream request fails or new @@ -374,7 +418,7 @@ impl UpstreamProjectSourceService { let mut result = "ok"; if response.pending.contains(&key) { channel.pending += 1; - self.state_channels.insert(key, channel); + self.merge_channel(key, channel); continue; } let state = response