Skip to content

Commit

Permalink
fix(project-cache): Merge instead of replace project state channel in…
Browse files Browse the repository at this point in the history
… upstream
  • Loading branch information
Dav1dde committed Aug 27, 2024
1 parent 7da488e commit baa1864
Showing 1 changed file with 45 additions and 1 deletion.
46 changes: 45 additions & 1 deletion relay-server/src/services/project_upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ impl UpstreamQuery for GetProjectStates {
/// The wrapper struct for the incoming external requests which also keeps addition information.
#[derive(Debug)]
struct ProjectStateChannel {
// Main broadcast channel.
channel: BroadcastChannel<ProjectFetchState>,
// Additional broadcast channels tracked from merge operations.
other: Vec<BroadcastChannel<ProjectFetchState>>,
deadline: Instant,
no_cache: bool,
attempts: u64,
Expand All @@ -99,6 +102,7 @@ impl ProjectStateChannel {
Self {
no_cache,
channel: sender.into_channel(),
other: Vec::new(),
deadline: now + timeout,
attempts: 0,
errors: 0,
Expand All @@ -115,12 +119,35 @@ impl ProjectStateChannel {
}

pub fn send(self, state: ProjectFetchState) {
for channel in self.other {
channel.send(state.clone());
}
self.channel.send(state)
}

pub fn expired(&self) -> bool {
Instant::now() > self.deadline
}

pub fn merge(&mut self, channel: ProjectStateChannel) {
let ProjectStateChannel {
channel,
other,
deadline,
no_cache,
attempts,
errors,
pending,
} = channel;

self.other.push(channel);
self.other.extend(other);
self.deadline = self.deadline.max(deadline);
self.no_cache |= no_cache;
self.attempts += attempts;
self.errors += errors;
self.pending += pending;
}
}

/// The map of project keys with their project state channels.
Expand Down Expand Up @@ -269,6 +296,23 @@ impl UpstreamProjectSourceService {
}
}

/// Merges a [`ProjectStateChannel`] into the existing list of tracked channels.
///
/// A channel is removed when querying the upstream for the project,
/// when the upstream returns pending for this project it needs to be returned to
/// the list of channels. If there is already another request for the same project
/// outstanding those two requests must be merged.
fn merge_channel(&mut self, key: ProjectKey, channel: ProjectStateChannel) {
match self.state_channels.entry(key) {
Entry::Vacant(e) => {
e.insert(channel);
}
Entry::Occupied(mut e) => {
e.get_mut().merge(channel);
}
}
}

/// Executes an upstream request to fetch project configs.
///
/// This assumes that currently no request is running. If the upstream request fails or new
Expand Down Expand Up @@ -374,7 +418,7 @@ impl UpstreamProjectSourceService {
let mut result = "ok";
if response.pending.contains(&key) {
channel.pending += 1;
self.state_channels.insert(key, channel);
self.merge_channel(key, channel);
continue;
}
let state = response
Expand Down

0 comments on commit baa1864

Please sign in to comment.