Skip to content

Commit

Permalink
feat(spooler): Implement backpressure in spooler via bounded queues
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Sep 20, 2024
1 parent ea9b631 commit e44a20f
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 51 deletions.
26 changes: 14 additions & 12 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,6 @@ use std::time::Duration;

use crate::metrics::{MetricOutcomes, MetricStats};
use crate::services::buffer::{self, EnvelopeBufferService, ObservableEnvelopeBuffer};
use crate::services::stats::RelayStats;
use anyhow::{Context, Result};
use axum::extract::FromRequestParts;
use axum::http::request::Parts;
use rayon::ThreadPool;
use relay_cogs::Cogs;
use relay_config::{Config, RedisConnection, RedisPoolConfigs};
use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools};
use relay_system::{channel, Addr, Service};
use tokio::runtime::Runtime;

use crate::services::cogs::{CogsService, CogsServiceRecorder};
use crate::services::global_config::{GlobalConfigManager, GlobalConfigService};
use crate::services::health_check::{HealthCheck, HealthCheckService};
Expand All @@ -25,11 +14,22 @@ use crate::services::outcome_aggregator::OutcomeAggregator;
use crate::services::processor::{self, EnvelopeProcessor, EnvelopeProcessorService};
use crate::services::project_cache::{ProjectCache, ProjectCacheService, Services};
use crate::services::relays::{RelayCache, RelayCacheService};
use crate::services::stats::RelayStats;
#[cfg(feature = "processing")]
use crate::services::store::StoreService;
use crate::services::test_store::{TestStore, TestStoreService};
use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
use crate::utils::{MemoryChecker, MemoryStat};
use anyhow::{Context, Result};
use axum::extract::FromRequestParts;
use axum::http::request::Parts;
use rayon::ThreadPool;
use relay_cogs::Cogs;
use relay_config::{Config, RedisConnection, RedisPoolConfigs};
use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools};
use relay_system::{channel, Addr, Service};
use tokio::runtime::Runtime;
use tokio::sync::mpsc;

/// Indicates the type of failure of the server.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, thiserror::Error)]
Expand Down Expand Up @@ -241,12 +241,13 @@ impl ServiceState {
)
.spawn_handler(processor_rx);

let (project_cache_bounded_tx, project_cache_bounded_rx) = mpsc::channel(500);
let envelope_buffer = EnvelopeBufferService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
global_config_rx.clone(),
buffer::Services {
project_cache: project_cache.clone(),
project_cache: project_cache_bounded_tx,
outcome_aggregator: outcome_aggregator.clone(),
test_store: test_store.clone(),
},
Expand All @@ -269,6 +270,7 @@ impl ServiceState {
MemoryChecker::new(memory_stat.clone(), config.clone()),
project_cache_services,
global_config_rx,
project_cache_bounded_rx,
redis_pools
.as_ref()
.map(|pools| pools.project_configs.clone()),
Expand Down
75 changes: 44 additions & 31 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ use std::time::Duration;

use relay_base_schema::project::ProjectKey;
use relay_config::Config;
use relay_system::SendError;
use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service};
use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, SendError, Service};
use relay_system::{Controller, Shutdown};
use tokio::sync::watch;
use tokio::sync::{mpsc, watch};
use tokio::time::{timeout, Instant};

use crate::envelope::Envelope;
Expand All @@ -21,9 +20,8 @@ use crate::services::outcome::DiscardReason;
use crate::services::outcome::Outcome;
use crate::services::outcome::TrackOutcome;
use crate::services::processor::ProcessingGroup;
use crate::services::project_cache::DequeuedEnvelope;
use crate::services::project_cache::ProjectCache;
use crate::services::project_cache::UpdateProject;

