Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jjbayer committed Sep 11, 2024
1 parent 25a3b7a commit fd0ae7a
Show file tree
Hide file tree
Showing 23 changed files with 143 additions and 75 deletions.
8 changes: 8 additions & 0 deletions relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ mod testutils;

use std::sync::Arc;

use futures::StreamExt;
use relay_config::Config;
use relay_system::{Controller, Service};

Expand All @@ -303,6 +304,13 @@ pub fn run(config: Config) -> anyhow::Result<()> {
Controller::start(config.shutdown_timeout());
let service = ServiceState::start(config.clone())?;
HttpServer::new(config, service.clone())?.start();

for x in service.join_handles() {}
// while let Some(res) = service.join_handles().next() {

// }

// TODO: await simultaneously
Controller::shutdown_handle().finished().await;
anyhow::Ok(())
})?;
Expand Down
20 changes: 17 additions & 3 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ use crate::services::stats::RelayStats;
use anyhow::{Context, Result};
use axum::extract::FromRequestParts;
use axum::http::request::Parts;
use futures::stream::FuturesUnordered;
use rayon::ThreadPool;
use relay_cogs::Cogs;
use relay_config::{Config, RedisConnection, RedisPoolConfigs};
use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools};
use relay_system::{channel, Addr, Service};
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;

use crate::services::cogs::{CogsService, CogsServiceRecorder};
use crate::services::global_config::{GlobalConfigManager, GlobalConfigService};
Expand Down Expand Up @@ -141,6 +143,7 @@ struct StateInner {
config: Arc<Config>,
memory_checker: MemoryChecker,
registry: Registry,
join_handles: Vec<JoinHandle<()>>,
}

/// Server state.
Expand Down Expand Up @@ -221,7 +224,7 @@ impl ServiceState {
let cogs = CogsService::new(&config);
let cogs = Cogs::new(CogsServiceRecorder::new(&config, cogs.start()));

EnvelopeProcessorService::new(
let processor_handle = EnvelopeProcessorService::new(
create_processor_pool(&config)?,
config.clone(),
global_config_handle,
Expand Down Expand Up @@ -251,7 +254,7 @@ impl ServiceState {

// Keep all the services in one context.
let project_cache_services = Services {
envelope_buffer: envelope_buffer.as_ref().map(ObservableEnvelopeBuffer::addr),
envelope_buffer: envelope_buffer.as_ref().map(|(b, _)| b.addr()),
aggregator: aggregator.clone(),
envelope_processor: processor.clone(),
outcome_aggregator: outcome_aggregator.clone(),
Expand Down Expand Up @@ -301,13 +304,20 @@ impl ServiceState {
global_config,
project_cache,
upstream_relay,
envelope_buffer,
envelope_buffer: envelope_buffer.as_ref().map(|(b, _)| b.clone()),
};

let state = StateInner {
config: config.clone(),
memory_checker: MemoryChecker::new(memory_stat, config.clone()),
registry,
join_handles: {
let mut j = Vec::from_iter([processor_handle]);
if let Some((_, handle)) = envelope_buffer {
j.push(handle);
}
j
},
};

Ok(ServiceState {
Expand Down Expand Up @@ -376,6 +386,10 @@ impl ServiceState {
pub fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
&self.inner.registry.outcome_aggregator
}

pub fn join_handles(&self) -> &[JoinHandle<()>] {
self.inner.join_handles.as_slice()
}
}

fn create_redis_pool(
Expand Down
15 changes: 7 additions & 8 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use relay_system::Request;
use relay_system::SendError;
use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service};
use tokio::sync::watch;
use tokio::task::JoinHandle;

use crate::envelope::Envelope;
use crate::services::buffer::envelope_buffer::Peek;
Expand Down Expand Up @@ -128,12 +129,10 @@ impl EnvelopeBufferService {
}

/// Returns both the [`Addr`] to this service, and a reference to the capacity flag.
pub fn start_observable(self) -> ObservableEnvelopeBuffer {
pub fn start_observable(self) -> (ObservableEnvelopeBuffer, JoinHandle<()>) {
let has_capacity = self.has_capacity.clone();
ObservableEnvelopeBuffer {
addr: self.start(),
has_capacity,
}
let (addr, join_handle) = self.start_joinable();
(ObservableEnvelopeBuffer { addr, has_capacity }, join_handle)
}

/// Wait for the configured amount of time and make sure the project cache is ready to receive.
Expand Down Expand Up @@ -259,7 +258,7 @@ impl EnvelopeBufferService {
impl Service for EnvelopeBufferService {
type Interface = EnvelopeBuffer;

fn spawn_handler(mut self, mut rx: Receiver<Self::Interface>) {
fn spawn_handler(mut self, mut rx: Receiver<Self::Interface>) -> JoinHandle<()> {
let config = self.config.clone();
let memory_checker = self.memory_checker.clone();
let mut global_config_rx = self.global_config_rx.clone();
Expand Down Expand Up @@ -312,7 +311,7 @@ impl Service for EnvelopeBufferService {
}

relay_log::info!("EnvelopeBufferService stop");
});
})
}
}

Expand Down Expand Up @@ -364,7 +363,7 @@ mod tests {
service.has_capacity.store(false, Ordering::Relaxed);

// Observable has correct value:
let ObservableEnvelopeBuffer { addr, has_capacity } = service.start_observable();
let (ObservableEnvelopeBuffer { addr, has_capacity }, _) = service.start_observable();
assert!(!has_capacity.load(Ordering::Relaxed));

// Send a message to trigger update of `has_capacity` flag:
Expand Down
8 changes: 4 additions & 4 deletions relay-server/src/services/cogs.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::sync::atomic::{AtomicBool, Ordering};

use relay_cogs::{CogsMeasurement, CogsRecorder, ResourceId};
use relay_config::Config;
use relay_system::{Addr, FromMessage, Interface, Service};
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::task::JoinHandle;

use crate::statsd::RelayCounters;

Expand Down Expand Up @@ -54,12 +54,12 @@ impl CogsService {
impl Service for CogsService {
type Interface = CogsReport;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
self.handle_report(message);
}
});
})
}
}

Expand Down
5 changes: 3 additions & 2 deletions relay-server/src/services/global_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Serv
use reqwest::Method;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch};
use tokio::task::JoinHandle;
use tokio::time::Instant;

