Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spool): Introduce in-memory EnvelopeBuffer as alternative to the old spooler #3863

Merged
merged 65 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
35805f3
wip
jjbayer Jul 25, 2024
06ff0ac
wip: Borrow<K>
jjbayer Jul 25, 2024
cb71cce
it compiles
jjbayer Jul 25, 2024
b60cb8b
some tests
jjbayer Jul 25, 2024
ed56465
test again
jjbayer Jul 25, 2024
005c093
Push to envelope buffer
jjbayer Jul 25, 2024
9350674
async interface
jjbayer Jul 25, 2024
5942fff
Merge remote-tracking branch 'origin/master' into feat/envelope-buffer
jjbayer Jul 26, 2024
9428e5c
files
jjbayer Jul 26, 2024
26eb70b
return ready
jjbayer Jul 26, 2024
b7abdad
prefetch
jjbayer Jul 26, 2024
d1b8bea
wip project cache
jjbayer Jul 26, 2024
faf72fe
borrow checker
jjbayer Jul 26, 2024
d0dae29
v1 vs v2
jjbayer Jul 26, 2024
0fb8830
make envelope buffer optional for now
jjbayer Jul 26, 2024
a7d1cf8
fix: health check
jjbayer Jul 26, 2024
82f3a95
awaiting pop
jjbayer Jul 27, 2024
db1ec70
mark_ready on updated state
jjbayer Jul 27, 2024
8c5167e
test: peek
jjbayer Jul 28, 2024
472cecd
notify on change
jjbayer Jul 28, 2024
6abed12
Notify on ready
jjbayer Jul 28, 2024
9eb1fcc
fix: only one envelope buffer
jjbayer Jul 28, 2024
3d21861
fix: derive envelope group
jjbayer Jul 28, 2024
29fe242
Notify only if changed
jjbayer Jul 28, 2024
c686d93
ref: Eliminate duplicate stack keys
jjbayer Jul 28, 2024
d9f7895
Merge remote-tracking branch 'origin/master' into feat/envelope-buffer
jjbayer Jul 29, 2024
b50e19d
Merge
jjbayer Jul 29, 2024
054778a
wip: On-disk creation
jjbayer Jul 29, 2024
761b6c1
Improve
iambriccardo Jul 29, 2024
956b249
Improve
iambriccardo Jul 29, 2024
8864e61
Improve
iambriccardo Jul 29, 2024
990bb4e
Improve
iambriccardo Jul 29, 2024
c5cd261
Improve
iambriccardo Jul 29, 2024
8972444
Improve
iambriccardo Jul 30, 2024
a822832
Improve
iambriccardo Jul 30, 2024
8325990
Improve
iambriccardo Jul 30, 2024
8de1010
Improve
iambriccardo Jul 30, 2024
fc5a0e9
Improve
iambriccardo Jul 30, 2024
1729051
Improve
iambriccardo Jul 30, 2024
af36538
Improve
iambriccardo Jul 30, 2024
5f5228a
Improve
iambriccardo Jul 30, 2024
6c7cd9f
little fixes
jjbayer Jul 30, 2024
2ccd34d
error handling
jjbayer Jul 30, 2024
b6262f2
metrics
jjbayer Jul 30, 2024
3a12589
Improve
iambriccardo Jul 30, 2024
be9e4e5
Improve
iambriccardo Jul 30, 2024
4f9030d
doc & config
jjbayer Jul 30, 2024
0e0ab80
Improve
iambriccardo Jul 30, 2024
1d57809
Improve
iambriccardo Jul 30, 2024
07ec7e9
Improve
iambriccardo Jul 30, 2024
e3e62ec
Improve
iambriccardo Jul 30, 2024
f5c8df0
Improve
iambriccardo Jul 30, 2024
71da426
fix test_query
jjbayer Jul 30, 2024
8e57db9
changelog
jjbayer Jul 30, 2024
675b29f
count inflight
jjbayer Jul 30, 2024
107ec5b
metric for in flight pushes
jjbayer Jul 30, 2024
007bd36
bench
jjbayer Jul 30, 2024
3586303
bench: More projects
jjbayer Jul 31, 2024
ece3f32
Merge branch 'master' into feat/envelope-buffer
jjbayer Jul 31, 2024
67aea56
fix: org_id check
jjbayer Jul 31, 2024
2de15b0
ref: Variables are your friend
jjbayer Jul 31, 2024
09d0b24
ref: review comments
jjbayer Jul 31, 2024
d58c64d
fix: guard changed
jjbayer Jul 31, 2024
d59d2b1
Update relay-config/src/config.rs
jjbayer Jul 31, 2024
76db84d
instr(buffer): Metric for number of stacks (#3878)
jjbayer Jul 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@

**Internal**:

- Add `EnvelopeStack` and `SQLiteEnvelopeStack` to manage envelopes on disk. ([#3855](https://github.com/getsentry/relay/pull/3855))
- Add `client_sample_rate` to spans, pulled from the trace context ([#3872](https://github.com/getsentry/relay/pull/3872)).

- Add experimental support for V2 envelope buffering. ([#3855](https://github.com/getsentry/relay/pull/3855), [#3863](https://github.com/getsentry/relay/pull/3863))
- Add `client_sample_rate` to spans, pulled from the trace context. ([#3872](https://github.com/getsentry/relay/pull/3872))

## 24.7.1

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 77 additions & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,20 @@ fn spool_envelopes_unspool_interval() -> u64 {
100
}

/// Default batch size for the stack.
fn spool_envelopes_stack_disk_batch_size() -> usize {
200
}

/// Default maximum number of batches for the stack.
fn spool_envelopes_stack_max_batches() -> usize {
2
}

fn spool_envelopes_max_envelope_delay_secs() -> u64 {
24 * 60 * 60
}

/// Persistent buffering configuration for incoming envelopes.
#[derive(Debug, Serialize, Deserialize)]
pub struct EnvelopeSpool {
Expand All @@ -868,6 +882,40 @@ pub struct EnvelopeSpool {
/// The interval in milliseconds to trigger unspool.
#[serde(default = "spool_envelopes_unspool_interval")]
unspool_interval: u64,
/// Number of elements of the envelope stack that are flushed to disk.
#[serde(default = "spool_envelopes_stack_disk_batch_size")]
disk_batch_size: usize,
/// Number of batches of size [`Self::disk_batch_size`] that need to be accumulated before
/// flushing one batch to disk.
#[serde(default = "spool_envelopes_stack_max_batches")]
max_batches: usize,
/// Maximum time between receiving the envelope and processing it.
///
/// When envelopes spend too much time in the buffer (e.g. because their project cannot be loaded),
/// they are dropped. Defaults to 24h.
#[serde(default = "spool_envelopes_max_envelope_delay_secs")]
max_envelope_delay_secs: u64,
/// Version of the spooler.
#[serde(default = "EnvelopeSpoolVersion::default")]
jjbayer marked this conversation as resolved.
Show resolved Hide resolved
version: EnvelopeSpoolVersion,
}

/// Version of the envelope buffering mechanism.
#[derive(Debug, Default, Deserialize, Serialize)]
pub enum EnvelopeSpoolVersion {
/// Use the spooler service, which only buffers envelopes for unloaded projects and
/// switches between an in-memory mode and a disk mode on-demand.
///
/// This mode will be removed soon.
#[default]
#[serde(rename = "1")]
V1,
/// Use the envelope buffer, through which all envelopes pass before getting unspooled.
/// Can be either disk based or memory based.
///
/// This mode has not yet been stress-tested, do not use in production environments.
#[serde(rename = "experimental")]
V2,
}

impl Default for EnvelopeSpool {
Expand All @@ -879,6 +927,10 @@ impl Default for EnvelopeSpool {
max_disk_size: spool_envelopes_max_disk_size(),
max_memory_size: spool_envelopes_max_memory_size(),
unspool_interval: spool_envelopes_unspool_interval(), // 100ms
disk_batch_size: spool_envelopes_stack_disk_batch_size(),
max_batches: spool_envelopes_stack_max_batches(),
max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
version: EnvelopeSpoolVersion::V2,
}
}
}
Expand Down Expand Up @@ -2077,6 +2129,31 @@ impl Config {
self.values.spool.envelopes.max_memory_size.as_bytes()
}

/// Number of batches of size `stack_disk_batch_size` that need to be accumulated before
/// flushing one batch to disk.
pub fn spool_envelopes_stack_disk_batch_size(&self) -> usize {
self.values.spool.envelopes.disk_batch_size
}

/// Number of batches of size `stack_disk_batch_size` that need to be accumulated before
/// flushing one batch to disk.
pub fn spool_envelopes_stack_max_batches(&self) -> usize {
self.values.spool.envelopes.max_batches
}

/// Returns `true` if version 2 of the spooling mechanism is used.
pub fn spool_v2(&self) -> bool {
matches!(
self.values.spool.envelopes.version,
EnvelopeSpoolVersion::V2
)
}

/// Returns the time after which we drop envelopes as a [`Duration`] object.
pub fn spool_envelopes_max_age(&self) -> Duration {
Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs)
}

/// Returns the maximum size of an event payload in bytes.
pub fn max_event_size(&self) -> usize {
self.values.limits.max_event_size.as_bytes()
Expand Down
1 change: 1 addition & 0 deletions relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ minidump = { workspace = true, optional = true }
multer = { workspace = true }
once_cell = { workspace = true }
pin-project-lite = { workspace = true }
priority-queue = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
regex = { workspace = true }
Expand Down
121 changes: 109 additions & 12 deletions relay-server/benches/benches.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use bytes::Bytes;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use relay_config::Config;
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
use sqlx::{Pool, Sqlite};
use std::path::PathBuf;
use std::time::Duration;
use std::time::{Duration, Instant};
use tempfile::TempDir;
use tokio::runtime::Runtime;

use relay_base_schema::project::ProjectKey;
use relay_server::{Envelope, EnvelopeStack, SQLiteEnvelopeStack};
use relay_server::{
Envelope, EnvelopeStack, PolymorphicEnvelopeBuffer, SqliteEnvelopeStack, SqliteEnvelopeStore,
};

fn setup_db(path: &PathBuf) -> Pool<Sqlite> {
let options = SqliteConnectOptions::new()
Expand Down Expand Up @@ -37,6 +40,11 @@ async fn reset_db(db: Pool<Sqlite>) {
}

fn mock_envelope(size: &str) -> Box<Envelope> {
let project_key = "e12d836b15bb49d7bbf99e64295d995b";
mock_envelope_with_project_key(&ProjectKey::parse(project_key).unwrap(), size)
}

fn mock_envelope_with_project_key(project_key: &ProjectKey, size: &str) -> Box<Envelope> {
let payload = match size {
"small" => "small_payload".to_string(),
"medium" => "medium_payload".repeat(100),
Expand All @@ -47,20 +55,24 @@ fn mock_envelope(size: &str) -> Box<Envelope> {

let bytes = Bytes::from(format!(
"\
{{\"event_id\":\"9ec79c33ec9942ab8353589fcb2e04dc\",\"dsn\":\"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42\"}}\n\
{{\"event_id\":\"9ec79c33ec9942ab8353589fcb2e04dc\",\"dsn\":\"https://{}:@sentry.io/42\"}}\n\
{{\"type\":\"attachment\"}}\n\
{}\n\
",
project_key,
payload
));

Envelope::parse_bytes(bytes).unwrap()
let mut envelope = Envelope::parse_bytes(bytes).unwrap();
envelope.set_start_time(Instant::now());
envelope
}

fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().join("test.db");
let db = setup_db(&db_path);
let envelope_store = SqliteEnvelopeStore::new(db.clone());

let runtime = Runtime::new().unwrap();

Expand All @@ -83,8 +95,8 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
reset_db(db.clone()).await;
});

let stack = SQLiteEnvelopeStack::new(
db.clone(),
let stack = SqliteEnvelopeStack::new(
envelope_store.clone(),
disk_batch_size,
2,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
Expand Down Expand Up @@ -119,8 +131,8 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
runtime.block_on(async {
reset_db(db.clone()).await;

let mut stack = SQLiteEnvelopeStack::new(
db.clone(),
let mut stack = SqliteEnvelopeStack::new(
envelope_store.clone(),
disk_batch_size,
2,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
Expand Down Expand Up @@ -159,8 +171,8 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
reset_db(db.clone()).await;
});

let stack = SQLiteEnvelopeStack::new(
db.clone(),
let stack = SqliteEnvelopeStack::new(
envelope_store.clone(),
disk_batch_size,
2,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
Expand Down Expand Up @@ -199,5 +211,90 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
group.finish();
}

criterion_group!(benches, benchmark_sqlite_envelope_stack);
criterion_main!(benches);
fn benchmark_envelope_buffer(c: &mut Criterion) {
use rand::seq::SliceRandom;
let mut group = c.benchmark_group("envelope_buffer");
group.sample_size(10);

let runtime = Runtime::new().unwrap();

let num_projects = 100000;
let envelopes_per_project = 10;

group.throughput(Throughput::Elements(
num_projects * envelopes_per_project as u64,
));

group.bench_function("push_only", |b| {
b.iter_with_setup(
|| {
let project_keys: Vec<_> = (0..num_projects)
.map(|i| ProjectKey::parse(&format!("{:#032x}", i)).unwrap())
.collect();

let mut envelopes = vec![];
for project_key in &project_keys {
for _ in 0..envelopes_per_project {
envelopes.push(mock_envelope_with_project_key(project_key, "small"))
}
}

envelopes.shuffle(&mut rand::thread_rng());

envelopes
},
|envelopes| {
runtime.block_on(async {
let mut buffer = PolymorphicEnvelopeBuffer::from_config(&Config::default());
for envelope in envelopes.into_iter() {
buffer.push(envelope).await.unwrap();
}
})
},
);
});

group.bench_function("push_pop", |b| {
b.iter_with_setup(
|| {
let project_keys: Vec<_> = (0..num_projects)
.map(|i| ProjectKey::parse(&format!("{:#032x}", i)).unwrap())
.collect();

let mut envelopes = vec![];
for project_key in &project_keys {
for _ in 0..envelopes_per_project {
envelopes.push(mock_envelope_with_project_key(project_key, "big"))
}
}

envelopes.shuffle(&mut rand::thread_rng());

envelopes
},
|envelopes| {
runtime.block_on(async {
let mut buffer = PolymorphicEnvelopeBuffer::from_config(&Config::default());
let n = envelopes.len();
for envelope in envelopes.into_iter() {
let public_key = envelope.meta().public_key();
buffer.push(envelope).await.unwrap();
// Mark as ready:
buffer.mark_ready(&public_key, true);
}
for _ in 0..n {
let envelope = buffer.pop().await.unwrap().unwrap();
// Send back to end of queue to get worse-case behavior:
buffer.mark_ready(&envelope.meta().public_key(), false);
}
})
},
);
});

group.finish();
}

criterion_group!(sqlite, benchmark_sqlite_envelope_stack);
criterion_group!(buffer, benchmark_envelope_buffer);
criterion_main!(sqlite, buffer);
17 changes: 16 additions & 1 deletion relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,21 @@ fn queue_envelope(
);
envelope.scope(scoping);

state.project_cache().send(ValidateEnvelope::new(envelope));
match state.envelope_buffer() {
Some(buffer) => {
// NOTE: This assumes that a `prefetch` has already been scheduled for both the
// envelope's projects. See `handle_check_envelope`.
relay_log::trace!("Pushing envelope to V2 buffer");

// TODO: Sync-check whether the buffer has capacity.
// Otherwise return `QueueFailed`.
buffer.defer_push(envelope);
}
None => {
relay_log::trace!("Sending envelope to project cache for V1 buffer");
state.project_cache().send(ValidateEnvelope::new(envelope));
}
}
}
// The entire envelope is taken for a split above, and it's empty at this point, we can just
// accept it without additional checks.
Expand Down Expand Up @@ -333,6 +347,7 @@ pub async fn handle_envelope(
)
}

// TODO(jjbayer): Move this check to spool impl
if state.memory_checker().check_memory().is_exceeded() {
// NOTE: Long-term, we should not reject the envelope here, but spool it to disk instead.
// This will be fixed with the new spool implementation.
Expand Down
7 changes: 7 additions & 0 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,13 @@ impl Envelope {
self.dsc().map(|dsc| dsc.public_key)
}

/// Returns the time at which the envelope was received at this Relay.
///
/// This is the date time equivalent to [`start_time`](Self::start_time).
pub fn received_at(&self) -> DateTime<Utc> {
relay_common::time::instant_to_date_time(self.meta().start_time())
}

/// Sets the event id on the envelope.
pub fn set_event_id(&mut self, event_id: EventId) {
self.headers.event_id = Some(event_id);
Expand Down
8 changes: 4 additions & 4 deletions relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,11 @@ mod services;
mod statsd;
mod utils;

pub use self::envelope::Envelope; // pub for benchmarks
pub use self::services::buffer::{
EnvelopeStack, PolymorphicEnvelopeBuffer, SqliteEnvelopeStack, SqliteEnvelopeStore,
}; // pub for benchmarks
pub use self::services::spooler::spool_utils;
// Public just for benchmarks.
pub use self::envelope::Envelope;
pub use self::services::spooler::envelope_stack::sqlite::SQLiteEnvelopeStack;
pub use self::services::spooler::envelope_stack::EnvelopeStack;

#[cfg(test)]
mod testutils;
Expand Down
Loading
Loading