Skip to content

Commit

Permalink
fix(project-cache): Merge instead of replace project state channel in…
Browse files Browse the repository at this point in the history
… upstream (#3952)

Also described in code, while there is a request in flight we might get
another request for the same project, when the response for the project
is pending we replace the channel instead of merging it.
  • Loading branch information
Dav1dde authored Oct 1, 2024
1 parent 8c47ddd commit 1030d15
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ hex = "0.4.3"
hmac = "0.12.1"
hostname = "0.4.0"
human-size = "0.4.1"
http = "1.1.0"
hyper-util = { version = "0.1.7", features = ["tokio"] }
indexmap = "2.2.5"
insta = { version = "1.31.0", features = ["json", "redactions", "ron"] }
Expand Down
3 changes: 2 additions & 1 deletion relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,13 @@ semver = { workspace = true }

[dev-dependencies]
criterion = { workspace = true }
tokio = { workspace = true, features = ['test-util'] }
http = { workspace = true }
insta = { workspace = true }
relay-protocol = { workspace = true, features = ["test"] }
relay-test = { workspace = true }
similar-asserts = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ['test-util'] }

[[bench]]
name = "benches"
Expand Down
131 changes: 130 additions & 1 deletion relay-server/src/services/project_upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ impl UpstreamQuery for GetProjectStates {
/// The wrapper struct for the incoming external requests which also keeps addition information.
#[derive(Debug)]
struct ProjectStateChannel {
// Main broadcast channel.
channel: BroadcastChannel<UpstreamProjectState>,
// Additional broadcast channels tracked from merge operations.
merged: Vec<BroadcastChannel<UpstreamProjectState>>,
revision: Option<String>,
deadline: Instant,
no_cache: bool,
Expand All @@ -120,6 +123,7 @@ impl ProjectStateChannel {
Self {
no_cache,
channel: sender.into_channel(),
merged: Vec::new(),
revision,
deadline: now + timeout,
attempts: 0,
Expand Down Expand Up @@ -150,12 +154,39 @@ impl ProjectStateChannel {
}

pub fn send(self, state: UpstreamProjectState) {
for channel in self.merged {
channel.send(state.clone());
}
self.channel.send(state)
}

pub fn expired(&self) -> bool {
Instant::now() > self.deadline
}

pub fn merge(&mut self, channel: ProjectStateChannel) {
let ProjectStateChannel {
channel,
merged,
revision,
deadline,
no_cache,
attempts,
errors,
pending,
} = channel;

self.merged.push(channel);
self.merged.extend(merged);
if self.revision != revision {
self.revision = None;
}
self.deadline = self.deadline.max(deadline);
self.no_cache |= no_cache;
self.attempts += attempts;
self.errors += errors;
self.pending += pending;
}
}

/// The map of project keys with their project state channels.
Expand Down Expand Up @@ -304,6 +335,23 @@ impl UpstreamProjectSourceService {
}
}

/// Merges a [`ProjectStateChannel`] into the existing list of tracked channels.
///
/// A channel is removed when querying the upstream for the project,
/// when the upstream returns pending for this project it needs to be returned to
/// the list of channels. If there is already another request for the same project
/// outstanding those two requests must be merged.
fn merge_channel(&mut self, key: ProjectKey, channel: ProjectStateChannel) {
match self.state_channels.entry(key) {
Entry::Vacant(e) => {
e.insert(channel);
}
Entry::Occupied(mut e) => {
e.get_mut().merge(channel);
}
}
}

/// Executes an upstream request to fetch project configs.
///
/// This assumes that currently no request is running. If the upstream request fails or new
Expand Down Expand Up @@ -412,7 +460,7 @@ impl UpstreamProjectSourceService {
for (key, mut channel) in channels_batch {
if response.pending.contains(&key) {
channel.pending += 1;
self.state_channels.insert(key, channel);
self.merge_channel(key, channel);
continue;
}

Expand Down Expand Up @@ -610,3 +658,84 @@ impl Service for UpstreamProjectSourceService {
});
}
}

#[cfg(test)]
mod tests {
use crate::http::Response;
use futures::future::poll_immediate;

use super::*;

fn to_response(body: &impl serde::Serialize) -> Response {
let body = serde_json::to_vec(body).unwrap();
let response = http::response::Response::builder()
.status(http::StatusCode::OK)
.header(http::header::CONTENT_LENGTH, body.len())
.body(body)
.unwrap();

Response(response.into())
}

#[tokio::test]
async fn test_schedule_merge_channels() {
let (upstream_addr, mut upstream_rx) = Addr::custom();
let config = Arc::new(Config::from_json_value(serde_json::json!({})).unwrap());
let project_key = ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap();

macro_rules! next_send_request {
() => {{
let UpstreamRelay::SendRequest(mut req) = upstream_rx.recv().await.unwrap() else {
panic!()
};
req.configure(&config);
req
}};
}

let service = UpstreamProjectSourceService::new(Arc::clone(&config), upstream_addr).start();

let mut response1 = service.send(FetchProjectState {
project_key,
current_revision: Some("123".to_owned()),
no_cache: false,
});

// Wait for the upstream request to make sure we're in the pending state.
let request1 = next_send_request!();

// Add another request for the same project, which should be combined into a single
// request, after responding to the first inflight request.
let mut response2 = service.send(FetchProjectState {
project_key,
current_revision: None,
no_cache: false,
});

// Return pending to the service.
// Now the two requests should be combined.
request1
.respond(Ok(to_response(&serde_json::json!({
"pending": [project_key],
}))))
.await;

// Make sure there is no response yet.
assert!(poll_immediate(&mut response1).await.is_none());
assert!(poll_immediate(&mut response2).await.is_none());

// Send a response to the second request which should successfully resolve both responses.
next_send_request!()
.respond(Ok(to_response(&serde_json::json!({
"unchanged": [project_key],
}))))
.await;

let (response1, response2) = futures::future::join(response1, response2).await;
assert!(matches!(response1, Ok(UpstreamProjectState::NotModified)));
assert!(matches!(response2, Ok(UpstreamProjectState::NotModified)));

// No more messages to upstream expected.
assert!(upstream_rx.try_recv().is_err());
}
}

0 comments on commit 1030d15

Please sign in to comment.