Skip to content

Commit

Permalink
Fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Sep 7, 2024
1 parent 7cf5189 commit 83f0be2
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 15 deletions.
2 changes: 2 additions & 0 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ impl PolymorphicEnvelopeBuffer {
memory_checker: MemoryChecker,
) -> Result<Self, EnvelopeBufferError> {
let buffer = if config.spool_envelopes_path().is_some() {
relay_log::trace!("Initializing sqlite envelope buffer");
let buffer = EnvelopeBuffer::<SqliteStackProvider>::new(config).await?;
Self::Sqlite(buffer)
} else {
relay_log::trace!("Initializing memory envelope buffer");
let buffer = EnvelopeBuffer::<MemoryStackProvider>::new(memory_checker);
Self::InMemory(buffer)
};
Expand Down
28 changes: 18 additions & 10 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ impl EnvelopeBufferService {
&mut self,
buffer: &mut PolymorphicEnvelopeBuffer,
) -> Result<(), EnvelopeBufferError> {
relay_log::trace!("EnvelopeBufferService peek");
relay_log::trace!("EnvelopeBufferService: peeking the buffer");
match buffer.peek().await? {
Peek::Empty => {
relay_log::trace!("EnvelopeBufferService empty");
relay_log::trace!("EnvelopeBufferService: peek returned empty");
self.sleep = Duration::MAX; // wait for reset by `handle_message`.
}
Peek::Ready(_) => {
relay_log::trace!("EnvelopeBufferService pop");
relay_log::trace!("EnvelopeBufferService: popping envelope");
let envelope = buffer
.pop()
.await?
Expand All @@ -160,7 +160,7 @@ impl EnvelopeBufferService {
self.sleep = Duration::ZERO; // try next pop immediately
}
Peek::NotReady(stack_key, envelope) => {
relay_log::trace!("EnvelopeBufferService request update");
relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready, requesting project update");
let project_key = envelope.meta().public_key();
self.project_cache.send(UpdateProject(project_key));
match envelope.sampling_key() {
Expand Down Expand Up @@ -190,17 +190,23 @@ impl EnvelopeBufferService {
// projects was already triggered (see XXX).
// For better separation of concerns, this prefetch should be triggered from here
// once buffer V1 has been removed.
relay_log::trace!("EnvelopeBufferService push");
relay_log::trace!("EnvelopeBufferService: received push message");
self.push(buffer, envelope).await;
}
EnvelopeBuffer::NotReady(project_key, envelope) => {
relay_log::trace!("EnvelopeBufferService project not ready");
relay_log::trace!(
"EnvelopeBufferService: received project not ready message for project key {}",
&project_key
);
buffer.mark_ready(&project_key, false);
relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesReturned) += 1);
self.push(buffer, envelope).await;
}
EnvelopeBuffer::Ready(project_key) => {
relay_log::trace!("EnvelopeBufferService project ready {}", &project_key);
relay_log::trace!(
"EnvelopeBufferService: received project ready message for project key {}",
&project_key
);
buffer.mark_ready(&project_key, true);
}
};
Expand All @@ -210,6 +216,8 @@ impl EnvelopeBufferService {
async fn handle_shutdown(&mut self, buffer: PolymorphicEnvelopeBuffer, message: Shutdown) {
// We gracefully shut down only if the shutdown has a timeout.
if let Some(shutdown_timeout) = message.timeout {
relay_log::trace!("EnvelopeBufferService: shutting down gracefully");

let shutdown_result = timeout(shutdown_timeout, async {
buffer.shutdown().await;
})
Expand Down Expand Up @@ -261,9 +269,9 @@ impl Service for EnvelopeBufferService {

let mut shutdown = Controller::shutdown_handle();

relay_log::info!("EnvelopeBufferService start");
relay_log::info!("EnvelopeBufferService: starting");
loop {
relay_log::trace!("EnvelopeBufferService loop");
relay_log::trace!("EnvelopeBufferService: looping");

tokio::select! {
// NOTE: we do not select a bias here.
Expand Down Expand Up @@ -292,7 +300,7 @@ impl Service for EnvelopeBufferService {
self.update_observable_state(&mut buffer);
}

relay_log::info!("EnvelopeBufferService stop");
relay_log::info!("EnvelopeBufferService: stopping");
});
}
}
Expand Down
4 changes: 3 additions & 1 deletion relay-server/src/services/buffer/stack_provider/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,7 @@ impl StackProvider for MemoryStackProvider {
"memory"
}

async fn drain(self, _: impl IntoIterator<Item = Self::Stack>) {}
async fn drain(self, _: impl IntoIterator<Item = Self::Stack>) {
relay_log::trace!("Draining memory envelope buffer");
}
}
2 changes: 2 additions & 0 deletions relay-server/src/services/buffer/stack_provider/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ impl StackProvider for SqliteStackProvider {
}

async fn drain(mut self, envelope_stacks: impl IntoIterator<Item = Self::Stack>) {
relay_log::trace!("Draining sqlite envelope buffer");

relay_statsd::metric!(timer(RelayTimers::BufferDrain), {
let mut envelopes = Vec::with_capacity(self.drain_batch_size);
for envelope_stack in envelope_stacks {
Expand Down
67 changes: 63 additions & 4 deletions tests/integration/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
import queue
import os
import gzip
import sqlite3
import tempfile

import pytest
import signal
import zlib


def test_graceful_shutdown(mini_sentry, relay):
def test_graceful_shutdown_with_in_memory_buffer(mini_sentry, relay):
from time import sleep

get_project_config_original = mini_sentry.app.view_functions["get_project_config"]
Expand All @@ -17,14 +20,70 @@ def get_project_config():
sleep(1) # Causes the process to wait for one second before shutting down
return get_project_config_original()

relay = relay(mini_sentry, {"limits": {"shutdown_timeout": 2}})
project_id = 42
mini_sentry.add_basic_project_config(project_id)

relay = relay(
mini_sentry,
{
"limits": {"shutdown_timeout": 2},
"spool": {"envelopes": {"version": "experimental"}},
},
)

relay.send_event(project_id)

relay.shutdown(sig=signal.SIGTERM)
event = mini_sentry.captured_events.get(timeout=0).get_event()
assert event["logentry"] == {"formatted": "Hello, World!"}

# When using the memory envelope buffer, we do not flush envelopes, so we lose all of them.
assert mini_sentry.captured_events.empty()


def test_graceful_shutdown_with_sqlite_buffer(mini_sentry, relay):
from time import sleep

# Create a temporary directory for the sqlite db.
db_file_path = os.path.join(tempfile.mkdtemp(), "database.db")

get_project_config_original = mini_sentry.app.view_functions["get_project_config"]

@mini_sentry.app.endpoint("get_project_config")
def get_project_config():
sleep(1) # Causes the process to wait for one second before shutting down
return get_project_config_original()

project_id = 42
mini_sentry.add_basic_project_config(project_id)

relay = relay(
mini_sentry,
{
"limits": {"shutdown_timeout": 2},
"spool": {"envelopes": {"version": "experimental", "path": db_file_path}},
},
)

n = 10
for i in range(n):
relay.send_event(project_id)

relay.shutdown(sig=signal.SIGTERM)

# When using the disk envelope buffer, we don't forward envelopes, but we spool them to disk.
assert mini_sentry.captured_events.empty()

# Check if there's data in the SQLite table `envelopes`
conn = sqlite3.connect(db_file_path)
cursor = conn.cursor()

# Check if there's data in the `envelopes` table
cursor.execute("SELECT COUNT(*) FROM envelopes")
row_count = cursor.fetchone()[0]
assert (
row_count == n
), f"The 'envelopes' table is empty. Expected {n} rows, but found {row_count}"

conn.close()


@pytest.mark.skip("Flaky test")
Expand Down

0 comments on commit 83f0be2

Please sign in to comment.