Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(project-cache): Merge instead of replace project state channel in upstream #3952

Merged
merged 4 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ hex = "0.4.3"
hmac = "0.12.1"
hostname = "0.4.0"
human-size = "0.4.1"
http = "1.1.0"
hyper-util = { version = "0.1.7", features = ["tokio"] }
indexmap = "2.2.5"
insta = { version = "1.31.0", features = ["json", "redactions", "ron"] }
Expand Down
3 changes: 2 additions & 1 deletion relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,13 @@ semver = { workspace = true }

[dev-dependencies]
criterion = { workspace = true }
tokio = { workspace = true, features = ['test-util'] }
http = { workspace = true }
insta = { workspace = true }
relay-protocol = { workspace = true, features = ["test"] }
relay-test = { workspace = true }
similar-asserts = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ['test-util'] }

[[bench]]
name = "benches"
Expand Down
131 changes: 130 additions & 1 deletion relay-server/src/services/project_upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ impl UpstreamQuery for GetProjectStates {
/// The wrapper struct for the incoming external requests which also keeps addition information.
#[derive(Debug)]
struct ProjectStateChannel {
// Main broadcast channel.
channel: BroadcastChannel<UpstreamProjectState>,
// Additional broadcast channels tracked from merge operations.
merged: Vec<BroadcastChannel<UpstreamProjectState>>,
revision: Option<String>,
deadline: Instant,
no_cache: bool,
Expand All @@ -120,6 +123,7 @@ impl ProjectStateChannel {
Self {
no_cache,
channel: sender.into_channel(),
merged: Vec::new(),
revision,
deadline: now + timeout,
attempts: 0,
Expand Down Expand Up @@ -150,12 +154,39 @@ impl ProjectStateChannel {
}

pub fn send(self, state: UpstreamProjectState) {
for channel in self.merged {
channel.send(state.clone());
}
self.channel.send(state)
}

pub fn expired(&self) -> bool {
Instant::now() > self.deadline
}

pub fn merge(&mut self, channel: ProjectStateChannel) {
let ProjectStateChannel {
channel,
merged,
revision,
deadline,
no_cache,
attempts,
errors,
pending,
} = channel;

self.merged.push(channel);
self.merged.extend(merged);
if self.revision != revision {
self.revision = None;
}
self.deadline = self.deadline.max(deadline);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be the minimum? In theory the deadline could get bumped indefinitely here, which maybe isn't a bad thing?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think max is correct, matches the previous behavior more closely.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think min matches the previous behaviour more closely since we'd replace the newer entry with the older entry and the deadline is only computed once.

self.no_cache |= no_cache;
self.attempts += attempts;
self.errors += errors;
self.pending += pending;
}
}

/// The map of project keys with their project state channels.
Expand Down Expand Up @@ -304,6 +335,23 @@ impl UpstreamProjectSourceService {
}
}

/// Merges a [`ProjectStateChannel`] into the existing list of tracked channels.
///
/// A channel is removed when querying the upstream for the project,
/// when the upstream returns pending for this project it needs to be returned to
/// the list of channels. If there is already another request for the same project
/// outstanding those two requests must be merged.
fn merge_channel(&mut self, key: ProjectKey, channel: ProjectStateChannel) {
match self.state_channels.entry(key) {
Entry::Vacant(e) => {
e.insert(channel);
}
Entry::Occupied(mut e) => {
e.get_mut().merge(channel);
}
}
}

/// Executes an upstream request to fetch project configs.
///
/// This assumes that currently no request is running. If the upstream request fails or new
Expand Down Expand Up @@ -412,7 +460,7 @@ impl UpstreamProjectSourceService {
for (key, mut channel) in channels_batch {
if response.pending.contains(&key) {
channel.pending += 1;
self.state_channels.insert(key, channel);
self.merge_channel(key, channel);
continue;
}

Expand Down Expand Up @@ -610,3 +658,84 @@ impl Service for UpstreamProjectSourceService {
});
}
}

#[cfg(test)]
mod tests {
use crate::http::Response;
use futures::future::poll_immediate;

use super::*;

fn to_response(body: &impl serde::Serialize) -> Response {
let body = serde_json::to_vec(body).unwrap();
let response = http::response::Response::builder()
.status(http::StatusCode::OK)
.header(http::header::CONTENT_LENGTH, body.len())
.body(body)
.unwrap();

Response(response.into())
}

#[tokio::test]
async fn test_schedule_merge_channels() {
let (upstream_addr, mut upstream_rx) = Addr::custom();
let config = Arc::new(Config::from_json_value(serde_json::json!({})).unwrap());
let project_key = ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap();

macro_rules! next_send_request {
() => {{
let UpstreamRelay::SendRequest(mut req) = upstream_rx.recv().await.unwrap() else {
panic!()
};
req.configure(&config);
req
}};
}

let service = UpstreamProjectSourceService::new(Arc::clone(&config), upstream_addr).start();

let mut response1 = service.send(FetchProjectState {
project_key,
current_revision: Some("123".to_owned()),
no_cache: false,
});

// Wait for the upstream request to make sure we're in the pending state.
let request1 = next_send_request!();

// Add another request for the same project, which should be combined into a single
// request, after responding to the first inflight request.
let mut response2 = service.send(FetchProjectState {
project_key,
current_revision: None,
no_cache: false,
});

// Return pending to the service.
// Now the two requests should be combined.
request1
.respond(Ok(to_response(&serde_json::json!({
"pending": [project_key],
}))))
.await;

// Make sure there is no response yet.
assert!(poll_immediate(&mut response1).await.is_none());
assert!(poll_immediate(&mut response2).await.is_none());

// Send a response to the second request which should successfully resolve both responses.
next_send_request!()
.respond(Ok(to_response(&serde_json::json!({
"unchanged": [project_key],
}))))
.await;

let (response1, response2) = futures::future::join(response1, response2).await;
assert!(matches!(response1, Ok(UpstreamProjectState::NotModified)));
assert!(matches!(response2, Ok(UpstreamProjectState::NotModified)));

// No more messages to upstream expected.
assert!(upstream_rx.try_recv().is_err());
}
}
Loading