Skip to content

Commit

Permalink
feat(project-config): Query project configs with revision (#3947)
Browse files Browse the repository at this point in the history
Follow-up to to #3892, now also implements the revision fetching in the
project config API.

Sentry does not need to be updated and this is not a breaking change and
works with any combinations of Relays.

Extends the project config API with another field `revisions` which
optionally contains the revisions for all requested public keys. If
there are are revisions, the response is extended with a field
`unchanged` which contains all project keys which were unchanged (have
the same revision), similar to how `pending` is handled.
  • Loading branch information
Dav1dde committed Aug 28, 2024
1 parent 7bd2c99 commit c064654
Show file tree
Hide file tree
Showing 9 changed files with 301 additions and 60 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

**Features:**

- Extend project config API to be revision aware. ([#3947](https://github.com/getsentry/relay/pull/3947)).

**Bug Fixes**:

- Keep frames from both ends of the stacktrace when trimming frames. ([#3905](https://github.com/getsentry/relay/pull/3905))
Expand Down
76 changes: 60 additions & 16 deletions relay-server/src/endpoints/project_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use axum::handler::Handler;
use axum::http::Request;
use axum::response::{IntoResponse, Result};
use axum::{Json, RequestExt};
use futures::future;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use relay_base_schema::project::ProjectKey;
use relay_dynamic_config::{ErrorBoundary, GlobalConfig};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -79,6 +80,8 @@ struct GetProjectStatesResponseWrapper {
configs: HashMap<ProjectKey, ProjectStateWrapper>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pending: Vec<ProjectKey>,
#[serde(skip_serializing_if = "Vec::is_empty")]
unchanged: Vec<ProjectKey>,
#[serde(skip_serializing_if = "Option::is_none")]
global: Option<Arc<GlobalConfig>>,
#[serde(skip_serializing_if = "Option::is_none")]
Expand All @@ -92,7 +95,13 @@ struct GetProjectStatesResponseWrapper {
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct GetProjectStatesRequest {
/// The list of all requested project configs.
public_keys: Vec<ErrorBoundary<ProjectKey>>,
/// List of revisions for all project configs.
///
/// This length of this list if specified must be the same length
/// as [`Self::public_keys`], the items are asssociated by index.
revisions: Option<ErrorBoundary<Vec<Option<String>>>>,
#[serde(default)]
full_config: bool,
#[serde(default)]
Expand All @@ -101,6 +110,32 @@ struct GetProjectStatesRequest {
global: bool,
}

fn into_valid_keys(
public_keys: Vec<ErrorBoundary<ProjectKey>>,
revisions: Option<ErrorBoundary<Vec<Option<String>>>>,
) -> impl Iterator<Item = (ProjectKey, Option<String>)> {
let mut revisions = revisions.and_then(|e| e.ok()).unwrap_or_default();
if !revisions.is_empty() && revisions.len() != public_keys.len() {
// The downstream sent us a different amount of revisions than project keys,
// this indicates an error in the downstream code. Just to be safe, discard
// all revisions and carry on as if the downstream never sent any revisions.
relay_log::warn!(
"downstream sent {} project keys but {} revisions, discarding all revisions",
public_keys.len(),
revisions.len()
);
revisions.clear();
}
let revisions = revisions.into_iter().chain(std::iter::repeat(None));

std::iter::zip(public_keys, revisions).filter_map(|(public_key, revision)| {
// Skip unparsable public keys.
// The downstream Relay will consider them `ProjectState::missing`.
let public_key = public_key.ok()?;
Some((public_key, revision))
})
}

async fn inner(
state: ServiceState,
Query(version): Query<VersionQuery>,
Expand All @@ -112,21 +147,21 @@ async fn inner(
let no_cache = inner.no_cache;
let keys_len = inner.public_keys.len();

// Skip unparsable public keys. The downstream Relay will consider them `ProjectState::missing`.
let valid_keys = inner.public_keys.into_iter().filter_map(|e| e.ok());
let futures = valid_keys.map(|project_key| async move {
let state_result = if version.version >= ENDPOINT_V3 && !no_cache {
project_cache
.send(GetCachedProjectState::new(project_key))
.await
} else {
project_cache
.send(GetProjectState::new(project_key).no_cache(no_cache))
.await
};
let mut futures: FuturesUnordered<_> = into_valid_keys(inner.public_keys, inner.revisions)
.map(|(project_key, revision)| async move {
let state_result = if version.version >= ENDPOINT_V3 && !no_cache {
project_cache
.send(GetCachedProjectState::new(project_key))
.await
} else {
project_cache
.send(GetProjectState::new(project_key).no_cache(no_cache))
.await
};

(project_key, state_result)
});
(project_key, revision, state_result)
})
.collect();

let (global, global_status) = if inner.global {
match state.global_config().send(global_config::Get).await? {
Expand All @@ -143,8 +178,10 @@ async fn inner(
};

let mut pending = Vec::with_capacity(keys_len);
let mut unchanged = Vec::with_capacity(keys_len);
let mut configs = HashMap::with_capacity(keys_len);
for (project_key, state_result) in future::join_all(futures).await {

while let Some((project_key, revision, state_result)) = futures.next().await {
let project_info = match state_result? {
ProjectState::Enabled(info) => info,
ProjectState::Disabled => {
Expand All @@ -157,6 +194,12 @@ async fn inner(
}
};

// Only ever omit responses when there was a valid revision in the first place.
if revision.is_some() && project_info.rev == revision {
unchanged.push(project_key);
continue;
}

// If public key is known (even if rate-limited, which is Some(false)), it has
// access to the project config
let has_access = relay.internal
Expand Down Expand Up @@ -187,6 +230,7 @@ async fn inner(
Ok(Json(GetProjectStatesResponseWrapper {
configs,
pending,
unchanged,
global,
global_status,
}))
Expand Down
10 changes: 10 additions & 0 deletions relay-server/src/services/project/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,13 @@ pub struct LimitedParsedProjectState {
#[serde(flatten)]
pub info: ProjectInfo,
}

/// Response indicating whether a project state needs to be updated
/// or the upstream does not have a newer version.
#[derive(Debug, Clone)]
pub enum UpstreamProjectState {
/// The upstream sent a [`ProjectState`].
New(ProjectState),
/// The upstream indicated that there is no newer version of the state available.
NotModified,
}
35 changes: 28 additions & 7 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::services::buffer::{EnvelopeBufferError, EnvelopeBufferGuard, GuardedE
use crate::services::processor::{
EncodeMetrics, EnvelopeProcessor, MetricData, ProcessEnvelope, ProcessingGroup, ProjectMetrics,
};
use crate::services::project::state::UpstreamProjectState;
use chrono::{DateTime, Utc};
use hashbrown::HashSet;
use relay_base_schema::project::ProjectKey;
Expand Down Expand Up @@ -459,7 +460,7 @@ impl ProjectSource {
self,
project_key: ProjectKey,
no_cache: bool,
_cached_state: ProjectFetchState,
cached_state: ProjectFetchState,
) -> Result<ProjectFetchState, ()> {
let state_opt = self
.local_source
Expand All @@ -478,29 +479,32 @@ impl ProjectSource {
RelayMode::Managed => (), // Proceed with loading the config from redis or upstream
}

let current_revision = cached_state.revision().map(String::from);
#[cfg(feature = "processing")]
if let Some(redis_source) = self.redis_source {
let revision = _cached_state.revision().map(String::from);
let current_revision = current_revision.clone();

let redis_permit = self.redis_semaphore.acquire().await.map_err(|_| ())?;
let state_fetch_result = tokio::task::spawn_blocking(move || {
redis_source.get_config_if_changed(project_key, revision.as_deref())
redis_source.get_config_if_changed(project_key, current_revision.as_deref())
})
.await
.map_err(|_| ())?;
drop(redis_permit);

match state_fetch_result {
// New state fetched from Redis, possibly pending.
Ok(Some(state)) => {
Ok(UpstreamProjectState::New(state)) => {
let state = state.sanitized();
if !state.is_pending() {
return Ok(ProjectFetchState::new(state));
}
}
// Redis reported that we're holding an up-to-date version of the state already,
// refresh the state and return the old cached state again.
Ok(None) => return Ok(ProjectFetchState::refresh(_cached_state)),
Ok(UpstreamProjectState::NotModified) => {
return Ok(ProjectFetchState::refresh(cached_state))
}
Err(error) => {
relay_log::error!(
error = &error as &dyn Error,
Expand All @@ -510,13 +514,20 @@ impl ProjectSource {
};
};

self.upstream_source
let state = self
.upstream_source
.send(FetchProjectState {
project_key,
current_revision,
no_cache,
})
.await
.map_err(|_| ())
.map_err(|_| ())?;

match state {
UpstreamProjectState::New(state) => Ok(ProjectFetchState::new(state.sanitized())),
UpstreamProjectState::NotModified => Ok(ProjectFetchState::refresh(cached_state)),
}
}
}

Expand Down Expand Up @@ -1502,6 +1513,16 @@ pub struct FetchProjectState {
/// The public key to fetch the project by.
pub project_key: ProjectKey,

/// Currently cached revision if available.
///
/// The upstream is allowed to omit full project configs
/// for requests for which the requester already has the most
/// recent revision.
///
/// Settings this to `None` will essentially always re-fetch
/// the project config.
pub current_revision: Option<String>,

/// If true, all caches should be skipped and a fresh state should be computed.
pub no_cache: bool,
}
Expand Down
14 changes: 6 additions & 8 deletions relay-server/src/services/project_redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use relay_config::Config;
use relay_redis::{RedisError, RedisPool};
use relay_statsd::metric;

use crate::services::project::state::UpstreamProjectState;
use crate::services::project::{ParsedProjectState, ProjectState};
use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};

Expand Down Expand Up @@ -54,16 +55,13 @@ impl RedisProjectSource {

/// Fetches a project config from Redis.
///
/// Returns `None` if the the project config stored in Redis has the same `revision`.
/// Always returns a project state if the passed `revision` is `None`.
///
/// The returned project state is [`ProjectState::Pending`] if the requested project config is not
/// stored in Redis.
pub fn get_config_if_changed(
&self,
key: ProjectKey,
revision: Option<&str>,
) -> Result<Option<ProjectState>, RedisProjectError> {
) -> Result<UpstreamProjectState, RedisProjectError> {
let mut client = self.redis.client()?;
let mut connection = client.connection()?;

Expand All @@ -82,7 +80,7 @@ impl RedisProjectSource {
counter(RelayCounters::ProjectStateRedis) += 1,
hit = "revision",
);
return Ok(None);
return Ok(UpstreamProjectState::NotModified);
}
}

Expand All @@ -96,7 +94,7 @@ impl RedisProjectSource {
counter(RelayCounters::ProjectStateRedis) += 1,
hit = "false"
);
return Ok(Some(ProjectState::Pending));
return Ok(UpstreamProjectState::New(ProjectState::Pending));
};

let response = ProjectState::from(parse_redis_response(response.as_slice())?);
Expand All @@ -113,13 +111,13 @@ impl RedisProjectSource {
counter(RelayCounters::ProjectStateRedis) += 1,
hit = "project_config_revision"
);
Ok(None)
Ok(UpstreamProjectState::NotModified)
} else {
metric!(
counter(RelayCounters::ProjectStateRedis) += 1,
hit = "project_config"
);
Ok(Some(response))
Ok(UpstreamProjectState::New(response))
}
}

Expand Down
Loading

0 comments on commit c064654

Please sign in to comment.