From 8abf61f3ee991afd2c26bdaa858bb98e052d3d23 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Wed, 24 Jul 2024 13:59:11 +0200 Subject: [PATCH] replace all uses --- relay-server/src/services/cogs.rs | 2 +- relay-server/src/services/global_config.rs | 4 ++-- relay-server/src/services/health_check.rs | 4 ++-- relay-server/src/services/metrics/aggregator.rs | 4 ++-- relay-server/src/services/metrics/router.rs | 4 ++-- relay-server/src/services/outcome.rs | 8 ++++---- relay-server/src/services/outcome_aggregator.rs | 2 +- relay-server/src/services/processor.rs | 2 +- relay-server/src/services/project_cache.rs | 6 +++--- relay-server/src/services/project_local.rs | 4 ++-- relay-server/src/services/project_upstream.rs | 4 ++-- relay-server/src/services/relays.rs | 4 ++-- relay-server/src/services/server.rs | 4 ++-- relay-server/src/services/spooler/mod.rs | 6 +++--- relay-server/src/services/stats.rs | 2 +- relay-server/src/services/store.rs | 2 +- relay-server/src/services/test_store.rs | 2 +- relay-server/src/services/upstream.rs | 8 ++++---- relay-system/src/controller.rs | 5 +++-- relay-system/src/runtime.rs | 1 + relay-system/src/service.rs | 4 ++-- relay-test/src/lib.rs | 2 +- 22 files changed, 43 insertions(+), 41 deletions(-) diff --git a/relay-server/src/services/cogs.rs b/relay-server/src/services/cogs.rs index acbeb790ab..453e3de0a3 100644 --- a/relay-server/src/services/cogs.rs +++ b/relay-server/src/services/cogs.rs @@ -106,7 +106,7 @@ impl Service for CogsService { type Interface = CogsReport; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { let mut shutdown = Controller::shutdown_handle(); loop { diff --git a/relay-server/src/services/global_config.rs b/relay-server/src/services/global_config.rs index bc08352626..a194fc9b04 100644 --- a/relay-server/src/services/global_config.rs +++ b/relay-server/src/services/global_config.rs @@ -284,7 +284,7 @@ impl GlobalConfigService { let upstream_relay = self.upstream.clone(); let internal_tx = self.internal_tx.clone(); - tokio::spawn(async move { + relay_system::spawn(async move { metric!(timer(RelayTimers::GlobalConfigRequestDuration), { let query = GetGlobalConfig::new(); let res = upstream_relay.send(SendQuery(query)).await; @@ -360,7 +360,7 @@ impl Service for GlobalConfigService { type Interface = GlobalConfigManager; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { let mut shutdown_handle = Controller::shutdown_handle(); relay_log::info!("global config service starting"); diff --git a/relay-server/src/services/health_check.rs b/relay-server/src/services/health_check.rs index a32750297c..ce8c778cbf 100644 --- a/relay-server/src/services/health_check.rs +++ b/relay-server/src/services/health_check.rs @@ -198,7 +198,7 @@ impl Service for HealthCheckService { // Add 10% buffer to the internal timeouts to avoid race conditions. let status_timeout = (check_interval + self.config.health_probe_timeout()).mul_f64(1.1); - tokio::spawn(async move { + relay_system::spawn(async move { let shutdown = Controller::shutdown_handle(); while shutdown.get().is_none() { @@ -215,7 +215,7 @@ impl Service for HealthCheckService { update_tx.send(StatusUpdate::new(Status::Unhealthy)).ok(); }); - tokio::spawn(async move { + relay_system::spawn(async move { while let Some(HealthCheck(message, sender)) = rx.recv().await { let update = update_rx.borrow(); diff --git a/relay-server/src/services/metrics/aggregator.rs b/relay-server/src/services/metrics/aggregator.rs index e76bf78bda..378f1b04c3 100644 --- a/relay-server/src/services/metrics/aggregator.rs +++ b/relay-server/src/services/metrics/aggregator.rs @@ -229,7 +229,7 @@ impl Service for AggregatorService { type Interface = Aggregator; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { let mut ticker = tokio::time::interval(Duration::from_millis(self.flush_interval_ms)); let mut shutdown = Controller::shutdown_handle(); @@ -331,7 +331,7 @@ mod tests { type Interface = TestInterface; fn spawn_handler(self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { while let Some(message) = rx.recv().await { let buckets = message.0.buckets; relay_log::debug!(?buckets, "received buckets"); diff --git a/relay-server/src/services/metrics/router.rs b/relay-server/src/services/metrics/router.rs index 76b063e752..13072c5c88 100644 --- a/relay-server/src/services/metrics/router.rs +++ b/relay-server/src/services/metrics/router.rs @@ -44,7 +44,7 @@ impl Service for RouterService { type Interface = Aggregator; fn spawn_handler(self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { let mut router = StartedRouter::start(self); relay_log::info!("metrics router started"); @@ -117,7 +117,7 @@ impl StartedRouter { .chain(Some(self.default.send(AcceptsMetrics))) .collect::>(); - tokio::spawn(async move { + relay_system::spawn(async move { let mut accepts = true; while let Some(req) = requests.next().await { accepts &= req.unwrap_or_default(); diff --git a/relay-server/src/services/outcome.rs b/relay-server/src/services/outcome.rs index 55a4a8b73a..d48a696474 100644 --- a/relay-server/src/services/outcome.rs +++ b/relay-server/src/services/outcome.rs @@ -657,7 +657,7 @@ impl HttpOutcomeProducer { let upstream_relay = self.upstream_relay.clone(); - tokio::spawn(async move { + relay_system::spawn(async move { match upstream_relay.send(SendQuery(request)).await { Ok(_) => relay_log::trace!("outcome batch sent"), Err(error) => { @@ -683,7 +683,7 @@ impl Service for HttpOutcomeProducer { type Interface = TrackRawOutcome; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { loop { tokio::select! { // Prioritize flush over receiving messages to prevent starving. @@ -776,7 +776,7 @@ impl Service for ClientReportOutcomeProducer { type Interface = TrackOutcome; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { loop { tokio::select! { // Prioritize flush over receiving messages to prevent starving. @@ -1037,7 +1037,7 @@ impl Service for OutcomeProducerService { fn spawn_handler(self, mut rx: relay_system::Receiver) { let Self { config, inner } = self; - tokio::spawn(async move { + relay_system::spawn(async move { let broker = inner.start(); relay_log::info!("OutcomeProducer started."); diff --git a/relay-server/src/services/outcome_aggregator.rs b/relay-server/src/services/outcome_aggregator.rs index 0a13cbf361..8728255d4a 100644 --- a/relay-server/src/services/outcome_aggregator.rs +++ b/relay-server/src/services/outcome_aggregator.rs @@ -139,7 +139,7 @@ impl Service for OutcomeAggregator { type Interface = TrackOutcome; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { let mut shutdown = Controller::shutdown_handle(); relay_log::info!("outcome aggregator started"); diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 99de389281..8006fdf691 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -2814,7 +2814,7 @@ impl Service for EnvelopeProcessorService { type Interface = EnvelopeProcessor; fn spawn_handler(self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { while let Some(message) = rx.recv().await { let service = self.clone(); self.inner diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 910606b265..35189de481 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -724,7 +724,7 @@ impl ProjectCacheBroker { let source = self.source.clone(); let sender = self.state_tx.clone(); - tokio::spawn(async move { + relay_system::spawn(async move { // Wait on the new attempt time when set. if let Some(next_attempt) = next_attempt { tokio::time::sleep_until(next_attempt).await; @@ -1148,7 +1148,7 @@ impl Service for ProjectCacheService { let outcome_aggregator = services.outcome_aggregator.clone(); let test_store = services.test_store.clone(); - tokio::spawn(async move { + relay_system::spawn(async move { let mut ticker = tokio::time::interval(config.cache_eviction_interval()); relay_log::info!("project cache started"); @@ -1429,7 +1429,7 @@ mod tests { } // Emulate the project cache service loop. - tokio::task::spawn(async move { + relay_system::spawn(async move { loop { select! { diff --git a/relay-server/src/services/project_local.rs b/relay-server/src/services/project_local.rs index 2307462b3c..e21b13130a 100644 --- a/relay-server/src/services/project_local.rs +++ b/relay-server/src/services/project_local.rs @@ -146,7 +146,7 @@ async fn spawn_poll_local_states( poll_local_states(&project_path, &tx).await; // Start a background loop that polls periodically: - tokio::spawn(async move { + relay_system::spawn(async move { // To avoid running two load tasks simultaneously at startup, we delay the interval by one period: let start_at = Instant::now() + period; let mut ticker = tokio::time::interval_at(start_at, period); @@ -166,7 +166,7 @@ impl Service for LocalProjectSourceService { // collect the result, the producer will block, which is acceptable. let (state_tx, mut state_rx) = mpsc::channel(1); - tokio::spawn(async move { + relay_system::spawn(async move { relay_log::info!("project local cache started"); // Start the background task that periodically reloads projects from disk: diff --git a/relay-server/src/services/project_upstream.rs b/relay-server/src/services/project_upstream.rs index 668d3196ae..1f3d22ade3 100644 --- a/relay-server/src/services/project_upstream.rs +++ b/relay-server/src/services/project_upstream.rs @@ -481,7 +481,7 @@ impl UpstreamProjectSourceService { let channels = self.prepare_batches(); let upstream_relay = self.upstream_relay.clone(); - tokio::spawn(async move { + relay_system::spawn(async move { let responses = Self::fetch_states(config, upstream_relay, channels).await; // Send back all resolved responses and also unused channels. // These responses will be handled by `handle_responses` function. @@ -532,7 +532,7 @@ impl Service for UpstreamProjectSourceService { type Interface = UpstreamProjectSource; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { relay_log::info!("project upstream cache started"); loop { tokio::select! { diff --git a/relay-server/src/services/relays.rs b/relay-server/src/services/relays.rs index 922b1c7dda..5f2d2c37eb 100644 --- a/relay-server/src/services/relays.rs +++ b/relay-server/src/services/relays.rs @@ -238,7 +238,7 @@ impl RelayCacheService { let fetch_tx = self.fetch_tx(); let upstream_relay = self.upstream_relay.clone(); - tokio::spawn(async move { + relay_system::spawn(async move { let request = GetRelays { relay_ids: channels.keys().cloned().collect(), }; @@ -335,7 +335,7 @@ impl Service for RelayCacheService { type Interface = RelayCache; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { relay_log::info!("key cache started"); loop { diff --git a/relay-server/src/services/server.rs b/relay-server/src/services/server.rs index 32a7c532c1..b4ae639b3b 100644 --- a/relay-server/src/services/server.rs +++ b/relay-server/src/services/server.rs @@ -135,7 +135,7 @@ impl Service for HttpServer { relay_log::info!("spawning http server"); relay_log::info!(" listening on http://{}/", config.listen_addr()); relay_statsd::metric!(counter(RelayCounters::ServerStarting) += 1); - tokio::spawn(server.serve(app)); + relay_system::spawn(server.serve(app)); } Err(err) => { relay_log::error!("Failed to start the HTTP server: {err}"); @@ -143,7 +143,7 @@ impl Service for HttpServer { } } - tokio::spawn(async move { + relay_system::spawn(async move { let Shutdown { timeout } = Controller::shutdown_handle().notified().await; relay_log::info!("Shutting down HTTP server"); diff --git a/relay-server/src/services/spooler/mod.rs b/relay-server/src/services/spooler/mod.rs index bce1b33ad5..b99c178117 100644 --- a/relay-server/src/services/spooler/mod.rs +++ b/relay-server/src/services/spooler/mod.rs @@ -1184,7 +1184,7 @@ impl BufferService { BufferState::Disk(ref disk) => { let db = disk.db.clone(); let project_cache = self.services.project_cache.clone(); - tokio::spawn(async move { + relay_system::spawn(async move { match OnDisk::get_spooled_index(&db).await { Ok(index) => { relay_log::trace!( @@ -1255,7 +1255,7 @@ impl Service for BufferService { type Interface = Buffer; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { let mut shutdown = Controller::shutdown_handle(); loop { @@ -1574,7 +1574,7 @@ mod tests { type Interface = TestHealth; fn spawn_handler(self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { loop { tokio::select! { Some(TestHealth::SpoolHealth(sender)) = rx.recv() => self.buffer.send(Health(sender)), diff --git a/relay-server/src/services/stats.rs b/relay-server/src/services/stats.rs index a2a1a960d7..2babe2bc10 100644 --- a/relay-server/src/services/stats.rs +++ b/relay-server/src/services/stats.rs @@ -123,7 +123,7 @@ impl Service for RelayStats { return; }; - tokio::spawn(async move { + relay_system::spawn(async move { loop { let _ = tokio::join!( self.upstream_status(), diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 1e7d3ec2c4..c2cdf2b0d9 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -1088,7 +1088,7 @@ impl Service for StoreService { fn spawn_handler(self, mut rx: relay_system::Receiver) { let this = Arc::new(self); - tokio::spawn(async move { + relay_system::spawn(async move { relay_log::info!("store forwarder started"); while let Some(message) = rx.recv().await { diff --git a/relay-server/src/services/test_store.rs b/relay-server/src/services/test_store.rs index c61621fc0c..589c8c8c05 100644 --- a/relay-server/src/services/test_store.rs +++ b/relay-server/src/services/test_store.rs @@ -135,7 +135,7 @@ impl relay_system::Service for TestStoreService { type Interface = TestStore; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { while let Some(message) = rx.recv().await { self.handle_message(message); } diff --git a/relay-server/src/services/upstream.rs b/relay-server/src/services/upstream.rs index b2e53e4b29..86efe021a7 100644 --- a/relay-server/src/services/upstream.rs +++ b/relay-server/src/services/upstream.rs @@ -1344,7 +1344,7 @@ impl ConnectionMonitor { // Only take action if we exceeded the grace period. if first_error + self.client.config.http_outage_grace_period() <= now { let return_tx = return_tx.clone(); - let task = tokio::spawn(Self::connect(self.client.clone(), return_tx)); + let task = relay_system::spawn(Self::connect(self.client.clone(), return_tx)); self.state = ConnectionState::Reconnecting(task); } } @@ -1429,7 +1429,7 @@ impl UpstreamBroker { let client = self.client.clone(); let action_tx = self.action_tx.clone(); - tokio::spawn(async move { + relay_system::spawn(async move { let send_start = Instant::now(); let result = client.send(entry.request.as_mut()).await; emit_response_metrics(send_start, &entry, &result); @@ -1512,7 +1512,7 @@ impl Service for UpstreamRelayService { state: AuthState::Unknown, tx: action_tx.clone(), }; - tokio::spawn(auth.run()); + relay_system::spawn(auth.run()); // Main broker that serializes public and internal messages, as well as maintains connection // and authentication state. @@ -1525,7 +1525,7 @@ impl Service for UpstreamRelayService { action_tx, }; - tokio::spawn(async move { + relay_system::spawn(async move { loop { tokio::select! { biased; diff --git a/relay-system/src/controller.rs b/relay-system/src/controller.rs index 60c5c829a1..acec9d0d4e 100644 --- a/relay-system/src/controller.rs +++ b/relay-system/src/controller.rs @@ -130,7 +130,7 @@ impl ShutdownHandle { /// type Interface = (); /// /// fn spawn_handler(self, mut rx: relay_system::Receiver) { -/// tokio::spawn(async move { +/// relay_system::spawn(async move { /// let mut shutdown = Controller::shutdown_handle(); /// /// loop { @@ -166,8 +166,9 @@ pub struct Controller; impl Controller { /// Starts a controller that monitors shutdown signals. + #[track_caller] pub fn start(shutdown_timeout: Duration) { - tokio::spawn(monitor_shutdown(shutdown_timeout)); + crate::spawn(monitor_shutdown(shutdown_timeout)); } /// Manually initiates the shutdown process of the system. diff --git a/relay-system/src/runtime.rs b/relay-system/src/runtime.rs index d5c3f1041c..abfada72c7 100644 --- a/relay-system/src/runtime.rs +++ b/relay-system/src/runtime.rs @@ -10,6 +10,7 @@ use crate::statsd::{SystemCounters, SystemTimers}; /// /// This is in instrumented spawn variant of Tokio's [`tokio::spawn`]. #[track_caller] +#[allow(clippy::disallowed_methods)] pub fn spawn(future: F) -> JoinHandle where F: Future + Send + 'static, diff --git a/relay-system/src/service.rs b/relay-system/src/service.rs index a09d019e13..7d858618fa 100644 --- a/relay-system/src/service.rs +++ b/relay-system/src/service.rs @@ -932,7 +932,7 @@ pub fn channel(name: &'static str) -> (Addr, Receiver) { /// type Interface = MyMessage; /// /// fn spawn_handler(self, mut rx: Receiver) { -/// tokio::spawn(async move { +/// relay_system::spawn(async move { /// while let Some(message) = rx.recv().await { /// // handle the message /// } @@ -1047,7 +1047,7 @@ mod tests { type Interface = MockMessage; fn spawn_handler(self, mut rx: Receiver) { - tokio::spawn(async move { + crate::spawn(async move { while rx.recv().await.is_some() { tokio::time::sleep(BACKLOG_INTERVAL * 2).await; } diff --git a/relay-test/src/lib.rs b/relay-test/src/lib.rs index 138c76724d..6eebc52828 100644 --- a/relay-test/src/lib.rs +++ b/relay-test/src/lib.rs @@ -44,7 +44,7 @@ where { let (addr, mut rx) = channel(name); - let handle = tokio::spawn(async move { + let handle = relay_system::spawn(async move { while let Some(msg) = rx.recv().await { f(&mut state, msg); }