use crate::services::test_store::TestStore;
use crate::statsd::RelayCounters;
use crate::utils::ManagedEnvelope;
Expand Down Expand Up @@ -96,7 +94,7 @@ impl ObservableEnvelopeBuffer {

/// Services that the buffer service communicates with.
pub struct Services {
pub project_cache: Addr<ProjectCache>,
pub project_cache: mpsc::Sender<ProjectCache>,
pub outcome_aggregator: Addr<TrackOutcome>,
pub test_store: Addr<TestStore>,
}
Expand Down Expand Up @@ -149,10 +147,7 @@ impl EnvelopeBufferService {
}

/// Wait for the configured amount of time and make sure the project cache is ready to receive.
async fn ready_to_pop(
&mut self,
buffer: &mut PolymorphicEnvelopeBuffer,
) -> Result<(), SendError> {
async fn ready_to_pop(&mut self, buffer: &PolymorphicEnvelopeBuffer) -> Result<(), SendError> {
relay_statsd::metric!(
counter(RelayCounters::BufferReadyToPop) += 1,
status = "checking"
Expand All @@ -169,14 +164,13 @@ impl EnvelopeBufferService {
tokio::time::sleep(self.sleep).await;
}

relay_statsd::metric!(
counter(RelayCounters::BufferReadyToPop) += 1,
status = "slept"
);
while self.services.project_cache.capacity() == 0 {
tokio::time::sleep(Duration::from_millis(1)).await;
}

relay_statsd::metric!(
counter(RelayCounters::BufferReadyToPop) += 1,
status = "checked"
status = "acquired"
);

Ok(())
Expand Down Expand Up @@ -234,7 +228,17 @@ impl EnvelopeBufferService {
.pop()
.await?
.expect("Element disappeared despite exclusive excess");
self.services.project_cache.send(DequeuedEnvelope(envelope));
if let Err(error) = self
.services
.project_cache
.send(ProjectCache::HandleDequeuedEnvelope(envelope))
.await
{
relay_log::error!(
error = &error as &dyn Error,
"the envelope buffer couldn't send an envelope to the project cache",
);
};
self.sleep = Duration::ZERO; // try next pop immediately
}
Peek::NotReady(stack_key, next_project_fetch, envelope) => {
Expand All @@ -249,15 +253,23 @@ impl EnvelopeBufferService {
// avoid flooding the project cache with `UpdateProject` messages.
if Instant::now() >= next_project_fetch {
relay_log::trace!("EnvelopeBufferService: requesting project(s) update");
let project_key = envelope.meta().public_key();
self.services.project_cache.send(UpdateProject(project_key));
let own_key = envelope.meta().public_key();

// TODO: do we want to handle an error?
let _ = self
.services
.project_cache
.send(ProjectCache::UpdateProject(own_key))
.await;
match envelope.sampling_key() {
None => {}
Some(sampling_key) if sampling_key == project_key => {} // already sent.
Some(sampling_key) if sampling_key == own_key => {} // already sent.
Some(sampling_key) => {
self.services
let _ = self
.services
.project_cache
.send(UpdateProject(sampling_key));
.send(ProjectCache::UpdateProject(sampling_key))
.await;
}
}

Expand Down Expand Up @@ -397,7 +409,7 @@ impl Service for EnvelopeBufferService {
// On the one hand, we might want to prioritize dequeuing over enqueuing
// so we do not exceed the buffer capacity by starving the dequeue.
// on the other hand, prioritizing old messages violates the LIFO design.
Ok(()) = self.ready_to_pop(&mut buffer) => {
Ok(_) = self.ready_to_pop(&buffer) => {
if let Err(e) = self.try_pop(&mut buffer).await {
relay_log::error!(
error = &e as &dyn std::error::Error,
Expand Down Expand Up @@ -447,7 +459,7 @@ mod tests {
fn buffer_service() -> (
EnvelopeBufferService,
watch::Sender<global_config::Status>,
mpsc::UnboundedReceiver<ProjectCache>,
mpsc::Receiver<ProjectCache>,
mpsc::UnboundedReceiver<TrackOutcome>,
) {
let config = Arc::new(
Expand All @@ -462,15 +474,15 @@ mod tests {
);
let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone());
let (global_tx, global_rx) = watch::channel(global_config::Status::Pending);
let (project_cache, project_cache_rx) = Addr::custom();
let (project_cache, project_cache_rx) = mpsc::channel(5);
let (outcome_aggregator, outcome_aggregator_rx) = Addr::custom();
(
EnvelopeBufferService::new(
config,
memory_checker,
global_rx,
Services {
project_cache,
project_cache: project_cache,
outcome_aggregator,
test_store: Addr::dummy(),
},
Expand Down Expand Up @@ -506,8 +518,9 @@ mod tests {

#[tokio::test]
async fn pop_requires_global_config() {
relay_log::init_test!();
tokio::time::pause();
let (service, global_tx, project_cache_rx, _) = buffer_service();
let (service, global_tx, mut project_cache_rx, _) = buffer_service();

let addr = service.start();

Expand Down Expand Up @@ -555,13 +568,13 @@ mod tests {
GlobalConfig::default(),
)));

let (project_cache, project_cache_rx) = Addr::custom();
let (project_cache, project_cache_rx) = mpsc::channel(20);
let service = EnvelopeBufferService::new(
config,
memory_checker,
global_rx,
Services {
project_cache,
project_cache: project_cache,
outcome_aggregator: Addr::dummy(),
test_store: Addr::dummy(),
},
Expand Down Expand Up @@ -598,14 +611,14 @@ mod tests {
);
let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone());
let (global_tx, global_rx) = watch::channel(global_config::Status::Pending);
let (project_cache, project_cache_rx) = Addr::custom();
let (project_cache, project_cache_rx) = mpsc::channel(20);
let (outcome_aggregator, mut outcome_aggregator_rx) = Addr::custom();
let service = EnvelopeBufferService::new(
config,
memory_checker,
global_rx,
Services {
project_cache,
project_cache: project_cache,
outcome_aggregator,
test_store: Addr::dummy(),
},
Expand Down Expand Up @@ -653,7 +666,7 @@ mod tests {
tokio::time::sleep(Duration::from_secs(1)).await;

// We expect the project update request to be sent.
let Some(ProjectCache::HandleDequeuedEnvelope(envelope, _)) = project_cache_rx.recv().await
let Some(ProjectCache::HandleDequeuedEnvelope(envelope)) = project_cache_rx.recv().await
else {
panic!();
};
Expand Down
24 changes: 16 additions & 8 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ pub enum ProjectCache {
UpdateSpoolIndex(UpdateSpoolIndex),
SpoolHealth(Sender<bool>),
RefreshIndexCache(RefreshIndexCache),
HandleDequeuedEnvelope(Box<Envelope>, Sender<()>),
HandleDequeuedEnvelope(Box<Envelope>),
UpdateProject(ProjectKey),
}

Expand All @@ -314,7 +314,7 @@ impl ProjectCache {
Self::UpdateSpoolIndex(_) => "UpdateSpoolIndex",
Self::SpoolHealth(_) => "SpoolHealth",
Self::RefreshIndexCache(_) => "RefreshIndexCache",
Self::HandleDequeuedEnvelope(_, _) => "HandleDequeuedEnvelope",
Self::HandleDequeuedEnvelope(_) => "HandleDequeuedEnvelope",
Self::UpdateProject(_) => "UpdateProject",
}
}
Expand Down Expand Up @@ -422,11 +422,11 @@ impl FromMessage<SpoolHealth> for ProjectCache {
}

impl FromMessage<DequeuedEnvelope> for ProjectCache {
type Response = relay_system::AsyncResponse<()>;
type Response = relay_system::NoResponse;

fn from_message(message: DequeuedEnvelope, sender: Sender<()>) -> Self {
fn from_message(message: DequeuedEnvelope, _: ()) -> Self {
let DequeuedEnvelope(envelope) = message;
Self::HandleDequeuedEnvelope(envelope, sender)
Self::HandleDequeuedEnvelope(envelope)
}
}

Expand Down Expand Up @@ -1308,7 +1308,7 @@ impl ProjectCacheBroker {
ProjectCache::RefreshIndexCache(message) => {
self.handle_refresh_index_cache(message)
}
ProjectCache::HandleDequeuedEnvelope(message, sender) => {
ProjectCache::HandleDequeuedEnvelope(message) => {
let envelope_buffer = self
.services
.envelope_buffer
Expand All @@ -1321,8 +1321,6 @@ impl ProjectCacheBroker {
"Failed to handle popped envelope"
);
}
// Return response to signal readiness for next envelope:
sender.send(())
}
ProjectCache::UpdateProject(project) => self.handle_update_project(project),
}
Expand All @@ -1338,6 +1336,7 @@ pub struct ProjectCacheService {
memory_checker: MemoryChecker,
services: Services,
global_config_rx: watch::Receiver<global_config::Status>,
project_cache_bounded_rx: mpsc::Receiver<ProjectCache>,
redis: Option<RedisPool>,
}

Expand All @@ -1348,13 +1347,15 @@ impl ProjectCacheService {
memory_checker: MemoryChecker,
services: Services,
global_config_rx: watch::Receiver<global_config::Status>,
project_cache_bounded_rx: mpsc::Receiver<ProjectCache>,
redis: Option<RedisPool>,
) -> Self {
Self {
config,
memory_checker,
services,
global_config_rx,
project_cache_bounded_rx,
redis,
}
}
Expand All @@ -1369,6 +1370,7 @@ impl Service for ProjectCacheService {
memory_checker,
services,
mut global_config_rx,
mut project_cache_bounded_rx,
redis,
} = self;
let project_cache = services.project_cache.clone();
Expand Down Expand Up @@ -1489,6 +1491,12 @@ impl Service for ProjectCacheService {
broker.handle_periodic_unspool()
})
}
// TODO: this prioritization might stab us in the back.
Some(message) = project_cache_bounded_rx.recv() => {
metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "handle_message_from_bounded", {
broker.handle_message(message)
})
}
Some(message) = rx.recv() => {
metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "handle_message", {
broker.handle_message(message)
Expand Down

0 comments on commit e44a20f

Please sign in to comment.