use crate::services::upstream::{
Expand Down Expand Up @@ -338,7 +339,7 @@ impl GlobalConfigService {
impl Service for GlobalConfigService {
type Interface = GlobalConfigManager;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
tokio::spawn(async move {
let mut shutdown_handle = Controller::shutdown_handle();

Expand Down Expand Up @@ -384,7 +385,7 @@ impl Service for GlobalConfigService {
}
}
relay_log::info!("global config service stopped");
});
})
}
}

Expand Down
9 changes: 6 additions & 3 deletions relay-server/src/services/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use relay_config::Config;
use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Sender, Service};
use std::future::Future;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio::time::{timeout, Instant};

use crate::services::metrics::RouterHandle;
Expand Down Expand Up @@ -189,13 +190,13 @@ impl HealthCheckService {
impl Service for HealthCheckService {
type Interface = HealthCheck;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
let (update_tx, update_rx) = watch::channel(StatusUpdate::new(Status::Unhealthy));
let check_interval = self.config.health_refresh_interval();
// Add 10% buffer to the internal timeouts to avoid race conditions.
let status_timeout = (check_interval + self.config.health_probe_timeout()).mul_f64(1.1);

tokio::spawn(async move {
let j1 = tokio::spawn(async move {
let shutdown = Controller::shutdown_handle();

while shutdown.get().is_none() {
Expand All @@ -212,7 +213,7 @@ impl Service for HealthCheckService {
update_tx.send(StatusUpdate::new(Status::Unhealthy)).ok();
});

tokio::spawn(async move {
let _j2 = tokio::spawn(async move {
while let Some(HealthCheck(message, sender)) = rx.recv().await {
let update = update_rx.borrow();

Expand All @@ -225,6 +226,8 @@ impl Service for HealthCheckService {
});
}
});

j1 // TODO: should return j1 + j2
}
}

Expand Down
14 changes: 10 additions & 4 deletions relay-server/src/services/metrics/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,10 @@ impl AggregatorService {
impl Service for AggregatorService {
type Interface = Aggregator;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(
mut self,
mut rx: relay_system::Receiver<Self::Interface>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(Duration::from_millis(self.flush_interval_ms));
let mut shutdown = Controller::shutdown_handle();
Expand All @@ -264,7 +267,7 @@ impl Service for AggregatorService {
else => break,
}
}
});
})
}
}

Expand Down Expand Up @@ -361,7 +364,10 @@ mod tests {
impl Service for TestReceiver {
type Interface = TestInterface;

fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(
self,
mut rx: relay_system::Receiver<Self::Interface>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
let buckets = message.0.buckets;
Expand All @@ -370,7 +376,7 @@ mod tests {
self.add_buckets(buckets);
}
}
});
})
}
}

