diff --git a/CHANGELOG.md b/CHANGELOG.md index b7a942d49e..64ca467548 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +**Features:** + +- Extend project config API to be revision aware. ([#3947](https://github.com/getsentry/relay/pull/3947)). + **Bug Fixes**: - Keep frames from both ends of the stacktrace when trimming frames. ([#3905](https://github.com/getsentry/relay/pull/3905)) diff --git a/relay-server/src/endpoints/project_configs.rs b/relay-server/src/endpoints/project_configs.rs index 0a84430583..81711b7f45 100644 --- a/relay-server/src/endpoints/project_configs.rs +++ b/relay-server/src/endpoints/project_configs.rs @@ -6,7 +6,8 @@ use axum::handler::Handler; use axum::http::Request; use axum::response::{IntoResponse, Result}; use axum::{Json, RequestExt}; -use futures::future; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use relay_base_schema::project::ProjectKey; use relay_dynamic_config::{ErrorBoundary, GlobalConfig}; use serde::{Deserialize, Serialize}; @@ -79,6 +80,8 @@ struct GetProjectStatesResponseWrapper { configs: HashMap, #[serde(skip_serializing_if = "Vec::is_empty")] pending: Vec, + #[serde(skip_serializing_if = "Vec::is_empty")] + unchanged: Vec, #[serde(skip_serializing_if = "Option::is_none")] global: Option>, #[serde(skip_serializing_if = "Option::is_none")] @@ -92,7 +95,13 @@ struct GetProjectStatesResponseWrapper { #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] struct GetProjectStatesRequest { + /// The list of all requested project configs. public_keys: Vec>, + /// List of revisions for all project configs. + /// + /// This length of this list if specified must be the same length + /// as [`Self::public_keys`], the items are asssociated by index. + revisions: Option>>>, #[serde(default)] full_config: bool, #[serde(default)] @@ -101,6 +110,32 @@ struct GetProjectStatesRequest { global: bool, } +fn into_valid_keys( + public_keys: Vec>, + revisions: Option>>>, +) -> impl Iterator)> { + let mut revisions = revisions.and_then(|e| e.ok()).unwrap_or_default(); + if !revisions.is_empty() && revisions.len() != public_keys.len() { + // The downstream sent us a different amount of revisions than project keys, + // this indicates an error in the downstream code. Just to be safe, discard + // all revisions and carry on as if the downstream never sent any revisions. + relay_log::warn!( + "downstream sent {} project keys but {} revisions, discarding all revisions", + public_keys.len(), + revisions.len() + ); + revisions.clear(); + } + let revisions = revisions.into_iter().chain(std::iter::repeat(None)); + + std::iter::zip(public_keys, revisions).filter_map(|(public_key, revision)| { + // Skip unparsable public keys. + // The downstream Relay will consider them `ProjectState::missing`. + let public_key = public_key.ok()?; + Some((public_key, revision)) + }) +} + async fn inner( state: ServiceState, Query(version): Query, @@ -112,21 +147,21 @@ async fn inner( let no_cache = inner.no_cache; let keys_len = inner.public_keys.len(); - // Skip unparsable public keys. The downstream Relay will consider them `ProjectState::missing`. - let valid_keys = inner.public_keys.into_iter().filter_map(|e| e.ok()); - let futures = valid_keys.map(|project_key| async move { - let state_result = if version.version >= ENDPOINT_V3 && !no_cache { - project_cache - .send(GetCachedProjectState::new(project_key)) - .await - } else { - project_cache - .send(GetProjectState::new(project_key).no_cache(no_cache)) - .await - }; + let mut futures: FuturesUnordered<_> = into_valid_keys(inner.public_keys, inner.revisions) + .map(|(project_key, revision)| async move { + let state_result = if version.version >= ENDPOINT_V3 && !no_cache { + project_cache + .send(GetCachedProjectState::new(project_key)) + .await + } else { + project_cache + .send(GetProjectState::new(project_key).no_cache(no_cache)) + .await + }; - (project_key, state_result) - }); + (project_key, revision, state_result) + }) + .collect(); let (global, global_status) = if inner.global { match state.global_config().send(global_config::Get).await? { @@ -143,8 +178,10 @@ async fn inner( }; let mut pending = Vec::with_capacity(keys_len); + let mut unchanged = Vec::with_capacity(keys_len); let mut configs = HashMap::with_capacity(keys_len); - for (project_key, state_result) in future::join_all(futures).await { + + while let Some((project_key, revision, state_result)) = futures.next().await { let project_info = match state_result? { ProjectState::Enabled(info) => info, ProjectState::Disabled => { @@ -157,6 +194,12 @@ async fn inner( } }; + // Only ever omit responses when there was a valid revision in the first place. + if revision.is_some() && project_info.rev == revision { + unchanged.push(project_key); + continue; + } + // If public key is known (even if rate-limited, which is Some(false)), it has // access to the project config let has_access = relay.internal @@ -187,6 +230,7 @@ async fn inner( Ok(Json(GetProjectStatesResponseWrapper { configs, pending, + unchanged, global, global_status, })) diff --git a/relay-server/src/services/project/state.rs b/relay-server/src/services/project/state.rs index dad7f5b4b9..70616d4a6e 100644 --- a/relay-server/src/services/project/state.rs +++ b/relay-server/src/services/project/state.rs @@ -114,3 +114,13 @@ pub struct LimitedParsedProjectState { #[serde(flatten)] pub info: ProjectInfo, } + +/// Response indicating whether a project state needs to be updated +/// or the upstream does not have a newer version. +#[derive(Debug, Clone)] +pub enum UpstreamProjectState { + /// The upstream sent a [`ProjectState`]. + New(ProjectState), + /// The upstream indicated that there is no newer version of the state available. + NotModified, +} diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 63413e99bf..c7d7331445 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -8,6 +8,7 @@ use crate::services::buffer::{EnvelopeBufferError, EnvelopeBufferGuard, GuardedE use crate::services::processor::{ EncodeMetrics, EnvelopeProcessor, MetricData, ProcessEnvelope, ProcessingGroup, ProjectMetrics, }; +use crate::services::project::state::UpstreamProjectState; use chrono::{DateTime, Utc}; use hashbrown::HashSet; use relay_base_schema::project::ProjectKey; @@ -459,7 +460,7 @@ impl ProjectSource { self, project_key: ProjectKey, no_cache: bool, - _cached_state: ProjectFetchState, + cached_state: ProjectFetchState, ) -> Result { let state_opt = self .local_source @@ -478,13 +479,14 @@ impl ProjectSource { RelayMode::Managed => (), // Proceed with loading the config from redis or upstream } + let current_revision = cached_state.revision().map(String::from); #[cfg(feature = "processing")] if let Some(redis_source) = self.redis_source { - let revision = _cached_state.revision().map(String::from); + let current_revision = current_revision.clone(); let redis_permit = self.redis_semaphore.acquire().await.map_err(|_| ())?; let state_fetch_result = tokio::task::spawn_blocking(move || { - redis_source.get_config_if_changed(project_key, revision.as_deref()) + redis_source.get_config_if_changed(project_key, current_revision.as_deref()) }) .await .map_err(|_| ())?; @@ -492,7 +494,7 @@ impl ProjectSource { match state_fetch_result { // New state fetched from Redis, possibly pending. - Ok(Some(state)) => { + Ok(UpstreamProjectState::New(state)) => { let state = state.sanitized(); if !state.is_pending() { return Ok(ProjectFetchState::new(state)); @@ -500,7 +502,9 @@ impl ProjectSource { } // Redis reported that we're holding an up-to-date version of the state already, // refresh the state and return the old cached state again. - Ok(None) => return Ok(ProjectFetchState::refresh(_cached_state)), + Ok(UpstreamProjectState::NotModified) => { + return Ok(ProjectFetchState::refresh(cached_state)) + } Err(error) => { relay_log::error!( error = &error as &dyn Error, @@ -510,13 +514,20 @@ impl ProjectSource { }; }; - self.upstream_source + let state = self + .upstream_source .send(FetchProjectState { project_key, + current_revision, no_cache, }) .await - .map_err(|_| ()) + .map_err(|_| ())?; + + match state { + UpstreamProjectState::New(state) => Ok(ProjectFetchState::new(state.sanitized())), + UpstreamProjectState::NotModified => Ok(ProjectFetchState::refresh(cached_state)), + } } } @@ -1502,6 +1513,16 @@ pub struct FetchProjectState { /// The public key to fetch the project by. pub project_key: ProjectKey, + /// Currently cached revision if available. + /// + /// The upstream is allowed to omit full project configs + /// for requests for which the requester already has the most + /// recent revision. + /// + /// Settings this to `None` will essentially always re-fetch + /// the project config. + pub current_revision: Option, + /// If true, all caches should be skipped and a fresh state should be computed. pub no_cache: bool, } diff --git a/relay-server/src/services/project_redis.rs b/relay-server/src/services/project_redis.rs index edee03a829..18e8dd0e53 100644 --- a/relay-server/src/services/project_redis.rs +++ b/relay-server/src/services/project_redis.rs @@ -5,6 +5,7 @@ use relay_config::Config; use relay_redis::{RedisError, RedisPool}; use relay_statsd::metric; +use crate::services::project::state::UpstreamProjectState; use crate::services::project::{ParsedProjectState, ProjectState}; use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers}; @@ -54,16 +55,13 @@ impl RedisProjectSource { /// Fetches a project config from Redis. /// - /// Returns `None` if the the project config stored in Redis has the same `revision`. - /// Always returns a project state if the passed `revision` is `None`. - /// /// The returned project state is [`ProjectState::Pending`] if the requested project config is not /// stored in Redis. pub fn get_config_if_changed( &self, key: ProjectKey, revision: Option<&str>, - ) -> Result, RedisProjectError> { + ) -> Result { let mut client = self.redis.client()?; let mut connection = client.connection()?; @@ -82,7 +80,7 @@ impl RedisProjectSource { counter(RelayCounters::ProjectStateRedis) += 1, hit = "revision", ); - return Ok(None); + return Ok(UpstreamProjectState::NotModified); } } @@ -96,7 +94,7 @@ impl RedisProjectSource { counter(RelayCounters::ProjectStateRedis) += 1, hit = "false" ); - return Ok(Some(ProjectState::Pending)); + return Ok(UpstreamProjectState::New(ProjectState::Pending)); }; let response = ProjectState::from(parse_redis_response(response.as_slice())?); @@ -113,13 +111,13 @@ impl RedisProjectSource { counter(RelayCounters::ProjectStateRedis) += 1, hit = "project_config_revision" ); - Ok(None) + Ok(UpstreamProjectState::NotModified) } else { metric!( counter(RelayCounters::ProjectStateRedis) += 1, hit = "project_config" ); - Ok(Some(response)) + Ok(UpstreamProjectState::New(response)) } } diff --git a/relay-server/src/services/project_upstream.rs b/relay-server/src/services/project_upstream.rs index 3334e98c31..4d7d5338ed 100644 --- a/relay-server/src/services/project_upstream.rs +++ b/relay-server/src/services/project_upstream.rs @@ -1,6 +1,7 @@ use std::borrow::Cow; use std::collections::hash_map::Entry; use std::collections::HashMap; +use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; @@ -17,7 +18,9 @@ use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; use tokio::time::Instant; -use crate::services::project::{ParsedProjectState, ProjectFetchState}; +use crate::services::project::state::UpstreamProjectState; +use crate::services::project::ParsedProjectState; +use crate::services::project::ProjectState; use crate::services::project_cache::FetchProjectState; use crate::services::upstream::{ Method, RequestPriority, SendQuery, UpstreamQuery, UpstreamRelay, UpstreamRequestError, @@ -32,8 +35,18 @@ use crate::utils::{RetryBackoff, SleepHandle}; #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] pub struct GetProjectStates { + /// List of requested project keys. public_keys: Vec, + /// List of revisions for each project key. + /// + /// The revisions are mapped by index to the project key, + /// this is a separate field to keep the API compatible. + revisions: Vec>, + /// If `true` the upstream should return a full configuration. + /// + /// Upstreams will ignore this for non-internal Relays. full_config: bool, + /// If `true` the upstream should not serve from cache. no_cache: bool, } @@ -49,7 +62,13 @@ pub struct GetProjectStatesResponse { configs: HashMap>>, /// The [`ProjectKey`]'s that couldn't be immediately retrieved from the upstream. #[serde(default)] - pending: Vec, + pending: HashSet, + /// The [`ProjectKey`]'s that the upstream has no updates for. + /// + /// List is only populated when the request contains revision information + /// for all requested configurations. + #[serde(default)] + unchanged: HashSet, } impl UpstreamQuery for GetProjectStates { @@ -79,7 +98,8 @@ impl UpstreamQuery for GetProjectStates { /// The wrapper struct for the incoming external requests which also keeps addition information. #[derive(Debug)] struct ProjectStateChannel { - channel: BroadcastChannel, + channel: BroadcastChannel, + revision: Option, deadline: Instant, no_cache: bool, attempts: u64, @@ -91,7 +111,8 @@ struct ProjectStateChannel { impl ProjectStateChannel { pub fn new( - sender: BroadcastSender, + sender: BroadcastSender, + revision: Option, timeout: Duration, no_cache: bool, ) -> Self { @@ -99,6 +120,7 @@ impl ProjectStateChannel { Self { no_cache, channel: sender.into_channel(), + revision, deadline: now + timeout, attempts: 0, errors: 0, @@ -110,11 +132,24 @@ impl ProjectStateChannel { self.no_cache = true; } - pub fn attach(&mut self, sender: BroadcastSender) { - self.channel.attach(sender) + /// Attaches a new sender to the same channel. + /// + /// Also makes sure the new sender's revision matches the already requested revision. + /// If the new revision is different from the contained revision this clears the revision. + /// To not have multiple fetches per revision per batch, we need to find a common denominator + /// for requests with different revisions, which is always to fetch the full project config. + pub fn attach( + &mut self, + sender: BroadcastSender, + revision: Option, + ) { + self.channel.attach(sender); + if self.revision != revision { + self.revision = None; + } } - pub fn send(self, state: ProjectFetchState) { + pub fn send(self, state: UpstreamProjectState) { self.channel.send(state) } @@ -132,16 +167,16 @@ type ProjectStateChannels = HashMap; /// Internally it maintains the buffer queue of the incoming requests, which got scheduled to fetch the /// state and takes care of the backoff in case there is a problem with the requests. #[derive(Debug)] -pub struct UpstreamProjectSource(FetchProjectState, BroadcastSender); +pub struct UpstreamProjectSource(FetchProjectState, BroadcastSender); impl Interface for UpstreamProjectSource {} impl FromMessage for UpstreamProjectSource { - type Response = BroadcastResponse; + type Response = BroadcastResponse; fn from_message( message: FetchProjectState, - sender: BroadcastSender, + sender: BroadcastSender, ) -> Self { Self(message, sender) } @@ -301,6 +336,10 @@ impl UpstreamProjectSourceService { let query = GetProjectStates { public_keys: channels_batch.keys().copied().collect(), + revisions: channels_batch + .values() + .map(|c| c.revision.clone()) + .collect(), full_config: config.processing_enabled() || config.request_full_project_config(), no_cache: channels_batch.values().any(|c| c.no_cache), }; @@ -371,25 +410,34 @@ impl UpstreamProjectSourceService { response.configs.len() as u64 ); for (key, mut channel) in channels_batch { - let mut result = "ok"; if response.pending.contains(&key) { channel.pending += 1; self.state_channels.insert(key, channel); continue; } - let state = response - .configs - .remove(&key) - .unwrap_or(ErrorBoundary::Ok(None)); - let state = match state { - ErrorBoundary::Err(error) => { - result = "invalid"; - let error = &error as &dyn std::error::Error; - relay_log::error!(error, "error fetching project state {key}"); - ProjectFetchState::pending() - } - ErrorBoundary::Ok(None) => ProjectFetchState::disabled(), - ErrorBoundary::Ok(Some(state)) => ProjectFetchState::new(state.into()), + + let mut result = "ok"; + let state = if response.unchanged.contains(&key) { + result = "ok_unchanged"; + UpstreamProjectState::NotModified + } else { + let state = response + .configs + .remove(&key) + .unwrap_or(ErrorBoundary::Ok(None)); + + let state = match state { + ErrorBoundary::Err(error) => { + result = "invalid"; + let error = &error as &dyn std::error::Error; + relay_log::error!(error, "error fetching project state {key}"); + ProjectState::Pending + } + ErrorBoundary::Ok(None) => ProjectState::Disabled, + ErrorBoundary::Ok(Some(state)) => state.into(), + }; + + UpstreamProjectState::New(state) }; metric!( @@ -401,7 +449,7 @@ impl UpstreamProjectSourceService { result = result, ); - channel.send(state.sanitized()); + channel.send(state); } } Err(err) => { @@ -503,6 +551,7 @@ impl UpstreamProjectSourceService { let UpstreamProjectSource( FetchProjectState { project_key, + current_revision, no_cache, }, sender, @@ -514,11 +563,16 @@ impl UpstreamProjectSourceService { // otherwise create a new one. match self.state_channels.entry(project_key) { Entry::Vacant(entry) => { - entry.insert(ProjectStateChannel::new(sender, query_timeout, no_cache)); + entry.insert(ProjectStateChannel::new( + sender, + current_revision, + query_timeout, + no_cache, + )); } Entry::Occupied(mut entry) => { let channel = entry.get_mut(); - channel.attach(sender); + channel.attach(sender, current_revision); // Ensure upstream skips caches if one of the recipients requests an uncached response. This // operation is additive across requests. if no_cache { diff --git a/tests/integration/fixtures/mini_sentry.py b/tests/integration/fixtures/mini_sentry.py index eb16b5bcfc..276704b6c0 100644 --- a/tests/integration/fixtures/mini_sentry.py +++ b/tests/integration/fixtures/mini_sentry.py @@ -47,6 +47,7 @@ def __init__(self, server_address, app): self.fail_on_relay_error = True self.request_log = [] self.project_config_simulate_pending = False + self.project_config_ignore_revision = False @property def internal_error_dsn(self): @@ -327,6 +328,7 @@ def get_project_config(): response = {} configs = {} pending = [] + unchanged = [] global_ = None version = flask_request.args.get("version") @@ -341,7 +343,12 @@ def get_project_config(): configs[project_id] = project_config elif version in ["2", "3", "4"]: - for public_key in flask_request.json["publicKeys"]: + for i, public_key in enumerate(flask_request.json["publicKeys"]): + try: + revision = flask_request.json.get("revisions")[i] + except IndexError: + revision = None + # We store projects by id, but need to return by key for project_config in sentry.project_configs.values(): for key in project_config["publicKeys"]: @@ -354,6 +361,13 @@ def get_project_config(): and sentry.project_config_simulate_pending ): pending.append(public_key) + elif ( + version == "3" + and not sentry.project_config_ignore_revision + and revision is not None + and project_config["rev"] == revision + ): + unchanged.append(public_key) else: # TODO 11 Nov 2020 (RaduW) horrible hack # For some reason returning multiple public keys breaks Relay @@ -367,6 +381,7 @@ def get_project_config(): response["configs"] = configs response["pending"] = pending + response["unchanged"] = unchanged if global_ is not None: response["global"] = global_ diff --git a/tests/integration/test_projectconfigs.py b/tests/integration/test_projectconfigs.py index 0c4f6735e4..1d040c8d9b 100644 --- a/tests/integration/test_projectconfigs.py +++ b/tests/integration/test_projectconfigs.py @@ -197,7 +197,7 @@ def request_config(): if data["configs"]: break else: - print("Relay did still not receive a project config from minisentry") + assert False, "Relay did still not receive a project config from minisentry" assert public_key in data["configs"] assert data.get("pending") is None @@ -302,6 +302,8 @@ def assert_clear_test_failures(): def test_cached_project_config(mini_sentry, relay): + mini_sentry.project_config_ignore_revision = True + project_key = 42 relay_config = { "cache": {"project_expiry": 2, "project_grace_period": 5, "miss_expiry": 2} @@ -397,3 +399,44 @@ def test_compression(mini_sentry, relay): assert response.ok assert response.headers["content-encoding"] == "gzip" assert public_key in response.json()["configs"] + + +def test_unchanged_projects(mini_sentry, relay): + relay = relay(mini_sentry) + cfg = mini_sentry.add_basic_project_config(42) + cfg["rev"] = "123" + public_key = mini_sentry.get_dsn_public_key(42) + + body = {"publicKeys": [public_key], "revisions": ["123"]} + packed, signature = SecretKey.parse(relay.secret_key).pack(body) + + def request_config(): + return relay.post( + "/api/0/relays/projectconfigs/?version=3", + data=packed, + headers={ + "X-Sentry-Relay-Id": relay.relay_id, + "X-Sentry-Relay-Signature": signature, + }, + ) + + response = request_config() + + assert response.ok + data = response.json() + assert public_key in data["pending"] + assert public_key not in data.get("unchanged", []) + + deadline = time.monotonic() + 15 + while time.monotonic() <= deadline: + response = request_config() + assert response.ok + data = response.json() + if data.get("unchanged"): + break + else: + assert False, "Relay did still not receive a project config from minisentry" + + assert public_key in data["unchanged"] + assert public_key not in data["configs"] + assert data.get("pending") is None diff --git a/tests/integration/test_query.py b/tests/integration/test_query.py index 417b63c759..f345904364 100644 --- a/tests/integration/test_query.py +++ b/tests/integration/test_query.py @@ -294,3 +294,55 @@ def test_processing_redis_query_with_revision( relay.send_event(project_id) event, _ = events_consumer.get_event() assert event["logentry"] == {"formatted": "Hello, World!"} + + +@pytest.mark.parametrize("with_revision_support", [True, False]) +def test_project_fetch_revision(mini_sentry, relay, with_revision_support): + project_config_fetch = threading.Event() + with_rev = threading.Event() + + get_project_config_original = mini_sentry.app.view_functions["get_project_config"] + + @mini_sentry.app.endpoint("get_project_config") + def get_project_config(): + if not flask_request.json.get("global"): + project_config_fetch.set() + if flask_request.json.get("revisions") == ["123"]: + with_rev.set() + + return get_project_config_original() + + mini_sentry.project_config_ignore_revision = not with_revision_support + config = mini_sentry.add_basic_project_config(42) + config["rev"] = "123" + + relay = relay( + mini_sentry, + { + "cache": { + "miss_expiry": 1, + "project_expiry": 1, + "project_grace_period": 0, + } + }, + ) + + relay.send_event(42) + + assert project_config_fetch.wait(timeout=1) + project_config_fetch.clear() + assert not with_rev.is_set() + + event = mini_sentry.captured_events.get(timeout=1).get_event() + assert event["logentry"] == {"formatted": "Hello, World!"} + + # Wait for project config to expire. + time.sleep(2) + + relay.send_event(42) + + assert project_config_fetch.wait(timeout=1) + assert with_rev.wait(timeout=1) + + event = mini_sentry.captured_events.get(timeout=1).get_event() + assert event["logentry"] == {"formatted": "Hello, World!"}