Skip to content

Commit

Permalink
Revert "feat(spooler): Implement backpressure mechanism based on atom…
Browse files Browse the repository at this point in the history
…ics (#4040)"

This reverts commit 0198a23.
  • Loading branch information
iambriccardo committed Sep 18, 2024
1 parent 0198a23 commit 911da87
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 79 deletions.
6 changes: 0 additions & 6 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::convert::Infallible;
use std::fmt;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -242,9 +241,6 @@ impl ServiceState {
)
.spawn_handler(processor_rx);

// We initialize a shared boolean that is used to manage backpressure between the
// EnvelopeBufferService and the ProjectCacheService.
let project_cache_ready = Arc::new(AtomicBool::new(true));
let envelope_buffer = EnvelopeBufferService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
Expand All @@ -254,7 +250,6 @@ impl ServiceState {
outcome_aggregator: outcome_aggregator.clone(),
test_store: test_store.clone(),
},
project_cache_ready.clone(),
)
.map(|b| b.start_observable());

Expand All @@ -277,7 +272,6 @@ impl ServiceState {
redis_pools
.as_ref()
.map(|pools| pools.project_configs.clone()),
project_cache_ready,
)
.spawn_handler(project_cache_rx);

Expand Down
57 changes: 2 additions & 55 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ pub struct EnvelopeBufferService {
services: Services,
has_capacity: Arc<AtomicBool>,
sleep: Duration,
project_cache_ready: Arc<AtomicBool>,
}

/// The maximum amount of time between evaluations of dequeue conditions.
Expand All @@ -129,7 +128,6 @@ impl EnvelopeBufferService {
memory_checker: MemoryChecker,
global_config_rx: watch::Receiver<global_config::Status>,
services: Services,
project_cache_ready: Arc<AtomicBool>,
) -> Option<Self> {
config.spool_v2().then(|| Self {
config,
Expand All @@ -138,7 +136,6 @@ impl EnvelopeBufferService {
services,
has_capacity: Arc::new(AtomicBool::new(true)),
sleep: Duration::ZERO,
project_cache_ready,
})
}

Expand Down Expand Up @@ -172,12 +169,6 @@ impl EnvelopeBufferService {
tokio::time::sleep(self.sleep).await;
}

// In case the project cache is not ready, we defer popping to first try and handle incoming
// messages and only come back to this in case within the timeout no data was received.
while !self.project_cache_ready.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_millis(10)).await;
}

