Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(project-cache): Check revision before loading from Redis #3892

Merged
merged 4 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions relay-server/src/services/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,11 @@ impl Project {
"project {} state requested {attempts} times",
self.project_key
);
project_cache.send(RequestUpdate::new(self.project_key, no_cache));
project_cache.send(RequestUpdate {
project_key: self.project_key,
no_cache,
cached_state: self.state.clone(),
});
}

channel
Expand Down Expand Up @@ -473,7 +477,11 @@ impl Project {
self.project_key
);

project_cache.send(RequestUpdate::new(self.project_key, no_cache));
project_cache.send(RequestUpdate {
project_key: self.project_key,
no_cache,
cached_state: self.state.clone(),
});
return old_state;
}

Expand Down
18 changes: 18 additions & 0 deletions relay-server/src/services/project/state/fetch_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,22 @@ impl ProjectFetchState {
}
}

/// Refreshes the expiry of the fetch state.
pub fn refresh(old: ProjectFetchState) -> Self {
Self {
last_fetch: Some(Instant::now()),
state: old.state,
}
}

/// Project state for an unknown but allowed project.
///
/// This state is used for forwarding in Proxy mode.
pub fn allowed() -> Self {
Self::enabled(ProjectInfo {
project_id: None,
last_change: None,
rev: None,
public_keys: Default::default(),
slug: None,
config: ProjectConfig::default(),
Expand Down Expand Up @@ -119,6 +128,15 @@ impl ProjectFetchState {
Expiry::Updated
}
}

/// Returns the revision of the contained project state.
pub fn revision(&self) -> Option<&str> {
match &self.state {
ProjectState::Enabled(info) => info.rev.as_deref(),
ProjectState::Disabled => None,
ProjectState::Pending => None,
}
}
}

/// Wrapper for a project state, with expiry information.
Expand Down
4 changes: 3 additions & 1 deletion relay-server/src/services/project/state/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ pub struct ProjectInfo {
///
/// This might be `None` in some rare cases like where states
/// are faked locally.
#[serde(default)]
pub last_change: Option<DateTime<Utc>>,
/// The revision id of the project config.
pub rev: Option<String>,
/// Indicates that the project is disabled.
/// A container of known public keys in the project.
///
Expand All @@ -56,6 +57,7 @@ pub struct ProjectInfo {
pub struct LimitedProjectInfo {
pub project_id: Option<ProjectId>,
pub last_change: Option<DateTime<Utc>>,
pub rev: Option<String>,
pub public_keys: SmallVec<[PublicKeyConfig; 1]>,
pub slug: Option<String>,
#[serde(with = "LimitedProjectConfig")]
Expand Down
61 changes: 35 additions & 26 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,15 @@ use crate::utils::{GarbageDisposal, ManagedEnvelope, MemoryChecker, RetryBackoff
#[derive(Clone, Debug)]
pub struct RequestUpdate {
/// The public key to fetch the project by.
project_key: ProjectKey,

pub project_key: ProjectKey,
/// If true, all caches should be skipped and a fresh state should be computed.
no_cache: bool,
}

impl RequestUpdate {
pub fn new(project_key: ProjectKey, no_cache: bool) -> Self {
Self {
project_key,
no_cache,
}
}
pub no_cache: bool,
/// Previously cached fetch state, if availabled.
Dav1dde marked this conversation as resolved.
Show resolved Hide resolved
///
/// The upstream request will include the revision of the currently cached state,
/// if the upstream does not have a different revision, this cached
/// state is re-used and its expiry bumped.
pub cached_state: ProjectFetchState,
}

/// Returns the project state.
Expand Down Expand Up @@ -459,7 +455,12 @@ impl ProjectSource {
}
}

async fn fetch(self, project_key: ProjectKey, no_cache: bool) -> Result<ProjectFetchState, ()> {
async fn fetch(
self,
project_key: ProjectKey,
no_cache: bool,
_cached_state: ProjectFetchState,
) -> Result<ProjectFetchState, ()> {
let state_opt = self
.local_source
.send(FetchOptionalProjectState { project_key })
Expand All @@ -479,27 +480,34 @@ impl ProjectSource {

#[cfg(feature = "processing")]
if let Some(redis_source) = self.redis_source {
let revision = _cached_state.revision().map(String::from);

let redis_permit = self.redis_semaphore.acquire().await.map_err(|_| ())?;
let state_fetch_result =
tokio::task::spawn_blocking(move || redis_source.get_config(project_key))
.await
.map_err(|_| ())?;
let state_fetch_result = tokio::task::spawn_blocking(move || {
redis_source.get_config_if_changed(project_key, revision.as_deref())
})
.await
.map_err(|_| ())?;
drop(redis_permit);

let state = match state_fetch_result {
Ok(state) => state.sanitized(),
match state_fetch_result {
// New state fetched from Redis, possibly pending.
Ok(Some(state)) => {
let state = state.sanitized();
if !state.is_pending() {
return Ok(ProjectFetchState::new(state));
}
}
// Redis reported that we're holding an up to date version of the state alread,
Dav1dde marked this conversation as resolved.
Show resolved Hide resolved
// refresh the state and return the old cached state again.
Ok(None) => return Ok(ProjectFetchState::refresh(_cached_state)),
Err(error) => {
relay_log::error!(
error = &error as &dyn Error,
"failed to fetch project from Redis",
);
ProjectState::Pending
}
};

if !matches!(state, ProjectState::Pending) {
return Ok(ProjectFetchState::new(state));
}
};

self.upstream_source
Expand Down Expand Up @@ -734,6 +742,7 @@ impl ProjectCacheBroker {
let RequestUpdate {
project_key,
no_cache,
cached_state,
} = message;

// Bump the update time of the project in our hashmap to evade eviction.
Expand All @@ -750,14 +759,14 @@ impl ProjectCacheBroker {
tokio::time::sleep_until(next_attempt).await;
}
let state = source
.fetch(project_key, no_cache)
.fetch(project_key, no_cache, cached_state)
.await
.unwrap_or_else(|()| ProjectFetchState::disabled());

let message = UpdateProjectState {
project_key,
state,
no_cache,
state,
};

sender.send(message).ok();
Expand Down
53 changes: 44 additions & 9 deletions relay-server/src/services/project_redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,44 @@ impl RedisProjectSource {
RedisProjectSource { config, redis }
}

pub fn get_config(&self, key: ProjectKey) -> Result<ProjectState, RedisProjectError> {
let mut command = relay_redis::redis::cmd("GET");

let prefix = self.config.projectconfig_cache_prefix();
command.arg(format!("{prefix}:{key}"));
pub fn get_config_if_changed(
&self,
key: ProjectKey,
revision: Option<&str>,
) -> Result<Option<ProjectState>, RedisProjectError> {
Dav1dde marked this conversation as resolved.
Show resolved Hide resolved
let mut client = self.redis.client()?;
let mut connection = client.connection()?;

// Only check for the revision if we were passed a revision.
if let Some(revision) = revision {
let current_revision: Option<String> = relay_redis::redis::cmd("GET")
.arg(self.get_redis_rev_key(key))
.query(&mut connection)
.map_err(RedisError::Redis)?;

relay_log::trace!(
"Redis revision {current_revision:?}, requested revision {revision:?}"
);
if current_revision.as_deref() == Some(revision) {
metric!(
counter(RelayCounters::ProjectStateRedis) += 1,
hit = "revision",
);
return Ok(None);
}
}

let raw_response_opt: Option<Vec<u8>> = command
.query(&mut self.redis.client()?.connection()?)
let raw_response_opt: Option<Vec<u8>> = relay_redis::redis::cmd("GET")
.arg(self.get_redis_project_config_key(key))
.query(&mut connection)
.map_err(RedisError::Redis)?;

let response = match raw_response_opt {
Some(response) => {
metric!(counter(RelayCounters::ProjectStateRedis) += 1, hit = "true");
metric!(
counter(RelayCounters::ProjectStateRedis) += 1,
hit = "project_config"
);
let parsed = parse_redis_response(response.as_slice())?;
ProjectState::from(parsed)
}
Expand All @@ -77,7 +102,17 @@ impl RedisProjectSource {
}
};

Ok(response)
Ok(Some(response))
}

fn get_redis_project_config_key(&self, key: ProjectKey) -> String {
let prefix = self.config.projectconfig_cache_prefix();
format!("{prefix}:{key}")
}

fn get_redis_rev_key(&self, key: ProjectKey) -> String {
let prefix = self.config.projectconfig_cache_prefix();
format!("{prefix}:{key}.rev")
}
}

Expand Down
7 changes: 5 additions & 2 deletions relay-server/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,8 +647,11 @@ pub enum RelayCounters {
ProjectStateNoCache,
/// Number of times a project state is requested from the central Redis cache.
///
/// This has a tag `hit` with values `true` or `false`. If false the request will be
/// sent to the sentry endpoint.
/// This metric is tagged with:
/// - `hit`: One of:
/// - `revision`: the in cached version was validated with the revision.
Dav1dde marked this conversation as resolved.
Show resolved Hide resolved
/// - `project_config`: the request was handled by the cache.
/// - `false`: the request will be sent to the sentry endpoint.
#[cfg(feature = "processing")]
ProjectStateRedis,
/// Number of times a project is looked up from the cache.
Expand Down
38 changes: 37 additions & 1 deletion tests/integration/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,5 +256,41 @@ def test_processing_redis_query_compressed(

relay.send_event(project_id)

event, v = events_consumer.get_event()
event, _ = events_consumer.get_event()
assert event["logentry"] == {"formatted": "Hello, World!"}


def test_processing_redis_query_with_revision(
mini_sentry,
redis_client,
relay_with_processing,
events_consumer,
outcomes_consumer,
):
outcomes_consumer = outcomes_consumer()
events_consumer = events_consumer()

relay = relay_with_processing(
{"limits": {"query_timeout": 10}, "cache": {"project_expiry": 1}}
)
project_id = 42
cfg = mini_sentry.add_full_project_config(project_id)
cfg["rev"] = "123"

key = mini_sentry.get_dsn_public_key(project_id)
projectconfig_cache_prefix = relay.options["processing"][
"projectconfig_cache_prefix"
]
redis_client.setex(f"{projectconfig_cache_prefix}:{key}", 3600, json.dumps(cfg))
redis_client.setex(f"{projectconfig_cache_prefix}:{key}.rev", 3600, cfg["rev"])

relay.send_event(project_id)
event, _ = events_consumer.get_event()
assert event["logentry"] == {"formatted": "Hello, World!"}

# 1 second timeout on the project cache, make sure it times out
time.sleep(2)

relay.send_event(project_id)
event, _ = events_consumer.get_event()
assert event["logentry"] == {"formatted": "Hello, World!"}
Loading