Skip to content

Commit

Permalink
ref(project-cache): Check revision before loading from Redis
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Aug 2, 2024
1 parent 871ba75 commit 382d427
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 40 deletions.
12 changes: 10 additions & 2 deletions relay-server/src/services/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,11 @@ impl Project {
"project {} state requested {attempts} times",
self.project_key
);
project_cache.send(RequestUpdate::new(self.project_key, no_cache));
project_cache.send(RequestUpdate {
project_key: self.project_key,
no_cache,
cached_state: self.state.clone(),
});
}

channel
Expand Down Expand Up @@ -473,7 +477,11 @@ impl Project {
self.project_key
);

project_cache.send(RequestUpdate::new(self.project_key, no_cache));
project_cache.send(RequestUpdate {
project_key: self.project_key,
no_cache,
cached_state: self.state.clone(),
});
return old_state;
}

Expand Down
18 changes: 18 additions & 0 deletions relay-server/src/services/project/state/fetch_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,22 @@ impl ProjectFetchState {
}
}

/// Refreshes the expiry of the fetch state.
pub fn refresh(old: ProjectFetchState) -> Self {
Self {
last_fetch: Some(Instant::now()),
state: old.state,
}
}

/// Project state for an unknown but allowed project.
///
/// This state is used for forwarding in Proxy mode.
pub fn allowed() -> Self {
Self::enabled(ProjectInfo {
project_id: None,
last_change: None,
rev: None,
public_keys: Default::default(),
slug: None,
config: ProjectConfig::default(),
Expand Down Expand Up @@ -119,6 +128,15 @@ impl ProjectFetchState {
Expiry::Updated
}
}

/// Returns the revision of the contained project state.
pub fn revision(&self) -> Option<&str> {
match &self.state {
ProjectState::Enabled(info) => info.rev.as_deref(),
ProjectState::Disabled => None, // TODO: disabled can also have a revision
ProjectState::Pending => None,
}
}
}

/// Wrapper for a project state, with expiry information.
Expand Down
4 changes: 3 additions & 1 deletion relay-server/src/services/project/state/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ pub struct ProjectInfo {
///
/// This might be `None` in some rare cases like where states
/// are faked locally.
#[serde(default)]
pub last_change: Option<DateTime<Utc>>,
/// The revision id of the project config.
pub rev: Option<String>,
/// Indicates that the project is disabled.
/// A container of known public keys in the project.
///
Expand All @@ -56,6 +57,7 @@ pub struct ProjectInfo {
pub struct LimitedProjectInfo {
pub project_id: Option<ProjectId>,
pub last_change: Option<DateTime<Utc>>,
pub rev: Option<String>,
pub public_keys: SmallVec<[PublicKeyConfig; 1]>,
pub slug: Option<String>,
#[serde(with = "LimitedProjectConfig")]
Expand Down
61 changes: 35 additions & 26 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,15 @@ use crate::utils::{GarbageDisposal, ManagedEnvelope, MemoryChecker, RetryBackoff
#[derive(Clone, Debug)]
pub struct RequestUpdate {
/// The public key to fetch the project by.
project_key: ProjectKey,

pub project_key: ProjectKey,
/// If true, all caches should be skipped and a fresh state should be computed.
no_cache: bool,
}

impl RequestUpdate {
pub fn new(project_key: ProjectKey, no_cache: bool) -> Self {
Self {
project_key,
no_cache,
}
}
pub no_cache: bool,
/// Previously cached fetch state, if availabled.
///
/// The upstream request will include the revision of the currently cached state,
/// if the upstream does not have a different revision, this cached
/// state is re-used and its expiry bumped.
pub cached_state: ProjectFetchState,
}

/// Returns the project state.
Expand Down Expand Up @@ -459,7 +455,12 @@ impl ProjectSource {
}
}

async fn fetch(self, project_key: ProjectKey, no_cache: bool) -> Result<ProjectFetchState, ()> {
async fn fetch(
self,
project_key: ProjectKey,
no_cache: bool,
cached_state: ProjectFetchState,
) -> Result<ProjectFetchState, ()> {
let state_opt = self
.local_source
.send(FetchOptionalProjectState { project_key })
Expand All @@ -479,27 +480,34 @@ impl ProjectSource {

#[cfg(feature = "processing")]
if let Some(redis_source) = self.redis_source {
let revision = cached_state.revision().map(String::from);

let redis_permit = self.redis_semaphore.acquire().await.map_err(|_| ())?;
let state_fetch_result =
tokio::task::spawn_blocking(move || redis_source.get_config(project_key))
.await
.map_err(|_| ())?;
let state_fetch_result = tokio::task::spawn_blocking(move || {
redis_source.get_config_if_changed(project_key, revision.as_deref())
})
.await
.map_err(|_| ())?;
drop(redis_permit);

let state = match state_fetch_result {
Ok(state) => state.sanitized(),
match state_fetch_result {
// New state fetched from Redis, possibly pending.
Ok(Some(state)) => {
let state = state.sanitized();
if !state.is_pending() {
return Ok(ProjectFetchState::new(state));
}
}
// Redis reported that we're holding an up to date version of the state alread,
// refresh the state and return the old cached state again.
Ok(None) => return Ok(ProjectFetchState::refresh(cached_state)),
Err(error) => {
relay_log::error!(
error = &error as &dyn Error,
"failed to fetch project from Redis",
);
ProjectState::Pending
}
};

if !matches!(state, ProjectState::Pending) {
return Ok(ProjectFetchState::new(state));
}
};

self.upstream_source
Expand Down Expand Up @@ -734,6 +742,7 @@ impl ProjectCacheBroker {
let RequestUpdate {
project_key,
no_cache,
cached_state,
} = message;

// Bump the update time of the project in our hashmap to evade eviction.
Expand All @@ -750,14 +759,14 @@ impl ProjectCacheBroker {
tokio::time::sleep_until(next_attempt).await;
}
let state = source
.fetch(project_key, no_cache)
.fetch(project_key, no_cache, cached_state)
.await
.unwrap_or_else(|()| ProjectFetchState::disabled());

let message = UpdateProjectState {
project_key,
state,
no_cache,
state,
};

sender.send(message).ok();
Expand Down
50 changes: 41 additions & 9 deletions relay-server/src/services/project_redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,41 @@ impl RedisProjectSource {
RedisProjectSource { config, redis }
}

pub fn get_config(&self, key: ProjectKey) -> Result<ProjectState, RedisProjectError> {
let mut command = relay_redis::redis::cmd("GET");

let prefix = self.config.projectconfig_cache_prefix();
command.arg(format!("{prefix}:{key}"));
pub fn get_config_if_changed(
&self,
key: ProjectKey,
revision: Option<&str>,
) -> Result<Option<ProjectState>, RedisProjectError> {
let mut client = self.redis.client()?;
let mut connection = client.connection()?;

// Only check for the revision if we were passed a revision.
if let Some(revision) = revision {
let current_revision: Option<String> = relay_redis::redis::cmd("GET")
.arg(self.get_redis_rev_key(key))
.query(&mut connection)
.map_err(RedisError::Redis)?;

if current_revision.as_deref() == Some(revision) {
metric!(
counter(RelayCounters::ProjectStateRedis) += 1,
hit = "revision",
);
return Ok(None);
}
}

let raw_response_opt: Option<Vec<u8>> = command
.query(&mut self.redis.client()?.connection()?)
let raw_response_opt: Option<Vec<u8>> = relay_redis::redis::cmd("GET")
.arg(self.get_redis_project_config_key(key))
.query(&mut connection)
.map_err(RedisError::Redis)?;

let response = match raw_response_opt {
Some(response) => {
metric!(counter(RelayCounters::ProjectStateRedis) += 1, hit = "true");
metric!(
counter(RelayCounters::ProjectStateRedis) += 1,
hit = "project_config"
);
let parsed = parse_redis_response(response.as_slice())?;
ProjectState::from(parsed)
}
Expand All @@ -77,7 +99,17 @@ impl RedisProjectSource {
}
};

Ok(response)
Ok(Some(response))
}

fn get_redis_project_config_key(&self, key: ProjectKey) -> String {
let prefix = self.config.projectconfig_cache_prefix();
format!("{prefix}:{key}")
}

fn get_redis_rev_key(&self, key: ProjectKey) -> String {
let prefix = self.config.projectconfig_cache_prefix();
format!("{prefix}:{key}.rev")
}
}

Expand Down
7 changes: 5 additions & 2 deletions relay-server/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,8 +647,11 @@ pub enum RelayCounters {
ProjectStateNoCache,
/// Number of times a project state is requested from the central Redis cache.
///
/// This has a tag `hit` with values `true` or `false`. If false the request will be
/// sent to the sentry endpoint.
/// This metric is tagged with:
/// - `hit`: One of:
/// - `revision`: the in cached version was validated with the revision.
/// - `project_config`: the request was handled by the cache.
/// - `false`: the request will be sent to the sentry endpoint.
#[cfg(feature = "processing")]
ProjectStateRedis,
/// Number of times a project is looked up from the cache.
Expand Down

0 comments on commit 382d427

Please sign in to comment.