Skip to content

Commit

Permalink
Merge branch 'master' into riccardo/fix/trimming-spans
Browse files Browse the repository at this point in the history
  • Loading branch information
jjbayer authored Aug 1, 2024
2 parents b82c766 + 824cdb2 commit 9b6ac1c
Show file tree
Hide file tree
Showing 43 changed files with 3,184 additions and 1,089 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

**Internal**:

- Add `EnvelopeStack` and `SQLiteEnvelopeStack` to manage envelopes on disk. ([#3855](https://github.com/getsentry/relay/pull/3855))
- Add `client_sample_rate` to spans, pulled from the trace context ([#3872](https://github.com/getsentry/relay/pull/3872)).
- Add experimental support for V2 envelope buffering. ([#3855](https://github.com/getsentry/relay/pull/3855), [#3863](https://github.com/getsentry/relay/pull/3863))
- Add `client_sample_rate` to spans, pulled from the trace context. ([#3872](https://github.com/getsentry/relay/pull/3872))
- Introduce `trim = "disabled"` type attribute to prevent trimming of spans. ([#3877](https://github.com/getsentry/relay/pull/3877))

## 24.7.1
Expand All @@ -22,6 +22,8 @@

- "Cardinality limit" outcomes now report which limit was exceeded. ([#3825](https://github.com/getsentry/relay/pull/3825))
- Derive span browser name from user agent. ([#3834](https://github.com/getsentry/relay/pull/3834))
- Redis pools for `project_configs`, `cardinality`, `quotas`, and `misc` usecases
can now be configured individually. ([#3859](https://github.com/getsentry/relay/pull/3859))

**Internal**:

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

131 changes: 100 additions & 31 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@ use relay_kafka::{
};
use relay_metrics::aggregator::{AggregatorConfig, FlushBatching};
use relay_metrics::MetricNamespace;
use relay_redis::RedisConfigOptions;
use serde::de::{DeserializeOwned, Unexpected, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use uuid::Uuid;

use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig};
use crate::byte_size::ByteSize;
use crate::upstream::UpstreamDescriptor;
use crate::{RedisConfig, RedisConnection};
use crate::{create_redis_pools, RedisConfig, RedisConfigs, RedisPoolConfigs};

const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10;

Expand Down Expand Up @@ -842,6 +841,20 @@ fn spool_envelopes_unspool_interval() -> u64 {
100
}

/// Default batch size for the stack.
fn spool_envelopes_stack_disk_batch_size() -> usize {
200
}

/// Default maximum number of batches for the stack.
fn spool_envelopes_stack_max_batches() -> usize {
2
}

fn spool_envelopes_max_envelope_delay_secs() -> u64 {
24 * 60 * 60
}

/// Persistent buffering configuration for incoming envelopes.
#[derive(Debug, Serialize, Deserialize)]
pub struct EnvelopeSpool {
Expand All @@ -868,6 +881,40 @@ pub struct EnvelopeSpool {
/// The interval in milliseconds to trigger unspool.
#[serde(default = "spool_envelopes_unspool_interval")]
unspool_interval: u64,
/// Number of elements of the envelope stack that are flushed to disk.
#[serde(default = "spool_envelopes_stack_disk_batch_size")]
disk_batch_size: usize,
/// Number of batches of size [`Self::disk_batch_size`] that need to be accumulated before
/// flushing one batch to disk.
#[serde(default = "spool_envelopes_stack_max_batches")]
max_batches: usize,
/// Maximum time between receiving the envelope and processing it.
///
/// When envelopes spend too much time in the buffer (e.g. because their project cannot be loaded),
/// they are dropped. Defaults to 24h.
#[serde(default = "spool_envelopes_max_envelope_delay_secs")]
max_envelope_delay_secs: u64,
/// Version of the spooler.
#[serde(default)]
version: EnvelopeSpoolVersion,
}

/// Version of the envelope buffering mechanism.
#[derive(Debug, Default, Deserialize, Serialize)]
pub enum EnvelopeSpoolVersion {
/// Use the spooler service, which only buffers envelopes for unloaded projects and
/// switches between an in-memory mode and a disk mode on-demand.
///
/// This mode will be removed soon.
#[default]
#[serde(rename = "1")]
V1,
/// Use the envelope buffer, through which all envelopes pass before getting unspooled.
/// Can be either disk based or memory based.
///
/// This mode has not yet been stress-tested, do not use in production environments.
#[serde(rename = "experimental")]
V2,
}

impl Default for EnvelopeSpool {
Expand All @@ -879,6 +926,10 @@ impl Default for EnvelopeSpool {
max_disk_size: spool_envelopes_max_disk_size(),
max_memory_size: spool_envelopes_max_memory_size(),
unspool_interval: spool_envelopes_unspool_interval(), // 100ms
disk_batch_size: spool_envelopes_stack_disk_batch_size(),
max_batches: spool_envelopes_stack_max_batches(),
max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
version: EnvelopeSpoolVersion::default(),
}
}
}
Expand Down Expand Up @@ -1024,7 +1075,7 @@ pub struct Processing {
pub kafka_validate_topics: bool,
/// Redis hosts to connect to for storing state for rate limits.
#[serde(default)]
pub redis: Option<RedisConfig>,
pub redis: Option<RedisConfigs>,
/// Maximum chunk size of attachments for Kafka.
#[serde(default = "default_chunk_size")]
pub attachment_chunk_size: ByteSize,
Expand Down Expand Up @@ -1585,7 +1636,7 @@ impl Config {
}

if let Some(redis) = overrides.redis_url {
processing.redis = Some(RedisConfig::single(redis))
processing.redis = Some(RedisConfigs::Unified(RedisConfig::single(redis)))
}

if let Some(kafka_url) = overrides.kafka_url {
Expand Down Expand Up @@ -2077,6 +2128,31 @@ impl Config {
self.values.spool.envelopes.max_memory_size.as_bytes()
}

/// Number of batches of size `stack_disk_batch_size` that need to be accumulated before
/// flushing one batch to disk.
pub fn spool_envelopes_stack_disk_batch_size(&self) -> usize {
self.values.spool.envelopes.disk_batch_size
}

/// Number of batches of size `stack_disk_batch_size` that need to be accumulated before
/// flushing one batch to disk.
pub fn spool_envelopes_stack_max_batches(&self) -> usize {
self.values.spool.envelopes.max_batches
}

/// Returns `true` if version 2 of the spooling mechanism is used.
pub fn spool_v2(&self) -> bool {
matches!(
self.values.spool.envelopes.version,
EnvelopeSpoolVersion::V2
)
}

/// Returns the time after which we drop envelopes as a [`Duration`] object.
pub fn spool_envelopes_max_age(&self) -> Duration {
Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs)
}

/// Returns the maximum size of an event payload in bytes.
pub fn max_event_size(&self) -> usize {
self.values.limits.max_event_size.as_bytes()
Expand Down Expand Up @@ -2279,34 +2355,15 @@ impl Config {
&self.values.processing.topics.unused
}

/// Redis servers to connect to, for rate limiting.
pub fn redis(&self) -> Option<(&RedisConnection, RedisConfigOptions)> {
let cpu_concurrency = self.cpu_concurrency();

let redis = self.values.processing.redis.as_ref()?;
/// Redis servers to connect to for project configs, cardinality limits,
/// rate limiting, and metrics metadata.
pub fn redis(&self) -> Option<RedisPoolConfigs> {
let redis_configs = self.values.processing.redis.as_ref()?;

let max_connections = redis
.options
.max_connections
.unwrap_or(cpu_concurrency as u32 * 2)
.max(crate::redis::DEFAULT_MIN_MAX_CONNECTIONS);

let min_idle = redis
.options
.min_idle
.unwrap_or_else(|| max_connections.div_ceil(crate::redis::DEFAULT_MIN_IDLE_RATIO));

let options = RedisConfigOptions {
max_connections,
min_idle: Some(min_idle),
connection_timeout: redis.options.connection_timeout,
max_lifetime: redis.options.max_lifetime,
idle_timeout: redis.options.idle_timeout,
read_timeout: redis.options.read_timeout,
write_timeout: redis.options.write_timeout,
};

Some((&redis.connection, options))
Some(create_redis_pools(
redis_configs,
self.cpu_concurrency() as u32,
))
}

/// Chunk size of attachments in bytes.
Expand Down Expand Up @@ -2517,4 +2574,16 @@ cache:
fn test_emit_outcomes_invalid() {
assert!(serde_json::from_str::<EmitOutcomes>("asdf").is_err());
}

#[test]
fn test_spool_defaults_to_v1() {
let config: ConfigValues = serde_json::from_str("{}").unwrap();
assert!(matches!(
config.spool.envelopes.version,
EnvelopeSpoolVersion::V1
));

let config = Config::from_json_value(serde_json::json!({})).unwrap();
assert!(!config.spool_v2());
}
}
Loading

0 comments on commit 9b6ac1c

Please sign in to comment.