relay_statsd::metric!(
counter(RelayCounters::BufferReadyToPop) += 1,
status = "slept"
Expand Down Expand Up @@ -243,11 +234,7 @@ impl EnvelopeBufferService {
.pop()
.await?
.expect("Element disappeared despite exclusive excess");
// We assume that the project cache is now busy to process this envelope, so we flip
// the boolean flag, which will prioritize writes.
self.project_cache_ready.store(false, Ordering::SeqCst);
self.services.project_cache.send(DequeuedEnvelope(envelope));

self.sleep = Duration::ZERO; // try next pop immediately
}
Peek::NotReady(stack_key, envelope) => {
Expand Down Expand Up @@ -452,7 +439,6 @@ mod tests {
watch::Sender<global_config::Status>,
mpsc::UnboundedReceiver<ProjectCache>,
mpsc::UnboundedReceiver<TrackOutcome>,
Arc<AtomicBool>,
) {
let config = Arc::new(
Config::from_json_value(serde_json::json!({
Expand All @@ -468,7 +454,6 @@ mod tests {
let (global_tx, global_rx) = watch::channel(global_config::Status::Pending);
let (project_cache, project_cache_rx) = Addr::custom();
let (outcome_aggregator, outcome_aggregator_rx) = Addr::custom();
let project_cache_ready = Arc::new(AtomicBool::new(true));
(
EnvelopeBufferService::new(
config,
Expand All @@ -479,20 +464,18 @@ mod tests {
outcome_aggregator,
test_store: Addr::dummy(),
},
project_cache_ready.clone(),
)
.unwrap(),
global_tx,
project_cache_rx,
outcome_aggregator_rx,
project_cache_ready,
)
}

#[tokio::test]
async fn capacity_is_updated() {
tokio::time::pause();
let (service, _global_rx, _project_cache_tx, _, _) = buffer_service();
let (service, _global_rx, _project_cache_tx, _) = buffer_service();

// Set capacity to false:
service.has_capacity.store(false, Ordering::Relaxed);
Expand All @@ -514,7 +497,7 @@ mod tests {
#[tokio::test]
async fn pop_requires_global_config() {
tokio::time::pause();
let (service, global_tx, project_cache_rx, _, _) = buffer_service();
let (service, global_tx, project_cache_rx, _) = buffer_service();

let addr = service.start();

Expand Down Expand Up @@ -563,7 +546,6 @@ mod tests {
)));

let (project_cache, project_cache_rx) = Addr::custom();
let project_cache_ready = Arc::new(AtomicBool::new(true));
let service = EnvelopeBufferService::new(
config,
memory_checker,
Expand All @@ -573,7 +555,6 @@ mod tests {
outcome_aggregator: Addr::dummy(),
test_store: Addr::dummy(),
},
project_cache_ready,
)
.unwrap();
let addr = service.start();
Expand Down Expand Up @@ -609,7 +590,6 @@ mod tests {
let (global_tx, global_rx) = watch::channel(global_config::Status::Pending);
let (project_cache, project_cache_rx) = Addr::custom();
let (outcome_aggregator, mut outcome_aggregator_rx) = Addr::custom();
let project_cache_ready = Arc::new(AtomicBool::new(true));
let service = EnvelopeBufferService::new(
config,
memory_checker,
Expand All @@ -619,7 +599,6 @@ mod tests {
outcome_aggregator,
test_store: Addr::dummy(),
},
project_cache_ready,
)
.unwrap();

Expand All @@ -644,36 +623,4 @@ mod tests {
assert_eq!(outcome.category, DataCategory::TransactionIndexed);
assert_eq!(outcome.quantity, 1);
}

#[tokio::test]
async fn output_is_throttled() {
tokio::time::pause();
let (service, global_tx, mut project_cache_rx, _, _) = buffer_service();
global_tx.send_replace(global_config::Status::Ready(Arc::new(
GlobalConfig::default(),
)));

let addr = service.start();

// Send five messages:
let envelope = new_envelope(false, "foo");
let project_key = envelope.meta().public_key();
for _ in 0..5 {
addr.send(EnvelopeBuffer::Push(envelope.clone()));
}
addr.send(EnvelopeBuffer::Ready(project_key));

tokio::time::sleep(Duration::from_millis(100)).await;

let mut messages = vec![];
project_cache_rx.recv_many(&mut messages, 100).await;

assert_eq!(
messages
.iter()
.filter(|message| matches!(message, ProjectCache::HandleDequeuedEnvelope(..)))
.count(),
1
);
}
}
26 changes: 8 additions & 18 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::{BTreeMap, BTreeSet};
use std::error::Error;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -293,7 +292,7 @@ pub enum ProjectCache {
UpdateSpoolIndex(UpdateSpoolIndex),
SpoolHealth(Sender<bool>),
RefreshIndexCache(RefreshIndexCache),
HandleDequeuedEnvelope(Box<Envelope>),
HandleDequeuedEnvelope(Box<Envelope>, Sender<()>),
UpdateProject(ProjectKey),
}

Expand All @@ -312,7 +311,7 @@ impl ProjectCache {
Self::UpdateSpoolIndex(_) => "UpdateSpoolIndex",
Self::SpoolHealth(_) => "SpoolHealth",
Self::RefreshIndexCache(_) => "RefreshIndexCache",
Self::HandleDequeuedEnvelope(_) => "HandleDequeuedEnvelope",
Self::HandleDequeuedEnvelope(_, _) => "HandleDequeuedEnvelope",
Self::UpdateProject(_) => "UpdateProject",
}
}
Expand Down Expand Up @@ -420,11 +419,11 @@ impl FromMessage<SpoolHealth> for ProjectCache {
}

impl FromMessage<DequeuedEnvelope> for ProjectCache {
type Response = relay_system::NoResponse;
type Response = relay_system::AsyncResponse<()>;

fn from_message(message: DequeuedEnvelope, _: ()) -> Self {
fn from_message(message: DequeuedEnvelope, sender: Sender<()>) -> Self {
let DequeuedEnvelope(envelope) = message;
Self::HandleDequeuedEnvelope(envelope)
Self::HandleDequeuedEnvelope(envelope, sender)
}
}

Expand Down Expand Up @@ -611,8 +610,6 @@ struct ProjectCacheBroker {
spool_v1: Option<SpoolV1>,
/// Status of the global configuration, used to determine readiness for processing.
global_config: GlobalConfigStatus,
/// Atomic boolean signaling whether the project cache is ready to accept a new envelope.
project_cache_ready: Arc<AtomicBool>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -1308,7 +1305,7 @@ impl ProjectCacheBroker {
ProjectCache::RefreshIndexCache(message) => {
self.handle_refresh_index_cache(message)
}
ProjectCache::HandleDequeuedEnvelope(message) => {
ProjectCache::HandleDequeuedEnvelope(message, sender) => {
let envelope_buffer = self
.services
.envelope_buffer
Expand All @@ -1321,9 +1318,8 @@ impl ProjectCacheBroker {
"Failed to handle popped envelope"
);
}

// We mark the project cache as ready to accept new traffic.
self.project_cache_ready.store(true, Ordering::SeqCst);
// Return response to signal readiness for next envelope:
sender.send(())
}
ProjectCache::UpdateProject(project) => self.handle_update_project(project),
}
Expand All @@ -1340,7 +1336,6 @@ pub struct ProjectCacheService {
services: Services,
global_config_rx: watch::Receiver<global_config::Status>,
redis: Option<RedisPool>,
project_cache_ready: Arc<AtomicBool>,
}

impl ProjectCacheService {
Expand All @@ -1351,15 +1346,13 @@ impl ProjectCacheService {
services: Services,
global_config_rx: watch::Receiver<global_config::Status>,
redis: Option<RedisPool>,
project_cache_ready: Arc<AtomicBool>,
) -> Self {
Self {
config,
memory_checker,
services,
global_config_rx,
redis,
project_cache_ready,
}
}
}
Expand All @@ -1374,7 +1367,6 @@ impl Service for ProjectCacheService {
services,
mut global_config_rx,
redis,
project_cache_ready,
} = self;
let project_cache = services.project_cache.clone();
let outcome_aggregator = services.outcome_aggregator.clone();
Expand Down Expand Up @@ -1456,7 +1448,6 @@ impl Service for ProjectCacheService {
spool_v1_unspool_handle: SleepHandle::idle(),
spool_v1,
global_config,
project_cache_ready,
};

loop {
Expand Down Expand Up @@ -1651,7 +1642,6 @@ mod tests {
buffer_unspool_backoff: RetryBackoff::new(Duration::from_millis(100)),
}),
global_config: GlobalConfigStatus::Pending,
project_cache_ready: Arc::new(AtomicBool::new(true)),
},
buffer,
)
Expand Down

0 comments on commit 911da87

Please sign in to comment.