diff --git a/relay-server/src/services/metrics/aggregator.rs b/relay-server/src/services/metrics/aggregator.rs index e7d1bfe21d..5f11e152ac 100644 --- a/relay-server/src/services/metrics/aggregator.rs +++ b/relay-server/src/services/metrics/aggregator.rs @@ -8,6 +8,7 @@ use relay_config::AggregatorServiceConfig; use relay_metrics::aggregator::AggregateMetricsError; use relay_metrics::{aggregator, Bucket, UnixTimestamp}; use relay_system::{Controller, FromMessage, Interface, NoResponse, Recipient, Service, Shutdown}; +use tokio::task::JoinHandle; use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers}; @@ -246,10 +247,7 @@ impl AggregatorService { impl Service for AggregatorService { type Interface = Aggregator; - fn spawn_handler( - mut self, - mut rx: relay_system::Receiver, - ) -> tokio::task::JoinHandle<()> { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { let mut ticker = tokio::time::interval(Duration::from_millis(self.flush_interval_ms)); let mut shutdown = Controller::shutdown_handle(); @@ -324,6 +322,7 @@ mod tests { use relay_common::time::UnixTimestamp; use relay_metrics::{aggregator::AggregatorConfig, BucketMetadata, BucketValue}; + use tokio::task::JoinHandle; use super::*; @@ -364,10 +363,7 @@ mod tests { impl Service for TestReceiver { type Interface = TestInterface; - fn spawn_handler( - self, - mut rx: relay_system::Receiver, - ) -> tokio::task::JoinHandle<()> { + fn spawn_handler(self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { while let Some(message) = rx.recv().await { let buckets = message.0.buckets; diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 97f9153545..2bfb6383e3 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -41,6 +41,7 @@ use relay_statsd::metric; use relay_system::{Addr, FromMessage, NoResponse, Service}; use reqwest::header; use smallvec::{smallvec, SmallVec}; +use tokio::task::JoinHandle; #[cfg(feature = "processing")] use { @@ -2891,10 +2892,7 @@ impl Service for EnvelopeProcessorService { type Interface = EnvelopeProcessor; #[must_use] - fn spawn_handler( - self, - mut rx: relay_system::Receiver, - ) -> tokio::task::JoinHandle<()> { + fn spawn_handler(self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { while let Some(message) = rx.recv().await { let service = self.clone(); diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 4185f07149..0abded5c9e 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -23,6 +23,7 @@ use relay_system::{Addr, FromMessage, Interface, Sender, Service}; #[cfg(feature = "processing")] use tokio::sync::Semaphore; use tokio::sync::{mpsc, watch}; +use tokio::task::JoinHandle; use tokio::time::Instant; use crate::services::metrics::{Aggregator, FlushBuckets}; @@ -1370,10 +1371,7 @@ impl ProjectCacheService { impl Service for ProjectCacheService { type Interface = ProjectCache; - fn spawn_handler( - self, - mut rx: relay_system::Receiver, - ) -> tokio::task::JoinHandle<()> { + fn spawn_handler(self, mut rx: relay_system::Receiver) -> JoinHandle<()> { let Self { config, memory_checker, diff --git a/relay-server/src/services/server.rs b/relay-server/src/services/server.rs index ec2670fbbd..3f2cad4750 100644 --- a/relay-server/src/services/server.rs +++ b/relay-server/src/services/server.rs @@ -13,6 +13,7 @@ use relay_config::Config; use relay_system::{Controller, Service, Shutdown}; use socket2::TcpKeepalive; use tokio::net::{TcpSocket, TcpStream}; +use tokio::task::JoinHandle; use tower::ServiceBuilder; use tower_http::compression::predicate::SizeAbove; use tower_http::compression::{CompressionLayer, DefaultPredicate, Predicate}; @@ -167,7 +168,7 @@ impl Accept for KeepAliveAcceptor { } } -fn serve(listener: TcpListener, app: App, config: Arc) -> tokio::task::JoinHandle<()> { +fn serve(listener: TcpListener, app: App, config: Arc) -> JoinHandle<()> { let handle = Handle::new(); let mut server = axum_server::from_tcp(listener) @@ -227,10 +228,7 @@ impl HttpServer { impl Service for HttpServer { type Interface = (); - fn spawn_handler( - self, - _rx: relay_system::Receiver, - ) -> tokio::task::JoinHandle<()> { + fn spawn_handler(self, _rx: relay_system::Receiver) -> JoinHandle<()> { let Self { config, service, diff --git a/relay-server/src/services/spooler/mod.rs b/relay-server/src/services/spooler/mod.rs index f94adde8da..1b98af2f7e 100644 --- a/relay-server/src/services/spooler/mod.rs +++ b/relay-server/src/services/spooler/mod.rs @@ -50,6 +50,7 @@ use sqlx::sqlite::{ use sqlx::{Pool, Row, Sqlite}; use tokio::fs::DirBuilder; use tokio::sync::mpsc; +use tokio::task::JoinHandle; use crate::envelope::{Envelope, EnvelopeError}; use crate::extractors::StartTime; @@ -1272,10 +1273,7 @@ impl BufferService { impl Service for BufferService { type Interface = Buffer; - fn spawn_handler( - mut self, - mut rx: relay_system::Receiver, - ) -> tokio::task::JoinHandle<()> { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { let mut shutdown = Controller::shutdown_handle(); @@ -1331,6 +1329,7 @@ mod tests { use std::str::FromStr; use std::sync::Mutex; use std::time::{Duration, Instant}; + use tokio::task::JoinHandle; use uuid::Uuid; use crate::services::project_cache::SpoolHealth; @@ -1594,10 +1593,7 @@ mod tests { impl Service for TestHealthService { type Interface = TestHealth; - fn spawn_handler( - self, - mut rx: relay_system::Receiver, - ) -> tokio::task::JoinHandle<()> { + fn spawn_handler(self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { loop { tokio::select! { diff --git a/relay-server/src/services/stats.rs b/relay-server/src/services/stats.rs index 0a4cb15119..7e27d60ce1 100644 --- a/relay-server/src/services/stats.rs +++ b/relay-server/src/services/stats.rs @@ -5,6 +5,7 @@ use relay_config::{Config, RelayMode}; use relay_redis::{RedisPool, RedisPools}; use relay_statsd::metric; use relay_system::{Addr, Service}; +use tokio::task::JoinHandle; use tokio::time::interval; use crate::services::upstream::{IsNetworkOutage, UpstreamRelay}; @@ -136,10 +137,7 @@ impl RelayStats { impl Service for RelayStats { type Interface = (); - fn spawn_handler( - self, - _rx: relay_system::Receiver, - ) -> tokio::task::JoinHandle<()> { + fn spawn_handler(self, _rx: relay_system::Receiver) -> JoinHandle<()> { let Some(mut ticker) = self.config.metrics_periodic_interval().map(interval) else { return tokio::spawn(async {}); }; diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 7ab2d77ae9..8ea8bb54c5 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -7,6 +7,7 @@ use std::collections::BTreeMap; use std::error::Error; use std::sync::Arc; use std::time::Instant; +use tokio::task::JoinHandle; use bytes::Bytes; use relay_base_schema::data_category::DataCategory; @@ -1044,10 +1045,7 @@ impl StoreService { impl Service for StoreService { type Interface = Store; - fn spawn_handler( - self, - mut rx: relay_system::Receiver, - ) -> tokio::task::JoinHandle<()> { + fn spawn_handler(self, mut rx: relay_system::Receiver) -> JoinHandle<()> { let this = Arc::new(self); tokio::spawn(async move { diff --git a/relay-server/src/services/test_store.rs b/relay-server/src/services/test_store.rs index e57da07d1b..fbd02a8ccb 100644 --- a/relay-server/src/services/test_store.rs +++ b/relay-server/src/services/test_store.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use relay_config::{Config, RelayMode}; use relay_event_schema::protocol::EventId; use relay_system::{AsyncResponse, FromMessage, NoResponse, Sender}; +use tokio::task::JoinHandle; use crate::envelope::Envelope; use crate::services::outcome::Outcome; @@ -134,10 +135,7 @@ impl TestStoreService { impl relay_system::Service for TestStoreService { type Interface = TestStore; - fn spawn_handler( - mut self, - mut rx: relay_system::Receiver, - ) -> tokio::task::JoinHandle<()> { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { while let Some(message) = rx.recv().await { self.handle_message(message); diff --git a/relay-server/src/services/upstream.rs b/relay-server/src/services/upstream.rs index 59b7ea4b76..59fbec2a45 100644 --- a/relay-server/src/services/upstream.rs +++ b/relay-server/src/services/upstream.rs @@ -28,6 +28,7 @@ pub use reqwest::Method; use serde::de::DeserializeOwned; use serde::Serialize; use tokio::sync::mpsc; +use tokio::task::JoinHandle; use tokio::time::Instant; use crate::http::{HttpError, Request, RequestBuilder, Response, StatusCode}; @@ -1255,7 +1256,7 @@ enum ConnectionState { /// The connection is interrupted and reconnection is in progress. /// /// If the task has finished, connection should be considered `Connected`. - Reconnecting(tokio::task::JoinHandle<()>), + Reconnecting(JoinHandle<()>), } /// Maintains outage state of the connection to the upstream. @@ -1498,10 +1499,7 @@ impl UpstreamRelayService { impl Service for UpstreamRelayService { type Interface = UpstreamRelay; - fn spawn_handler( - self, - mut rx: relay_system::Receiver, - ) -> tokio::task::JoinHandle<()> { + fn spawn_handler(self, mut rx: relay_system::Receiver) -> JoinHandle<()> { let Self { config } = self; let client = SharedClient::build(config.clone());