diff --git a/CHANGELOG.md b/CHANGELOG.md index 4cb086f04b..15c9b13791 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - Apply globally defined metric tags to legacy transaction metrics. ([#3615](https://github.com/getsentry/relay/pull/3615)) - Limit the maximum size of spans in an transaction to 800 kib. ([#3645](https://github.com/getsentry/relay/pull/3645)) - Scrub identifiers in spans with `op:db` and `db_system:redis`. ([#3642](https://github.com/getsentry/relay/pull/3642)) +- Stop trimming important span fields by marking them `trim = "false"`. ([#3670](https://github.com/getsentry/relay/pull/3670)) **Features**: @@ -23,6 +24,8 @@ - Limit metric name to 150 characters. ([#3628](https://github.com/getsentry/relay/pull/3628)) - Add validation of Kafka topics on startup. ([#3543](https://github.com/getsentry/relay/pull/3543)) - Send `attachment` data inline when possible. ([#3654](https://github.com/getsentry/relay/pull/3654)) +- Drops support for transaction metrics extraction versions < 3. ([#3672](https://github.com/getsentry/relay/pull/3672)) +- Move partitioning into the `Aggregator` and add a new `Partition` bucket shift mode. ([#3661](https://github.com/getsentry/relay/pull/3661)) ## 24.5.0 diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index cc3e1ffdf0..95db8c1dce 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -14,7 +14,7 @@ use relay_kafka::{ ConfigError as KafkaConfigError, KafkaConfigParam, KafkaParams, KafkaTopic, TopicAssignment, TopicAssignments, }; -use relay_metrics::aggregator::{AggregatorConfig, ShiftKey}; +use relay_metrics::aggregator::{AggregatorConfig, FlushBatching}; use relay_metrics::{AggregatorServiceConfig, MetricNamespace, ScopedAggregatorConfig}; use relay_redis::RedisConfig; use serde::de::{DeserializeOwned, Unexpected, Visitor}; @@ -2276,11 +2276,6 @@ impl Config { self.values.processing.attachment_chunk_size.as_bytes() } - /// Amount of metric partitions. - pub fn metrics_partitions(&self) -> Option { - self.values.aggregator.flush_partitions - } - /// Maximum metrics batch size in bytes. pub fn metrics_max_batch_size_bytes(&self) -> usize { self.values.aggregator.max_flush_bytes @@ -2398,7 +2393,8 @@ impl Config { max_project_key_bucket_bytes, initial_delay: 30, debounce_delay: 10, - shift_key: ShiftKey::Project, + flush_partitions: None, + flush_batching: FlushBatching::Project, } } diff --git a/relay-dynamic-config/src/metrics.rs b/relay-dynamic-config/src/metrics.rs index bb641af2cc..191516582e 100644 --- a/relay-dynamic-config/src/metrics.rs +++ b/relay-dynamic-config/src/metrics.rs @@ -122,6 +122,9 @@ pub struct CustomMeasurementConfig { /// - 6: Bugfix to make transaction metrics extraction apply globally defined tag mappings. const TRANSACTION_EXTRACT_MAX_SUPPORTED_VERSION: u16 = 6; +/// Minimum supported version of metrics extraction from transaction. +const TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION: u16 = 3; + /// Deprecated. Defines whether URL transactions should be considered low cardinality. #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] #[serde(rename_all = "camelCase")] @@ -160,19 +163,15 @@ impl TransactionMetricsConfig { /// Creates an enabled configuration with empty defaults. pub fn new() -> Self { Self { - version: 1, + version: TRANSACTION_EXTRACT_MAX_SUPPORTED_VERSION, ..Self::default() } } /// Returns `true` if metrics extraction is enabled and compatible with this Relay. pub fn is_enabled(&self) -> bool { - self.version > 0 && self.version <= TRANSACTION_EXTRACT_MAX_SUPPORTED_VERSION - } - - /// Returns `true` if usage should be tracked through a dedicated metric. - pub fn usage_metric(&self) -> bool { - self.version >= 3 + self.version >= TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION + && self.version <= TRANSACTION_EXTRACT_MAX_SUPPORTED_VERSION } } diff --git a/relay-event-derive/src/lib.rs b/relay-event-derive/src/lib.rs index 0812bba1f7..814802af2b 100644 --- a/relay-event-derive/src/lib.rs +++ b/relay-event-derive/src/lib.rs @@ -323,6 +323,7 @@ struct FieldAttrs { max_chars_allowance: Option, max_depth: Option, max_bytes: Option, + trim: Option, } impl FieldAttrs { @@ -366,6 +367,14 @@ impl FieldAttrs { quote!(crate::processor::Pii::False) }; + let trim = if let Some(trim) = self.trim { + quote!(#trim) + } else if let Some(ref parent_attrs) = inherit_from_field_attrs { + quote!(#parent_attrs.trim) + } else { + quote!(true) + }; + let retain = self.retain; let max_chars = if let Some(ref max_chars) = self.max_chars { @@ -421,6 +430,7 @@ impl FieldAttrs { max_bytes: #max_bytes, pii: #pii, retain: #retain, + trim: #trim, } }) } @@ -595,6 +605,17 @@ fn parse_field_attributes( panic!("Got non string literal for retain"); } } + } else if ident == "trim" { + match name_value.lit { + Lit::Str(litstr) => match litstr.value().as_str() { + "true" => rv.trim = None, + "false" => rv.trim = Some(false), + other => panic!("Unknown value {other}"), + }, + _ => { + panic!("Got non string literal for trim"); + } + } } else if ident == "legacy_alias" || ident == "skip_serialization" { // Skip } else { diff --git a/relay-event-normalization/src/normalize/mod.rs b/relay-event-normalization/src/normalize/mod.rs index 57c43b4fa8..884c95ecbe 100644 --- a/relay-event-normalization/src/normalize/mod.rs +++ b/relay-event-normalization/src/normalize/mod.rs @@ -1706,7 +1706,6 @@ mod tests { timestamp: ~, start_timestamp: ~, exclusive_time: ~, - description: ~, op: "app_start_cold", span_id: ~, parent_span_id: ~, @@ -1714,6 +1713,7 @@ mod tests { segment_id: ~, is_segment: ~, status: ~, + description: ~, tags: ~, origin: ~, profile_id: ~, @@ -1751,7 +1751,6 @@ mod tests { timestamp: ~, start_timestamp: ~, exclusive_time: ~, - description: ~, op: "app.start.cold", span_id: ~, parent_span_id: ~, @@ -1759,6 +1758,7 @@ mod tests { segment_id: ~, is_segment: ~, status: ~, + description: ~, tags: ~, origin: ~, profile_id: ~, @@ -1796,7 +1796,6 @@ mod tests { timestamp: ~, start_timestamp: ~, exclusive_time: ~, - description: ~, op: "app.start.warm", span_id: ~, parent_span_id: ~, @@ -1804,6 +1803,7 @@ mod tests { segment_id: ~, is_segment: ~, status: ~, + description: ~, tags: ~, origin: ~, profile_id: ~, diff --git a/relay-event-normalization/src/trimming.rs b/relay-event-normalization/src/trimming.rs index dc210d6ed1..c5bb4a9ee1 100644 --- a/relay-event-normalization/src/trimming.rs +++ b/relay-event-normalization/src/trimming.rs @@ -74,15 +74,16 @@ impl Processor for TrimmingProcessor { }); } - if self.remaining_size() == Some(0) { - // TODO: Create remarks (ensure they do not bloat event) - return Err(ProcessingAction::DeleteValueHard); - } - if self.remaining_depth(state) == Some(0) { - // TODO: Create remarks (ensure they do not bloat event) - return Err(ProcessingAction::DeleteValueHard); + if state.attrs().trim { + if self.remaining_size() == Some(0) { + // TODO: Create remarks (ensure they do not bloat event) + return Err(ProcessingAction::DeleteValueHard); + } + if self.remaining_depth(state) == Some(0) { + // TODO: Create remarks (ensure they do not bloat event) + return Err(ProcessingAction::DeleteValueHard); + } } - Ok(()) } @@ -131,6 +132,10 @@ impl Processor for TrimmingProcessor { trim_string(value, meta, max_chars, state.attrs().max_chars_allowance); } + if !state.attrs().trim { + return Ok(()); + } + if let Some(size_state) = self.size_state.last() { if let Some(size_remaining) = size_state.size_remaining { trim_string(value, meta, size_remaining, 0); @@ -149,6 +154,10 @@ impl Processor for TrimmingProcessor { where T: ProcessValue, { + if !state.attrs().trim { + return Ok(()); + } + // If we need to check the bag size, then we go down a different path if !self.size_state.is_empty() { let original_length = value.len(); @@ -159,7 +168,7 @@ impl Processor for TrimmingProcessor { let mut split_index = None; for (index, item) in value.iter_mut().enumerate() { - if self.remaining_size().unwrap() == 0 { + if self.remaining_size() == Some(0) { split_index = Some(index); break; } @@ -191,6 +200,10 @@ impl Processor for TrimmingProcessor { where T: ProcessValue, { + if !state.attrs().trim { + return Ok(()); + } + // If we need to check the bag size, then we go down a different path if !self.size_state.is_empty() { let original_length = value.len(); @@ -201,7 +214,7 @@ impl Processor for TrimmingProcessor { let mut split_key = None; for (key, item) in value.iter_mut() { - if self.remaining_size().unwrap() == 0 { + if self.remaining_size() == Some(0) { split_key = Some(key.to_owned()); break; } @@ -230,6 +243,10 @@ impl Processor for TrimmingProcessor { _meta: &mut Meta, state: &ProcessingState<'_>, ) -> ProcessingResult { + if !state.attrs().trim { + return Ok(()); + } + match value { Value::Array(_) | Value::Object(_) => { if self.remaining_depth(state) == Some(1) { @@ -252,6 +269,10 @@ impl Processor for TrimmingProcessor { _meta: &mut Meta, state: &ProcessingState<'_>, ) -> ProcessingResult { + if !state.attrs().trim { + return Ok(()); + } + processor::apply(&mut stacktrace.frames, |frames, meta| { enforce_frame_hard_limit(frames, meta, 250); Ok(()) @@ -393,9 +414,10 @@ mod tests { use std::iter::repeat; use relay_event_schema::protocol::{ - Breadcrumb, Context, Contexts, Event, Exception, ExtraValue, Span, TagEntry, Tags, Values, + Breadcrumb, Context, Contexts, Event, Exception, ExtraValue, Span, SpanId, TagEntry, Tags, + TraceId, Values, }; - use relay_protocol::{Map, Remark, SerializableAnnotated}; + use relay_protocol::{get_value, Map, Remark, SerializableAnnotated}; use similar_asserts::assert_eq; use crate::MaxChars; @@ -930,4 +952,109 @@ mod tests { assert_eq!(event.0.unwrap().spans.0.unwrap().len(), 8); } + + #[test] + fn test_too_many_spans_trimmed_trace_id() { + let original_description = "a".repeat(819163); + let original_trace_id = TraceId("b".repeat(48)); + let mut event = Annotated::new(Event { + spans: Annotated::new(vec![ + Span { + description: original_description.clone().into(), + ..Default::default() + } + .into(), + Span { + trace_id: original_trace_id.clone().into(), + ..Default::default() + } + .into(), + ]), + ..Default::default() + }); + + let mut processor = TrimmingProcessor::new(); + processor::process_value(&mut event, &mut processor, ProcessingState::root()).unwrap(); + + assert_eq!( + get_value!(event.spans[0].description!), + &original_description + ); + // Trace ID would be trimmed without `trim = "false"` + assert_eq!(get_value!(event.spans[1].trace_id!), &original_trace_id); + } + + #[test] + fn test_too_many_spans_trimmed_trace_id_drop() { + let original_description = "a".repeat(819163); + let original_span_id = SpanId("b".repeat(48)); + let original_trace_id = TraceId("c".repeat(48)); + let original_segment_id = SpanId("d".repeat(48)); + let original_op = "e".repeat(129); + + let mut event = Annotated::new(Event { + spans: Annotated::new(vec![ + Span { + description: original_description.clone().into(), + ..Default::default() + } + .into(), + Span { + span_id: original_span_id.clone().into(), + trace_id: original_trace_id.clone().into(), + segment_id: original_segment_id.clone().into(), + is_segment: false.into(), + op: original_op.clone().into(), + ..Default::default() + } + .into(), + ]), + ..Default::default() + }); + + let mut processor = TrimmingProcessor::new(); + processor::process_value(&mut event, &mut processor, ProcessingState::root()).unwrap(); + + assert_eq!( + get_value!(event.spans[0].description!), + &original_description + ); + // These fields would be dropped without `trim = "false"` + assert_eq!(get_value!(event.spans[1].span_id!), &original_span_id); + assert_eq!(get_value!(event.spans[1].trace_id!), &original_trace_id); + assert_eq!(get_value!(event.spans[1].segment_id!), &original_segment_id); + assert_eq!(get_value!(event.spans[1].is_segment!), &false); + + // span.op is trimmed to its max_chars, but not dropped: + assert_eq!(get_value!(event.spans[1].op!).len(), 128); + } + + #[test] + fn test_trim_false_contributes_to_budget() { + for span_id in ["short", "looooooooooooooooooooooooooong"] { + let original_span_id = SpanId(span_id.to_owned()); + let original_description = "a".repeat(900000); + + let mut event = Annotated::new(Event { + spans: Annotated::new(vec![Span { + span_id: original_span_id.clone().into(), + description: original_description.clone().into(), + ..Default::default() + } + .into()]), + ..Default::default() + }); + + let mut processor = TrimmingProcessor::new(); + processor::process_value(&mut event, &mut processor, ProcessingState::root()).unwrap(); + + assert_eq!(get_value!(event.spans[0].span_id!).as_ref(), span_id); + + // The amount of trimming on the description depends on the length of the span id. + assert_eq!( + get_value!(event.spans[0].description!).len(), + 1024 * 800 - 12 - span_id.len(), + ); + } + } } diff --git a/relay-event-schema/src/processor/attrs.rs b/relay-event-schema/src/processor/attrs.rs index 5cfaa0beab..c80a9fe7c6 100644 --- a/relay-event-schema/src/processor/attrs.rs +++ b/relay-event-schema/src/processor/attrs.rs @@ -128,6 +128,8 @@ pub struct FieldAttrs { pub pii: Pii, /// Whether additional properties should be retained during normalization. pub retain: bool, + /// Whether the trimming processor is allowed to shorten or drop this field. + pub trim: bool, } /// A set of characters allowed or denied for a (string) field. @@ -167,6 +169,7 @@ impl FieldAttrs { max_bytes: None, pii: Pii::False, retain: false, + trim: true, } } diff --git a/relay-event-schema/src/protocol/span.rs b/relay-event-schema/src/protocol/span.rs index 08199c0f38..933d71f06d 100644 --- a/relay-event-schema/src/protocol/span.rs +++ b/relay-event-schema/src/protocol/span.rs @@ -26,37 +26,40 @@ pub struct Span { /// excluding its immediate child spans. pub exclusive_time: Annotated, - /// Human readable description of a span (e.g. method URL). - #[metastructure(pii = "maybe")] - pub description: Annotated, - /// Span type (see `OperationType` docs). - #[metastructure(max_chars = 128)] + #[metastructure(max_chars = 128, trim = "false")] pub op: Annotated, /// The Span id. - #[metastructure(required = "true")] + #[metastructure(required = "true", trim = "false")] pub span_id: Annotated, /// The ID of the span enclosing this span. + #[metastructure(trim = "false")] pub parent_span_id: Annotated, /// The ID of the trace the span belongs to. - #[metastructure(required = "true")] + #[metastructure(required = "true", trim = "false")] pub trace_id: Annotated, /// A unique identifier for a segment within a trace (8 byte hexadecimal string). /// /// For spans embedded in transactions, the `segment_id` is the `span_id` of the containing /// transaction. + #[metastructure(trim = "false")] pub segment_id: Annotated, /// Whether or not the current span is the root of the segment. + #[metastructure(trim = "false")] pub is_segment: Annotated, /// The status of a span. pub status: Annotated, + /// Human readable description of a span (e.g. method URL). + #[metastructure(pii = "maybe")] + pub description: Annotated, + /// Arbitrary tags on a span, like on the top-level event. #[metastructure(pii = "maybe")] pub tags: Annotated>, @@ -450,11 +453,11 @@ mod tests { "timestamp": 0.0, "start_timestamp": -63158400.0, "exclusive_time": 1.23, - "description": "desc", "op": "operation", "span_id": "fa90fdead5f74052", "trace_id": "4c79f60c11214eb38604f4ae0781bfb2", "status": "ok", + "description": "desc", "origin": "auto.http", "measurements": { "memory": { diff --git a/relay-event-schema/src/protocol/span/convert.rs b/relay-event-schema/src/protocol/span/convert.rs index 1246e3c649..d283c2acd6 100644 --- a/relay-event-schema/src/protocol/span/convert.rs +++ b/relay-event-schema/src/protocol/span/convert.rs @@ -248,7 +248,6 @@ mod tests { timestamp: ~, start_timestamp: ~, exclusive_time: 123.4, - description: "my 1st transaction", op: "myop", span_id: SpanId( "fa90fdead5f74052", @@ -264,6 +263,7 @@ mod tests { ), is_segment: true, status: Ok, + description: "my 1st transaction", tags: ~, origin: "manual", profile_id: EventId( diff --git a/relay-metrics/src/aggregator.rs b/relay-metrics/src/aggregator.rs index 2a4db7c723..05112604db 100644 --- a/relay-metrics/src/aggregator.rs +++ b/relay-metrics/src/aggregator.rs @@ -3,7 +3,7 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap}; use std::error::Error; -use std::hash::Hasher; +use std::hash::{Hash, Hasher}; use std::time::Duration; use std::{fmt, mem}; @@ -88,6 +88,24 @@ impl BucketKey { fn namespace(&self) -> MetricNamespace { self.metric_name.namespace() } + + /// Computes a stable partitioning key for this [`Bucket`]. + /// + /// The partitioning key is inherently producing collisions, since the output of the hasher is + /// reduced into an interval of size `partitions`. This means that buckets with totally + /// different values might end up in the same partition. + /// + /// The role of partitioning is to let Relays forward the same metric to the same upstream + /// instance with the goal of increasing bucketing efficiency. + fn partition_key(&self, partitions: u64) -> u64 { + let key = (self.project_key, &self.metric_name, &self.tags); + + let mut hasher = fnv::FnvHasher::default(); + key.hash(&mut hasher); + + let partitions = partitions.max(1); + hasher.finish() % partitions + } } /// Estimates the number of bytes needed to encode the tags. @@ -98,10 +116,10 @@ pub fn tags_cost(tags: &BTreeMap) -> usize { tags.iter().map(|(k, v)| k.len() + v.len()).sum() } -/// Configuration value for [`AggregatorConfig::shift_key`]. +/// Configuration value for [`AggregatorConfig::flush_batching`]. #[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)] #[serde(rename_all = "lowercase")] -pub enum ShiftKey { +pub enum FlushBatching { /// Shifts the flush time by an offset based on the [`ProjectKey`]. /// /// This allows buckets from the same project to be flushed together. @@ -112,10 +130,21 @@ pub enum ShiftKey { /// /// This allows for a completely random distribution of bucket flush times. /// - /// Only for use in processing Relays. + /// It should only be used in processing Relays since this flushing behavior it's better + /// suited for how Relay emits metrics to Kafka. Bucket, - /// Do not apply shift. This should be set when `http.global_metrics` is used. + /// Shifts the flush time by an offset based on the partition key. + /// + /// This allows buckets belonging to the same partition to be flushed together. Requires + /// [`flush_partitions`](AggregatorConfig::flush_partitions) to be set, otherwise this will fall + /// back to [`FlushBatching::Project`]. + /// + /// It should only be used in Relays with the `http.global_metrics` option set since when + /// encoding metrics via the global endpoint we can leverage partitioning. + Partition, + + /// Do not apply shift. None, } @@ -181,11 +210,24 @@ pub struct AggregatorConfig { /// Defaults to `None`, i.e. no limit. pub max_project_key_bucket_bytes: Option, - /// Key used to shift the flush time of a bucket. + /// The number of logical partitions that can receive flushed buckets. + /// + /// If set, buckets are partitioned by (bucket key % flush_partitions), and routed + /// by setting the header `X-Sentry-Relay-Shard`. /// - /// This prevents flushing all buckets from a bucket interval at the same - /// time by computing an offset from the hash of the given key. - pub shift_key: ShiftKey, + /// This setting will take effect only when paired with [`FlushBatching::Partition`]. + pub flush_partitions: Option, + + /// The batching mode for the flushing of the aggregator. + /// + /// Batching is applied via shifts to the flushing time that is determined when the first bucket + /// is inserted. Thanks to the shifts, Relay is able to prevent flushing all buckets from a + /// bucket interval at the same time. + /// + /// For example, the aggregator can choose to shift by the same value all buckets within a given + /// partition, effectively allowing all the elements of that partition to be flushed together. + #[serde(alias = "shift_key")] + pub flush_batching: FlushBatching, } impl AggregatorConfig { @@ -240,21 +282,29 @@ impl AggregatorConfig { Duration::from_secs(self.initial_delay) } - // Shift deterministically within one bucket interval based on the project or bucket key. - // - // This distributes buckets over time to prevent peaks. + /// Shift deterministically within one bucket interval based on the configured [`FlushBatching`]. + /// + /// This distributes buckets over time to prevent peaks. fn flush_time_shift(&self, bucket: &BucketKey) -> Duration { - let hash_value = match self.shift_key { - ShiftKey::Project => { + let shift_range = self.bucket_interval * 1000; + + // Fall back to default flushing by project if no partitioning is configured. + let (batching, partitions) = match (self.flush_batching, self.flush_partitions) { + (FlushBatching::Partition, None | Some(0)) => (FlushBatching::Project, 0), + (flush_batching, partitions) => (flush_batching, partitions.unwrap_or(0)), + }; + + let shift_millis = match batching { + FlushBatching::Project => { let mut hasher = FnvHasher::default(); hasher.write(bucket.project_key.as_str().as_bytes()); - hasher.finish() + hasher.finish() % shift_range } - ShiftKey::Bucket => bucket.hash64(), - ShiftKey::None => return Duration::ZERO, + FlushBatching::Bucket => bucket.hash64() % shift_range, + FlushBatching::Partition => shift_range * bucket.partition_key(partitions) / partitions, + FlushBatching::None => 0, }; - let shift_millis = hash_value % (self.bucket_interval * 1000); Duration::from_millis(shift_millis) } @@ -293,7 +343,8 @@ impl Default for AggregatorConfig { max_tag_key_length: 200, max_tag_value_length: 200, max_project_key_bucket_bytes: None, - shift_key: ShiftKey::default(), + flush_batching: FlushBatching::default(), + flush_partitions: None, } } } @@ -539,10 +590,16 @@ impl Aggregator { .collect() } - /// Pop and return the buckets that are eligible for flushing out according to bucket interval. + /// Pop and return the partitions with buckets that are eligible for flushing out according to + /// bucket interval. + /// + /// If no partitioning is enabled, the function will return a single `None` partition. /// /// Note that this function is primarily intended for tests. - pub fn pop_flush_buckets(&mut self, force: bool) -> HashMap> { + pub fn pop_flush_buckets( + &mut self, + force: bool, + ) -> HashMap, HashMap>> { relay_statsd::metric!( gauge(MetricGauges::Buckets) = self.bucket_count() as u64, aggregator = &self.name, @@ -555,7 +612,7 @@ impl Aggregator { aggregator = &self.name, ); - let mut buckets = HashMap::new(); + let mut partitions = HashMap::new(); let mut stats = HashMap::new(); relay_statsd::metric!( @@ -587,7 +644,9 @@ impl Aggregator { metadata, }; - buckets + partitions + .entry(self.config.flush_partitions.map(|p| key.partition_key(p))) + .or_insert_with(HashMap::new) .entry(key.project_key) .or_insert_with(Vec::new) .push(bucket); @@ -609,7 +668,7 @@ impl Aggregator { ); } - buckets + partitions } /// Wrapper for [`AggregatorConfig::get_bucket_timestamp`]. @@ -891,7 +950,8 @@ mod tests { max_tag_key_length: 200, max_tag_value_length: 200, max_project_key_bucket_bytes: None, - shift_key: ShiftKey::default(), + flush_batching: FlushBatching::default(), + flush_partitions: None, } } @@ -1463,10 +1523,10 @@ mod tests { } #[test] - fn test_parse_shift_key() { - let json = r#"{"shift_key": "bucket"}"#; + fn test_parse_flush_batching() { + let json = r#"{"shift_key": "partition"}"#; let parsed: AggregatorConfig = serde_json::from_str(json).unwrap(); - assert!(matches!(parsed.shift_key, ShiftKey::Bucket)); + assert!(matches!(parsed.flush_batching, FlushBatching::Partition)); } #[test] diff --git a/relay-metrics/src/aggregatorservice.rs b/relay-metrics/src/aggregatorservice.rs index 9bffb02948..91b958c6e0 100644 --- a/relay-metrics/src/aggregatorservice.rs +++ b/relay-metrics/src/aggregatorservice.rs @@ -8,7 +8,7 @@ use relay_system::{ }; use serde::{Deserialize, Serialize}; -use crate::aggregator::{self, AggregatorConfig, ShiftKey}; +use crate::aggregator::{self, AggregatorConfig, FlushBatching}; use crate::bucket::Bucket; use crate::statsd::{MetricCounters, MetricHistograms, MetricTimers}; @@ -86,12 +86,6 @@ pub struct AggregatorServiceConfig { /// Defaults to `None`, i.e. no limit. pub max_project_key_bucket_bytes: Option, - /// Key used to shift the flush time of a bucket. - /// - /// This prevents flushing all buckets from a bucket interval at the same - /// time by computing an offset from the hash of the given key. - pub shift_key: ShiftKey, - // TODO(dav1dde): move these config values to a better spot /// The approximate maximum number of bytes submitted within one flush cycle. /// @@ -100,11 +94,23 @@ pub struct AggregatorServiceConfig { /// adds some additional overhead, this number is approxmate and some safety margin should be /// left to hard limits. pub max_flush_bytes: usize, + /// The number of logical partitions that can receive flushed buckets. /// /// If set, buckets are partitioned by (bucket key % flush_partitions), and routed /// by setting the header `X-Sentry-Relay-Shard`. pub flush_partitions: Option, + + /// The batching mode for the flushing of the aggregator. + /// + /// Batching is applied via shifts to the flushing time that is determined when the first bucket + /// is inserted. Thanks to the shifts, Relay is able to prevent flushing all buckets from a + /// bucket interval at the same time. + /// + /// For example, the aggregator can choose to shift by the same value all buckets within a given + /// partition, effectively allowing all the elements of that partition to be flushed together. + #[serde(alias = "shift_key")] + pub flush_batching: FlushBatching, } impl Default for AggregatorServiceConfig { @@ -120,9 +126,9 @@ impl Default for AggregatorServiceConfig { max_tag_key_length: 200, max_tag_value_length: 200, max_project_key_bucket_bytes: None, - shift_key: ShiftKey::default(), max_flush_bytes: 5_000_000, // 5 MB flush_partitions: None, + flush_batching: FlushBatching::Project, } } } @@ -139,7 +145,8 @@ impl From<&AggregatorServiceConfig> for AggregatorConfig { max_tag_key_length: value.max_tag_key_length, max_tag_value_length: value.max_tag_value_length, max_project_key_bucket_bytes: value.max_project_key_bucket_bytes, - shift_key: value.shift_key, + flush_partitions: value.flush_partitions, + flush_batching: value.flush_batching, } } } @@ -219,6 +226,10 @@ pub struct BucketCountInquiry; /// failed buckets. They will be merged back into the aggregator and flushed at a later time. #[derive(Clone, Debug)] pub struct FlushBuckets { + /// The partition to which the buckets belong. + /// + /// When set to `Some` it means that partitioning was enabled in the [`Aggregator`]. + pub partition_key: Option, /// The buckets to be flushed. pub buckets: HashMap>, } @@ -277,23 +288,25 @@ impl AggregatorService { /// If `force` is true, flush all buckets unconditionally and do not attempt to merge back. fn try_flush(&mut self) { let force_flush = matches!(&self.state, AggregatorState::ShuttingDown); - let buckets = self.aggregator.pop_flush_buckets(force_flush); + let partitions = self.aggregator.pop_flush_buckets(force_flush); - if buckets.is_empty() { + if partitions.is_empty() { return; } - relay_log::trace!("flushing {} projects to receiver", buckets.len()); + relay_log::trace!("flushing {} partitions to receiver", partitions.len()); let mut total_bucket_count = 0u64; - for buckets in buckets.values() { - let bucket_count = buckets.len() as u64; - total_bucket_count += bucket_count; - - relay_statsd::metric!( - histogram(MetricHistograms::BucketsFlushedPerProject) = bucket_count, - aggregator = self.aggregator.name(), - ); + for buckets_by_project in partitions.values() { + for buckets in buckets_by_project.values() { + let bucket_count = buckets.len() as u64; + total_bucket_count += bucket_count; + + relay_statsd::metric!( + histogram(MetricHistograms::BucketsFlushedPerProject) = bucket_count, + aggregator = self.aggregator.name(), + ); + } } relay_statsd::metric!( @@ -302,7 +315,12 @@ impl AggregatorService { ); if let Some(ref receiver) = self.receiver { - receiver.send(FlushBuckets { buckets }) + for (partition_key, buckets_by_project) in partitions { + receiver.send(FlushBuckets { + partition_key, + buckets: buckets_by_project, + }) + } } } diff --git a/relay-protocol-derive/src/lib.rs b/relay-protocol-derive/src/lib.rs index 4fb7be5959..11f9b2d6ab 100644 --- a/relay-protocol-derive/src/lib.rs +++ b/relay-protocol-derive/src/lib.rs @@ -730,7 +730,7 @@ fn parse_field_attributes( rv.skip_serialization = FromStr::from_str(&litstr.value()) .expect("Unknown value for skip_serialization"); } - _ => panic!("Got non string literal for legacy_alias"), + _ => panic!("Got non string literal for skip_serialization"), } } } diff --git a/relay-server/src/metrics/minimal.rs b/relay-server/src/metrics/minimal.rs index 7865bff787..37889ad6e1 100644 --- a/relay-server/src/metrics/minimal.rs +++ b/relay-server/src/metrics/minimal.rs @@ -2,9 +2,9 @@ use relay_metrics::{ BucketMetadata, CounterType, MetricName, MetricNamespace, MetricResourceIdentifier, MetricType, }; use serde::de::IgnoredAny; -use serde::{de, Deserialize, Deserializer}; +use serde::Deserialize; -use crate::metrics::{BucketSummary, ExtractionMode, TrackableBucket}; +use crate::metrics::{BucketSummary, TrackableBucket}; /// Bucket which parses only the minimally required information to implement [`TrackableBucket`]. /// @@ -30,7 +30,7 @@ impl TrackableBucket for MinimalTrackableBucket { self.value.ty() } - fn summary(&self, mode: ExtractionMode) -> BucketSummary { + fn summary(&self) -> BucketSummary { let mri = match MetricResourceIdentifier::parse(self.name()) { Ok(mri) => mri, Err(_) => return BucketSummary::default(), @@ -38,10 +38,8 @@ impl TrackableBucket for MinimalTrackableBucket { match mri.namespace { MetricNamespace::Transactions => { - let usage = matches!(mode, ExtractionMode::Usage); let count = match self.value { - MinimalValue::Counter(c) if usage && mri.name == "usage" => c.to_f64() as usize, - MinimalValue::Distribution(d) if !usage && mri.name == "duration" => d.0, + MinimalValue::Counter(c) if mri.name == "usage" => c.to_f64() as usize, _ => 0, }; let has_profile = matches!(mri.name.as_ref(), "usage" | "duration") @@ -67,7 +65,7 @@ enum MinimalValue { #[serde(rename = "c")] Counter(CounterType), #[serde(rename = "d")] - Distribution(SeqCount), + Distribution(IgnoredAny), #[serde(rename = "s")] Set(IgnoredAny), #[serde(rename = "g")] @@ -91,41 +89,6 @@ struct Tags { has_profile: Option, } -/// Deserializes only the count of a sequence ingoring all individual items. -#[derive(Clone, Copy, Debug, Default)] -struct SeqCount(usize); - -impl<'de> Deserialize<'de> for SeqCount { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - struct Visitor; - - impl<'a> de::Visitor<'a> for Visitor { - type Value = SeqCount; - - fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { - formatter.write_str("a sequence") - } - - fn visit_seq(self, mut seq: A) -> Result - where - A: de::SeqAccess<'a>, - { - let mut count = 0; - while seq.next_element::()?.is_some() { - count += 1; - } - - Ok(SeqCount(count)) - } - } - - deserializer.deserialize_seq(Visitor) - } -} - #[cfg(test)] mod tests { use insta::assert_debug_snapshot; @@ -133,18 +96,6 @@ mod tests { use super::*; - #[test] - fn test_seq_count() { - let SeqCount(s) = serde_json::from_str("[1, 2, 3, 4, 5]").unwrap(); - assert_eq!(s, 5); - - let SeqCount(s) = serde_json::from_str("[1, 2, \"mixed\", 4, 5]").unwrap(); - assert_eq!(s, 5); - - let SeqCount(s) = serde_json::from_str("[]").unwrap(); - assert_eq!(s, 0); - } - #[test] fn test_buckets() { let json = r#"[ @@ -214,44 +165,12 @@ mod tests { for (b, mb) in buckets.iter().zip(min_buckets.iter()) { assert_eq!(b.name(), mb.name()); - assert_eq!( - b.summary(ExtractionMode::Usage), - mb.summary(ExtractionMode::Usage) - ); - assert_eq!( - b.summary(ExtractionMode::Duration), - mb.summary(ExtractionMode::Duration) - ); + assert_eq!(b.summary(), mb.summary()); assert_eq!(b.metadata, mb.metadata); } - let duration = min_buckets - .iter() - .map(|b| b.summary(ExtractionMode::Duration)) - .collect::>(); - let usage = min_buckets - .iter() - .map(|b| b.summary(ExtractionMode::Usage)) - .collect::>(); - - assert_debug_snapshot!(duration, @r###" - [ - Transactions { - count: 4, - has_profile: true, - }, - Transactions { - count: 0, - has_profile: false, - }, - Spans( - 3, - ), - None, - None, - ] - "###); - assert_debug_snapshot!(usage, @r###" + let summary = min_buckets.iter().map(|b| b.summary()).collect::>(); + assert_debug_snapshot!(summary, @r###" [ Transactions { count: 0, diff --git a/relay-server/src/metrics/outcomes.rs b/relay-server/src/metrics/outcomes.rs index 401d0f5af8..97aea4f582 100644 --- a/relay-server/src/metrics/outcomes.rs +++ b/relay-server/src/metrics/outcomes.rs @@ -7,33 +7,13 @@ use relay_quotas::{DataCategory, Scoping}; use relay_system::Addr; use crate::envelope::SourceQuantities; -use crate::metrics::{ExtractionMode, MetricStats}; +use crate::metrics::MetricStats; use crate::services::outcome::{Outcome, TrackOutcome}; #[cfg(feature = "processing")] use relay_cardinality::{CardinalityLimit, CardinalityReport}; pub const PROFILE_TAG: &str = "has_profile"; -/// Indicates where quantities should be taken from. -pub enum Quantities { - /// Calculates the quantities from the passed buckets using [`ExtractionMode`]. - FromBuckets(ExtractionMode), - /// Uses the provided [`SourceQuantities`]. - Value(SourceQuantities), -} - -impl From for Quantities { - fn from(value: ExtractionMode) -> Self { - Self::FromBuckets(value) - } -} - -impl From for Quantities { - fn from(value: SourceQuantities) -> Self { - Self::Value(value) - } -} - /// [`MetricOutcomes`] takes care of creating the right outcomes for metrics at the end of their /// lifecycle. /// @@ -55,13 +35,7 @@ impl MetricOutcomes { } /// Tracks an outcome for a list of buckets and generates the necessary outcomes. - pub fn track( - &self, - scoping: Scoping, - buckets: &[impl TrackableBucket], - quantities: impl Into, - outcome: Outcome, - ) { + pub fn track(&self, scoping: Scoping, buckets: &[impl TrackableBucket], outcome: Outcome) { let timestamp = Utc::now(); // Never emit accepted outcomes for surrogate metrics. @@ -72,10 +46,7 @@ impl MetricOutcomes { spans, profiles, buckets, - } = match quantities.into() { - Quantities::FromBuckets(mode) => extract_quantities(buckets, mode), - Quantities::Value(source) => source, - }; + } = extract_quantities(buckets); let categories = [ (DataCategory::Transaction, transactions as u32), @@ -151,7 +122,7 @@ pub trait TrackableBucket { /// of datapoints contained in the bucket. /// /// Additionally tracks whether the transactions also contained profiling information. - fn summary(&self, mode: ExtractionMode) -> BucketSummary; + fn summary(&self) -> BucketSummary; /// Metric bucket metadata. fn metadata(&self) -> BucketMetadata; @@ -166,8 +137,8 @@ impl TrackableBucket for &T { (**self).ty() } - fn summary(&self, mode: ExtractionMode) -> BucketSummary { - (**self).summary(mode) + fn summary(&self) -> BucketSummary { + (**self).summary() } fn metadata(&self) -> BucketMetadata { @@ -184,8 +155,8 @@ impl TrackableBucket for Bucket { self.value.ty() } - fn summary(&self, mode: ExtractionMode) -> BucketSummary { - BucketView::new(self).summary(mode) + fn summary(&self) -> BucketSummary { + BucketView::new(self).summary() } fn metadata(&self) -> BucketMetadata { @@ -202,7 +173,7 @@ impl TrackableBucket for BucketView<'_> { self.ty() } - fn summary(&self, mode: ExtractionMode) -> BucketSummary { + fn summary(&self) -> BucketSummary { let mri = match MetricResourceIdentifier::parse(self.name()) { Ok(mri) => mri, Err(_) => return BucketSummary::default(), @@ -210,12 +181,8 @@ impl TrackableBucket for BucketView<'_> { match mri.namespace { MetricNamespace::Transactions => { - let usage = matches!(mode, ExtractionMode::Usage); let count = match self.value() { - BucketViewValue::Counter(c) if usage && mri.name == "usage" => { - c.to_f64() as usize - } - BucketViewValue::Distribution(d) if !usage && mri.name == "duration" => d.len(), + BucketViewValue::Counter(c) if mri.name == "usage" => c.to_f64() as usize, _ => 0, }; let has_profile = matches!(mri.name.as_ref(), "usage" | "duration") @@ -239,7 +206,7 @@ impl TrackableBucket for BucketView<'_> { } /// Extracts quota information from a list of metric buckets. -pub fn extract_quantities(buckets: I, mode: ExtractionMode) -> SourceQuantities +pub fn extract_quantities(buckets: I) -> SourceQuantities where I: IntoIterator, T: TrackableBucket, @@ -248,7 +215,7 @@ where for bucket in buckets { quantities.buckets += 1; - let summary = bucket.summary(mode); + let summary = bucket.summary(); match summary { BucketSummary::Transactions { count, has_profile } => { quantities.transactions += count; diff --git a/relay-server/src/metrics/rate_limits.rs b/relay-server/src/metrics/rate_limits.rs index 87feebfecc..50bcc9a2ed 100644 --- a/relay-server/src/metrics/rate_limits.rs +++ b/relay-server/src/metrics/rate_limits.rs @@ -17,9 +17,6 @@ pub struct MetricsLimiter> = Vec> { /// A list of aggregated metric buckets with some counters. buckets: Vec, - /// Mode used to extract transaction and span counts. - mode: ExtractionMode, - /// The quotas set on the current project. quotas: Q, @@ -30,15 +27,6 @@ pub struct MetricsLimiter> = Vec> { counts: EntityCounts, } -/// Whether to extract transaction and profile count based on the usage or duration metric. -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum ExtractionMode { - /// Use the usage count metric. - Usage, - /// Use the duration distribution metric. - Duration, -} - fn to_counts(summary: &BucketSummary) -> EntityCounts { match *summary { BucketSummary::Transactions { count, has_profile } => EntityCounts { @@ -120,12 +108,11 @@ impl>> MetricsLimiter { buckets: impl IntoIterator, quotas: Q, scoping: Scoping, - mode: ExtractionMode, ) -> Result> { let buckets: Vec<_> = buckets .into_iter() .map(|bucket| { - let summary = bucket.summary(mode); + let summary = bucket.summary(); SummarizedBucket { bucket, summary } }) .collect(); @@ -138,7 +125,6 @@ impl>> MetricsLimiter { if let Some(counts) = total_counts { Ok(Self { buckets, - mode, quotas, scoping, counts, @@ -194,7 +180,7 @@ impl>> MetricsLimiter { }); self.buckets = buckets; - metric_outcomes.track(self.scoping, &dropped, self.mode, outcome); + metric_outcomes.track(self.scoping, &dropped, outcome); } fn drop_profiles( @@ -321,7 +307,6 @@ mod tests { project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: None, }, - ExtractionMode::Usage, ) .unwrap(); diff --git a/relay-server/src/metrics_extraction/snapshots/relay_server__metrics_extraction__event__tests__extract_span_metrics_mobile.snap b/relay-server/src/metrics_extraction/snapshots/relay_server__metrics_extraction__event__tests__extract_span_metrics_mobile.snap index 954b08971b..48ef885df0 100644 --- a/relay-server/src/metrics_extraction/snapshots/relay_server__metrics_extraction__event__tests__extract_span_metrics_mobile.snap +++ b/relay-server/src/metrics_extraction/snapshots/relay_server__metrics_extraction__event__tests__extract_span_metrics_mobile.snap @@ -12,7 +12,6 @@ expression: "(&event.value().unwrap().spans, metrics)" 2020-08-21T02:18:20Z, ), exclusive_time: 2000.0, - description: ~, op: "app.start.cold", span_id: SpanId( "bd429c44b67a3eb4", @@ -24,6 +23,7 @@ expression: "(&event.value().unwrap().spans, metrics)" segment_id: ~, is_segment: ~, status: ~, + description: ~, tags: ~, origin: ~, profile_id: ~, @@ -60,7 +60,6 @@ expression: "(&event.value().unwrap().spans, metrics)" 2020-08-21T02:18:20Z, ), exclusive_time: 3000.0, - description: ~, op: "ui.load.initial_display", span_id: SpanId( "bd429c44b67a3eb2", @@ -72,6 +71,7 @@ expression: "(&event.value().unwrap().spans, metrics)" segment_id: ~, is_segment: ~, status: ~, + description: ~, tags: ~, origin: ~, profile_id: ~, @@ -185,7 +185,6 @@ expression: "(&event.value().unwrap().spans, metrics)" 2020-08-21T02:18:20Z, ), exclusive_time: 3000.0, - description: "Cold Start", op: "app.start.cold", span_id: SpanId( "bd429c44b67a3eb2", @@ -197,6 +196,7 @@ expression: "(&event.value().unwrap().spans, metrics)" segment_id: ~, is_segment: ~, status: ~, + description: "Cold Start", tags: ~, origin: ~, profile_id: ~, @@ -235,7 +235,6 @@ expression: "(&event.value().unwrap().spans, metrics)" 2020-08-21T02:18:20Z, ), exclusive_time: 3000.0, - description: "Custom Op", op: "custom.op", span_id: SpanId( "bd429c44b67a3eb2", @@ -247,6 +246,7 @@ expression: "(&event.value().unwrap().spans, metrics)" segment_id: ~, is_segment: ~, status: ~, + description: "Custom Op", tags: ~, origin: ~, profile_id: ~, @@ -282,7 +282,6 @@ expression: "(&event.value().unwrap().spans, metrics)" 2020-08-21T02:18:20Z, ), exclusive_time: 3000.0, - description: "io.sentry.android.core.SentryPerformanceProvider.onCreate", op: "contentprovider.load", span_id: SpanId( "bd429c44b67a3eb2", @@ -294,6 +293,7 @@ expression: "(&event.value().unwrap().spans, metrics)" segment_id: ~, is_segment: ~, status: ~, + description: "io.sentry.android.core.SentryPerformanceProvider.onCreate", tags: ~, origin: ~, profile_id: ~, @@ -331,7 +331,6 @@ expression: "(&event.value().unwrap().spans, metrics)" 2020-08-21T02:18:20Z, ), exclusive_time: 3000.0, - description: "io.sentry.samples.android.MyApplication.onCreate", op: "application.load", span_id: SpanId( "bd429c44b67a3eb2", @@ -343,6 +342,7 @@ expression: "(&event.value().unwrap().spans, metrics)" segment_id: ~, is_segment: ~, status: ~, + description: "io.sentry.samples.android.MyApplication.onCreate", tags: ~, origin: ~, profile_id: ~, @@ -380,7 +380,6 @@ expression: "(&event.value().unwrap().spans, metrics)" 2020-08-21T02:18:20Z, ), exclusive_time: 3000.0, - description: "io.sentry.samples.android.MainActivity.onCreate", op: "activity.load", span_id: SpanId( "bd429c44b67a3eb2", @@ -392,6 +391,7 @@ expression: "(&event.value().unwrap().spans, metrics)" segment_id: ~, is_segment: ~, status: ~, + description: "io.sentry.samples.android.MainActivity.onCreate", tags: ~, origin: ~, profile_id: ~, @@ -480,7 +480,6 @@ expression: "(&event.value().unwrap().spans, metrics)" 2020-08-21T02:18:20Z, ), exclusive_time: 3000.0, - description: "Process Initialization", op: "process.load", span_id: SpanId( "bd429c44b67a3eb2", @@ -492,6 +491,7 @@ expression: "(&event.value().unwrap().spans, metrics)" segment_id: ~, is_segment: ~, status: ~, + description: "Process Initialization", tags: ~, origin: ~, profile_id: ~, @@ -578,7 +578,6 @@ expression: "(&event.value().unwrap().spans, metrics)" 2020-08-21T02:18:20Z, ), exclusive_time: 3000.0, - description: "somebackup.212321", op: "file.read", span_id: SpanId( "bd429c44b67a3eb2", @@ -590,6 +589,7 @@ expression: "(&event.value().unwrap().spans, metrics)" segment_id: ~, is_segment: ~, status: ~, + description: "somebackup.212321", tags: ~, origin: ~, profile_id: ~, @@ -628,7 +628,6 @@ expression: "(&event.value().unwrap().spans, metrics)" 2020-08-21T02:18:20Z, ), exclusive_time: 3000.0, - description: "www.example.com", op: "http.client", span_id: SpanId( "bd429c44b67a3eb2", @@ -640,6 +639,7 @@ expression: "(&event.value().unwrap().spans, metrics)" segment_id: ~, is_segment: ~, status: ~, + description: "www.example.com", tags: ~, origin: ~, profile_id: ~, @@ -728,7 +728,6 @@ expression: "(&event.value().unwrap().spans, metrics)" 2020-08-21T02:18:20Z, ), exclusive_time: 3000.0, - description: "www.example.com", op: "http.client", span_id: SpanId( "bd429c44b67a3eb2", @@ -740,6 +739,7 @@ expression: "(&event.value().unwrap().spans, metrics)" segment_id: ~, is_segment: ~, status: ~, + description: "www.example.com", tags: ~, origin: ~, profile_id: ~, diff --git a/relay-server/src/metrics_extraction/transactions/mod.rs b/relay-server/src/metrics_extraction/transactions/mod.rs index 4df374bd7e..08d919796e 100644 --- a/relay-server/src/metrics_extraction/transactions/mod.rs +++ b/relay-server/src/metrics_extraction/transactions/mod.rs @@ -13,7 +13,7 @@ use relay_metrics::{Bucket, DurationUnit, FiniteF64}; use crate::metrics_extraction::generic; use crate::metrics_extraction::transactions::types::{ CommonTag, CommonTags, ExtractMetricsError, LightTransactionTags, TransactionCPRTags, - TransactionDurationTags, TransactionMeasurementTags, TransactionMetric, UsageTags, + TransactionMeasurementTags, TransactionMetric, UsageTags, }; use crate::metrics_extraction::IntoMetric; use crate::statsd::RelayCounters; @@ -394,20 +394,11 @@ impl TransactionExtractor<'_> { // Duration let duration = relay_common::time::chrono_to_positive_millis(end - start); if let Some(duration) = FiniteF64::new(duration) { - let has_profile = if self.config.version >= 3 { - false - } else { - self.has_profile - }; - metrics.project_metrics.push( TransactionMetric::Duration { unit: DurationUnit::MilliSecond, value: duration, - tags: TransactionDurationTags { - has_profile, - universal_tags: tags.clone(), - }, + tags: tags.clone(), } .into_metric(timestamp), ); @@ -641,7 +632,6 @@ mod tests { 2020-08-21T02:18:20Z, ), exclusive_time: 2000.0, - description: "", op: "react.mount", span_id: SpanId( "bd429c44b67a3eb4", @@ -655,6 +645,7 @@ mod tests { segment_id: ~, is_segment: ~, status: ~, + description: "", tags: ~, origin: ~, profile_id: ~, diff --git a/relay-server/src/metrics_extraction/transactions/types.rs b/relay-server/src/metrics_extraction/transactions/types.rs index 69c24176c8..07f09e82b7 100644 --- a/relay-server/src/metrics_extraction/transactions/types.rs +++ b/relay-server/src/metrics_extraction/transactions/types.rs @@ -21,7 +21,7 @@ pub enum TransactionMetric { Duration { unit: DurationUnit, value: DistributionType, - tags: TransactionDurationTags, + tags: CommonTags, }, /// A distribution metric for the transaction duration with limited tags. DurationLight { @@ -133,22 +133,6 @@ impl IntoMetric for TransactionMetric { } } -#[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd)] -pub struct TransactionDurationTags { - pub has_profile: bool, - pub universal_tags: CommonTags, -} - -impl From for BTreeMap { - fn from(tags: TransactionDurationTags) -> Self { - let mut map: BTreeMap = tags.universal_tags.into(); - if tags.has_profile { - map.insert("has_profile".to_string(), "true".to_string()); - } - map - } -} - #[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd)] pub struct LightTransactionTags { pub transaction_op: Option, diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 7ebbe4f1fb..91eb22e18e 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -5,7 +5,7 @@ use std::fmt::{Debug, Display}; use std::future::Future; use std::io::Write; use std::pin::Pin; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, Once}; use std::time::{Duration, Instant}; use anyhow::Context; @@ -14,7 +14,6 @@ use bytes::Bytes; use chrono::{DateTime, Utc}; use flate2::write::{GzEncoder, ZlibEncoder}; use flate2::Compression; -use fnv::FnvHasher; use relay_base_schema::project::{ProjectId, ProjectKey}; use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token}; use relay_common::time::UnixTimestamp; @@ -65,7 +64,7 @@ use { use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemType}; use crate::extractors::{PartialDsn, RequestMeta}; -use crate::metrics::{ExtractionMode, MetricOutcomes, MinimalTrackableBucket}; +use crate::metrics::{MetricOutcomes, MinimalTrackableBucket}; use crate::metrics_extraction::transactions::types::ExtractMetricsError; use crate::metrics_extraction::transactions::{ExtractedMetrics, TransactionExtractor}; use crate::service::ServiceError; @@ -811,6 +810,7 @@ pub struct ProjectMetrics { /// Encodes metrics into an envelope ready to be sent upstream. #[derive(Debug)] pub struct EncodeMetrics { + pub partition_key: Option, pub scopes: BTreeMap, } @@ -1262,6 +1262,16 @@ impl EnvelopeProcessorService { }; if !tx_config.is_enabled() { + static TX_CONFIG_ERROR: Once = Once::new(); + TX_CONFIG_ERROR.call_once(|| { + if self.inner.config.processing_enabled() { + relay_log::error!( + "Processing Relay outdated, received tx config in version {}, which is not supported", + tx_config.version + ); + } + }); + return Ok(()); } @@ -2246,7 +2256,6 @@ impl EnvelopeProcessorService { scoping: Scoping, buckets: Vec, quotas: CombinedQuotas<'_>, - mode: ExtractionMode, ) -> Vec { let Some(rate_limiter) = self.inner.rate_limiter.as_ref() else { return buckets; @@ -2261,7 +2270,7 @@ impl EnvelopeProcessorService { .into_iter() .flat_map(|(namespace, buckets)| { let item_scoping = scoping.metric_bucket(namespace); - self.rate_limit_buckets(item_scoping, buckets, quotas, mode, rate_limiter) + self.rate_limit_buckets(item_scoping, buckets, quotas, rate_limiter) }) .collect() } @@ -2273,7 +2282,6 @@ impl EnvelopeProcessorService { item_scoping: relay_quotas::ItemScoping, buckets: Vec, quotas: CombinedQuotas<'_>, - mode: ExtractionMode, rate_limiter: &RedisRateLimiter, ) -> Vec { let batch_size = self.inner.config.metrics_max_batch_size_bytes(); @@ -2293,7 +2301,6 @@ impl EnvelopeProcessorService { self.inner.metric_outcomes.track( *item_scoping.scoping, &buckets, - mode, Outcome::RateLimited(reason_code), ); @@ -2323,7 +2330,6 @@ impl EnvelopeProcessorService { scoping: Scoping, limits: &[CardinalityLimit], buckets: Vec, - mode: ExtractionMode, ) -> Vec { let global_config = self.inner.global_config.current(); let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode; @@ -2381,7 +2387,7 @@ impl EnvelopeProcessorService { if !rejected.is_empty() { self.inner .metric_outcomes - .track(scoping, &rejected, mode, Outcome::CardinalityLimited); + .track(scoping, &rejected, Outcome::CardinalityLimited); } accepted @@ -2406,13 +2412,11 @@ impl EnvelopeProcessorService { project_state, } = message; - let mode = project_state.get_extraction_mode(); - let quotas = CombinedQuotas::new(&global_config, project_state.get_quotas()); - let buckets = self.rate_limit_buckets_by_namespace(scoping, buckets, quotas, mode); + let buckets = self.rate_limit_buckets_by_namespace(scoping, buckets, quotas); let limits = project_state.get_cardinality_limits(); - let buckets = self.cardinality_limit_buckets(scoping, limits, buckets, mode); + let buckets = self.cardinality_limit_buckets(scoping, limits, buckets); if buckets.is_empty() { continue; @@ -2429,7 +2433,6 @@ impl EnvelopeProcessorService { buckets, scoping, retention, - mode, }); } } @@ -2446,72 +2449,56 @@ impl EnvelopeProcessorService { /// access to the central Redis instance. Cached rate limits are applied in the project cache /// already. fn encode_metrics_envelope(&self, message: EncodeMetrics) { + let EncodeMetrics { + partition_key, + scopes, + } = message; + let batch_size = self.inner.config.metrics_max_batch_size_bytes(); let upstream = self.inner.config.upstream_descriptor(); - for (scoping, message) in message.scopes { - let ProjectMetrics { - buckets, - project_state, - } = message; + for (scoping, message) in scopes { + let ProjectMetrics { buckets, .. } = message; - let project_key = scoping.project_key; let dsn = PartialDsn::outbound(&scoping, upstream); - let mode = project_state.get_extraction_mode(); - - let partitions = if let Some(count) = self.inner.config.metrics_partitions() { - let mut partitions: BTreeMap, Vec> = BTreeMap::new(); - for bucket in buckets { - let partition_key = partition_key(project_key, &bucket, Some(count)); - partitions.entry(partition_key).or_default().push(bucket); - } - partitions - } else { - BTreeMap::from([(None, buckets)]) - }; - - for (partition_key, buckets) in partitions { - if let Some(key) = partition_key { - relay_statsd::metric!(histogram(RelayHistograms::PartitionKeys) = key); - } - - let mut num_batches = 0; - for batch in BucketsView::from(&buckets).by_size(batch_size) { - let mut envelope = - Envelope::from_request(None, RequestMeta::outbound(dsn.clone())); - let mut item = Item::new(ItemType::MetricBuckets); - item.set_source_quantities(metrics::extract_quantities(batch, mode)); - item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap()); - envelope.add_item(item); + if let Some(key) = partition_key { + relay_statsd::metric!(histogram(RelayHistograms::PartitionKeys) = key); + } - let mut envelope = ManagedEnvelope::standalone( - envelope, - self.inner.addrs.outcome_aggregator.clone(), - self.inner.addrs.test_store.clone(), - ProcessingGroup::Metrics, - ); - envelope.set_partition_key(partition_key).scope(scoping); + let mut num_batches = 0; + for batch in BucketsView::from(&buckets).by_size(batch_size) { + let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone())); - relay_statsd::metric!( - histogram(RelayHistograms::BucketsPerBatch) = batch.len() as u64 - ); + let mut item = Item::new(ItemType::MetricBuckets); + item.set_source_quantities(metrics::extract_quantities(batch)); + item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap()); + envelope.add_item(item); - self.handle_submit_envelope(SubmitEnvelope { - envelope: envelope.into_processed(), - }); - num_batches += 1; - } + let mut envelope = ManagedEnvelope::standalone( + envelope, + self.inner.addrs.outcome_aggregator.clone(), + self.inner.addrs.test_store.clone(), + ProcessingGroup::Metrics, + ); + envelope.set_partition_key(partition_key).scope(scoping); relay_statsd::metric!( - histogram(RelayHistograms::BatchesPerPartition) = num_batches + histogram(RelayHistograms::BucketsPerBatch) = batch.len() as u64 ); + + self.handle_submit_envelope(SubmitEnvelope { + envelope: envelope.into_processed(), + }); + num_batches += 1; } + + relay_statsd::metric!(histogram(RelayHistograms::BatchesPerPartition) = num_batches); } } /// Creates a [`SendMetricsRequest`] and sends it to the upstream relay. - fn send_global_partition(&self, key: Option, partition: &mut Partition<'_>) { + fn send_global_partition(&self, partition_key: Option, partition: &mut Partition<'_>) { if partition.is_empty() { return; } @@ -2528,7 +2515,7 @@ impl EnvelopeProcessorService { }; let request = SendMetricsRequest { - partition_key: key.map(|k| k.to_string()), + partition_key: partition_key.map(|k| k.to_string()), unencoded, encoded, project_info, @@ -2554,42 +2541,33 @@ impl EnvelopeProcessorService { /// access to the central Redis instance. Cached rate limits are applied in the project cache /// already. fn encode_metrics_global(&self, message: EncodeMetrics) { - let partition_count = self.inner.config.metrics_partitions(); - let batch_size = self.inner.config.metrics_max_batch_size_bytes(); - - let mut partitions = BTreeMap::new(); + let EncodeMetrics { + partition_key, + scopes, + } = message; - for (scoping, message) in &message.scopes { - let ProjectMetrics { - buckets, - project_state, - } = message; + let batch_size = self.inner.config.metrics_max_batch_size_bytes(); + let mut partition = Partition::new(batch_size); - let mode = project_state.get_extraction_mode(); + for (scoping, message) in &scopes { + let ProjectMetrics { buckets, .. } = message; for bucket in buckets { - let partition_key = partition_key(scoping.project_key, bucket, partition_count); - let mut remaining = Some(BucketView::new(bucket)); - while let Some(bucket) = remaining.take() { - let partition = partitions - .entry(partition_key) - .or_insert_with(|| Partition::new(batch_size)); - if let Some(next) = partition.insert(bucket, *scoping, mode) { + while let Some(bucket) = remaining.take() { + if let Some(next) = partition.insert(bucket, *scoping) { // A part of the bucket could not be inserted. Take the partition and submit // it immediately. Repeat until the final part was inserted. This should // always result in a request, otherwise we would enter an endless loop. - self.send_global_partition(partition_key, partition); + self.send_global_partition(partition_key, &mut partition); remaining = Some(next); } } } } - for (partition_key, mut partition) in partitions { - self.send_global_partition(partition_key, &mut partition); - } + self.send_global_partition(partition_key, &mut partition); } fn handle_encode_metrics(&self, message: EncodeMetrics) { @@ -2869,18 +2847,6 @@ impl UpstreamRequest for SendEnvelope { } } -/// Computes a stable partitioning key for sharded metric requests. -fn partition_key(project_key: ProjectKey, bucket: &Bucket, partitions: Option) -> Option { - use std::hash::{Hash, Hasher}; - - let partitions = partitions?.max(1); - let key = (project_key, &bucket.name, &bucket.tags); - - let mut hasher = FnvHasher::default(); - key.hash(&mut hasher); - Some(hasher.finish() % partitions) -} - /// A container for metric buckets from multiple projects. /// /// This container is used to send metrics to the upstream in global batches as part of the @@ -2892,7 +2858,7 @@ struct Partition<'a> { max_size: usize, remaining: usize, views: HashMap>>, - project_info: HashMap, + project_info: HashMap, } impl<'a> Partition<'a> { @@ -2916,12 +2882,7 @@ impl<'a> Partition<'a> { /// upstream immediately. Use [`Self::take`] to retrieve the contents of the /// partition. Afterwards, the caller is responsible to call this function again with the /// remaining bucket until it is fully inserted. - pub fn insert( - &mut self, - bucket: BucketView<'a>, - scoping: Scoping, - mode: ExtractionMode, - ) -> Option> { + pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option> { let (current, next) = bucket.split(self.remaining, Some(self.max_size)); if let Some(current) = current { @@ -2933,7 +2894,7 @@ impl<'a> Partition<'a> { self.project_info .entry(scoping.project_key) - .or_insert((scoping, mode)); + .or_insert(scoping); } next @@ -2947,7 +2908,7 @@ impl<'a> Partition<'a> { /// Returns the serialized buckets for this partition. /// /// This empties the partition, so that it can be reused. - fn take(&mut self) -> (Bytes, HashMap) { + fn take(&mut self) -> (Bytes, HashMap) { #[derive(serde::Serialize)] struct Wrapper<'a> { buckets: &'a HashMap>>, @@ -2980,7 +2941,7 @@ struct SendMetricsRequest { /// Mapping of all contained project keys to their scoping and extraction mode. /// /// Used to track outcomes for transmission failures. - project_info: HashMap, + project_info: HashMap, /// Encoding (compression) of the payload. http_encoding: HttpEncoding, /// Metric outcomes instance to send outcomes on error. @@ -3006,7 +2967,7 @@ impl SendMetricsRequest { }; for (key, buckets) in buckets { - let Some(&(scoping, mode)) = self.project_info.get(&key) else { + let Some(&scoping) = self.project_info.get(&key) else { relay_log::error!("missing scoping for project key"); continue; }; @@ -3014,7 +2975,6 @@ impl SendMetricsRequest { self.metric_outcomes.track( scoping, &buckets, - mode, Outcome::Invalid(DiscardReason::Internal), ); } @@ -3230,7 +3190,10 @@ mod tests { scopes.insert(scoping_by_org_id(rate_limited_org), project_metrics.clone()); scopes.insert(scoping_by_org_id(not_ratelimited_org), project_metrics); - EncodeMetrics { scopes } + EncodeMetrics { + partition_key: None, + scopes, + } }; // ensure the order of the map while iterating is as expected. diff --git a/relay-server/src/services/processor/dynamic_sampling.rs b/relay-server/src/services/processor/dynamic_sampling.rs index 843e0d3ca2..63156bb4ea 100644 --- a/relay-server/src/services/processor/dynamic_sampling.rs +++ b/relay-server/src/services/processor/dynamic_sampling.rs @@ -482,13 +482,13 @@ mod tests { let sampling_result = run(&mut state, &config); assert!(sampling_result.should_keep()); - // Current version is 1, so it won't run DS if it's outdated - let mut state = get_state(Some(0)); + // Current version is 3, so it won't run DS if it's outdated + let mut state = get_state(Some(2)); let sampling_result = run(&mut state, &config); assert!(sampling_result.should_keep()); // Dynamic sampling is run, as the transactionmetrics version is up to date. - let mut state = get_state(Some(1)); + let mut state = get_state(Some(3)); let sampling_result = run(&mut state, &config); assert!(sampling_result.should_drop()); } diff --git a/relay-server/src/services/project.rs b/relay-server/src/services/project.rs index c983a98504..c7e16db4bb 100644 --- a/relay-server/src/services/project.rs +++ b/relay-server/src/services/project.rs @@ -23,7 +23,7 @@ use tokio::time::Instant; use url::Url; use crate::envelope::Envelope; -use crate::metrics::{ExtractionMode, MetricOutcomes, MetricsLimiter}; +use crate::metrics::{MetricOutcomes, MetricsLimiter}; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; #[cfg(feature = "processing")] use crate::services::processor::RateLimitBuckets; @@ -282,14 +282,6 @@ impl ProjectState { scoping } - /// Returns the transaction/profile extraction mode for this project. - pub fn get_extraction_mode(&self) -> ExtractionMode { - match self.config.transaction_metrics { - Some(ErrorBoundary::Ok(ref c)) if c.usage_metric() => ExtractionMode::Usage, - _ => ExtractionMode::Duration, - } - } - /// Returns quotas declared in this project state. pub fn get_quotas(&self) -> &[Quota] { self.config.quotas.as_slice() @@ -617,8 +609,7 @@ impl Project { // Check rate limits if necessary. let quotas = state.config.quotas.clone(); - let extraction_mode = state.get_extraction_mode(); - let buckets = match MetricsLimiter::create(buckets, quotas, scoping, extraction_mode) { + let buckets = match MetricsLimiter::create(buckets, quotas, scoping) { Ok(mut bucket_limiter) => { let cached_rate_limits = self.rate_limits().clone(); #[allow(unused_variables)] @@ -1133,7 +1124,6 @@ impl Project { .filter_map(|bucket| bucket.name.try_namespace()) .collect(); - let mode = project_state.get_extraction_mode(); for namespace in namespaces { let limits = self.rate_limits().check_with_quotas( project_state.get_quotas(), @@ -1147,7 +1137,7 @@ impl Project { }); let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone()); - metric_outcomes.track(scoping, &rejected, mode, Outcome::RateLimited(reason_code)); + metric_outcomes.track(scoping, &rejected, Outcome::RateLimited(reason_code)); } } diff --git a/relay-server/src/services/project/metrics.rs b/relay-server/src/services/project/metrics.rs index c36f65bbd5..47040eb3ef 100644 --- a/relay-server/src/services/project/metrics.rs +++ b/relay-server/src/services/project/metrics.rs @@ -101,13 +101,10 @@ impl Buckets { }) .collect(); - let mode = project_state.get_extraction_mode(); - if !disabled_namespace_buckets.is_empty() { metric_outcomes.track( scoping, &disabled_namespace_buckets, - mode, Outcome::Filtered(FilterStatKey::DisabledNamespace), ); } @@ -116,7 +113,6 @@ impl Buckets { metric_outcomes.track( scoping, &denied_buckets, - mode, Outcome::Filtered(FilterStatKey::DeniedName), ); } diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 4a3c157da9..ade2b36bdc 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -899,17 +899,18 @@ impl ProjectCacheBroker { fn handle_flush_buckets(&mut self, message: FlushBuckets) { let metric_outcomes = self.metric_outcomes.clone(); - let mut output = BTreeMap::new(); + let mut scoped_buckets = BTreeMap::new(); for (project_key, buckets) in message.buckets { let project = self.get_or_create_project(project_key); if let Some((scoping, b)) = project.check_buckets(&metric_outcomes, buckets) { - output.insert(scoping, b); + scoped_buckets.insert(scoping, b); } } - self.services - .envelope_processor - .send(EncodeMetrics { scopes: output }) + self.services.envelope_processor.send(EncodeMetrics { + partition_key: message.partition_key, + scopes: scoped_buckets, + }) } fn handle_buffer_index(&mut self, message: UpdateSpoolIndex) { diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 96c15c5d93..063794ffdf 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -30,7 +30,7 @@ use uuid::Uuid; use crate::envelope::{AttachmentType, Envelope, Item, ItemType}; -use crate::metrics::{ArrayEncoding, BucketEncoder, ExtractionMode, MetricOutcomes}; +use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes}; use crate::service::ServiceError; use crate::services::global_config::GlobalConfigHandle; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; @@ -88,7 +88,6 @@ pub struct StoreMetrics { pub buckets: Vec, pub scoping: Scoping, pub retention: u16, - pub mode: ExtractionMode, } #[derive(Debug)] @@ -205,15 +204,6 @@ impl StoreService { KafkaTopic::Attachments } else if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) { KafkaTopic::Transactions - } else if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::UserReportV2) { - if is_rolled_out( - scoping.organization_id, - global_options.feedback_ingest_topic_rollout_rate, - ) { - KafkaTopic::Feedback - } else { - KafkaTopic::Events - } } else { KafkaTopic::Events }; @@ -394,7 +384,6 @@ impl StoreService { buckets, scoping, retention, - mode, } = message; let batch_size = self.config.metrics_max_batch_size_bytes(); @@ -433,7 +422,7 @@ impl StoreService { } }; - self.metric_outcomes.track(scoping, &[view], mode, outcome); + self.metric_outcomes.track(scoping, &[view], outcome); } } diff --git a/relay-spans/src/span.rs b/relay-spans/src/span.rs index d043efbbf1..507e868f56 100644 --- a/relay-spans/src/span.rs +++ b/relay-spans/src/span.rs @@ -602,7 +602,6 @@ mod tests { 1970-01-01T00:02:03Z, ), exclusive_time: 500.0, - description: ~, op: "myop", span_id: SpanId( "fa90fdead5f74052", @@ -618,6 +617,7 @@ mod tests { ), is_segment: ~, status: Ok, + description: ~, tags: ~, origin: ~, profile_id: EventId( diff --git a/tests/integration/consts.py b/tests/integration/consts.py new file mode 100644 index 0000000000..63d7e8065d --- /dev/null +++ b/tests/integration/consts.py @@ -0,0 +1,4 @@ +# Minimum supported version for metric transaction by Relay. +TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION = 3 +# Maximum supported version for metric transaction by Relay. +TRANSACTION_EXTRACT_MAX_SUPPORTED_VERSION = 6 diff --git a/tests/integration/test_dynamic_sampling.py b/tests/integration/test_dynamic_sampling.py index 34bf3397b4..c0e805d301 100644 --- a/tests/integration/test_dynamic_sampling.py +++ b/tests/integration/test_dynamic_sampling.py @@ -1,6 +1,10 @@ from datetime import datetime import uuid import json +from .consts import ( + TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, + TRANSACTION_EXTRACT_MAX_SUPPORTED_VERSION, +) import pytest from sentry_sdk.envelope import Envelope, Item, PayloadRef @@ -233,7 +237,9 @@ def test_it_removes_events(mini_sentry, relay): # create a basic project config config = mini_sentry.add_basic_project_config(project_id) - config["config"]["transactionMetrics"] = {"version": 1} + config["config"]["transactionMetrics"] = { + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION + } public_key = config["publicKeys"][0]["publicKey"] @@ -345,7 +351,9 @@ def test_sample_on_parametrized_root_transaction(mini_sentry, relay): parametrized_transaction = "/auth/login/*/" config = mini_sentry.add_basic_project_config(project_id) - config["config"]["transactionMetrics"] = {"version": 1} + config["config"]["transactionMetrics"] = { + "version": TRANSACTION_EXTRACT_MAX_SUPPORTED_VERSION + } sampling_config = mini_sentry.add_basic_project_config(43) sampling_public_key = sampling_config["publicKeys"][0]["publicKey"] @@ -455,14 +463,18 @@ def test_uses_trace_public_key(mini_sentry, relay): # create basic project configs project_id1 = 42 config1 = mini_sentry.add_basic_project_config(project_id1) - config1["config"]["transactionMetrics"] = {"version": 1} + config1["config"]["transactionMetrics"] = { + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION + } public_key1 = config1["publicKeys"][0]["publicKey"] _add_sampling_config(config1, sample_rate=0, rule_type="trace") project_id2 = 43 config2 = mini_sentry.add_basic_project_config(project_id2) - config2["config"]["transactionMetrics"] = {"version": 1} + config2["config"]["transactionMetrics"] = { + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION + } public_key2 = config2["publicKeys"][0]["publicKey"] _add_sampling_config(config2, sample_rate=1, rule_type="trace") @@ -525,7 +537,9 @@ def test_multi_item_envelope(mini_sentry, relay, rule_type, event_factory): # create a basic project config config = mini_sentry.add_basic_project_config(project_id) - config["config"]["transactionMetrics"] = {"version": 1} + config["config"]["transactionMetrics"] = { + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION + } # add a sampling rule to project config that removes all transactions (sample_rate=0) public_key = config["publicKeys"][0]["publicKey"] # add a sampling rule to project config that drops all events (sample_rate=0), it should be ignored @@ -575,7 +589,9 @@ def test_client_sample_rate_adjusted(mini_sentry, relay, rule_type, event_factor project_id = 42 relay = relay(mini_sentry) config = mini_sentry.add_basic_project_config(project_id) - config["config"]["transactionMetrics"] = {"version": 1} + config["config"]["transactionMetrics"] = { + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION + } public_key = config["publicKeys"][0]["publicKey"] # the closer to 0, the less flaky the test is @@ -681,7 +697,9 @@ def make_envelope(public_key): project_id = 42 relay = relay(relay_with_processing()) config = mini_sentry.add_basic_project_config(project_id) - config["config"]["transactionMetrics"] = {"version": 1} + config["config"]["transactionMetrics"] = { + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION + } config["config"]["features"] = ["projects:profiling-ingest-unsampled-profiles"] public_key = config["publicKeys"][0]["publicKey"] @@ -824,7 +842,9 @@ def test_invalid_global_generic_filters_skip_dynamic_sampling(mini_sentry, relay project_id = 42 config = mini_sentry.add_basic_project_config(project_id) - config["config"]["transactionMetrics"] = {"version": 1} + config["config"]["transactionMetrics"] = { + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION + } public_key = config["publicKeys"][0]["publicKey"] # Reject all transactions with dynamic sampling diff --git a/tests/integration/test_metrics.py b/tests/integration/test_metrics.py index b9638d8b55..ebafe5fdca 100644 --- a/tests/integration/test_metrics.py +++ b/tests/integration/test_metrics.py @@ -7,6 +7,10 @@ import signal import time import queue +from .consts import ( + TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, + TRANSACTION_EXTRACT_MAX_SUPPORTED_VERSION, +) import pytest import requests @@ -253,7 +257,16 @@ def test_metrics_backdated(mini_sentry, relay): @pytest.mark.parametrize( "metrics_partitions,expected_header", - [(None, None), (0, "0"), (1, "0"), (128, "17")], + [ + # With no partitions defined, partitioning will not be performed but bucket shift will still be done. + (None, None), + # With zero partitions defined, all the buckets will be forwarded to a single partition. + (0, "0"), + # With zero partitions defined, all the buckets will be forwarded to a single partition. + (1, "0"), + # With more than zero partitions defined, the buckets will be forwarded to one of the partitions. + (128, "17"), + ], ) def test_metrics_partition_key(mini_sentry, relay, metrics_partitions, expected_header): forever = 100 * 365 * 24 * 60 * 60 # *almost forever @@ -267,8 +280,8 @@ def test_metrics_partition_key(mini_sentry, relay, metrics_partitions, expected_ "debounce_delay": 0, "max_secs_in_past": forever, "max_secs_in_future": forever, + "shift_key": "partition", "flush_partitions": metrics_partitions, - "shift_key": "none", }, } relay = relay(mini_sentry, options=relay_config) @@ -799,11 +812,11 @@ def test_transaction_metrics( ) if extract_metrics == "corrupted": - config["transactionMetrics"] = 42 + config["transactionMetrics"] = TRANSACTION_EXTRACT_MAX_SUPPORTED_VERSION + 1 elif extract_metrics: config["transactionMetrics"] = { - "version": 1, + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, } config.setdefault("features", []).append("projects:span-metrics-extraction") @@ -940,7 +953,7 @@ def test_transaction_metrics_count_per_root_project( "span_ops": {"type": "spanOperations", "matches": ["react.mount"]} } config["transactionMetrics"] = { - "version": 1, + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, } transaction = generate_transaction_item() @@ -1013,7 +1026,9 @@ def test_transaction_metrics_extraction_external_relays( project_id = 42 mini_sentry.add_full_project_config(project_id) config = mini_sentry.project_configs[project_id]["config"] - config["transactionMetrics"] = {"version": 3} + config["transactionMetrics"] = { + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION + } config["sampling"] = { "version": 2, "rules": [ @@ -1093,7 +1108,7 @@ def test_transaction_metrics_extraction_processing_relays( mini_sentry.add_full_project_config(project_id) config = mini_sentry.project_configs[project_id]["config"] config["transactionMetrics"] = { - "version": 1, + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, } tx = generate_transaction_item() @@ -1165,6 +1180,10 @@ def test_transaction_metrics_not_extracted_on_unsupported_version( assert tx["transaction"] == "/organizations/:orgId/performance/:eventSlug/" tx_consumer.assert_empty() + if unsupported_version < TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION: + error = str(mini_sentry.test_failures.pop(0)) + assert "Processing Relay outdated" in error + metrics_consumer.assert_empty() @@ -1173,7 +1192,7 @@ def test_no_transaction_metrics_when_filtered(mini_sentry, relay): mini_sentry.add_full_project_config(project_id) config = mini_sentry.project_configs[project_id]["config"] config["transactionMetrics"] = { - "version": 1, + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, } config["filterSettings"]["releases"] = {"releases": ["foo@1.2.4"]} @@ -1204,7 +1223,7 @@ def test_transaction_name_too_long( mini_sentry.add_full_project_config(project_id) config = mini_sentry.project_configs[project_id]["config"] config["transactionMetrics"] = { - "version": 1, + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, } transaction = { @@ -1320,7 +1339,7 @@ def test_limit_custom_measurements( "maxCustomMeasurements": 1, } config["transactionMetrics"] = { - "version": 1, + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, } transaction = generate_transaction_item() @@ -1379,7 +1398,7 @@ def test_span_metrics( mini_sentry.add_full_project_config(project_id) config = mini_sentry.project_configs[project_id]["config"] config["transactionMetrics"] = { - "version": 1, + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, } config.setdefault("features", []).append("projects:span-metrics-extraction") @@ -1464,7 +1483,9 @@ def test_generic_metric_extraction(mini_sentry, relay): } ], } - config["transactionMetrics"] = {"version": 3} + config["transactionMetrics"] = { + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION + } config["sampling"] = { "version": 2, "rules": [ @@ -1517,7 +1538,7 @@ def test_span_metrics_secondary_aggregator( mini_sentry.add_full_project_config(project_id) config = mini_sentry.project_configs[project_id]["config"] config["transactionMetrics"] = { - "version": 1, + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, } config.setdefault("features", []).append("projects:span-metrics-extraction") @@ -1687,7 +1708,7 @@ def test_relay_forwards_events_without_extracting_metrics_on_broken_global_filte mini_sentry.add_full_project_config(project_id) config = mini_sentry.project_configs[project_id]["config"] config["transactionMetrics"] = { - "version": 1, + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, } if is_processing_relay: @@ -1750,7 +1771,7 @@ def test_relay_forwards_events_without_extracting_metrics_on_unsupported_project } } config["transactionMetrics"] = { - "version": 1, + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, } if is_processing_relay: @@ -1806,7 +1827,7 @@ def test_missing_global_filters_enables_metric_extraction( mini_sentry.add_full_project_config(project_id) config = mini_sentry.project_configs[project_id]["config"] config["transactionMetrics"] = { - "version": 1, + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, } relay = relay_with_processing( @@ -1946,7 +1967,7 @@ def test_histogram_outliers(mini_sentry, relay): mini_sentry.global_config["metricExtraction"] = yaml.full_load(f) project_config = mini_sentry.add_full_project_config(project_id=42)["config"] project_config["transactionMetrics"] = { - "version": 1, + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, } project_config["metricExtraction"] = { "version": 3, diff --git a/tests/integration/test_outcome.py b/tests/integration/test_outcome.py index 4cb6bad651..706bb66b8a 100644 --- a/tests/integration/test_outcome.py +++ b/tests/integration/test_outcome.py @@ -7,6 +7,10 @@ from datetime import UTC, datetime, timedelta, timezone from pathlib import Path from queue import Empty +from .consts import ( + TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, + TRANSACTION_EXTRACT_MAX_SUPPORTED_VERSION, +) import pytest import requests @@ -793,7 +797,9 @@ def test_outcome_to_client_report(relay, mini_sentry): # Create project config project_id = 42 project_config = mini_sentry.add_full_project_config(project_id) - project_config["config"]["transactionMetrics"] = {"version": 1} + project_config["config"]["transactionMetrics"] = { + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION + } project_config["config"]["sampling"] = { "version": 2, "rules": [ @@ -972,7 +978,9 @@ def test_outcomes_aggregate_dynamic_sampling(relay, mini_sentry): ], } - project_config["config"]["transactionMetrics"] = {"version": 1} + project_config["config"]["transactionMetrics"] = { + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION + } upstream = relay( mini_sentry, @@ -1064,7 +1072,9 @@ def test_graceful_shutdown(relay, mini_sentry): # Create project config project_id = 42 project_config = mini_sentry.add_full_project_config(project_id) - project_config["config"]["transactionMetrics"] = {"version": 1} + project_config["config"]["transactionMetrics"] = { + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION + } project_config["config"]["sampling"] = { "version": 2, "rules": [ @@ -1151,7 +1161,7 @@ def test_profile_outcomes( project_config.setdefault("features", []).append("organizations:profiling") project_config["transactionMetrics"] = { - "version": 1, + "version": TRANSACTION_EXTRACT_MAX_SUPPORTED_VERSION, } project_config["sampling"] = { "version": 2, @@ -1272,10 +1282,10 @@ def make_envelope(transaction_name): metrics = [ m for m, _ in metrics_consumer.get_metrics() - if m["name"] == "d:transactions/duration@millisecond" + if m["name"] == "c:transactions/usage@none" ] - assert len(metrics) == 2 assert all(metric["tags"]["has_profile"] == "true" for metric in metrics) + assert sum(metric["value"] for metric in metrics) == 2 assert outcomes == expected_outcomes, outcomes @@ -1299,7 +1309,7 @@ def test_profile_outcomes_invalid( project_config.setdefault("features", []).append("organizations:profiling") project_config["transactionMetrics"] = { - "version": 1, + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, } config = { @@ -1385,7 +1395,7 @@ def test_profile_outcomes_too_many( project_config.setdefault("features", []).append("organizations:profiling") project_config["transactionMetrics"] = { - "version": 1, + "version": TRANSACTION_EXTRACT_MAX_SUPPORTED_VERSION, } config = { @@ -1450,9 +1460,7 @@ def make_envelope(): # Make sure one profile will not be counted as accepted metrics = metrics_by_name(metrics_consumer, 4) - assert ( - metrics["d:transactions/duration@millisecond"]["tags"]["has_profile"] == "true" - ) + assert "has_profile" not in metrics["d:transactions/duration@millisecond"]["tags"] assert metrics["c:transactions/usage@none"]["tags"]["has_profile"] == "true" @@ -1473,7 +1481,7 @@ def test_profile_outcomes_data_invalid( project_config.setdefault("features", []).append("organizations:profiling") project_config["transactionMetrics"] = { - "version": 1, + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, } config = { @@ -1535,9 +1543,7 @@ def make_envelope(): # Because invalid data is detected _after_ metrics extraction, there is still a metric: metrics = metrics_by_name(metrics_consumer, 4) - assert ( - metrics["d:transactions/duration@millisecond"]["tags"]["has_profile"] == "true" - ) + assert "has_profile" not in metrics["d:transactions/duration@millisecond"]["tags"] assert metrics["c:transactions/usage@none"]["tags"]["has_profile"] == "true" @@ -1745,7 +1751,7 @@ def test_span_outcomes( ] ) project_config["transactionMetrics"] = { - "version": 1, + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, } project_config["sampling"] = { "version": 2, @@ -1869,7 +1875,7 @@ def test_span_outcomes_invalid( ] ) project_config["transactionMetrics"] = { - "version": 1, + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, } config = { @@ -1984,7 +1990,7 @@ def test_global_rate_limit_by_namespace( "scope": "global", "categories": ["metric_bucket"], "limit": metric_bucket_limit, - "window": int(datetime.now(UTC).timestamp()), + "window": int(datetime.now(UTC).timestamp()) - 1, "reasonCode": global_reason_code, }, { @@ -1993,13 +1999,13 @@ def test_global_rate_limit_by_namespace( "categories": ["metric_bucket"], "limit": transaction_limit, "namespace": "transactions", - "window": int(datetime.now(UTC).timestamp()), + "window": int(datetime.now(UTC).timestamp()) - 1, "reasonCode": transaction_reason_code, }, ] # Truncate the timestamp and add a slight offset to never be on the border of the rate limiting window. - ts = datetime.now(UTC).timestamp() // 100 * 100 + 50 + ts = datetime.now(UTC).timestamp() def send_buckets(n, name, value, ty): for i in range(n): diff --git a/tests/integration/test_spans.py b/tests/integration/test_spans.py index 4b14769d36..a7df1f190a 100644 --- a/tests/integration/test_spans.py +++ b/tests/integration/test_spans.py @@ -2,6 +2,7 @@ import uuid from collections import Counter from datetime import datetime, timedelta, timezone, UTC +from .consts import TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION import pytest from opentelemetry.proto.common.v1.common_pb2 import AnyValue, KeyValue @@ -43,7 +44,7 @@ def test_span_extraction( "organizations:indexed-spans-extraction", ] project_config["config"]["transactionMetrics"] = { - "version": 3, + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, } if discard_transaction: @@ -199,7 +200,7 @@ def test_span_extraction_with_sampling( "organizations:indexed-spans-extraction", ] project_config["config"]["transactionMetrics"] = { - "version": 3, + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, } spans_consumer = spans_consumer() @@ -243,7 +244,7 @@ def test_duplicate_performance_score(mini_sentry, relay): "organizations:indexed-spans-extraction", ] project_config["config"]["transactionMetrics"] = { - "version": 1, + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, } project_config["config"]["performanceScore"] = { "profiles": [ @@ -455,7 +456,9 @@ def test_span_ingestion( "projects:span-metrics-extraction", "projects:relay-otel-endpoint", ] - project_config["config"]["transactionMetrics"] = {"version": 1} + project_config["config"]["transactionMetrics"] = { + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION + } if extract_transaction: project_config["config"]["features"].append( "projects:extract-transaction-from-segment-span" @@ -1534,7 +1537,9 @@ def test_rate_limit_indexed_consistent_extracted( project_config = mini_sentry.add_full_project_config(project_id) # Span metrics won't be extracted without a supported transactionMetrics config. # Without extraction, the span is treated as `Span`, not `SpanIndexed`. - project_config["config"]["transactionMetrics"] = {"version": 3} + project_config["config"]["transactionMetrics"] = { + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION + } project_config["config"]["features"] = [ "projects:span-metrics-extraction", "organizations:indexed-spans-extraction", @@ -1803,7 +1808,9 @@ def test_dynamic_sampling( project_config["config"]["features"] = [ "organizations:standalone-span-ingestion", ] - project_config["config"]["transactionMetrics"] = {"version": 1} + project_config["config"]["transactionMetrics"] = { + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION + } sampling_config = mini_sentry.add_basic_project_config(43) sampling_public_key = sampling_config["publicKeys"][0]["publicKey"] diff --git a/tests/integration/test_store.py b/tests/integration/test_store.py index 36807724d4..bad0b3e076 100644 --- a/tests/integration/test_store.py +++ b/tests/integration/test_store.py @@ -7,6 +7,11 @@ from datetime import UTC, datetime, timedelta, timezone from time import sleep +from .consts import ( + TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, + TRANSACTION_EXTRACT_MAX_SUPPORTED_VERSION, +) + import pytest from flask import Response, abort from requests.exceptions import HTTPError @@ -603,7 +608,7 @@ def send_buckets(buckets): assert len(produced_buckets) == metric_bucket_limit -@pytest.mark.parametrize("violating_bucket", [[4.0, 5.0], [4.0, 5.0, 6.0]]) +@pytest.mark.parametrize("violating_bucket", [2, 3]) def test_rate_limit_metrics_buckets( mini_sentry, relay_with_processing, @@ -690,7 +695,7 @@ def send_buckets(buckets): send_buckets( [ # Duration metric, subtract 3 from quota - make_bucket("d:transactions/duration@millisecond", "d", [1, 2, 3]), + make_bucket("c:transactions/usage@none", "c", 3), ], ) send_buckets( @@ -701,9 +706,9 @@ def send_buckets(buckets): ) send_buckets( [ - # Duration metric, subtract from quota. This bucket is still accepted, but the rest + # Usage metric, subtract from quota. This bucket is still accepted, but the rest # will be exceeded. - make_bucket("d:transactions/duration@millisecond", "d", violating_bucket), + make_bucket("c:transactions/usage@none", "c", violating_bucket), ], ) send_buckets( @@ -714,8 +719,8 @@ def send_buckets(buckets): ) send_buckets( [ - # Another three for duration, won't make it into kafka. - make_bucket("d:transactions/duration@millisecond", "d", [7, 8, 9]), + # Another three for usage, won't make it into kafka. + make_bucket("c:transactions/usage@none", "c", 3), # Session metrics are still accepted. make_bucket("d:sessions/session@user", "s", [1254]), ], @@ -731,49 +736,49 @@ def send_buckets(buckets): assert produced_buckets == [ { - "name": "d:sessions/duration@second", + "name": "c:transactions/usage@none", "org_id": 1, - "project_id": 42, "retention_days": 90, + "project_id": 42, "tags": {}, - "type": "d", - "value": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], + "type": "c", + "value": violating_bucket, }, { - "name": "d:sessions/session@none", + "name": "c:transactions/usage@none", "org_id": 1, "retention_days": 90, "project_id": 42, "tags": {}, "type": "c", - "value": 1.0, + "value": 3, }, { - "name": "d:sessions/session@user", + "name": "d:sessions/duration@second", "org_id": 1, - "retention_days": 90, "project_id": 42, + "retention_days": 90, "tags": {}, - "type": "s", - "value": [1254], + "type": "d", + "value": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], }, { - "name": "d:transactions/duration@millisecond", + "name": "d:sessions/session@none", "org_id": 1, "retention_days": 90, "project_id": 42, "tags": {}, - "type": "d", - "value": [1.0, 2.0, 3.0], + "type": "c", + "value": 1.0, }, { - "name": "d:transactions/duration@millisecond", + "name": "d:sessions/session@user", "org_id": 1, "retention_days": 90, "project_id": 42, "tags": {}, - "type": "d", - "value": violating_bucket, + "type": "s", + "value": [1254], }, { "name": "d:transactions/measurements.lcp@millisecond", @@ -803,7 +808,13 @@ def send_buckets(buckets): ) -@pytest.mark.parametrize("extraction_version", [1, 3]) +@pytest.mark.parametrize( + "extraction_version", + [ + TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, + TRANSACTION_EXTRACT_MAX_SUPPORTED_VERSION, + ], +) def test_processing_quota_transaction_indexing( mini_sentry, relay_with_processing, diff --git a/tests/integration/test_unreal.py b/tests/integration/test_unreal.py index 9910cd2f22..99f1716cba 100644 --- a/tests/integration/test_unreal.py +++ b/tests/integration/test_unreal.py @@ -1,6 +1,7 @@ import os import pytest import json +from .consts import TRANSACTION_EXTRACT_MAX_SUPPORTED_VERSION def _load_dump_file(base_file_name: str): @@ -23,7 +24,7 @@ def test_unreal_crash(mini_sentry, relay, dump_file_name, extract_metrics): if extract_metrics: # regression: we dropped unreal events in customer relays while metrics extraction was on config["transactionMetrics"] = { - "version": 1, + "version": TRANSACTION_EXTRACT_MAX_SUPPORTED_VERSION, } unreal_content = _load_dump_file(dump_file_name)