Skip to content

Commit

Permalink
feat(spooler): Implement shutdown behavior in the spooler (#3980)
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo authored Sep 13, 2024
1 parent 5d351d5 commit 828b130
Show file tree
Hide file tree
Showing 11 changed files with 318 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- Remove the `generate-schema` tool. Relay no longer exposes JSON schema for the event protocol. Consult the Rust type documentation of the `relay-event-schema` crate instead. ([#3974](https://github.com/getsentry/relay/pull/3974))
- Allow creation of `SqliteEnvelopeBuffer` from config, and load existing stacks from db on startup. ([#3967](https://github.com/getsentry/relay/pull/3967))
- Only tag `user.geo.subregion` on frontend and mobile projects. ([#4013](https://github.com/getsentry/relay/pull/4013), [#4023](https://github.com/getsentry/relay/pull/4023))
- Implement graceful shutdown mechanism in the `EnvelopeBuffer`. ([#3980](https://github.com/getsentry/relay/pull/3980))

## 24.8.0

Expand Down
35 changes: 30 additions & 5 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::convert::Infallible;
use std::error::Error;
use std::mem;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::Arc;
Expand Down Expand Up @@ -55,9 +56,11 @@ impl PolymorphicEnvelopeBuffer {
memory_checker: MemoryChecker,
) -> Result<Self, EnvelopeBufferError> {
let buffer = if config.spool_envelopes_path().is_some() {
relay_log::trace!("PolymorphicEnvelopeBuffer: initializing sqlite envelope buffer");
let buffer = EnvelopeBuffer::<SqliteStackProvider>::new(config).await?;
Self::Sqlite(buffer)
} else {
relay_log::trace!("PolymorphicEnvelopeBuffer: initializing memory envelope buffer");
let buffer = EnvelopeBuffer::<MemoryStackProvider>::new(memory_checker);
Self::InMemory(buffer)
};
Expand Down Expand Up @@ -137,6 +140,20 @@ impl PolymorphicEnvelopeBuffer {
Self::InMemory(buffer) => buffer.has_capacity(),
}
}

/// Shuts down the [`PolymorphicEnvelopeBuffer`].
pub async fn shutdown(&mut self) -> bool {
// Currently, we want to flush the buffer only for disk, since the in memory implementation
// tries to not do anything and pop as many elements as possible within the shutdown
// timeout.
let Self::Sqlite(buffer) = self else {
relay_log::trace!("PolymorphicEnvelopeBuffer: shutdown procedure not needed");
return false;
};
buffer.flush().await;

true
}
}

/// Error that occurs while interacting with the envelope buffer.
Expand Down Expand Up @@ -374,6 +391,19 @@ where
});
}

/// Returns `true` if the underlying storage has the capacity to store more envelopes.
pub fn has_capacity(&self) -> bool {
self.stack_provider.has_store_capacity()
}

/// Flushes the envelope buffer.
pub async fn flush(&mut self) {
let priority_queue = mem::take(&mut self.priority_queue);
self.stack_provider
.flush(priority_queue.into_iter().map(|(q, _)| q.value))
.await;
}

/// Pushes a new [`EnvelopeStack`] with the given [`Envelope`] inserted.
async fn push_stack(
&mut self,
Expand Down Expand Up @@ -413,11 +443,6 @@ where
Ok(())
}

/// Returns `true` if the underlying storage has the capacity to store more envelopes.
pub fn has_capacity(&self) -> bool {
self.stack_provider.has_store_capacity()
}

/// Pops an [`EnvelopeStack`] with the supplied [`EnvelopeBufferError`].
fn pop_stack(&mut self, project_key_pair: ProjectKeyPair) {
for project_key in project_key_pair.iter() {
Expand Down
4 changes: 4 additions & 0 deletions relay-server/src/services/buffer/envelope_stack/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ impl EnvelopeStack for MemoryEnvelopeStack {
async fn pop(&mut self) -> Result<Option<Box<Envelope>>, Self::Error> {
Ok(self.0.pop())
}

fn flush(self) -> Vec<Box<Envelope>> {
self.0
}
}
4 changes: 4 additions & 0 deletions relay-server/src/services/buffer/envelope_stack/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,8 @@ pub trait EnvelopeStack: Send + std::fmt::Debug {

/// Pops the [`Envelope`] on top of the stack.
fn pop(&mut self) -> impl Future<Output = Result<Option<Box<Envelope>>, Self::Error>>;

/// Persists all envelopes in the [`EnvelopeStack`]s to external storage, if possible,
/// and consumes the stack provider.
fn flush(self) -> Vec<Box<Envelope>>;
}
32 changes: 32 additions & 0 deletions relay-server/src/services/buffer/envelope_stack/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ impl EnvelopeStack for SqliteEnvelopeStack {

Ok(result)
}

fn flush(self) -> Vec<Box<Envelope>> {
self.batches_buffer.into_iter().flatten().collect()
}
}

#[cfg(test)]
Expand Down Expand Up @@ -461,4 +465,32 @@ mod tests {
}
assert_eq!(stack.batches_buffer_size, 0);
}

#[tokio::test]
async fn test_drain() {
let db = setup_db(true).await;
let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let mut stack = SqliteEnvelopeStack::new(
envelope_store.clone(),
5,
1,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);

let envelopes = mock_envelopes(5);

// We push 5 envelopes and check that there is nothing on disk.
for envelope in envelopes.clone() {
assert!(stack.push(envelope).await.is_ok());
}
assert_eq!(stack.batches_buffer_size, 5);
assert_eq!(envelope_store.total_count().await.unwrap(), 0);

// We drain the stack and make sure nothing was spooled to disk.
let drained_envelopes = stack.flush();
assert_eq!(drained_envelopes.into_iter().collect::<Vec<_>>().len(), 5);
assert_eq!(envelope_store.total_count().await.unwrap(), 0);
}
}
69 changes: 56 additions & 13 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
//! Types for buffering envelopes.

use std::error::Error;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use relay_base_schema::project::ProjectKey;
use relay_config::Config;
use relay_system::Request;
use relay_system::SendError;
use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service};
use relay_system::{Controller, Request, Shutdown};
use tokio::sync::watch;
use tokio::time::timeout;

use crate::envelope::Envelope;
use crate::services::buffer::envelope_buffer::Peek;
Expand Down Expand Up @@ -208,18 +210,18 @@ impl EnvelopeBufferService {
&mut self,
buffer: &mut PolymorphicEnvelopeBuffer,
) -> Result<(), EnvelopeBufferError> {
relay_log::trace!("EnvelopeBufferService peek");
relay_log::trace!("EnvelopeBufferService: peeking the buffer");
match buffer.peek().await? {
Peek::Empty => {
relay_log::trace!("EnvelopeBufferService empty");
relay_log::trace!("EnvelopeBufferService: peek returned empty");
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
peek_result = "empty"
);
self.sleep = Duration::MAX; // wait for reset by `handle_message`.
}
Peek::Ready(_) => {
relay_log::trace!("EnvelopeBufferService pop");
relay_log::trace!("EnvelopeBufferService: popping envelope");
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
peek_result = "ready"
Expand All @@ -234,7 +236,7 @@ impl EnvelopeBufferService {
self.sleep = Duration::ZERO; // try next pop immediately
}
Peek::NotReady(stack_key, envelope) => {
relay_log::trace!("EnvelopeBufferService request update");
relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready, requesting project update");
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
peek_result = "not_ready"
Expand Down Expand Up @@ -268,23 +270,55 @@ impl EnvelopeBufferService {
// projects was already triggered (see XXX).
// For better separation of concerns, this prefetch should be triggered from here
// once buffer V1 has been removed.
relay_log::trace!("EnvelopeBufferService push");
relay_log::trace!("EnvelopeBufferService: received push message");
self.push(buffer, envelope).await;
}
EnvelopeBuffer::NotReady(project_key, envelope) => {
relay_log::trace!("EnvelopeBufferService project not ready");
relay_log::trace!(
"EnvelopeBufferService: received project not ready message for project key {}",
&project_key
);
buffer.mark_ready(&project_key, false);
relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesReturned) += 1);
self.push(buffer, envelope).await;
}
EnvelopeBuffer::Ready(project_key) => {
relay_log::trace!("EnvelopeBufferService project ready {}", &project_key);
relay_log::trace!(
"EnvelopeBufferService: received project ready message for project key {}",
&project_key
);
buffer.mark_ready(&project_key, true);
}
};
self.sleep = Duration::ZERO;
}

async fn handle_shutdown(
&mut self,
buffer: &mut PolymorphicEnvelopeBuffer,
message: Shutdown,
) -> bool {
// We gracefully shut down only if the shutdown has a timeout.
if let Some(shutdown_timeout) = message.timeout {
relay_log::trace!("EnvelopeBufferService: shutting down gracefully");

let shutdown_result = timeout(shutdown_timeout, buffer.shutdown()).await;
match shutdown_result {
Ok(shutdown_result) => {
return shutdown_result;
}
Err(error) => {
relay_log::error!(
error = &error as &dyn Error,
"the envelope buffer didn't shut down in time, some envelopes might be lost",
);
}
}
}

false
}

async fn push(&mut self, buffer: &mut PolymorphicEnvelopeBuffer, envelope: Box<Envelope>) {
if let Err(e) = buffer.push(envelope).await {
relay_log::error!(
Expand Down Expand Up @@ -322,15 +356,17 @@ impl Service for EnvelopeBufferService {
};
buffer.initialize().await;

relay_log::info!("EnvelopeBufferService start");
let mut shutdown = Controller::shutdown_handle();

relay_log::info!("EnvelopeBufferService: starting");
let mut iteration = 0;
loop {
iteration += 1;
relay_log::trace!("EnvelopeBufferService loop iteration {iteration}");
relay_log::trace!("EnvelopeBufferService: loop iteration {iteration}");

tokio::select! {
// NOTE: we do not select a bias here.
// On the one hand, we might want to prioritize dequeing over enqueing
// On the one hand, we might want to prioritize dequeuing over enqueuing
// so we do not exceed the buffer capacity by starving the dequeue.
// on the other hand, prioritizing old messages violates the LIFO design.
Ok(()) = self.ready_to_pop(&mut buffer) => {
Expand All @@ -344,8 +380,15 @@ impl Service for EnvelopeBufferService {
Some(message) = rx.recv() => {
self.handle_message(&mut buffer, message).await;
}
shutdown = shutdown.notified() => {
// In case the shutdown was handled, we break out of the loop signaling that
// there is no need to process anymore envelopes.
if self.handle_shutdown(&mut buffer, shutdown).await {
break;
}
}
_ = global_config_rx.changed() => {
relay_log::trace!("EnvelopeBufferService received global config");
relay_log::trace!("EnvelopeBufferService: received global config");
self.sleep = Duration::ZERO; // Try to pop
}
else => break,
Expand All @@ -354,7 +397,7 @@ impl Service for EnvelopeBufferService {
self.update_observable_state(&mut buffer);
}

relay_log::info!("EnvelopeBufferService stop");
relay_log::info!("EnvelopeBufferService: stopping");
});
}
}
Expand Down
8 changes: 8 additions & 0 deletions relay-server/src/services/buffer/stack_provider/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::services::buffer::stack_provider::{
InitializationState, StackCreationType, StackProvider,
};
use crate::utils::MemoryChecker;
use crate::EnvelopeStack;

#[derive(Debug)]
pub struct MemoryStackProvider {
Expand Down Expand Up @@ -41,4 +42,11 @@ impl StackProvider for MemoryStackProvider {
fn stack_type<'a>(&self) -> &'a str {
"memory"
}

async fn flush(&mut self, envelope_stacks: impl IntoIterator<Item = Self::Stack>) {
for envelope_stack in envelope_stacks {
// The flushed envelopes will be immediately dropped.
let _ = envelope_stack.flush();
}
}
}
6 changes: 6 additions & 0 deletions relay-server/src/services/buffer/stack_provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,10 @@ pub trait StackProvider: std::fmt::Debug {

/// Returns the string representation of the stack type offered by this [`StackProvider`].
fn stack_type<'a>(&self) -> &'a str;

/// Flushes the supplied [`EnvelopeStack`]s and consumes the [`StackProvider`].
fn flush(
&mut self,
envelope_stacks: impl IntoIterator<Item = Self::Stack>,
) -> impl Future<Output = ()>;
}
Loading

0 comments on commit 828b130

Please sign in to comment.