Skip to content

Commit

Permalink
ref(server): Simplify global config subscription (#4004)
Browse files Browse the repository at this point in the history
Remove the extra async message by returning the service's watch handle
directly from `GlobalConfigService`'s constructor.

This prepares for #3989, which
subscribes the envelope buffer service to global config.
  • Loading branch information
jjbayer authored Sep 9, 2024
1 parent 559323c commit 8cda2aa
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 64 deletions.
5 changes: 3 additions & 2 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ impl ServiceState {
.start();
let outcome_aggregator = OutcomeAggregator::new(&config, outcome_producer.clone()).start();

let global_config = GlobalConfigService::new(config.clone(), upstream_relay.clone());
let (global_config, global_config_rx) =
GlobalConfigService::new(config.clone(), upstream_relay.clone());
let global_config_handle = global_config.handle();
// The global config service must start before dependant services are
// started. Messages like subscription requests to the global config
Expand Down Expand Up @@ -256,13 +257,13 @@ impl ServiceState {
project_cache: project_cache.clone(),
test_store: test_store.clone(),
upstream_relay: upstream_relay.clone(),
global_config: global_config.clone(),
};

ProjectCacheService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
project_cache_services,
global_config_rx,
redis_pools
.as_ref()
.map(|pools| pools.project_configs.clone()),
Expand Down
76 changes: 30 additions & 46 deletions relay-server/src/services/global_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,27 +104,14 @@ impl UpstreamQuery for GetGlobalConfig {
/// The message for requesting the most recent global config from [`GlobalConfigService`].
pub struct Get;

/// The message for receiving a watch that subscribes to the [`GlobalConfigService`].
///
/// The global config service must be up and running, else the subscription
/// fails. Subscribers should use the initial value when they get the watch
/// rather than only waiting for the watch to update, in case a global config
/// is only updated once, such as is the case with the static config file.
pub struct Subscribe;

/// An interface to get [`GlobalConfig`]s through [`GlobalConfigService`].
///
/// For a one-off update, [`GlobalConfigService`] responds to
/// [`GlobalConfigManager::Get`] messages with the latest instance of the
/// [`GlobalConfig`]. For continued updates, you can subscribe with
/// [`GlobalConfigManager::Subscribe`] to get a receiver back where up-to-date
/// instances will be sent to, while [`GlobalConfigService`] manages the update
/// frequency from upstream.
/// [`GlobalConfig`].
pub enum GlobalConfigManager {
/// Returns the most recent global config.
Get(relay_system::Sender<Status>),
/// Returns a [`watch::Receiver`] where global config updates will be sent to.
Subscribe(relay_system::Sender<watch::Receiver<Status>>),
}

impl Interface for GlobalConfigManager {}
Expand All @@ -137,14 +124,6 @@ impl FromMessage<Get> for GlobalConfigManager {
}
}

impl FromMessage<Subscribe> for GlobalConfigManager {
type Response = AsyncResponse<watch::Receiver<Status>>;

fn from_message(_: Subscribe, sender: relay_system::Sender<watch::Receiver<Status>>) -> Self {
Self::Subscribe(sender)
}
}

/// Describes the current fetching status of the [`GlobalConfig`] from the upstream.
#[derive(Debug, Clone, Default)]
pub enum Status {
Expand Down Expand Up @@ -201,10 +180,6 @@ impl fmt::Debug for GlobalConfigHandle {
}

/// Service implementing the [`GlobalConfigManager`] interface.
///
/// The service offers two alternatives to fetch the [`GlobalConfig`]:
/// responding to a [`Get`] message with the config for one-off requests, or
/// subscribing to updates with [`Subscribe`] to keep up-to-date.
#[derive(Debug)]
pub struct GlobalConfigService {
config: Arc<Config>,
Expand All @@ -228,21 +203,27 @@ pub struct GlobalConfigService {

impl GlobalConfigService {
/// Creates a new [`GlobalConfigService`].
pub fn new(config: Arc<Config>, upstream: Addr<UpstreamRelay>) -> Self {
pub fn new(
config: Arc<Config>,
upstream: Addr<UpstreamRelay>,
) -> (Self, watch::Receiver<Status>) {
let (internal_tx, internal_rx) = mpsc::channel(1);
let (global_config_watch, _) = watch::channel(Status::Pending);

Self {
config,
global_config_watch,
internal_tx,
internal_rx,
upstream,
fetch_handle: SleepHandle::idle(),
last_fetched: Instant::now(),
upstream_failure_interval: Duration::from_secs(35),
shutdown: false,
}
let (global_config_watch, rx) = watch::channel(Status::Pending);

(
Self {
config,
global_config_watch,
internal_tx,
internal_rx,
upstream,
fetch_handle: SleepHandle::idle(),
last_fetched: Instant::now(),
upstream_failure_interval: Duration::from_secs(35),
shutdown: false,
},
rx,
)
}

/// Creates a [`GlobalConfigHandle`] which can be used to retrieve the current state
Expand All @@ -259,9 +240,6 @@ impl GlobalConfigService {
GlobalConfigManager::Get(sender) => {
sender.send(self.global_config_watch.borrow().clone());
}
GlobalConfigManager::Subscribe(sender) => {
sender.send(self.global_config_watch.subscribe());
}
}
}

Expand Down Expand Up @@ -440,7 +418,9 @@ mod tests {
config.regenerate_credentials(false).unwrap();
let fetch_interval = config.global_config_fetch_interval();

let service = GlobalConfigService::new(Arc::new(config), upstream).start();
let service = GlobalConfigService::new(Arc::new(config), upstream)
.0
.start();

assert!(service.send(Get).await.is_ok());

Expand Down Expand Up @@ -469,7 +449,9 @@ mod tests {
config.regenerate_credentials(false).unwrap();

let fetch_interval = config.global_config_fetch_interval();
let service = GlobalConfigService::new(Arc::new(config), upstream).start();
let service = GlobalConfigService::new(Arc::new(config), upstream)
.0
.start();
service.send(Get).await.unwrap();

tokio::time::sleep(fetch_interval * 2).await;
Expand All @@ -494,7 +476,9 @@ mod tests {

let fetch_interval = config.global_config_fetch_interval();

let service = GlobalConfigService::new(Arc::new(config), upstream).start();
let service = GlobalConfigService::new(Arc::new(config), upstream)
.0
.start();
service.send(Get).await.unwrap();

tokio::time::sleep(fetch_interval * 2).await;
Expand Down
25 changes: 9 additions & 16 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time::Duration;

use crate::extractors::RequestMeta;
use crate::services::buffer::{EnvelopeBuffer, EnvelopeBufferError};
use crate::services::global_config;
use crate::services::processor::{
EncodeMetrics, EnvelopeProcessor, MetricData, ProcessEnvelope, ProcessingGroup, ProjectMetrics,
};
Expand All @@ -19,12 +20,11 @@ use relay_quotas::RateLimits;
use relay_redis::RedisPool;
use relay_statsd::metric;
use relay_system::{Addr, FromMessage, Interface, Sender, Service};
use tokio::sync::mpsc;
#[cfg(feature = "processing")]
use tokio::sync::Semaphore;
use tokio::sync::{mpsc, watch};
use tokio::time::Instant;

use crate::services::global_config::{self, GlobalConfigManager, Subscribe};
use crate::services::metrics::{Aggregator, FlushBuckets};
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::services::project::{Project, ProjectFetchState, ProjectSender, ProjectState};
Expand Down Expand Up @@ -585,7 +585,6 @@ pub struct Services {
pub project_cache: Addr<ProjectCache>,
pub test_store: Addr<TestStore>,
pub upstream_relay: Addr<UpstreamRelay>,
pub global_config: Addr<GlobalConfigManager>,
}

/// Main broker of the [`ProjectCacheService`].
Expand Down Expand Up @@ -1345,6 +1344,7 @@ pub struct ProjectCacheService {
config: Arc<Config>,
memory_checker: MemoryChecker,
services: Services,
global_config_rx: watch::Receiver<global_config::Status>,
redis: Option<RedisPool>,
}

Expand All @@ -1354,12 +1354,14 @@ impl ProjectCacheService {
config: Arc<Config>,
memory_checker: MemoryChecker,
services: Services,
global_config_rx: watch::Receiver<global_config::Status>,
redis: Option<RedisPool>,
) -> Self {
Self {
config,
memory_checker,
services,
global_config_rx,
redis,
}
}
Expand All @@ -1373,6 +1375,7 @@ impl Service for ProjectCacheService {
config,
memory_checker,
services,
mut global_config_rx,
redis,
} = self;
let project_cache = services.project_cache.clone();
Expand All @@ -1386,15 +1389,7 @@ impl Service for ProjectCacheService {
// Channel for async project state responses back into the project cache.
let (state_tx, mut state_rx) = mpsc::unbounded_channel();

let Ok(mut subscription) = services.global_config.send(Subscribe).await else {
// TODO(iker): we accept this sub-optimal error handling. TBD
// the approach to deal with failures on the subscription
// mechanism.
relay_log::error!("failed to subscribe to GlobalConfigService");
return;
};

let global_config = match subscription.borrow().clone() {
let global_config = match global_config_rx.borrow().clone() {
global_config::Status::Ready(_) => {
relay_log::info!("global config received");
GlobalConfigStatus::Ready
Expand Down Expand Up @@ -1469,9 +1464,9 @@ impl Service for ProjectCacheService {
tokio::select! {
biased;

Ok(()) = subscription.changed() => {
Ok(()) = global_config_rx.changed() => {
metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "update_global_config", {
match subscription.borrow().clone() {
match global_config_rx.borrow().clone() {
global_config::Status::Ready(_) => broker.set_global_config_ready(),
// The watch should only be updated if it gets a new value.
// This would imply a logical bug.
Expand Down Expand Up @@ -1591,7 +1586,6 @@ mod tests {
let (project_cache, _) = mock_service("project_cache", (), |&mut (), _| {});
let (test_store, _) = mock_service("test_store", (), |&mut (), _| {});
let (upstream_relay, _) = mock_service("upstream_relay", (), |&mut (), _| {});
let (global_config, _) = mock_service("global_config", (), |&mut (), _| {});

Services {
envelope_buffer: None,
Expand All @@ -1601,7 +1595,6 @@ mod tests {
outcome_aggregator,
test_store,
upstream_relay,
global_config,
}
}

Expand Down

0 comments on commit 8cda2aa

Please sign in to comment.