diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index a214b23837..41d6eb8cd6 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -177,7 +177,8 @@ impl ServiceState { .start(); let outcome_aggregator = OutcomeAggregator::new(&config, outcome_producer.clone()).start(); - let global_config = GlobalConfigService::new(config.clone(), upstream_relay.clone()); + let (global_config, global_config_rx) = + GlobalConfigService::new(config.clone(), upstream_relay.clone()); let global_config_handle = global_config.handle(); // The global config service must start before dependant services are // started. Messages like subscription requests to the global config @@ -256,13 +257,13 @@ impl ServiceState { project_cache: project_cache.clone(), test_store: test_store.clone(), upstream_relay: upstream_relay.clone(), - global_config: global_config.clone(), }; ProjectCacheService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), project_cache_services, + global_config_rx, redis_pools .as_ref() .map(|pools| pools.project_configs.clone()), diff --git a/relay-server/src/services/global_config.rs b/relay-server/src/services/global_config.rs index 4f438967a6..346c8c9cd7 100644 --- a/relay-server/src/services/global_config.rs +++ b/relay-server/src/services/global_config.rs @@ -104,27 +104,14 @@ impl UpstreamQuery for GetGlobalConfig { /// The message for requesting the most recent global config from [`GlobalConfigService`]. pub struct Get; -/// The message for receiving a watch that subscribes to the [`GlobalConfigService`]. -/// -/// The global config service must be up and running, else the subscription -/// fails. Subscribers should use the initial value when they get the watch -/// rather than only waiting for the watch to update, in case a global config -/// is only updated once, such as is the case with the static config file. -pub struct Subscribe; - /// An interface to get [`GlobalConfig`]s through [`GlobalConfigService`]. /// /// For a one-off update, [`GlobalConfigService`] responds to /// [`GlobalConfigManager::Get`] messages with the latest instance of the -/// [`GlobalConfig`]. For continued updates, you can subscribe with -/// [`GlobalConfigManager::Subscribe`] to get a receiver back where up-to-date -/// instances will be sent to, while [`GlobalConfigService`] manages the update -/// frequency from upstream. +/// [`GlobalConfig`]. pub enum GlobalConfigManager { /// Returns the most recent global config. Get(relay_system::Sender), - /// Returns a [`watch::Receiver`] where global config updates will be sent to. - Subscribe(relay_system::Sender>), } impl Interface for GlobalConfigManager {} @@ -137,14 +124,6 @@ impl FromMessage for GlobalConfigManager { } } -impl FromMessage for GlobalConfigManager { - type Response = AsyncResponse>; - - fn from_message(_: Subscribe, sender: relay_system::Sender>) -> Self { - Self::Subscribe(sender) - } -} - /// Describes the current fetching status of the [`GlobalConfig`] from the upstream. #[derive(Debug, Clone, Default)] pub enum Status { @@ -201,10 +180,6 @@ impl fmt::Debug for GlobalConfigHandle { } /// Service implementing the [`GlobalConfigManager`] interface. -/// -/// The service offers two alternatives to fetch the [`GlobalConfig`]: -/// responding to a [`Get`] message with the config for one-off requests, or -/// subscribing to updates with [`Subscribe`] to keep up-to-date. #[derive(Debug)] pub struct GlobalConfigService { config: Arc, @@ -228,21 +203,27 @@ pub struct GlobalConfigService { impl GlobalConfigService { /// Creates a new [`GlobalConfigService`]. - pub fn new(config: Arc, upstream: Addr) -> Self { + pub fn new( + config: Arc, + upstream: Addr, + ) -> (Self, watch::Receiver) { let (internal_tx, internal_rx) = mpsc::channel(1); - let (global_config_watch, _) = watch::channel(Status::Pending); - - Self { - config, - global_config_watch, - internal_tx, - internal_rx, - upstream, - fetch_handle: SleepHandle::idle(), - last_fetched: Instant::now(), - upstream_failure_interval: Duration::from_secs(35), - shutdown: false, - } + let (global_config_watch, rx) = watch::channel(Status::Pending); + + ( + Self { + config, + global_config_watch, + internal_tx, + internal_rx, + upstream, + fetch_handle: SleepHandle::idle(), + last_fetched: Instant::now(), + upstream_failure_interval: Duration::from_secs(35), + shutdown: false, + }, + rx, + ) } /// Creates a [`GlobalConfigHandle`] which can be used to retrieve the current state @@ -259,9 +240,6 @@ impl GlobalConfigService { GlobalConfigManager::Get(sender) => { sender.send(self.global_config_watch.borrow().clone()); } - GlobalConfigManager::Subscribe(sender) => { - sender.send(self.global_config_watch.subscribe()); - } } } @@ -440,7 +418,9 @@ mod tests { config.regenerate_credentials(false).unwrap(); let fetch_interval = config.global_config_fetch_interval(); - let service = GlobalConfigService::new(Arc::new(config), upstream).start(); + let service = GlobalConfigService::new(Arc::new(config), upstream) + .0 + .start(); assert!(service.send(Get).await.is_ok()); @@ -469,7 +449,9 @@ mod tests { config.regenerate_credentials(false).unwrap(); let fetch_interval = config.global_config_fetch_interval(); - let service = GlobalConfigService::new(Arc::new(config), upstream).start(); + let service = GlobalConfigService::new(Arc::new(config), upstream) + .0 + .start(); service.send(Get).await.unwrap(); tokio::time::sleep(fetch_interval * 2).await; @@ -494,7 +476,9 @@ mod tests { let fetch_interval = config.global_config_fetch_interval(); - let service = GlobalConfigService::new(Arc::new(config), upstream).start(); + let service = GlobalConfigService::new(Arc::new(config), upstream) + .0 + .start(); service.send(Get).await.unwrap(); tokio::time::sleep(fetch_interval * 2).await; diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 507833ce78..ef2f3f813f 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -5,6 +5,7 @@ use std::time::Duration; use crate::extractors::RequestMeta; use crate::services::buffer::{EnvelopeBuffer, EnvelopeBufferError}; +use crate::services::global_config; use crate::services::processor::{ EncodeMetrics, EnvelopeProcessor, MetricData, ProcessEnvelope, ProcessingGroup, ProjectMetrics, }; @@ -19,12 +20,11 @@ use relay_quotas::RateLimits; use relay_redis::RedisPool; use relay_statsd::metric; use relay_system::{Addr, FromMessage, Interface, Sender, Service}; -use tokio::sync::mpsc; #[cfg(feature = "processing")] use tokio::sync::Semaphore; +use tokio::sync::{mpsc, watch}; use tokio::time::Instant; -use crate::services::global_config::{self, GlobalConfigManager, Subscribe}; use crate::services::metrics::{Aggregator, FlushBuckets}; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::services::project::{Project, ProjectFetchState, ProjectSender, ProjectState}; @@ -585,7 +585,6 @@ pub struct Services { pub project_cache: Addr, pub test_store: Addr, pub upstream_relay: Addr, - pub global_config: Addr, } /// Main broker of the [`ProjectCacheService`]. @@ -1345,6 +1344,7 @@ pub struct ProjectCacheService { config: Arc, memory_checker: MemoryChecker, services: Services, + global_config_rx: watch::Receiver, redis: Option, } @@ -1354,12 +1354,14 @@ impl ProjectCacheService { config: Arc, memory_checker: MemoryChecker, services: Services, + global_config_rx: watch::Receiver, redis: Option, ) -> Self { Self { config, memory_checker, services, + global_config_rx, redis, } } @@ -1373,6 +1375,7 @@ impl Service for ProjectCacheService { config, memory_checker, services, + mut global_config_rx, redis, } = self; let project_cache = services.project_cache.clone(); @@ -1386,15 +1389,7 @@ impl Service for ProjectCacheService { // Channel for async project state responses back into the project cache. let (state_tx, mut state_rx) = mpsc::unbounded_channel(); - let Ok(mut subscription) = services.global_config.send(Subscribe).await else { - // TODO(iker): we accept this sub-optimal error handling. TBD - // the approach to deal with failures on the subscription - // mechanism. - relay_log::error!("failed to subscribe to GlobalConfigService"); - return; - }; - - let global_config = match subscription.borrow().clone() { + let global_config = match global_config_rx.borrow().clone() { global_config::Status::Ready(_) => { relay_log::info!("global config received"); GlobalConfigStatus::Ready @@ -1469,9 +1464,9 @@ impl Service for ProjectCacheService { tokio::select! { biased; - Ok(()) = subscription.changed() => { + Ok(()) = global_config_rx.changed() => { metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "update_global_config", { - match subscription.borrow().clone() { + match global_config_rx.borrow().clone() { global_config::Status::Ready(_) => broker.set_global_config_ready(), // The watch should only be updated if it gets a new value. // This would imply a logical bug. @@ -1591,7 +1586,6 @@ mod tests { let (project_cache, _) = mock_service("project_cache", (), |&mut (), _| {}); let (test_store, _) = mock_service("test_store", (), |&mut (), _| {}); let (upstream_relay, _) = mock_service("upstream_relay", (), |&mut (), _| {}); - let (global_config, _) = mock_service("global_config", (), |&mut (), _| {}); Services { envelope_buffer: None, @@ -1601,7 +1595,6 @@ mod tests { outcome_aggregator, test_store, upstream_relay, - global_config, } }