From e39fd9490ef8fa44b8961c55b92d91961a68aca9 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Mon, 30 Sep 2024 16:49:07 +0200 Subject: [PATCH] add a test --- Cargo.lock | 1 + Cargo.toml | 1 + relay-server/Cargo.toml | 3 +- relay-server/src/services/project_upstream.rs | 81 +++++++++++++++++++ 4 files changed, 85 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 68ac11118b..ea00552775 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3753,6 +3753,7 @@ dependencies = [ "fnv", "futures", "hashbrown 0.14.5", + "http", "hyper-util", "insta", "itertools 0.13.0", diff --git a/Cargo.toml b/Cargo.toml index f582dbfb1f..fdcdf002ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,6 +98,7 @@ hex = "0.4.3" hmac = "0.12.1" hostname = "0.4.0" human-size = "0.4.1" +http = "1.1.0" hyper-util = { version = "0.1.7", features = ["tokio"] } indexmap = "2.2.5" insta = { version = "1.31.0", features = ["json", "redactions", "ron"] } diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index f079baea06..2466d577f3 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -127,12 +127,13 @@ semver = { workspace = true } [dev-dependencies] criterion = { workspace = true } -tokio = { workspace = true, features = ['test-util'] } +http = { workspace = true } insta = { workspace = true } relay-protocol = { workspace = true, features = ["test"] } relay-test = { workspace = true } similar-asserts = { workspace = true } tempfile = { workspace = true } +tokio = { workspace = true, features = ['test-util'] } [[bench]] name = "benches" diff --git a/relay-server/src/services/project_upstream.rs b/relay-server/src/services/project_upstream.rs index 0d83861ef9..0b54cc17e6 100644 --- a/relay-server/src/services/project_upstream.rs +++ b/relay-server/src/services/project_upstream.rs @@ -658,3 +658,84 @@ impl Service for UpstreamProjectSourceService { }); } } + +#[cfg(test)] +mod tests { + use crate::http::Response; + use futures::future::poll_immediate; + + use super::*; + + fn to_response(body: &impl serde::Serialize) -> Response { + let body = serde_json::to_vec(body).unwrap(); + let response = http::response::Response::builder() + .status(http::StatusCode::OK) + .header(http::header::CONTENT_LENGTH, body.len()) + .body(body) + .unwrap(); + + Response(response.into()) + } + + #[tokio::test] + async fn test_schedule_merge_channels() { + let (upstream_addr, mut upstream_rx) = Addr::custom(); + let config = Arc::new(Config::from_json_value(serde_json::json!({})).unwrap()); + let project_key = ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap(); + + macro_rules! next_send_request { + () => {{ + let UpstreamRelay::SendRequest(mut req) = upstream_rx.recv().await.unwrap() else { + panic!() + }; + req.configure(&config); + req + }}; + } + + let service = UpstreamProjectSourceService::new(Arc::clone(&config), upstream_addr).start(); + + let mut response1 = service.send(FetchProjectState { + project_key, + current_revision: Some("123".to_owned()), + no_cache: false, + }); + + // Wait for the upstream request to make sure we're in the pending state. + let request1 = next_send_request!(); + + // Add another request for the same project, which should be combined into a single + // request, after responding to the first inflight request. + let mut response2 = service.send(FetchProjectState { + project_key, + current_revision: None, + no_cache: false, + }); + + // Return pending to the service. + // Now the two requests should be combined. + request1 + .respond(Ok(to_response(&serde_json::json!({ + "pending": [project_key], + })))) + .await; + + // Make sure there is no response yet. + assert!(poll_immediate(&mut response1).await.is_none()); + assert!(poll_immediate(&mut response2).await.is_none()); + + // Send a response to the second request which should successfully resolve both responses. + next_send_request!() + .respond(Ok(to_response(&serde_json::json!({ + "unchanged": [project_key], + })))) + .await; + + let (response1, response2) = futures::future::join(response1, response2).await; + assert!(matches!(response1, Ok(UpstreamProjectState::NotModified))); + assert!(matches!(response2, Ok(UpstreamProjectState::NotModified))); + + // No more messages to upstream expected. + assert!(upstream_rx.try_recv().is_err()); + } +}