Expand Down
5 changes: 3 additions & 2 deletions relay-server/src/services/metrics/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use relay_config::aggregator::Condition;
use relay_config::{AggregatorServiceConfig, ScopedAggregatorConfig};
use relay_metrics::MetricNamespace;
use relay_system::{Addr, NoResponse, Recipient, Service};
use tokio::task::JoinHandle;

use crate::services::metrics::{
Aggregator, AggregatorHandle, AggregatorService, FlushBuckets, MergeBuckets,
Expand Down Expand Up @@ -53,7 +54,7 @@ impl RouterService {
impl Service for RouterService {
type Interface = Aggregator;

fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
tokio::spawn(async move {
let mut router = StartedRouter::start(self);
relay_log::info!("metrics router started");
Expand All @@ -72,7 +73,7 @@ impl Service for RouterService {
}
}
relay_log::info!("metrics router stopped");
});
})
}
}

Expand Down
13 changes: 7 additions & 6 deletions relay-server/src/services/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use relay_sampling::evaluation::MatchedRuleIds;
use relay_statsd::metric;
use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
use serde::{Deserialize, Serialize};
use tokio::task::JoinHandle;

#[cfg(feature = "processing")]
use crate::service::ServiceError;
Expand Down Expand Up @@ -682,7 +683,7 @@ impl HttpOutcomeProducer {
impl Service for HttpOutcomeProducer {
type Interface = TrackRawOutcome;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
tokio::select! {
Expand All @@ -694,7 +695,7 @@ impl Service for HttpOutcomeProducer {
else => break,
}
}
});
})
}
}

Expand Down Expand Up @@ -775,7 +776,7 @@ impl ClientReportOutcomeProducer {
impl Service for ClientReportOutcomeProducer {
type Interface = TrackOutcome;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
tokio::select! {
Expand All @@ -787,7 +788,7 @@ impl Service for ClientReportOutcomeProducer {
else => break,
}
}
});
})
}
}

Expand Down Expand Up @@ -1034,7 +1035,7 @@ impl OutcomeProducerService {
impl Service for OutcomeProducerService {
type Interface = OutcomeProducer;

fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
let Self { config, inner } = self;

tokio::spawn(async move {
Expand All @@ -1045,7 +1046,7 @@ impl Service for OutcomeProducerService {
broker.handle_message(message, &config);
}
relay_log::info!("OutcomeProducer stopped.");
});
})
}
}

Expand Down
5 changes: 3 additions & 2 deletions relay-server/src/services/outcome_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use relay_config::{Config, EmitOutcomes};
use relay_quotas::{DataCategory, Scoping};
use relay_statsd::metric;
use relay_system::{Addr, Controller, Service, Shutdown};
use tokio::task::JoinHandle;

use crate::services::outcome::{Outcome, OutcomeProducer, TrackOutcome};
use crate::statsd::RelayTimers;
Expand Down Expand Up @@ -138,7 +139,7 @@ impl OutcomeAggregator {
impl Service for OutcomeAggregator {
type Interface = TrackOutcome;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
tokio::spawn(async move {
let mut shutdown = Controller::shutdown_handle();
relay_log::info!("outcome aggregator started");
Expand All @@ -157,6 +158,6 @@ impl Service for OutcomeAggregator {
}

relay_log::info!("outcome aggregator stopped");
});
})
}
}
Loading

0 comments on commit fd0ae7a

Please sign in to comment.