diff --git a/Cargo.lock b/Cargo.lock index 68ac11118b..ea00552775 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3753,6 +3753,7 @@ dependencies = [ "fnv", "futures", "hashbrown 0.14.5", + "http", "hyper-util", "insta", "itertools 0.13.0", diff --git a/Cargo.toml b/Cargo.toml index f582dbfb1f..fdcdf002ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,6 +98,7 @@ hex = "0.4.3" hmac = "0.12.1" hostname = "0.4.0" human-size = "0.4.1" +http = "1.1.0" hyper-util = { version = "0.1.7", features = ["tokio"] } indexmap = "2.2.5" insta = { version = "1.31.0", features = ["json", "redactions", "ron"] } diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index f079baea06..2466d577f3 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -127,12 +127,13 @@ semver = { workspace = true } [dev-dependencies] criterion = { workspace = true } -tokio = { workspace = true, features = ['test-util'] } +http = { workspace = true } insta = { workspace = true } relay-protocol = { workspace = true, features = ["test"] } relay-test = { workspace = true } similar-asserts = { workspace = true } tempfile = { workspace = true } +tokio = { workspace = true, features = ['test-util'] } [[bench]] name = "benches" diff --git a/relay-server/src/services/project_upstream.rs b/relay-server/src/services/project_upstream.rs index 4d7d5338ed..0b54cc17e6 100644 --- a/relay-server/src/services/project_upstream.rs +++ b/relay-server/src/services/project_upstream.rs @@ -98,7 +98,10 @@ impl UpstreamQuery for GetProjectStates { /// The wrapper struct for the incoming external requests which also keeps addition information. #[derive(Debug)] struct ProjectStateChannel { + // Main broadcast channel. channel: BroadcastChannel, + // Additional broadcast channels tracked from merge operations. + merged: Vec>, revision: Option, deadline: Instant, no_cache: bool, @@ -120,6 +123,7 @@ impl ProjectStateChannel { Self { no_cache, channel: sender.into_channel(), + merged: Vec::new(), revision, deadline: now + timeout, attempts: 0, @@ -150,12 +154,39 @@ impl ProjectStateChannel { } pub fn send(self, state: UpstreamProjectState) { + for channel in self.merged { + channel.send(state.clone()); + } self.channel.send(state) } pub fn expired(&self) -> bool { Instant::now() > self.deadline } + + pub fn merge(&mut self, channel: ProjectStateChannel) { + let ProjectStateChannel { + channel, + merged, + revision, + deadline, + no_cache, + attempts, + errors, + pending, + } = channel; + + self.merged.push(channel); + self.merged.extend(merged); + if self.revision != revision { + self.revision = None; + } + self.deadline = self.deadline.max(deadline); + self.no_cache |= no_cache; + self.attempts += attempts; + self.errors += errors; + self.pending += pending; + } } /// The map of project keys with their project state channels. @@ -304,6 +335,23 @@ impl UpstreamProjectSourceService { } } + /// Merges a [`ProjectStateChannel`] into the existing list of tracked channels. + /// + /// A channel is removed when querying the upstream for the project, + /// when the upstream returns pending for this project it needs to be returned to + /// the list of channels. If there is already another request for the same project + /// outstanding those two requests must be merged. + fn merge_channel(&mut self, key: ProjectKey, channel: ProjectStateChannel) { + match self.state_channels.entry(key) { + Entry::Vacant(e) => { + e.insert(channel); + } + Entry::Occupied(mut e) => { + e.get_mut().merge(channel); + } + } + } + /// Executes an upstream request to fetch project configs. /// /// This assumes that currently no request is running. If the upstream request fails or new @@ -412,7 +460,7 @@ impl UpstreamProjectSourceService { for (key, mut channel) in channels_batch { if response.pending.contains(&key) { channel.pending += 1; - self.state_channels.insert(key, channel); + self.merge_channel(key, channel); continue; } @@ -610,3 +658,84 @@ impl Service for UpstreamProjectSourceService { }); } } + +#[cfg(test)] +mod tests { + use crate::http::Response; + use futures::future::poll_immediate; + + use super::*; + + fn to_response(body: &impl serde::Serialize) -> Response { + let body = serde_json::to_vec(body).unwrap(); + let response = http::response::Response::builder() + .status(http::StatusCode::OK) + .header(http::header::CONTENT_LENGTH, body.len()) + .body(body) + .unwrap(); + + Response(response.into()) + } + + #[tokio::test] + async fn test_schedule_merge_channels() { + let (upstream_addr, mut upstream_rx) = Addr::custom(); + let config = Arc::new(Config::from_json_value(serde_json::json!({})).unwrap()); + let project_key = ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap(); + + macro_rules! next_send_request { + () => {{ + let UpstreamRelay::SendRequest(mut req) = upstream_rx.recv().await.unwrap() else { + panic!() + }; + req.configure(&config); + req + }}; + } + + let service = UpstreamProjectSourceService::new(Arc::clone(&config), upstream_addr).start(); + + let mut response1 = service.send(FetchProjectState { + project_key, + current_revision: Some("123".to_owned()), + no_cache: false, + }); + + // Wait for the upstream request to make sure we're in the pending state. + let request1 = next_send_request!(); + + // Add another request for the same project, which should be combined into a single + // request, after responding to the first inflight request. + let mut response2 = service.send(FetchProjectState { + project_key, + current_revision: None, + no_cache: false, + }); + + // Return pending to the service. + // Now the two requests should be combined. + request1 + .respond(Ok(to_response(&serde_json::json!({ + "pending": [project_key], + })))) + .await; + + // Make sure there is no response yet. + assert!(poll_immediate(&mut response1).await.is_none()); + assert!(poll_immediate(&mut response2).await.is_none()); + + // Send a response to the second request which should successfully resolve both responses. + next_send_request!() + .respond(Ok(to_response(&serde_json::json!({ + "unchanged": [project_key], + })))) + .await; + + let (response1, response2) = futures::future::join(response1, response2).await; + assert!(matches!(response1, Ok(UpstreamProjectState::NotModified))); + assert!(matches!(response2, Ok(UpstreamProjectState::NotModified))); + + // No more messages to upstream expected. + assert!(upstream_rx.try_recv().is_err()); + } +}