Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Sep 7, 2024
1 parent 88a9fb5 commit 26154ba
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 54 deletions.
29 changes: 18 additions & 11 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,6 @@ use std::time::Duration;

use crate::metrics::{MetricOutcomes, MetricStats};
use crate::services::buffer::{EnvelopeBufferService, ObservableEnvelopeBuffer};
use crate::services::stats::RelayStats;
use anyhow::{Context, Result};
use axum::extract::FromRequestParts;
use axum::http::request::Parts;
use rayon::ThreadPool;
use relay_cogs::Cogs;
use relay_config::{Config, RedisConnection, RedisPoolConfigs};
use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools};
use relay_system::{channel, Addr, Service};
use tokio::runtime::Runtime;

use crate::services::cogs::{CogsService, CogsServiceRecorder};
use crate::services::global_config::{GlobalConfigManager, GlobalConfigService};
use crate::services::health_check::{HealthCheck, HealthCheckService};
Expand All @@ -25,11 +14,22 @@ use crate::services::outcome_aggregator::OutcomeAggregator;
use crate::services::processor::{self, EnvelopeProcessor, EnvelopeProcessorService};
use crate::services::project_cache::{ProjectCache, ProjectCacheService, Services};
use crate::services::relays::{RelayCache, RelayCacheService};
use crate::services::stats::RelayStats;
#[cfg(feature = "processing")]
use crate::services::store::StoreService;
use crate::services::test_store::{TestStore, TestStoreService};
use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
use crate::utils::{MemoryChecker, MemoryStat};
use anyhow::{Context, Result};
use axum::extract::FromRequestParts;
use axum::http::request::Parts;
use rayon::ThreadPool;
use relay_cogs::Cogs;
use relay_config::{Config, RedisConnection, RedisPoolConfigs};
use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools};
use relay_system::{channel, Addr, Service};
use tokio::runtime::Runtime;
use tokio::sync::watch;

/// Indicates the type of failure of the server.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, thiserror::Error)]
Expand Down Expand Up @@ -240,10 +240,16 @@ impl ServiceState {
)
.spawn_handler(processor_rx);

// We create a watch channel that is used to monitor availability of the project cache
// service for processing new envelopes. This is used as a simple backpressure mechanism
// between services.
let (envelope_processing_availability, _) = watch::channel(true);

let envelope_buffer = EnvelopeBufferService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
project_cache.clone(),
envelope_processing_availability.subscribe(),
)
.map(|b| b.start_observable());

Expand All @@ -266,6 +272,7 @@ impl ServiceState {
redis_pools
.as_ref()
.map(|pools| pools.project_configs.clone()),
envelope_processing_availability,
)
.spawn_handler(project_cache_rx);

Expand Down
108 changes: 74 additions & 34 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
//! Types for buffering envelopes.

use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use relay_base_schema::project::ProjectKey;
use relay_config::Config;
use relay_system::Request;
use relay_system::SendError;
use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service};
use std::future;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;

use crate::envelope::Envelope;
use crate::services::buffer::envelope_buffer::Peek;
Expand Down Expand Up @@ -91,11 +91,11 @@ pub struct EnvelopeBufferService {
memory_checker: MemoryChecker,
project_cache: Addr<ProjectCache>,
has_capacity: Arc<AtomicBool>,
sleep: Duration,
project_cache_ready: Option<Request<()>>,
pop_delay: Duration,
project_cache_envelope_processing_availability: watch::Receiver<bool>,
}

const DEFAULT_SLEEP: Duration = Duration::from_millis(100);
const DEFAULT_POP_DELAY: Duration = Duration::from_millis(100);

