Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(project-config): Query project configs with revision #3947

Merged
merged 4 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

**Features:**

- Extend project config API to be revision aware. ([#3947](https://github.com/getsentry/relay/pull/3947)).

**Bug Fixes**:

- Keep frames from both ends of the stacktrace when trimming frames. ([#3905](https://github.com/getsentry/relay/pull/3905))
Expand Down
66 changes: 50 additions & 16 deletions relay-server/src/endpoints/project_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use axum::handler::Handler;
use axum::http::Request;
use axum::response::{IntoResponse, Result};
use axum::{Json, RequestExt};
use futures::future;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use relay_base_schema::project::ProjectKey;
use relay_dynamic_config::{ErrorBoundary, GlobalConfig};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -79,6 +80,8 @@ struct GetProjectStatesResponseWrapper {
configs: HashMap<ProjectKey, ProjectStateWrapper>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pending: Vec<ProjectKey>,
#[serde(skip_serializing_if = "Vec::is_empty")]
unchanged: Vec<ProjectKey>,
#[serde(skip_serializing_if = "Option::is_none")]
global: Option<Arc<GlobalConfig>>,
#[serde(skip_serializing_if = "Option::is_none")]
Expand All @@ -92,7 +95,13 @@ struct GetProjectStatesResponseWrapper {
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct GetProjectStatesRequest {
/// The list of all requested project configs.
public_keys: Vec<ErrorBoundary<ProjectKey>>,
/// List of revisions for all project configs.
///
/// This length of this list if specified must be the same length
/// as [`Self::public_keys`], the items are asssociated by index.
revisions: Option<ErrorBoundary<Vec<Option<String>>>>,
#[serde(default)]
full_config: bool,
#[serde(default)]
Expand All @@ -101,6 +110,21 @@ struct GetProjectStatesRequest {
global: bool,
}

fn into_valid_keys(
public_keys: Vec<ErrorBoundary<ProjectKey>>,
revisions: Option<ErrorBoundary<Vec<Option<String>>>>,
) -> impl Iterator<Item = (ProjectKey, Option<String>)> {
let revisions = revisions.and_then(|e| e.ok()).unwrap_or_default();
let revisions = revisions.into_iter().chain(std::iter::repeat(None));
Dav1dde marked this conversation as resolved.
Show resolved Hide resolved

std::iter::zip(public_keys, revisions).filter_map(|(public_key, revision)| {
// Skip unparsable public keys.
// The downstream Relay will consider them `ProjectState::missing`.
let public_key = public_key.ok()?;
Some((public_key, revision))
})
}

async fn inner(
state: ServiceState,
Query(version): Query<VersionQuery>,
Expand All @@ -112,22 +136,23 @@ async fn inner(
let no_cache = inner.no_cache;
let keys_len = inner.public_keys.len();

// Skip unparsable public keys. The downstream Relay will consider them `ProjectState::missing`.
let valid_keys = inner.public_keys.into_iter().filter_map(|e| e.ok());
let futures = valid_keys.map(|project_key| async move {
let state_result = if version.version >= ENDPOINT_V3 && !no_cache {
project_cache
.send(GetCachedProjectState::new(project_key))
.await
} else {
project_cache
.send(GetProjectState::new(project_key).no_cache(no_cache))
.await
};
let mut futures: FuturesUnordered<_> = into_valid_keys(inner.public_keys, inner.revisions)
.map(|(project_key, revision)| async move {
let state_result = if version.version >= ENDPOINT_V3 && !no_cache {
project_cache
.send(GetCachedProjectState::new(project_key))
.await
} else {
project_cache
.send(GetProjectState::new(project_key).no_cache(no_cache))
.await
};

(project_key, state_result)
});
(project_key, revision, state_result)
})
.collect();

// Skip unparsable public keys. The downstream Relay will consider them `ProjectState::missing`.
Dav1dde marked this conversation as resolved.
Show resolved Hide resolved
let (global, global_status) = if inner.global {
match state.global_config().send(global_config::Get).await? {
global_config::Status::Ready(config) => (Some(config), Some(StatusResponse::Ready)),
Expand All @@ -143,8 +168,10 @@ async fn inner(
};

let mut pending = Vec::with_capacity(keys_len);
let mut unchanged = Vec::with_capacity(keys_len);
let mut configs = HashMap::with_capacity(keys_len);
for (project_key, state_result) in future::join_all(futures).await {

while let Some((project_key, revision, state_result)) = futures.next().await {
let project_info = match state_result? {
ProjectState::Enabled(info) => info,
ProjectState::Disabled => {
Expand All @@ -157,6 +184,12 @@ async fn inner(
}
};

// Only ever omit responses when there was a valid revision in the first place.
if revision.is_some() && project_info.rev == revision {
unchanged.push(project_key);
continue;
}

// If public key is known (even if rate-limited, which is Some(false)), it has
// access to the project config
let has_access = relay.internal
Expand Down Expand Up @@ -187,6 +220,7 @@ async fn inner(
Ok(Json(GetProjectStatesResponseWrapper {
configs,
pending,
unchanged,
global,
global_status,
}))
Expand Down
10 changes: 10 additions & 0 deletions relay-server/src/services/project/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,13 @@ pub struct LimitedParsedProjectState {
#[serde(flatten)]
pub info: ProjectInfo,
}

/// Response indicating whether a project state needs to be updated
/// or the upstream does not have a newer version.
#[derive(Debug, Clone)]
pub enum UpstreamProjectState {
/// The upstream sent a [`ProjectState`].
New(ProjectState),
/// The upstream indicated that there is no newer version of the state available.
Unchanged,
Dav1dde marked this conversation as resolved.
Show resolved Hide resolved
}
35 changes: 28 additions & 7 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::services::buffer::{EnvelopeBufferError, EnvelopeBufferGuard, GuardedE
use crate::services::processor::{
EncodeMetrics, EnvelopeProcessor, MetricData, ProcessEnvelope, ProcessingGroup, ProjectMetrics,
};
use crate::services::project::state::UpstreamProjectState;
use chrono::{DateTime, Utc};
use hashbrown::HashSet;
use relay_base_schema::project::ProjectKey;
Expand Down Expand Up @@ -459,7 +460,7 @@ impl ProjectSource {
self,
project_key: ProjectKey,
no_cache: bool,
_cached_state: ProjectFetchState,
cached_state: ProjectFetchState,
) -> Result<ProjectFetchState, ()> {
let state_opt = self
.local_source
Expand All @@ -478,29 +479,32 @@ impl ProjectSource {
RelayMode::Managed => (), // Proceed with loading the config from redis or upstream
}

let current_revision = cached_state.revision().map(String::from);
#[cfg(feature = "processing")]
if let Some(redis_source) = self.redis_source {
let revision = _cached_state.revision().map(String::from);
let current_revision = current_revision.clone();

let redis_permit = self.redis_semaphore.acquire().await.map_err(|_| ())?;
let state_fetch_result = tokio::task::spawn_blocking(move || {
redis_source.get_config_if_changed(project_key, revision.as_deref())
redis_source.get_config_if_changed(project_key, current_revision.as_deref())
})
.await
.map_err(|_| ())?;
drop(redis_permit);

match state_fetch_result {
// New state fetched from Redis, possibly pending.
Ok(Some(state)) => {
Ok(UpstreamProjectState::New(state)) => {
let state = state.sanitized();
if !state.is_pending() {
return Ok(ProjectFetchState::new(state));
}
}
// Redis reported that we're holding an up-to-date version of the state already,
// refresh the state and return the old cached state again.
Ok(None) => return Ok(ProjectFetchState::refresh(_cached_state)),
Ok(UpstreamProjectState::Unchanged) => {
return Ok(ProjectFetchState::refresh(cached_state))
}
Err(error) => {
relay_log::error!(
error = &error as &dyn Error,
Expand All @@ -510,13 +514,20 @@ impl ProjectSource {
};
};

self.upstream_source
let state = self
.upstream_source
.send(FetchProjectState {
project_key,
current_revision,
no_cache,
})
.await
.map_err(|_| ())
.map_err(|_| ())?;

match state {
UpstreamProjectState::New(state) => Ok(ProjectFetchState::new(state.sanitized())),
UpstreamProjectState::Unchanged => Ok(ProjectFetchState::refresh(cached_state)),
}
}
}

Expand Down Expand Up @@ -1502,6 +1513,16 @@ pub struct FetchProjectState {
/// The public key to fetch the project by.
pub project_key: ProjectKey,

/// Currently cached revision if available.
///
/// The upstream is allowed to omit full project configs
/// for requests for which the requester already has the most
/// recent revision.
///
/// Settings this to `None` will essentially always re-fetch
/// the project config.
pub current_revision: Option<String>,

/// If true, all caches should be skipped and a fresh state should be computed.
pub no_cache: bool,
}
Expand Down
14 changes: 6 additions & 8 deletions relay-server/src/services/project_redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use relay_config::Config;
use relay_redis::{RedisError, RedisPool};
use relay_statsd::metric;

use crate::services::project::state::UpstreamProjectState;
use crate::services::project::{ParsedProjectState, ProjectState};
use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};

Expand Down Expand Up @@ -54,16 +55,13 @@ impl RedisProjectSource {

/// Fetches a project config from Redis.
///
/// Returns `None` if the the project config stored in Redis has the same `revision`.
/// Always returns a project state if the passed `revision` is `None`.
///
/// The returned project state is [`ProjectState::Pending`] if the requested project config is not
/// stored in Redis.
pub fn get_config_if_changed(
&self,
key: ProjectKey,
revision: Option<&str>,
) -> Result<Option<ProjectState>, RedisProjectError> {
) -> Result<UpstreamProjectState, RedisProjectError> {
let mut client = self.redis.client()?;
let mut connection = client.connection()?;

Expand All @@ -82,7 +80,7 @@ impl RedisProjectSource {
counter(RelayCounters::ProjectStateRedis) += 1,
hit = "revision",
);
return Ok(None);
return Ok(UpstreamProjectState::Unchanged);
}
}

Expand All @@ -96,7 +94,7 @@ impl RedisProjectSource {
counter(RelayCounters::ProjectStateRedis) += 1,
hit = "false"
);
return Ok(Some(ProjectState::Pending));
return Ok(UpstreamProjectState::New(ProjectState::Pending));
};

let response = ProjectState::from(parse_redis_response(response.as_slice())?);
Expand All @@ -113,13 +111,13 @@ impl RedisProjectSource {
counter(RelayCounters::ProjectStateRedis) += 1,
hit = "project_config_revision"
);
Ok(None)
Ok(UpstreamProjectState::Unchanged)
} else {
metric!(
counter(RelayCounters::ProjectStateRedis) += 1,
hit = "project_config"
);
Ok(Some(response))
Ok(UpstreamProjectState::New(response))
}
}

Expand Down
Loading
Loading