Skip to content

Commit

Permalink
ref
Browse files Browse the repository at this point in the history
  • Loading branch information
jjbayer committed Sep 12, 2024
1 parent 734c9cb commit 71137de
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 46 deletions.
12 changes: 4 additions & 8 deletions relay-server/src/services/metrics/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use relay_config::AggregatorServiceConfig;
use relay_metrics::aggregator::AggregateMetricsError;
use relay_metrics::{aggregator, Bucket, UnixTimestamp};
use relay_system::{Controller, FromMessage, Interface, NoResponse, Recipient, Service, Shutdown};
use tokio::task::JoinHandle;

use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};

Expand Down Expand Up @@ -246,10 +247,7 @@ impl AggregatorService {
impl Service for AggregatorService {
type Interface = Aggregator;

fn spawn_handler(
mut self,
mut rx: relay_system::Receiver<Self::Interface>,
) -> tokio::task::JoinHandle<()> {
fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(Duration::from_millis(self.flush_interval_ms));
let mut shutdown = Controller::shutdown_handle();
Expand Down Expand Up @@ -324,6 +322,7 @@ mod tests {

use relay_common::time::UnixTimestamp;
use relay_metrics::{aggregator::AggregatorConfig, BucketMetadata, BucketValue};
use tokio::task::JoinHandle;

use super::*;

Expand Down Expand Up @@ -364,10 +363,7 @@ mod tests {
impl Service for TestReceiver {
type Interface = TestInterface;

fn spawn_handler(
self,
mut rx: relay_system::Receiver<Self::Interface>,
) -> tokio::task::JoinHandle<()> {
fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
let buckets = message.0.buckets;
Expand Down
6 changes: 2 additions & 4 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use relay_statsd::metric;
use relay_system::{Addr, FromMessage, NoResponse, Service};
use reqwest::header;
use smallvec::{smallvec, SmallVec};
use tokio::task::JoinHandle;

#[cfg(feature = "processing")]
use {
Expand Down Expand Up @@ -2891,10 +2892,7 @@ impl Service for EnvelopeProcessorService {
type Interface = EnvelopeProcessor;

#[must_use]
fn spawn_handler(
self,
mut rx: relay_system::Receiver<Self::Interface>,
) -> tokio::task::JoinHandle<()> {
fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
let service = self.clone();
Expand Down
6 changes: 2 additions & 4 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use relay_system::{Addr, FromMessage, Interface, Sender, Service};
#[cfg(feature = "processing")]
use tokio::sync::Semaphore;
use tokio::sync::{mpsc, watch};
use tokio::task::JoinHandle;
use tokio::time::Instant;

use crate::services::metrics::{Aggregator, FlushBuckets};
Expand Down Expand Up @@ -1370,10 +1371,7 @@ impl ProjectCacheService {
impl Service for ProjectCacheService {
type Interface = ProjectCache;

fn spawn_handler(
self,
mut rx: relay_system::Receiver<Self::Interface>,
) -> tokio::task::JoinHandle<()> {
fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
let Self {
config,
memory_checker,
Expand Down
8 changes: 3 additions & 5 deletions relay-server/src/services/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use relay_config::Config;
use relay_system::{Controller, Service, Shutdown};
use socket2::TcpKeepalive;
use tokio::net::{TcpSocket, TcpStream};
use tokio::task::JoinHandle;
use tower::ServiceBuilder;
use tower_http::compression::predicate::SizeAbove;
use tower_http::compression::{CompressionLayer, DefaultPredicate, Predicate};
Expand Down Expand Up @@ -167,7 +168,7 @@ impl<S> Accept<TcpStream, S> for KeepAliveAcceptor {
}
}

fn serve(listener: TcpListener, app: App, config: Arc<Config>) -> tokio::task::JoinHandle<()> {
fn serve(listener: TcpListener, app: App, config: Arc<Config>) -> JoinHandle<()> {
let handle = Handle::new();

let mut server = axum_server::from_tcp(listener)
Expand Down Expand Up @@ -227,10 +228,7 @@ impl HttpServer {
impl Service for HttpServer {
type Interface = ();

fn spawn_handler(
self,
_rx: relay_system::Receiver<Self::Interface>,
) -> tokio::task::JoinHandle<()> {
fn spawn_handler(self, _rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
let Self {
config,
service,
Expand Down
12 changes: 4 additions & 8 deletions relay-server/src/services/spooler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use sqlx::sqlite::{
use sqlx::{Pool, Row, Sqlite};
use tokio::fs::DirBuilder;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;

use crate::envelope::{Envelope, EnvelopeError};
use crate::extractors::StartTime;
Expand Down Expand Up @@ -1272,10 +1273,7 @@ impl BufferService {
impl Service for BufferService {
type Interface = Buffer;

fn spawn_handler(
mut self,
mut rx: relay_system::Receiver<Self::Interface>,
) -> tokio::task::JoinHandle<()> {
fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
tokio::spawn(async move {
let mut shutdown = Controller::shutdown_handle();

Expand Down Expand Up @@ -1331,6 +1329,7 @@ mod tests {
use std::str::FromStr;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use tokio::task::JoinHandle;
use uuid::Uuid;

use crate::services::project_cache::SpoolHealth;
Expand Down Expand Up @@ -1594,10 +1593,7 @@ mod tests {
impl Service for TestHealthService {
type Interface = TestHealth;

fn spawn_handler(
self,
mut rx: relay_system::Receiver<Self::Interface>,
) -> tokio::task::JoinHandle<()> {
fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
tokio::select! {
Expand Down
6 changes: 2 additions & 4 deletions relay-server/src/services/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use relay_config::{Config, RelayMode};
use relay_redis::{RedisPool, RedisPools};
use relay_statsd::metric;
use relay_system::{Addr, Service};
use tokio::task::JoinHandle;
use tokio::time::interval;

use crate::services::upstream::{IsNetworkOutage, UpstreamRelay};
Expand Down Expand Up @@ -136,10 +137,7 @@ impl RelayStats {
impl Service for RelayStats {
type Interface = ();

fn spawn_handler(
self,
_rx: relay_system::Receiver<Self::Interface>,
) -> tokio::task::JoinHandle<()> {
fn spawn_handler(self, _rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
let Some(mut ticker) = self.config.metrics_periodic_interval().map(interval) else {
return tokio::spawn(async {});
};
Expand Down
6 changes: 2 additions & 4 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::collections::BTreeMap;
use std::error::Error;
use std::sync::Arc;
use std::time::Instant;
use tokio::task::JoinHandle;

use bytes::Bytes;
use relay_base_schema::data_category::DataCategory;
Expand Down Expand Up @@ -1044,10 +1045,7 @@ impl StoreService {
impl Service for StoreService {
type Interface = Store;

fn spawn_handler(
self,
mut rx: relay_system::Receiver<Self::Interface>,
) -> tokio::task::JoinHandle<()> {
fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
let this = Arc::new(self);

tokio::spawn(async move {
Expand Down
6 changes: 2 additions & 4 deletions relay-server/src/services/test_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::Arc;
use relay_config::{Config, RelayMode};
use relay_event_schema::protocol::EventId;
use relay_system::{AsyncResponse, FromMessage, NoResponse, Sender};
use tokio::task::JoinHandle;

use crate::envelope::Envelope;
use crate::services::outcome::Outcome;
Expand Down Expand Up @@ -134,10 +135,7 @@ impl TestStoreService {
impl relay_system::Service for TestStoreService {
type Interface = TestStore;

fn spawn_handler(
mut self,
mut rx: relay_system::Receiver<Self::Interface>,
) -> tokio::task::JoinHandle<()> {
fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
self.handle_message(message);
Expand Down
8 changes: 3 additions & 5 deletions relay-server/src/services/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub use reqwest::Method;
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::time::Instant;

use crate::http::{HttpError, Request, RequestBuilder, Response, StatusCode};
Expand Down Expand Up @@ -1255,7 +1256,7 @@ enum ConnectionState {
/// The connection is interrupted and reconnection is in progress.
///
/// If the task has finished, connection should be considered `Connected`.
Reconnecting(tokio::task::JoinHandle<()>),
Reconnecting(JoinHandle<()>),
}

/// Maintains outage state of the connection to the upstream.
Expand Down Expand Up @@ -1498,10 +1499,7 @@ impl UpstreamRelayService {
impl Service for UpstreamRelayService {
type Interface = UpstreamRelay;

fn spawn_handler(
self,
mut rx: relay_system::Receiver<Self::Interface>,
) -> tokio::task::JoinHandle<()> {
fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
let Self { config } = self;

let client = SharedClient::build(config.clone());
Expand Down

0 comments on commit 71137de

Please sign in to comment.