impl EnvelopeBufferService {
/// Creates a memory or disk based [`EnvelopeBufferService`], depending on the given config.
Expand All @@ -106,14 +106,15 @@ impl EnvelopeBufferService {
config: Arc<Config>,
memory_checker: MemoryChecker,
project_cache: Addr<ProjectCache>,
project_cache_envelope_processing_availability: watch::Receiver<bool>,
) -> Option<Self> {
config.spool_v2().then(|| Self {
config,
memory_checker,
project_cache,
has_capacity: Arc::new(AtomicBool::new(true)),
sleep: Duration::ZERO,
project_cache_ready: None,
pop_delay: Duration::ZERO,
project_cache_envelope_processing_availability,
})
}

Expand All @@ -128,11 +129,19 @@ impl EnvelopeBufferService {

/// Wait for the configured amount of time and make sure the project cache is ready to receive.
async fn ready_to_pop(&mut self) -> Result<(), SendError> {
tokio::time::sleep(self.sleep).await;
if let Some(project_cache_ready) = self.project_cache_ready.take() {
project_cache_ready.await?;
tokio::time::sleep(self.pop_delay).await;

if let Ok(availability) = self
.project_cache_envelope_processing_availability
.wait_for(|availability| *availability)
.await
{
if *availability {
return Ok(());
}
}
Ok(())

future::pending().await
}

/// Tries to pop an envelope for a ready project.
Expand All @@ -144,7 +153,8 @@ impl EnvelopeBufferService {
match buffer.peek().await? {
Peek::Empty => {
relay_log::trace!("EnvelopeBufferService empty");
self.sleep = Duration::MAX; // wait for reset by `handle_message`.
// Wait for reset by `handle_message`.
self.pop_delay = Duration::MAX;
}
Peek::Ready(_) => {
relay_log::trace!("EnvelopeBufferService pop");
Expand All @@ -153,9 +163,9 @@ impl EnvelopeBufferService {
.await?
.expect("Element disappeared despite exclusive excess");

self.project_cache_ready
.replace(self.project_cache.send(DequeuedEnvelope(envelope)));
self.sleep = Duration::ZERO; // try next pop immediately
self.project_cache.send(DequeuedEnvelope(envelope));
// Try next pop immediately.
self.pop_delay = Duration::ZERO;
}
Peek::NotReady(stack_key, envelope) => {
relay_log::trace!("EnvelopeBufferService request update");
Expand All @@ -168,9 +178,9 @@ impl EnvelopeBufferService {
self.project_cache.send(UpdateProject(sampling_key));
}
}
// deprioritize the stack to prevent head-of-line blocking
// Deprioritize the stack to prevent head-of-line blocking.
buffer.mark_seen(&stack_key);
self.sleep = DEFAULT_SLEEP;
self.pop_delay = DEFAULT_POP_DELAY;
}
}

Expand Down Expand Up @@ -202,7 +212,7 @@ impl EnvelopeBufferService {
buffer.mark_ready(&project_key, true);
}
};
self.sleep = Duration::ZERO;
self.pop_delay = Duration::ZERO;
}

/// Pushes an [`Envelope`] to the [`PolymorphicEnvelopeBuffer`].
Expand Down Expand Up @@ -235,6 +245,8 @@ impl Service for EnvelopeBufferService {
let config = self.config.clone();
let memory_checker = self.memory_checker.clone();
tokio::spawn(async move {
let mut ticker = tokio::time::interval(Duration::from_millis(100));

let buffer = PolymorphicEnvelopeBuffer::from_config(&config, memory_checker).await;

let mut buffer = match buffer {
Expand All @@ -249,15 +261,14 @@ impl Service for EnvelopeBufferService {
};
buffer.initialize().await;

let mut ticker = tokio::time::interval(Duration::from_millis(100));

relay_log::info!("EnvelopeBufferService start");
loop {
relay_log::trace!("EnvelopeBufferService loop");

tokio::select! {
// NOTE: we do not select a bias here.
// On the one hand, we might want to prioritize dequeing over enqueing
//
// On the one hand, we might want to prioritize dequeuing over enqueuing
// so we do not exceed the buffer capacity by starving the dequeue.
// on the other hand, prioritizing old messages violates the LIFO design.
Ok(()) = self.ready_to_pop() => {
Expand Down Expand Up @@ -308,7 +319,13 @@ mod tests {
.unwrap(),
);
let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone());
let service = EnvelopeBufferService::new(config, memory_checker, Addr::dummy()).unwrap();
let service = EnvelopeBufferService::new(
config,
memory_checker,
Addr::dummy(),
watch::channel(true).0.subscribe(),
)
.unwrap();

// Set capacity to false:
service.has_capacity.store(false, Ordering::Relaxed);
Expand Down Expand Up @@ -342,24 +359,47 @@ mod tests {
);
let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone());
let (project_cache_addr, mut project_cache_rx) = Addr::custom();
let service =
EnvelopeBufferService::new(config, memory_checker, project_cache_addr).unwrap();
let (envelope_processing_availability, _) = watch::channel(true);
let service = EnvelopeBufferService::new(
config,
memory_checker,
project_cache_addr,
envelope_processing_availability.subscribe(),
)
.unwrap();

let addr = service.start();

// Send five messages:
// Send one message with availability set to true.
let envelope = new_envelope(false, "foo");
let project_key = envelope.meta().public_key();
addr.send(EnvelopeBuffer::Push(envelope.clone()));

// We wait for the message to be processed.
tokio::time::sleep(Duration::from_millis(100)).await;

// We simulate the availability set to false.
envelope_processing_availability.send(false).unwrap();

// We send five more messages.
for _ in 0..5 {
addr.send(EnvelopeBuffer::Push(envelope.clone()));
}
addr.send(EnvelopeBuffer::Ready(project_key));

tokio::time::sleep(Duration::from_millis(1000)).await;

// Project cache received only one envelope:
assert_eq!(project_cache_rx.len(), 1); // without throttling, this would be 5.
assert!(project_cache_rx.try_recv().is_ok());
// We expect the project cache to have received only one envelope since the other five
// have been blocked on the check.
assert_eq!(project_cache_rx.len(), 1);
assert!(project_cache_rx.recv().await.is_some());
assert_eq!(project_cache_rx.len(), 0);

// We set the availability back to true.
envelope_processing_availability.send(true).unwrap();

// We wait for all the envelopes to be processed.
tokio::time::sleep(Duration::from_millis(100)).await;

// We expect to have received all five of them.
assert_eq!(project_cache_rx.len(), 5);
}
}
Loading

0 comments on commit 26154ba

Please sign in to comment.