From 83f0be2998d69e724f2697eac9000c2bcbc3e6ec Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Sat, 7 Sep 2024 10:46:38 +0200 Subject: [PATCH] Fix test --- .../services/buffer/envelope_buffer/mod.rs | 2 + relay-server/src/services/buffer/mod.rs | 28 +++++--- .../services/buffer/stack_provider/memory.rs | 4 +- .../services/buffer/stack_provider/sqlite.rs | 2 + tests/integration/test_basic.py | 67 +++++++++++++++++-- 5 files changed, 88 insertions(+), 15 deletions(-) diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index 9ebe75e3da..9bb17bae2c 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -50,9 +50,11 @@ impl PolymorphicEnvelopeBuffer { memory_checker: MemoryChecker, ) -> Result { let buffer = if config.spool_envelopes_path().is_some() { + relay_log::trace!("Initializing sqlite envelope buffer"); let buffer = EnvelopeBuffer::::new(config).await?; Self::Sqlite(buffer) } else { + relay_log::trace!("Initializing memory envelope buffer"); let buffer = EnvelopeBuffer::::new(memory_checker); Self::InMemory(buffer) }; diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 3aea3daaef..a0e28fd874 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -142,14 +142,14 @@ impl EnvelopeBufferService { &mut self, buffer: &mut PolymorphicEnvelopeBuffer, ) -> Result<(), EnvelopeBufferError> { - relay_log::trace!("EnvelopeBufferService peek"); + relay_log::trace!("EnvelopeBufferService: peeking the buffer"); match buffer.peek().await? { Peek::Empty => { - relay_log::trace!("EnvelopeBufferService empty"); + relay_log::trace!("EnvelopeBufferService: peek returned empty"); self.sleep = Duration::MAX; // wait for reset by `handle_message`. } Peek::Ready(_) => { - relay_log::trace!("EnvelopeBufferService pop"); + relay_log::trace!("EnvelopeBufferService: popping envelope"); let envelope = buffer .pop() .await? @@ -160,7 +160,7 @@ impl EnvelopeBufferService { self.sleep = Duration::ZERO; // try next pop immediately } Peek::NotReady(stack_key, envelope) => { - relay_log::trace!("EnvelopeBufferService request update"); + relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready, requesting project update"); let project_key = envelope.meta().public_key(); self.project_cache.send(UpdateProject(project_key)); match envelope.sampling_key() { @@ -190,17 +190,23 @@ impl EnvelopeBufferService { // projects was already triggered (see XXX). // For better separation of concerns, this prefetch should be triggered from here // once buffer V1 has been removed. - relay_log::trace!("EnvelopeBufferService push"); + relay_log::trace!("EnvelopeBufferService: received push message"); self.push(buffer, envelope).await; } EnvelopeBuffer::NotReady(project_key, envelope) => { - relay_log::trace!("EnvelopeBufferService project not ready"); + relay_log::trace!( + "EnvelopeBufferService: received project not ready message for project key {}", + &project_key + ); buffer.mark_ready(&project_key, false); relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesReturned) += 1); self.push(buffer, envelope).await; } EnvelopeBuffer::Ready(project_key) => { - relay_log::trace!("EnvelopeBufferService project ready {}", &project_key); + relay_log::trace!( + "EnvelopeBufferService: received project ready message for project key {}", + &project_key + ); buffer.mark_ready(&project_key, true); } }; @@ -210,6 +216,8 @@ impl EnvelopeBufferService { async fn handle_shutdown(&mut self, buffer: PolymorphicEnvelopeBuffer, message: Shutdown) { // We gracefully shut down only if the shutdown has a timeout. if let Some(shutdown_timeout) = message.timeout { + relay_log::trace!("EnvelopeBufferService: shutting down gracefully"); + let shutdown_result = timeout(shutdown_timeout, async { buffer.shutdown().await; }) @@ -261,9 +269,9 @@ impl Service for EnvelopeBufferService { let mut shutdown = Controller::shutdown_handle(); - relay_log::info!("EnvelopeBufferService start"); + relay_log::info!("EnvelopeBufferService: starting"); loop { - relay_log::trace!("EnvelopeBufferService loop"); + relay_log::trace!("EnvelopeBufferService: looping"); tokio::select! { // NOTE: we do not select a bias here. @@ -292,7 +300,7 @@ impl Service for EnvelopeBufferService { self.update_observable_state(&mut buffer); } - relay_log::info!("EnvelopeBufferService stop"); + relay_log::info!("EnvelopeBufferService: stopping"); }); } } diff --git a/relay-server/src/services/buffer/stack_provider/memory.rs b/relay-server/src/services/buffer/stack_provider/memory.rs index 0b8118e378..25dad34405 100644 --- a/relay-server/src/services/buffer/stack_provider/memory.rs +++ b/relay-server/src/services/buffer/stack_provider/memory.rs @@ -42,5 +42,7 @@ impl StackProvider for MemoryStackProvider { "memory" } - async fn drain(self, _: impl IntoIterator) {} + async fn drain(self, _: impl IntoIterator) { + relay_log::trace!("Draining memory envelope buffer"); + } } diff --git a/relay-server/src/services/buffer/stack_provider/sqlite.rs b/relay-server/src/services/buffer/stack_provider/sqlite.rs index d287076a22..12ba371fa9 100644 --- a/relay-server/src/services/buffer/stack_provider/sqlite.rs +++ b/relay-server/src/services/buffer/stack_provider/sqlite.rs @@ -125,6 +125,8 @@ impl StackProvider for SqliteStackProvider { } async fn drain(mut self, envelope_stacks: impl IntoIterator) { + relay_log::trace!("Draining sqlite envelope buffer"); + relay_statsd::metric!(timer(RelayTimers::BufferDrain), { let mut envelopes = Vec::with_capacity(self.drain_batch_size); for envelope_stack in envelope_stacks { diff --git a/tests/integration/test_basic.py b/tests/integration/test_basic.py index 6b6a943b20..b6d30f1bac 100644 --- a/tests/integration/test_basic.py +++ b/tests/integration/test_basic.py @@ -2,12 +2,15 @@ import queue import os import gzip +import sqlite3 +import tempfile + import pytest import signal import zlib -def test_graceful_shutdown(mini_sentry, relay): +def test_graceful_shutdown_with_in_memory_buffer(mini_sentry, relay): from time import sleep get_project_config_original = mini_sentry.app.view_functions["get_project_config"] @@ -17,14 +20,70 @@ def get_project_config(): sleep(1) # Causes the process to wait for one second before shutting down return get_project_config_original() - relay = relay(mini_sentry, {"limits": {"shutdown_timeout": 2}}) project_id = 42 mini_sentry.add_basic_project_config(project_id) + + relay = relay( + mini_sentry, + { + "limits": {"shutdown_timeout": 2}, + "spool": {"envelopes": {"version": "experimental"}}, + }, + ) + relay.send_event(project_id) relay.shutdown(sig=signal.SIGTERM) - event = mini_sentry.captured_events.get(timeout=0).get_event() - assert event["logentry"] == {"formatted": "Hello, World!"} + + # When using the memory envelope buffer, we do not flush envelopes, so we lose all of them. + assert mini_sentry.captured_events.empty() + + +def test_graceful_shutdown_with_sqlite_buffer(mini_sentry, relay): + from time import sleep + + # Create a temporary directory for the sqlite db. + db_file_path = os.path.join(tempfile.mkdtemp(), "database.db") + + get_project_config_original = mini_sentry.app.view_functions["get_project_config"] + + @mini_sentry.app.endpoint("get_project_config") + def get_project_config(): + sleep(1) # Causes the process to wait for one second before shutting down + return get_project_config_original() + + project_id = 42 + mini_sentry.add_basic_project_config(project_id) + + relay = relay( + mini_sentry, + { + "limits": {"shutdown_timeout": 2}, + "spool": {"envelopes": {"version": "experimental", "path": db_file_path}}, + }, + ) + + n = 10 + for i in range(n): + relay.send_event(project_id) + + relay.shutdown(sig=signal.SIGTERM) + + # When using the disk envelope buffer, we don't forward envelopes, but we spool them to disk. + assert mini_sentry.captured_events.empty() + + # Check if there's data in the SQLite table `envelopes` + conn = sqlite3.connect(db_file_path) + cursor = conn.cursor() + + # Check if there's data in the `envelopes` table + cursor.execute("SELECT COUNT(*) FROM envelopes") + row_count = cursor.fetchone()[0] + assert ( + row_count == n + ), f"The 'envelopes' table is empty. Expected {n} rows, but found {row_count}" + + conn.close() @pytest.mark.skip("Flaky test")