Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Sep 7, 2024
1 parent 4bd1766 commit 88a9fb5
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 20 deletions.
3 changes: 3 additions & 0 deletions relay-server/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
envelope_store.clone(),
disk_batch_size,
2,
10,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
true,
Expand Down Expand Up @@ -138,6 +139,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
envelope_store.clone(),
disk_batch_size,
2,
10,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
true,
Expand Down Expand Up @@ -179,6 +181,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
envelope_store.clone(),
disk_batch_size,
2,
10,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
true,
Expand Down
26 changes: 20 additions & 6 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ impl PolymorphicEnvelopeBuffer {
Self::InMemory(buffer) => buffer.has_capacity(),
}
}

/// Evicts the least recently used envelope stacks from the buffer.
pub async fn evict(&mut self) {
match self {
Self::Sqlite(buffer) => buffer.evict().await,
Self::InMemory(buffer) => buffer.evict().await,
}
}
}

/// Error that occurs while interacting with the envelope buffer.
Expand Down Expand Up @@ -330,8 +338,8 @@ where
/// Returns `true` if at least one priority was changed.
pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
let mut changed = false;
if let Some(project_key_pair) = self.stacks_by_project.get(project) {
for project_key_pair in project_key_pair {
if let Some(project_key_pairs) = self.stacks_by_project.get(project) {
for project_key_pair in project_key_pairs {
if let Some((
QueueItem {
key: _,
Expand Down Expand Up @@ -418,10 +426,15 @@ where
}
}

// We calculate how many envelope stacks we want to keep track.
// We calculate how many envelope stacks we want to evict.
let max_lru_length =
((self.priority_queue.len() as f32) * self.evictable_stacks_percentage) as usize;
let mut lru: BinaryHeap<LRUItem> = BinaryHeap::new();
((self.priority_queue.len() as f32) * self.evictable_stacks_percentage).ceil() as usize;
relay_log::trace!(
"Evicting {} elements from the envelope buffer",
max_lru_length
);

let mut lru: BinaryHeap<LRUItem> = BinaryHeap::with_capacity(max_lru_length);
relay_statsd::metric!(timer(RelayTimers::BufferEvictLRUConstruction), {
for (queue_item, priority) in self.priority_queue.iter() {
let lru_item = LRUItem(queue_item.key, priority.readiness, queue_item.last_update);
Expand Down Expand Up @@ -743,7 +756,8 @@ mod tests {
Config::from_json_value(serde_json::json!({
"spool": {
"envelopes": {
"path": path
"path": path,
"evictable_stacks_percentage": 0.7
}
}
}))
Expand Down
20 changes: 7 additions & 13 deletions relay-server/src/services/buffer/envelope_stack/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,12 @@ impl EnvelopeStack for SqliteEnvelopeStack {

impl Evictable for SqliteEnvelopeStack {
async fn evict(&mut self) {
// We want to evict all elements in memory.
// We want to evict all elements in memory, even though the capacity check for the sqlite
// implementation is triggered only based on disk usage.
self.batches_buffer.clear();
self.batches_buffer_size = 0;

// We remove a fixed number of envelopes from disk to free up some space.
if self
.envelope_store
.delete_many(
Expand All @@ -262,7 +264,6 @@ mod tests {
use std::time::{Duration, Instant};

use super::*;
use crate::services::buffer::envelope_store::sqlite::EnvelopesOrder;
use crate::services::buffer::stack_provider::Evictable;
use crate::services::buffer::testutils::utils::{mock_envelope, mock_envelopes, setup_db};
use relay_base_schema::project::ProjectKey;
Expand Down Expand Up @@ -493,7 +494,7 @@ mod tests {
#[tokio::test]
async fn test_evict() {
let db = setup_db(true).await;
let mut envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
let mut stack =
Expand All @@ -509,15 +510,8 @@ mod tests {
// We expect 0 in-memory data since we flushed the 5 in-memory envelopes.
assert!(stack.batches_buffer.is_empty());
assert_eq!(stack.batches_buffer_size, 0);
// We expect 2 out of the 10 envelopes on disk to have been flushed, so if we load 15, we
// should get 8 back.
assert_eq!(
envelope_store
.delete_many(own_key, sampling_key, 15, EnvelopesOrder::MostRecent)
.await
.unwrap()
.len(),
8
);
// We expect 2 out of the 10 envelopes on disk to have been flushed, so we should have 8
// on disk.
assert_eq!(envelope_store.total_count().await.unwrap(), 8);
}
}
14 changes: 13 additions & 1 deletion relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ impl EnvelopeBufferService {
self.sleep = Duration::ZERO;
}

/// Pushes an [`Envelope`] to the [`PolymorphicEnvelopeBuffer`].
async fn push(&mut self, buffer: &mut PolymorphicEnvelopeBuffer, envelope: Box<Envelope>) {
if let Err(e) = buffer.push(envelope).await {
relay_log::error!(
Expand All @@ -214,6 +215,13 @@ impl EnvelopeBufferService {
}
}

/// Tries to evict elements from the [`PolymorphicEnvelopeBuffer`] if no capacity
async fn try_evict(&self, buffer: &mut PolymorphicEnvelopeBuffer) {
if !buffer.has_capacity() {
buffer.evict().await;
}
}

fn update_observable_state(&self, buffer: &mut PolymorphicEnvelopeBuffer) {
self.has_capacity
.store(buffer.has_capacity(), Ordering::Relaxed);
Expand Down Expand Up @@ -241,6 +249,8 @@ impl Service for EnvelopeBufferService {
};
buffer.initialize().await;

let mut ticker = tokio::time::interval(Duration::from_millis(100));

relay_log::info!("EnvelopeBufferService start");
loop {
relay_log::trace!("EnvelopeBufferService loop");
Expand All @@ -261,7 +271,9 @@ impl Service for EnvelopeBufferService {
Some(message) = rx.recv() => {
self.handle_message(&mut buffer, message).await;
}

_ = ticker.tick() => {
self.try_evict(&mut buffer).await;
}
else => break,
}

Expand Down

0 comments on commit 88a9fb5

Please sign in to comment.