Skip to content

Commit

Permalink
Merge
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Sep 17, 2024
2 parents e77c328 + ea07659 commit 0bebf4c
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 27 deletions.
8 changes: 6 additions & 2 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;
use std::time::Duration;

use crate::metrics::{MetricOutcomes, MetricStats};
use crate::services::buffer::{EnvelopeBufferService, ObservableEnvelopeBuffer};
use crate::services::buffer::{self, EnvelopeBufferService, ObservableEnvelopeBuffer};
use crate::services::stats::RelayStats;
use anyhow::{Context, Result};
use axum::extract::FromRequestParts;
Expand Down Expand Up @@ -249,7 +249,11 @@ impl ServiceState {
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
global_config_rx.clone(),
project_cache.clone(),
buffer::Services {
project_cache: project_cache.clone(),
outcome_aggregator: outcome_aggregator.clone(),
test_store: test_store.clone(),
},
project_cache_ready.clone(),
)
.map(|b| b.start_observable());
Expand Down
132 changes: 117 additions & 15 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@ use tokio::time::timeout;
use crate::envelope::Envelope;
use crate::services::buffer::envelope_buffer::Peek;
use crate::services::global_config;
use crate::services::outcome::DiscardReason;
use crate::services::outcome::Outcome;
use crate::services::outcome::TrackOutcome;
use crate::services::processor::ProcessingGroup;
use crate::services::project_cache::DequeuedEnvelope;
use crate::services::project_cache::ProjectCache;
use crate::services::project_cache::UpdateProject;
use crate::services::test_store::TestStore;
use crate::statsd::RelayCounters;
use crate::utils::ManagedEnvelope;
use crate::utils::MemoryChecker;

pub use envelope_buffer::EnvelopeBufferError;
Expand Down Expand Up @@ -89,13 +95,20 @@ impl ObservableEnvelopeBuffer {
}
}

/// Services that the buffer service communicates with.
pub struct Services {
pub project_cache: Addr<ProjectCache>,
pub outcome_aggregator: Addr<TrackOutcome>,
pub test_store: Addr<TestStore>,
}

/// Spool V2 service which buffers envelopes and forwards them to the project cache when a project
/// becomes ready.
pub struct EnvelopeBufferService {
config: Arc<Config>,
memory_checker: MemoryChecker,
global_config_rx: watch::Receiver<global_config::Status>,
project_cache: Addr<ProjectCache>,
services: Services,
has_capacity: Arc<AtomicBool>,
sleep: Duration,
project_cache_ready: Arc<AtomicBool>,
Expand All @@ -116,15 +129,14 @@ impl EnvelopeBufferService {
config: Arc<Config>,
memory_checker: MemoryChecker,
global_config_rx: watch::Receiver<global_config::Status>,
project_cache: Addr<ProjectCache>,
services: Services,
project_cache_ready: Arc<AtomicBool>,
) -> Option<Self> {
config.spool_v2().then(|| Self {
config,
memory_checker,

global_config_rx,
project_cache,
services,
has_capacity: Arc::new(AtomicBool::new(true)),
sleep: Duration::ZERO,
project_cache_ready,
Expand Down Expand Up @@ -214,6 +226,13 @@ impl EnvelopeBufferService {
);
self.sleep = Duration::MAX; // wait for reset by `handle_message`.
}
Peek::Ready(envelope) | Peek::NotReady(.., envelope) if self.expired(envelope) => {
let envelope = buffer
.pop()
.await?
.expect("Element disappeared despite exclusive excess");
self.drop_expired(envelope);
}
Peek::Ready(_) => {
relay_log::trace!("EnvelopeBufferService: popping envelope");
relay_statsd::metric!(
Expand All @@ -227,7 +246,7 @@ impl EnvelopeBufferService {
// We assume that the project cache is now busy to process this envelope, so we flip
// the boolean flag, which will prioritize writes.
self.project_cache_ready.store(false, Ordering::SeqCst);
self.project_cache.send(DequeuedEnvelope(envelope));
self.services.project_cache.send(DequeuedEnvelope(envelope));

self.sleep = Duration::ZERO; // try next pop immediately
}
Expand All @@ -238,12 +257,14 @@ impl EnvelopeBufferService {
peek_result = "not_ready"
);
let project_key = envelope.meta().public_key();
self.project_cache.send(UpdateProject(project_key));
self.services.project_cache.send(UpdateProject(project_key));
match envelope.sampling_key() {
None => {}
Some(sampling_key) if sampling_key == project_key => {} // already sent.
Some(sampling_key) => {
self.project_cache.send(UpdateProject(sampling_key));
self.services
.project_cache
.send(UpdateProject(sampling_key));
}
}
// deprioritize the stack to prevent head-of-line blocking
Expand All @@ -255,6 +276,20 @@ impl EnvelopeBufferService {
Ok(())
}

fn expired(&self, envelope: &Envelope) -> bool {
envelope.meta().start_time().elapsed() > self.config.spool_envelopes_max_age()
}

fn drop_expired(&self, envelope: Box<Envelope>) {
let mut managed_envelope = ManagedEnvelope::new(
envelope,
self.services.outcome_aggregator.clone(),
self.services.test_store.clone(),
ProcessingGroup::Ungrouped,
);
managed_envelope.reject(Outcome::Invalid(DiscardReason::Timestamp));
}

async fn handle_message(
&mut self,
buffer: &mut PolymorphicEnvelopeBuffer,
Expand Down Expand Up @@ -400,9 +435,10 @@ impl Service for EnvelopeBufferService {

#[cfg(test)]
mod tests {
use std::time::Duration;
use std::time::{Duration, Instant};

use relay_dynamic_config::GlobalConfig;
use relay_quotas::DataCategory;
use tokio::sync::mpsc;
use uuid::Uuid;

Expand All @@ -415,6 +451,7 @@ mod tests {
EnvelopeBufferService,
watch::Sender<global_config::Status>,
mpsc::UnboundedReceiver<ProjectCache>,
mpsc::UnboundedReceiver<TrackOutcome>,
Arc<AtomicBool>,
) {
let config = Arc::new(
Expand All @@ -429,27 +466,33 @@ mod tests {
);
let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone());
let (global_tx, global_rx) = watch::channel(global_config::Status::Pending);
let (project_cache_addr, project_cache_rx) = Addr::custom();
let (project_cache, project_cache_rx) = Addr::custom();
let (outcome_aggregator, outcome_aggregator_rx) = Addr::custom();
let project_cache_ready = Arc::new(AtomicBool::new(true));
(
EnvelopeBufferService::new(
config,
memory_checker,
global_rx,
project_cache_addr,
Services {
project_cache,
outcome_aggregator,
test_store: Addr::dummy(),
},
project_cache_ready.clone(),
)
.unwrap(),
global_tx,
project_cache_rx,
outcome_aggregator_rx,
project_cache_ready,
)
}

#[tokio::test]
async fn capacity_is_updated() {
tokio::time::pause();
let (service, _global_rx, _project_cache_tx, _project_cache_ready) = buffer_service();
let (service, _global_rx, _project_cache_tx, _, _) = buffer_service();

// Set capacity to false:
service.has_capacity.store(false, Ordering::Relaxed);
Expand All @@ -471,7 +514,7 @@ mod tests {
#[tokio::test]
async fn pop_requires_global_config() {
tokio::time::pause();
let (service, global_tx, project_cache_rx, _project_cache_ready) = buffer_service();
let (service, global_tx, project_cache_rx, _, _) = buffer_service();

let addr = service.start();

Expand Down Expand Up @@ -519,13 +562,17 @@ mod tests {
GlobalConfig::default(),
)));

let (project_cache_addr, project_cache_rx) = Addr::custom();
let (project_cache, project_cache_rx) = Addr::custom();
let project_cache_ready = Arc::new(AtomicBool::new(true));
let service = EnvelopeBufferService::new(
config,
memory_checker,
global_rx,
project_cache_addr,
Services {
project_cache,
outcome_aggregator: Addr::dummy(),
test_store: Addr::dummy(),
},
project_cache_ready,
)
.unwrap();
Expand All @@ -543,10 +590,65 @@ mod tests {
assert_eq!(project_cache_rx.len(), 0);
}

#[tokio::test]
async fn old_envelope_is_dropped() {
tokio::time::pause();

let config = Arc::new(
Config::from_json_value(serde_json::json!({
"spool": {
"envelopes": {
"version": "experimental",
"max_envelope_delay_secs": 1,
}
}
}))
.unwrap(),
);
let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone());
let (global_tx, global_rx) = watch::channel(global_config::Status::Pending);
let (project_cache, project_cache_rx) = Addr::custom();
let (outcome_aggregator, mut outcome_aggregator_rx) = Addr::custom();
let project_cache_ready = Arc::new(AtomicBool::new(true));
let service = EnvelopeBufferService::new(
config,
memory_checker,
global_rx,
Services {
project_cache,
outcome_aggregator,
test_store: Addr::dummy(),
},
project_cache_ready,
)
.unwrap();

global_tx.send_replace(global_config::Status::Ready(Arc::new(
GlobalConfig::default(),
)));

let config = service.config.clone();
let addr = service.start();

// Send five messages:
let mut envelope = new_envelope(false, "foo");
envelope
.meta_mut()
.set_start_time(Instant::now() - 2 * config.spool_envelopes_max_age());
addr.send(EnvelopeBuffer::Push(envelope));

tokio::time::sleep(Duration::from_millis(100)).await;

assert!(project_cache_rx.is_empty());
let outcome = outcome_aggregator_rx.try_recv().unwrap();
assert_eq!(outcome.category, DataCategory::TransactionIndexed);
assert_eq!(outcome.quantity, 1);
}

#[tokio::test]
async fn output_is_throttled() {
tokio::time::pause();
let (service, global_tx, mut project_cache_rx, _project_cache_ready) = buffer_service();
let (service, global_tx, mut project_cache_rx, _, _) = buffer_service();
global_tx.send_replace(global_config::Status::Ready(Arc::new(
GlobalConfig::default(),
)));
Expand Down
10 changes: 0 additions & 10 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1095,16 +1095,6 @@ impl ProjectCacheBroker {
envelope: Box<Envelope>,
envelope_buffer: Addr<EnvelopeBuffer>,
) -> Result<(), EnvelopeBufferError> {
if envelope.meta().start_time().elapsed() > self.config.spool_envelopes_max_age() {
let mut managed_envelope = ManagedEnvelope::new(
envelope,
self.services.outcome_aggregator.clone(),
self.services.test_store.clone(),
ProcessingGroup::Ungrouped,
);
managed_envelope.reject(Outcome::Invalid(DiscardReason::Timestamp));
return Ok(());
}
let sampling_key = envelope.sampling_key();
let services = self.services.clone();

Expand Down

0 comments on commit 0bebf4c

Please sign in to comment.