From 890e1d16a98617d6396742bb1d08e175808ae59a Mon Sep 17 00:00:00 2001 From: David Herberth Date: Thu, 1 Aug 2024 09:28:01 +0200 Subject: [PATCH] ref(metrics): Remodel metric submission flow (#3867) Remove all the load of processing metrics from the project cache and move it to the processor. We slowly moved more and more logic into the project cache, which over time became quite expensive to execute. Especially for heavily batched metrics. To alleviate this problem, the project cache only enriches messages with project information and dispatches based on this information instead of also dealing with filtering/validating metrics and rate limiting metrics. Validation and rate limits are now enforced in the Processor. The old flow for Metrics: ```mermaid sequenceDiagram Endpoint->>Processor: ProcessMetrics Processor->>Project Cache: AddMetricBuckets activate Project Cache Note right of Project Cache: [check_buckets] Project Cache->>Aggregator: MergeBuckets deactivate Project Cache Aggregator->>Project Cache: FlushBuckets activate Project Cache Note right of Project Cache: [check_buckets] Project Cache->>Processor: EncodeMetrics deactivate Project Cache Processor->>Store: SendRequest ``` The new flow: ```mermaid sequenceDiagram participant Endpoint participant Processor participant Project Cache participant Aggregator participant Store Endpoint->>Project Cache: ProcessMetrics Project Cache->>Processor: ProcessProjectMetrics activate Processor Note right of Processor: [check_buckets] Processor->>Aggregator: MergeBuckets deactivate Processor Aggregator->>Project Cache: FlushBuckets Project Cache->>Processor: EncodeMetrics activate Processor Note right of Processor: [check_buckets] Processor->>Store: SendRequest deactivate Processor ``` Batched metrics just have a slightly adjusted flow, they go from the endpoint to the processor for parsing, then move forward with the same `ProcessMetrics` flow. --- relay-server/src/endpoints/common.rs | 10 +- relay-server/src/service.rs | 2 +- relay-server/src/services/processor.rs | 476 ++++++++++++------ .../{project => processor}/metrics.rs | 35 +- relay-server/src/services/project.rs | 321 ++---------- relay-server/src/services/project/state.rs | 22 +- .../src/services/project/state/info.rs | 14 + relay-server/src/services/project_cache.rs | 149 +++--- relay-server/src/services/project_local.rs | 1 + relay-server/src/testutils.rs | 2 + 10 files changed, 464 insertions(+), 568 deletions(-) rename relay-server/src/services/{project => processor}/metrics.rs (94%) diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 3309b7a0e4..20c8ccf211 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -11,8 +11,8 @@ use serde::Deserialize; use crate::envelope::{AttachmentType, Envelope, EnvelopeError, Item, ItemType, Items}; use crate::service::ServiceState; use crate::services::outcome::{DiscardReason, Outcome}; -use crate::services::processor::{ProcessMetricMeta, ProcessMetrics, ProcessingGroup}; -use crate::services::project_cache::{CheckEnvelope, ValidateEnvelope}; +use crate::services::processor::{MetricData, ProcessMetricMeta, ProcessingGroup}; +use crate::services::project_cache::{CheckEnvelope, ProcessMetrics, ValidateEnvelope}; use crate::statsd::{RelayCounters, RelayHistograms}; use crate::utils::{self, ApiErrorResponse, FormDataIter, ManagedEnvelope, MultipartError}; @@ -273,9 +273,9 @@ fn queue_envelope( if !metric_items.is_empty() { relay_log::trace!("sending metrics into processing queue"); - state.processor().send(ProcessMetrics { - items: metric_items.into_vec(), - start_time: envelope.meta().start_time(), + state.project_cache().send(ProcessMetrics { + data: MetricData::Raw(metric_items.into_vec()), + start_time: envelope.meta().start_time().into(), sent_at: envelope.sent_at(), project_key: envelope.meta().public_key(), source: envelope.meta().into(), diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 2925ee92b8..b5655562de 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -242,6 +242,7 @@ impl ServiceState { test_store: test_store.clone(), #[cfg(feature = "processing")] store_forwarder: store.clone(), + aggregator: aggregator.clone(), }, metric_outcomes.clone(), ) @@ -263,7 +264,6 @@ impl ServiceState { MemoryChecker::new(memory_stat.clone(), config.clone()), envelope_buffer.clone(), project_cache_services, - metric_outcomes, redis_pool.clone(), ) .spawn_handler(project_cache_rx); diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 6059347562..d2d95283b6 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1,5 +1,5 @@ use std::borrow::Cow; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::error::Error; use std::fmt::{Debug, Display}; use std::future::Future; @@ -36,7 +36,7 @@ use relay_metrics::{ use relay_pii::PiiConfigError; use relay_profiling::ProfileId; use relay_protocol::{Annotated, Value}; -use relay_quotas::{DataCategory, Scoping}; +use relay_quotas::{DataCategory, RateLimits, Scoping}; use relay_sampling::config::RuleId; use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision}; use relay_sampling::DynamicSamplingContext; @@ -47,7 +47,6 @@ use smallvec::{smallvec, SmallVec}; #[cfg(feature = "processing")] use { - crate::metrics::MetricsLimiter, crate::services::store::{Store, StoreEnvelope}, crate::utils::{sample, Enforcement, EnvelopeLimiter, ItemAction}, itertools::Itertools, @@ -57,7 +56,7 @@ use { }, relay_dynamic_config::{CardinalityLimiterMode, MetricExtractionGroups}, relay_metrics::RedisMetricMetaStore, - relay_quotas::{Quota, RateLimitingError, RateLimits, RedisRateLimiter}, + relay_quotas::{Quota, RateLimitingError, RedisRateLimiter}, relay_redis::RedisPool, std::iter::Chain, std::slice::Iter, @@ -66,16 +65,18 @@ use { use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemType}; use crate::extractors::{PartialDsn, RequestMeta}; -use crate::metrics::{MetricOutcomes, MinimalTrackableBucket}; +use crate::http; +use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket}; use crate::metrics_extraction::transactions::types::ExtractMetricsError; use crate::metrics_extraction::transactions::{ExtractedMetrics, TransactionExtractor}; use crate::service::ServiceError; use crate::services::global_config::GlobalConfigHandle; +use crate::services::metrics::{Aggregator, MergeBuckets}; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::services::processor::event::FiltersStatus; -use crate::services::project::ProjectInfo; +use crate::services::project::{ProjectInfo, ProjectState}; use crate::services::project_cache::{ - AddMetricBuckets, AddMetricMeta, BucketSource, ProjectCache, UpdateRateLimits, + AddMetricMeta, BucketSource, ProcessMetrics, ProjectCache, UpdateRateLimits, }; use crate::services::test_store::{Capture, TestStore}; use crate::services::upstream::{ @@ -86,11 +87,11 @@ use crate::utils::{ self, InvalidProcessingGroupType, ManagedEnvelope, SamplingResult, ThreadPool, TypedEnvelope, WorkerGroup, }; -use crate::{http, metrics}; mod attachment; mod dynamic_sampling; mod event; +mod metrics; mod profile; mod profile_chunk; mod replay; @@ -728,7 +729,7 @@ impl ProcessingExtractedMetrics { } } -fn send_metrics(metrics: ExtractedMetrics, envelope: &Envelope, project_cache: Addr) { +fn send_metrics(metrics: ExtractedMetrics, envelope: &Envelope, aggregator: &Addr) { let project_key = envelope.meta().public_key(); let ExtractedMetrics { @@ -737,7 +738,10 @@ fn send_metrics(metrics: ExtractedMetrics, envelope: &Envelope, project_cache: A } = metrics; if !project_metrics.is_empty() { - project_cache.send(AddMetricBuckets::internal(project_key, project_metrics)); + aggregator.send(MergeBuckets { + project_key, + buckets: project_metrics, + }); } if !sampling_metrics.is_empty() { @@ -748,10 +752,10 @@ fn send_metrics(metrics: ExtractedMetrics, envelope: &Envelope, project_cache: A // dependent_project_with_tracing -> metrics goes to root // root_project_with_tracing -> metrics goes to root == self let sampling_project_key = envelope.sampling_key().unwrap_or(project_key); - project_cache.send(AddMetricBuckets::internal( - sampling_project_key, - sampling_metrics, - )); + aggregator.send(MergeBuckets { + project_key: sampling_project_key, + buckets: sampling_metrics, + }); } } @@ -903,14 +907,23 @@ pub struct ProcessEnvelope { /// ignored independently. /// - For [`MetricBuckets`](ItemType::MetricBuckets), the entire list of buckets is parsed and /// dropped together on parsing failure. -/// - Other items will be ignored with an error message. +/// - Other envelope items will be ignored with an error message. /// /// Additionally, processing applies clock drift correction using the system clock of this Relay, if /// the Envelope specifies the [`sent_at`](Envelope::sent_at) header. #[derive(Debug)] -pub struct ProcessMetrics { +pub struct ProcessProjectMetrics { + /// The project state the metrics belong to. + /// + /// The project state can be pending, in which case cached rate limits + /// and other project specific operations are skipped and executed once + /// the project state becomes available. + pub project_state: ProjectState, + /// Currently active cached rate limits for this project. + pub rate_limits: RateLimits, + /// A list of metric items. - pub items: Vec, + pub data: MetricData, /// The target project. pub project_key: ProjectKey, /// Whether to keep or reset the metric metadata. @@ -922,6 +935,68 @@ pub struct ProcessMetrics { pub sent_at: Option>, } +/// Raw unparsed metric data. +#[derive(Debug)] +pub enum MetricData { + /// Raw data, unparsed envelope items. + Raw(Vec), + /// Already parsed buckets but unprocessed. + Parsed(Vec), +} + +impl MetricData { + /// Consumes the metric data and parses the contained buckets. + /// + /// If the contained data is already parsed the buckets are returned unchanged. + /// Raw buckets are parsed and created with the passed `timestamp`. + fn into_buckets(self, timestamp: UnixTimestamp) -> Vec { + let items = match self { + Self::Parsed(buckets) => return buckets, + Self::Raw(items) => items, + }; + + let mut buckets = Vec::new(); + for item in items { + let payload = item.payload(); + if item.ty() == &ItemType::Statsd { + for bucket_result in Bucket::parse_all(&payload, timestamp) { + match bucket_result { + Ok(bucket) => buckets.push(bucket), + Err(error) => relay_log::debug!( + error = &error as &dyn Error, + "failed to parse metric bucket from statsd format", + ), + } + } + } else if item.ty() == &ItemType::MetricBuckets { + match serde_json::from_slice::>(&payload) { + Ok(parsed_buckets) => { + // Re-use the allocation of `b` if possible. + if buckets.is_empty() { + buckets = parsed_buckets; + } else { + buckets.extend(parsed_buckets); + } + } + Err(error) => { + relay_log::debug!( + error = &error as &dyn Error, + "failed to parse metric bucket", + ); + metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1); + } + } + } else { + relay_log::error!( + "invalid item of type {} passed to ProcessMetrics", + item.ty() + ); + } + } + buckets + } +} + #[derive(Debug)] pub struct ProcessBatchedMetrics { /// Metrics payload in JSON format. @@ -950,6 +1025,8 @@ pub struct ProjectMetrics { pub buckets: Vec, /// Project info for extracting quotas. pub project_info: Arc, + /// Currently cached rate limits. + pub rate_limits: RateLimits, } /// Encodes metrics into an envelope ready to be sent upstream. @@ -990,7 +1067,7 @@ pub struct SubmitClientReports { #[derive(Debug)] pub enum EnvelopeProcessor { ProcessEnvelope(Box), - ProcessMetrics(Box), + ProcessProjectMetrics(Box), ProcessBatchedMetrics(Box), ProcessMetricMeta(Box), EncodeMetrics(Box), @@ -1004,7 +1081,7 @@ impl EnvelopeProcessor { pub fn variant(&self) -> &'static str { match self { EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope", - EnvelopeProcessor::ProcessMetrics(_) => "ProcessMetrics", + EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics", EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics", EnvelopeProcessor::ProcessMetricMeta(_) => "ProcessMetricMeta", EnvelopeProcessor::EncodeMetrics(_) => "EncodeMetrics", @@ -1025,11 +1102,11 @@ impl FromMessage for EnvelopeProcessor { } } -impl FromMessage for EnvelopeProcessor { +impl FromMessage for EnvelopeProcessor { type Response = NoResponse; - fn from_message(message: ProcessMetrics, _: ()) -> Self { - Self::ProcessMetrics(Box::new(message)) + fn from_message(message: ProcessProjectMetrics, _: ()) -> Self { + Self::ProcessProjectMetrics(Box::new(message)) } } @@ -1097,6 +1174,7 @@ pub struct Addrs { pub test_store: Addr, #[cfg(feature = "processing")] pub store_forwarder: Option>, + pub aggregator: Addr, } impl Default for Addrs { @@ -1108,6 +1186,7 @@ impl Default for Addrs { test_store: Addr::dummy(), #[cfg(feature = "processing")] store_forwarder: None, + aggregator: Addr::dummy(), } } } @@ -2004,7 +2083,7 @@ impl EnvelopeProcessorService { send_metrics( state.extracted_metrics, state.managed_envelope.envelope(), - self.inner.addrs.project_cache.clone(), + &self.inner.addrs.aggregator, ); let envelope_response = if state.managed_envelope.envelope().is_empty() { @@ -2058,63 +2137,28 @@ impl EnvelopeProcessorService { } } - fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) { - let ProcessMetrics { - items, + fn handle_process_project_metrics(&self, cogs: &mut Token, message: ProcessProjectMetrics) { + let ProcessProjectMetrics { + project_state, + rate_limits, + data, project_key, start_time, sent_at, source, } = message; - let received = relay_common::time::instant_to_date_time(start_time); let received_timestamp = UnixTimestamp::from_instant(start_time); - let clock_drift_processor = - ClockDriftProcessor::new(sent_at, received).at_least(MINIMUM_CLOCK_DRIFT); - - let mut buckets = Vec::new(); - for item in items { - let payload = item.payload(); - if item.ty() == &ItemType::Statsd { - for bucket_result in Bucket::parse_all(&payload, received_timestamp) { - match bucket_result { - Ok(bucket) => buckets.push(bucket), - Err(error) => relay_log::debug!( - error = &error as &dyn Error, - "failed to parse metric bucket from statsd format", - ), - } - } - } else if item.ty() == &ItemType::MetricBuckets { - match serde_json::from_slice::>(&payload) { - Ok(parsed_buckets) => { - // Re-use the allocation of `b` if possible. - if buckets.is_empty() { - buckets = parsed_buckets; - } else { - buckets.extend(parsed_buckets); - } - } - Err(error) => { - relay_log::debug!( - error = &error as &dyn Error, - "failed to parse metric bucket", - ); - metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1); - } - } - } else { - relay_log::error!( - "invalid item of type {} passed to ProcessMetrics", - item.ty() - ); - } - } - + let mut buckets = data.into_buckets(received_timestamp); if buckets.is_empty() { return; }; + cogs.update(relay_metrics::cogs::BySize(&buckets)); + + let received = relay_common::time::instant_to_date_time(start_time); + let clock_drift_processor = + ClockDriftProcessor::new(sent_at, received).at_least(MINIMUM_CLOCK_DRIFT); for bucket in &mut buckets { clock_drift_processor.process_timestamp(&mut bucket.timestamp); @@ -2123,14 +2167,22 @@ impl EnvelopeProcessorService { } } - cogs.update(relay_metrics::cogs::BySize(&buckets)); + let buckets = self::metrics::filter_namespaces(buckets, source); - relay_log::trace!("merging metric buckets into project cache"); - self.inner.addrs.project_cache.send(AddMetricBuckets { - project_key, - buckets, - source, - }); + // Best effort check to filter and rate limit buckets, if there is no project state + // available at the current time, we will check again after flushing. + let buckets = match project_state.enabled() { + Some(project_info) => { + self.check_buckets(project_key, &project_info, &rate_limits, buckets) + } + None => buckets, + }; + + relay_log::trace!("merging metric buckets into the aggregator"); + self.inner + .addrs + .aggregator + .send(MergeBuckets::new(project_key, buckets)); } fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) { @@ -2141,12 +2193,6 @@ impl EnvelopeProcessorService { sent_at, } = message; - let received = relay_common::time::instant_to_date_time(start_time); - let received_timestamp = UnixTimestamp::from_instant(start_time); - - let clock_drift_processor = - ClockDriftProcessor::new(sent_at, received).at_least(MINIMUM_CLOCK_DRIFT); - #[derive(serde::Deserialize)] struct Wrapper { buckets: HashMap>, @@ -2165,25 +2211,15 @@ impl EnvelopeProcessorService { }; let mut feature_weights = FeatureWeights::none(); - for (project_key, mut buckets) in buckets { - if buckets.is_empty() { - continue; - } - - for bucket in &mut buckets { - clock_drift_processor.process_timestamp(&mut bucket.timestamp); - if !matches!(source, BucketSource::Internal) { - bucket.metadata = BucketMetadata::new(received_timestamp); - } - } - + for (project_key, buckets) in buckets { feature_weights = feature_weights.merge(relay_metrics::cogs::BySize(&buckets).into()); - relay_log::trace!("merging metric buckets into project cache"); - self.inner.addrs.project_cache.send(AddMetricBuckets { + self.inner.addrs.project_cache.send(ProcessMetrics { + data: MetricData::Parsed(buckets), project_key, - buckets, source, + start_time: start_time.into(), + sent_at, }); } @@ -2309,6 +2345,69 @@ impl EnvelopeProcessorService { }); } + fn check_buckets( + &self, + project_key: ProjectKey, + project_info: &ProjectInfo, + rate_limits: &RateLimits, + buckets: Vec, + ) -> Vec { + let Some(scoping) = project_info.scoping(project_key) else { + relay_log::error!( + tags.project_key = project_key.as_str(), + "there is no scoping: dropping {} buckets", + buckets.len(), + ); + return Vec::new(); + }; + + let mut buckets = self::metrics::apply_project_info( + buckets, + &self.inner.metric_outcomes, + project_info, + scoping, + ); + + let namespaces: BTreeSet = buckets + .iter() + .filter_map(|bucket| bucket.name.try_namespace()) + .collect(); + + for namespace in namespaces { + let limits = rate_limits.check_with_quotas( + project_info.get_quotas(), + scoping.item(DataCategory::MetricBucket), + ); + + if limits.is_limited() { + let rejected; + (buckets, rejected) = utils::split_off(buckets, |bucket| { + bucket.name.try_namespace() == Some(namespace) + }); + + let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone()); + self.inner.metric_outcomes.track( + scoping, + &rejected, + Outcome::RateLimited(reason_code), + ); + } + } + + let quotas = project_info.config.quotas.clone(); + match MetricsLimiter::create(buckets, quotas, scoping) { + Ok(mut bucket_limiter) => { + bucket_limiter.enforce_limits( + rate_limits, + &self.inner.metric_outcomes, + &self.inner.addrs.outcome_aggregator, + ); + bucket_limiter.into_buckets() + } + Err(buckets) => buckets, + } + } + #[cfg(feature = "processing")] fn rate_limit_buckets( &self, @@ -2529,6 +2628,7 @@ impl EnvelopeProcessorService { let ProjectMetrics { buckets, project_info, + rate_limits: _, } = message; let buckets = self.rate_limit_buckets(scoping, &project_info, buckets); @@ -2589,7 +2689,7 @@ impl EnvelopeProcessorService { let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone())); let mut item = Item::new(ItemType::MetricBuckets); - item.set_source_quantities(metrics::extract_quantities(batch)); + item.set_source_quantities(crate::metrics::extract_quantities(batch)); item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap()); envelope.add_item(item); @@ -2694,7 +2794,17 @@ impl EnvelopeProcessorService { self.send_global_partition(partition_key, &mut partition); } - fn handle_encode_metrics(&self, message: EncodeMetrics) { + fn handle_encode_metrics(&self, mut message: EncodeMetrics) { + for (scoping, pm) in message.scopes.iter_mut() { + let buckets = std::mem::take(&mut pm.buckets); + pm.buckets = self.check_buckets( + scoping.project_key, + &pm.project_info, + &pm.rate_limits, + buckets, + ); + } + #[cfg(feature = "processing")] if self.inner.config.processing_enabled() { if let Some(ref store_forwarder) = self.inner.addrs.store_forwarder { @@ -2771,7 +2881,9 @@ impl EnvelopeProcessorService { match message { EnvelopeProcessor::ProcessEnvelope(m) => self.handle_process_envelope(*m), - EnvelopeProcessor::ProcessMetrics(m) => self.handle_process_metrics(&mut cogs, *m), + EnvelopeProcessor::ProcessProjectMetrics(m) => { + self.handle_process_project_metrics(&mut cogs, *m) + } EnvelopeProcessor::ProcessBatchedMetrics(m) => { self.handle_process_batched_metrics(&mut cogs, *m) } @@ -2787,7 +2899,7 @@ impl EnvelopeProcessorService { fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights { match message { EnvelopeProcessor::ProcessEnvelope(v) => AppFeature::from(v.envelope.group()).into(), - EnvelopeProcessor::ProcessMetrics(_) => AppFeature::Unattributed.into(), + EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(), EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(), EnvelopeProcessor::ProcessMetricMeta(_) => AppFeature::MetricMeta.into(), EnvelopeProcessor::EncodeMetrics(v) => v @@ -3165,6 +3277,7 @@ impl<'a> IntoIterator for CombinedQuotas<'a> { mod tests { use std::env; + use insta::assert_debug_snapshot; use relay_base_schema::metrics::{DurationUnit, MetricUnit}; use relay_common::glob2::LazyGlob; use relay_dynamic_config::{ExtrapolationConfig, MetricExtractionConfig, ProjectConfig}; @@ -3261,6 +3374,7 @@ mod tests { width: 10, metadata: BucketMetadata::default(), }], + rate_limits: Default::default(), project_info, }; @@ -3627,11 +3741,11 @@ mod tests { let start_time = Instant::now(); let config = Config::default(); - let (project_cache, mut project_cache_rx) = Addr::custom(); + let (aggregator, mut aggregator_rx) = Addr::custom(); let processor = create_test_processor_with_addrs( config, Addrs { - project_cache, + aggregator, ..Default::default() }, ); @@ -3648,17 +3762,19 @@ mod tests { ), (BucketSource::Internal, None), ] { - let message = ProcessMetrics { - items: vec![item.clone()], + let message = ProcessProjectMetrics { + data: MetricData::Raw(vec![item.clone()]), + project_state: ProjectState::Pending, + rate_limits: Default::default(), project_key, source, start_time, sent_at: Some(Utc::now()), }; - processor.handle_process_metrics(&mut token, message); + processor.handle_process_project_metrics(&mut token, message); - let value = project_cache_rx.recv().await.unwrap(); - let ProjectCache::AddMetricBuckets(merge_buckets) = value else { + let value = aggregator_rx.recv().await.unwrap(); + let Aggregator::MergeBuckets(merge_buckets) = value else { panic!() }; let buckets = merge_buckets.buckets; @@ -3668,7 +3784,7 @@ mod tests { } #[tokio::test] - async fn test_process_batched_metrics_bucket_metadata() { + async fn test_process_batched_metrics() { let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed); let start_time = Instant::now(); let config = Config::default(); @@ -3682,89 +3798,119 @@ mod tests { }, ); - let payload_no_metadata = r#"{ + let payload = r#"{ "buckets": { - "a94ae32be2584e0bbd7a4cbb95971fee": [ + "11111111111111111111111111111111": [ { "timestamp": 1615889440, "width": 0, "name": "d:custom/endpoint.response_time@millisecond", "type": "d", "value": [ - 36.0, - 49.0, - 57.0, 68.0 ], "tags": { "route": "user_index" } } - ] - } -} -"#; - - let payload_metadata = r#"{ - "buckets": { - "a94ae32be2584e0bbd7a4cbb95971fee": [ + ], + "22222222222222222222222222222222": [ { "timestamp": 1615889440, "width": 0, - "name": "d:custom/endpoint.response_time@millisecond", + "name": "d:custom/endpoint.cache_rate@none", "type": "d", "value": [ - 36.0, - 49.0, - 57.0, - 68.0 - ], - "tags": { - "route": "user_index" - }, - "metadata": { - "merges": 1, - "received_at": 1615889440 - } + 36.0 + ] } ] } } "#; - for (source, payload, expected_received_at) in [ - ( - BucketSource::External, - payload_no_metadata, - Some(UnixTimestamp::from_instant(start_time)), - ), - (BucketSource::Internal, payload_no_metadata, None), + let message = ProcessBatchedMetrics { + payload: Bytes::from(payload), + source: BucketSource::Internal, + start_time, + sent_at: Some(Utc::now()), + }; + processor.handle_process_batched_metrics(&mut token, message); + + let value = project_cache_rx.recv().await.unwrap(); + let ProjectCache::ProcessMetrics(pm1) = value else { + panic!() + }; + let value = project_cache_rx.recv().await.unwrap(); + let ProjectCache::ProcessMetrics(pm2) = value else { + panic!() + }; + + let mut messages = vec![pm1, pm2]; + messages.sort_by_key(|pm| pm.project_key); + + let actual = messages + .into_iter() + .map(|pm| (pm.project_key, pm.data, pm.source)) + .collect::>(); + + assert_debug_snapshot!(actual, @r###" + [ ( - BucketSource::External, - payload_metadata, - Some(UnixTimestamp::from_instant(start_time)), + ProjectKey("11111111111111111111111111111111"), + Parsed( + [ + Bucket { + timestamp: UnixTimestamp(1615889440), + width: 0, + name: MetricName( + "d:custom/endpoint.response_time@millisecond", + ), + value: Distribution( + [ + 68.0, + ], + ), + tags: { + "route": "user_index", + }, + metadata: BucketMetadata { + merges: 1, + received_at: None, + extracted_from_indexed: false, + }, + }, + ], + ), + Internal, ), ( - BucketSource::Internal, - payload_metadata, - Some(UnixTimestamp::from_secs(1615889440)), + ProjectKey("22222222222222222222222222222222"), + Parsed( + [ + Bucket { + timestamp: UnixTimestamp(1615889440), + width: 0, + name: MetricName( + "d:custom/endpoint.cache_rate@none", + ), + value: Distribution( + [ + 36.0, + ], + ), + tags: {}, + metadata: BucketMetadata { + merges: 1, + received_at: None, + extracted_from_indexed: false, + }, + }, + ], + ), + Internal, ), - ] { - let message = ProcessBatchedMetrics { - payload: Bytes::from(payload), - source, - start_time, - sent_at: Some(Utc::now()), - }; - processor.handle_process_batched_metrics(&mut token, message); - - let value = project_cache_rx.recv().await.unwrap(); - let ProjectCache::AddMetricBuckets(merge_buckets) = value else { - panic!() - }; - let buckets = merge_buckets.buckets; - assert_eq!(buckets.len(), 1); - assert_eq!(buckets[0].metadata.received_at, expected_received_at); - } + ] + "###); } #[test] diff --git a/relay-server/src/services/project/metrics.rs b/relay-server/src/services/processor/metrics.rs similarity index 94% rename from relay-server/src/services/project/metrics.rs rename to relay-server/src/services/processor/metrics.rs index 4fa31ab4a6..7d330a322e 100644 --- a/relay-server/src/services/project/metrics.rs +++ b/relay-server/src/services/processor/metrics.rs @@ -8,6 +8,11 @@ use crate::services::outcome::Outcome; use crate::services::project::ProjectInfo; use crate::services::project_cache::BucketSource; +/// Filters buckets based on their namespace. +/// +/// This is a noop for most namespaces except: +/// - [`MetricNamespace::Unsupported`]: Equal to invalid/unknown namespaces. +/// - [`MetricNamespace::Stats`]: Metric stats are only allowed if the `source` is [`BucketSource::Internal`]. pub fn filter_namespaces(mut buckets: Vec, source: BucketSource) -> Vec { buckets.retain(|bucket| match bucket.name.namespace() { MetricNamespace::Sessions => true, @@ -111,6 +116,17 @@ mod tests { use super::*; + fn create_custom_bucket_with_name(name: String) -> Bucket { + Bucket { + name: format!("d:custom/{name}@byte").into(), + value: BucketValue::Counter(1.into()), + timestamp: UnixTimestamp::now(), + tags: Default::default(), + width: 10, + metadata: Default::default(), + } + } + fn get_test_bucket(name: &str, tags: BTreeMap) -> Bucket { let json = serde_json::json!({ "timestamp": 1615889440, @@ -149,24 +165,13 @@ mod tests { assert_eq!(bucket.tags.len(), 1); } - fn create_custom_bucket_with_name(name: String) -> Bucket { - Bucket { - name: format!("d:custom/{name}@byte").into(), - value: BucketValue::Counter(1.into()), - timestamp: UnixTimestamp::now(), - tags: Default::default(), - width: 10, - metadata: Default::default(), - } - } - #[test] - fn test_apply_project_state() { + fn test_apply_project_info() { let (outcome_aggregator, _) = Addr::custom(); let (metric_stats, mut metric_stats_rx) = MetricStats::test(); let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator); - let project_state = ProjectInfo { + let project_info = ProjectInfo { config: serde_json::from_value(serde_json::json!({ "metrics": { "deniedNames": ["*cpu_time*"] }, "features": ["organizations:custom-metrics"] @@ -182,7 +187,7 @@ mod tests { let buckets = apply_project_info( buckets, &metric_outcomes, - &project_state, + &project_info, Scoping { organization_id: 42, project_id: ProjectId::new(43), @@ -207,7 +212,7 @@ mod tests { } #[test] - fn test_apply_project_state_with_disabled_custom_namespace() { + fn test_apply_project_info_with_disabled_custom_namespace() { let (outcome_aggregator, _) = Addr::custom(); let (metric_stats, mut metric_stats_rx) = MetricStats::test(); let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator); diff --git a/relay-server/src/services/project.rs b/relay-server/src/services/project.rs index c33651e826..5359773e54 100644 --- a/relay-server/src/services/project.rs +++ b/relay-server/src/services/project.rs @@ -1,11 +1,10 @@ -use std::collections::BTreeSet; use std::sync::Arc; use std::time::Duration; use relay_base_schema::project::ProjectKey; use relay_config::Config; use relay_dynamic_config::{ErrorBoundary, Feature}; -use relay_metrics::{Bucket, MetaAggregator, MetricMeta, MetricNamespace}; +use relay_metrics::{Bucket, MetaAggregator, MetricMeta}; use relay_quotas::{CachedRateLimits, DataCategory, MetricNamespaceScoping, RateLimits, Scoping}; use relay_sampling::evaluation::ReservoirCounters; use relay_statsd::metric; @@ -14,19 +13,18 @@ use serde::{Deserialize, Serialize}; use tokio::time::Instant; use crate::envelope::ItemType; -use crate::metrics::{MetricOutcomes, MetricsLimiter}; use crate::services::metrics::{Aggregator, MergeBuckets}; -use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; -use crate::services::processor::{EncodeMetricMeta, EnvelopeProcessor}; -use crate::services::project::metrics::{apply_project_info, filter_namespaces}; +use crate::services::outcome::{DiscardReason, Outcome}; +use crate::services::processor::{EncodeMetricMeta, EnvelopeProcessor, ProcessProjectMetrics}; use crate::services::project::state::ExpiryState; -use crate::services::project_cache::{BucketSource, CheckedEnvelope, ProjectCache, RequestUpdate}; +use crate::services::project_cache::{ + CheckedEnvelope, ProcessMetrics, ProjectCache, RequestUpdate, +}; use crate::utils::{Enforcement, SeqCount}; use crate::statsd::RelayCounters; -use crate::utils::{self, EnvelopeLimiter, ManagedEnvelope, RetryBackoff}; +use crate::utils::{EnvelopeLimiter, ManagedEnvelope, RetryBackoff}; -mod metrics; pub mod state; pub use state::{ @@ -119,10 +117,16 @@ impl Project { self.reservoir_counters.clone() } - fn current_state(&self) -> ProjectState { + /// Returns the current [`ProjectState`] attached to the project. + pub fn current_state(&self) -> ProjectState { self.state.current_state(&self.config) } + /// Returns the currently active cached rate limits. + pub fn current_rate_limits(&mut self) -> &RateLimits { + self.rate_limits.current_limits() + } + /// If a reservoir rule is no longer in the sampling config, we will remove those counters. fn remove_expired_reservoir_rules(&self) { let Some(state) = self.current_state().enabled() else { @@ -160,30 +164,21 @@ impl Project { self.last_updated_at = Instant::now(); } - /// Validates and inserts given [buckets](Bucket) into the metrics aggregator. - /// - /// The buckets will be keyed underneath this project key. - pub fn merge_buckets( - &mut self, - aggregator: &Addr, - metric_outcomes: &MetricOutcomes, - outcome_aggregator: &Addr, - buckets: Vec, - source: BucketSource, - ) { - // Best effort check for rate limits and project state. Continue if there is no project state. - let buckets = match self.check_buckets(metric_outcomes, outcome_aggregator, buckets) { - CheckedBuckets::NoProject(buckets) => buckets, - CheckedBuckets::Checked { buckets, .. } => buckets, - CheckedBuckets::Dropped => return, - }; + /// Collects internal project state and assembles a [`ProcessProjectMetrics`] message. + pub fn process_metrics(&mut self, message: ProcessMetrics) -> ProcessProjectMetrics { + let project_state = self.current_state(); + let rate_limits = self.rate_limits.current_limits().clone(); - let buckets = filter_namespaces(buckets, source); + ProcessProjectMetrics { + project_state, + rate_limits, - aggregator.send(MergeBuckets::new( - self.project_key, - buckets.into_iter().collect(), - )); + data: message.data, + project_key: message.project_key, + source: message.source, + start_time: message.start_time.into(), + sent_at: message.sent_at, + } } /// Returns a list of buckets back to the aggregator. @@ -419,8 +414,9 @@ impl Project { /// point. Therefore, this method is useful to trigger an update early if it is already clear /// that the project state will be needed soon. To retrieve an updated state, use /// [`Project::get_state`] instead. - pub fn prefetch(&mut self, project_cache: Addr, no_cache: bool) { + pub fn prefetch(&mut self, project_cache: Addr, no_cache: bool) -> &mut Self { self.get_cached_state(project_cache, no_cache); + self } /// Replaces the internal project state with a new one and triggers pending actions. @@ -501,15 +497,7 @@ impl Project { /// Returns `Some` if the project state has been fetched and contains a project identifier, /// otherwise `None`. pub fn scoping(&self) -> Option { - let info = self.current_state().enabled()?; - Some(Scoping { - organization_id: info.organization_id.unwrap_or(0), - project_id: info.project_id?, - project_key: self.project_key, - key_id: info - .get_public_key_config() - .and_then(|config| config.numeric_id), - }) + self.current_state().scoping(self.project_key) } /// Runs the checks on incoming envelopes. @@ -595,78 +583,6 @@ impl Project { rate_limits, }) } - - /// Drops metrics buckets if they are not allowed for this project. - /// - /// Reasons for dropping can be rate limits or a disabled project. - pub fn check_buckets( - &mut self, - metric_outcomes: &MetricOutcomes, - outcome_aggregator: &Addr, - buckets: Vec, - ) -> CheckedBuckets { - let project_info = match self.current_state() { - ProjectState::Enabled(info) => info.clone(), - ProjectState::Disabled => { - relay_log::debug!("dropping {} buckets for disabled project", buckets.len()); - return CheckedBuckets::Dropped; - } - ProjectState::Pending => return CheckedBuckets::NoProject(buckets), - }; - - let Some(scoping) = self.scoping() else { - relay_log::error!( - tags.project_key = self.project_key.as_str(), - "there is no scoping: dropping {} buckets", - buckets.len(), - ); - return CheckedBuckets::Dropped; - }; - - let mut buckets = apply_project_info(buckets, metric_outcomes, &project_info, scoping); - - let namespaces: BTreeSet = buckets - .iter() - .filter_map(|bucket| bucket.name.try_namespace()) - .collect(); - - let current_limits = self.rate_limits.current_limits(); - for namespace in namespaces { - let limits = current_limits.check_with_quotas( - project_info.get_quotas(), - scoping.item(DataCategory::MetricBucket), - ); - - if limits.is_limited() { - let rejected; - (buckets, rejected) = utils::split_off(buckets, |bucket| { - bucket.name.try_namespace() == Some(namespace) - }); - - let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone()); - metric_outcomes.track(scoping, &rejected, Outcome::RateLimited(reason_code)); - } - } - - let quotas = project_info.config.quotas.clone(); - let buckets = match MetricsLimiter::create(buckets, quotas, scoping) { - Ok(mut bucket_limiter) => { - bucket_limiter.enforce_limits(current_limits, metric_outcomes, outcome_aggregator); - bucket_limiter.into_buckets() - } - Err(buckets) => buckets, - }; - - if buckets.is_empty() { - return CheckedBuckets::Dropped; - } - - CheckedBuckets::Checked { - scoping, - project_info, - buckets, - } - } } /// Adds category limits for the nested spans inside a transaction. @@ -712,40 +628,13 @@ fn count_nested_spans(envelope: &ManagedEnvelope) -> usize { .map_or(0, |event| event.spans.0 + 1) } -/// Return value of [`Project::check_buckets`]. -#[derive(Debug)] -pub enum CheckedBuckets { - /// There is no project state available for these metrics yet. - /// - /// The metrics should be returned to the aggregator until the project state becomes available. - NoProject(Vec), - /// The buckets have been validated and can be processed. - Checked { - /// Project scoping. - scoping: Scoping, - /// Project info. - project_info: Arc, - /// List of buckets. - buckets: Vec, - }, - /// All buckets have been dropped. - /// - /// Can happen for multiple reasons: - /// - The project is disabled or not valid. - /// - All metrics have been filtered. - Dropped, -} - #[cfg(test)] mod tests { use crate::envelope::{ContentType, Envelope, Item}; use crate::extractors::RequestMeta; - use crate::metrics::MetricStats; use crate::services::processor::ProcessingGroup; use relay_base_schema::project::ProjectId; - use relay_common::time::UnixTimestamp; use relay_event_schema::protocol::EventId; - use relay_metrics::BucketValue; use relay_test::mock_service; use serde_json::json; use smallvec::SmallVec; @@ -865,156 +754,6 @@ mod tests { project } - fn create_metric(name: &str) -> Bucket { - Bucket { - name: name.into(), - width: 0, - value: BucketValue::counter(1.into()), - timestamp: UnixTimestamp::from_secs(1000), - tags: Default::default(), - metadata: Default::default(), - } - } - - #[test] - fn test_check_buckets_no_project() { - let (outcome_aggregator, _) = Addr::custom(); - let (metric_stats, mut metric_stats_rx) = MetricStats::test(); - let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator.clone()); - - let mut project = create_project(None); - project.state = ProjectFetchState::pending(); - let buckets = vec![create_metric("d:transactions/foo")]; - let cb = project.check_buckets(&metric_outcomes, &outcome_aggregator, buckets.clone()); - - match cb { - CheckedBuckets::NoProject(b) => { - assert_eq!(b, buckets) - } - cb => panic!("{cb:?}"), - } - - drop(metric_outcomes); - assert!(metric_stats_rx.blocking_recv().is_none()); - } - - #[test] - fn test_check_buckets_rate_limit() { - let (outcome_aggregator, _) = Addr::custom(); - let (metric_stats, mut metric_stats_rx) = MetricStats::test(); - let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator.clone()); - - let mut project = create_project(None); - let buckets = vec![create_metric("d:transactions/foo")]; - let cb = project.check_buckets(&metric_outcomes, &outcome_aggregator, buckets.clone()); - - match cb { - CheckedBuckets::Checked { - scoping, - project_info: _, - buckets: b, - } => { - assert_eq!(scoping, project.scoping().unwrap()); - assert_eq!(b, buckets) - } - cb => panic!("{cb:?}"), - } - - drop(metric_outcomes); - assert!(metric_stats_rx.blocking_recv().is_none()); - } - - #[test] - fn test_check_buckets_rate_limit_no_quota() { - let (outcome_aggregator, _) = Addr::custom(); - let (metric_stats, mut metric_stats_rx) = MetricStats::test(); - let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator.clone()); - - let mut project = create_project(Some(json!({ - "quotas": [{ - "id": "foo", - "categories": ["transaction"], - "window": 3600, - "limit": 0, - "reasonCode": "foo", - }] - }))); - let cb = project.check_buckets( - &metric_outcomes, - &outcome_aggregator, - vec![create_metric("d:transactions/foo")], - ); - - assert!(matches!(cb, CheckedBuckets::Dropped)); - - drop(metric_outcomes); - assert!(metric_stats_rx.blocking_recv().is_none()); - } - - #[test] - fn test_check_buckets_rate_limit_mixed_no_quota() { - let (outcome_aggregator, _) = Addr::custom(); - let (metric_stats, mut metric_stats_rx) = MetricStats::test(); - let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator.clone()); - - let mut project = create_project(Some(json!({ - "quotas": [{ - "id": "foo", - "categories": ["transaction"], - "window": 3600, - "limit": 0, - "reasonCode": "foo", - }] - }))); - let cb = project.check_buckets( - &metric_outcomes, - &outcome_aggregator, - vec![ - create_metric("d:transactions/foo"), - create_metric("d:profiles/foo"), - ], - ); - - match cb { - CheckedBuckets::Checked { - scoping, - project_info: _, - buckets, - } => { - assert_eq!(scoping, project.scoping().unwrap()); - assert_eq!(buckets, vec![create_metric("d:profiles/foo")]) - } - cb => panic!("{cb:?}"), - } - - drop(metric_outcomes); - assert!(metric_stats_rx.blocking_recv().is_none()); - } - - #[test] - fn test_check_buckets_project_state_filter() { - let (outcome_aggregator, _) = Addr::custom(); - let (metric_stats, mut metric_stats_rx) = MetricStats::test(); - let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator.clone()); - - let mut project = create_project(None); - let cb = project.check_buckets( - &metric_outcomes, - &outcome_aggregator, - vec![create_metric("d:custom/foo")], - ); - - assert!(matches!(cb, CheckedBuckets::Dropped)); - - drop(metric_outcomes); - let value = metric_stats_rx.blocking_recv().unwrap(); - let Aggregator::MergeBuckets(merge_buckets) = value else { - panic!(); - }; - assert_eq!(merge_buckets.buckets.len(), 1); - assert!(metric_stats_rx.blocking_recv().is_none()); - } - fn request_meta() -> RequestMeta { let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" .parse() diff --git a/relay-server/src/services/project/state.rs b/relay-server/src/services/project/state.rs index 9c7ea90acc..6c770b2d5b 100644 --- a/relay-server/src/services/project/state.rs +++ b/relay-server/src/services/project/state.rs @@ -3,12 +3,15 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; -pub use fetch_state::{ExpiryState, ProjectFetchState}; -pub use info::{LimitedProjectInfo, ProjectInfo}; +use relay_base_schema::project::ProjectKey; +use relay_quotas::Scoping; mod fetch_state; mod info; +pub use self::fetch_state::{ExpiryState, ProjectFetchState}; +pub use self::info::{LimitedProjectInfo, ProjectInfo}; + /// Representation of a project's current state. #[derive(Clone, Debug)] pub enum ProjectState { @@ -43,12 +46,23 @@ impl ProjectState { } /// Utility function that returns the project config if enabled. - pub fn enabled(&self) -> Option> { + pub fn enabled(self) -> Option> { match self { - ProjectState::Enabled(info) => Some(Arc::clone(info)), + ProjectState::Enabled(info) => Some(info), ProjectState::Disabled | ProjectState::Pending => None, } } + + /// Creates `Scoping` for this project if the state is loaded. + /// + /// Returns `Some` if the project state has been fetched and contains a project identifier, + /// otherwise `None`. + pub fn scoping(&self, project_key: ProjectKey) -> Option { + match self { + Self::Enabled(info) => info.scoping(project_key), + _ => None, + } + } } impl From for ProjectState { diff --git a/relay-server/src/services/project/state/info.rs b/relay-server/src/services/project/state/info.rs index 9e97a73aaf..08e62b62fe 100644 --- a/relay-server/src/services/project/state/info.rs +++ b/relay-server/src/services/project/state/info.rs @@ -69,6 +69,20 @@ impl ProjectInfo { self.public_keys.first() } + /// Creates `Scoping` for this project. + /// + /// Returns `Some` if the project contains a project identifier otherwise `None`. + pub fn scoping(&self, project_key: ProjectKey) -> Option { + Some(Scoping { + organization_id: self.organization_id.unwrap_or(0), + project_id: self.project_id?, + project_key, + key_id: self + .get_public_key_config() + .and_then(|config| config.numeric_id), + }) + } + /// Returns the project config. pub fn config(&self) -> &ProjectConfig { &self.config diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 8d506063da..3da280feff 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -4,12 +4,15 @@ use std::sync::Arc; use std::time::Duration; use crate::extractors::RequestMeta; -use crate::metrics::MetricOutcomes; use crate::services::buffer::{EnvelopeBufferError, GuardedEnvelopeBuffer, Peek}; +use crate::services::processor::{ + EncodeMetrics, EnvelopeProcessor, MetricData, ProcessEnvelope, ProcessingGroup, ProjectMetrics, +}; +use chrono::{DateTime, Utc}; use hashbrown::HashSet; use relay_base_schema::project::ProjectKey; use relay_config::{Config, RelayMode}; -use relay_metrics::{Bucket, MetricMeta}; +use relay_metrics::MetricMeta; use relay_quotas::RateLimits; use relay_redis::RedisPool; use relay_statsd::metric; @@ -22,12 +25,7 @@ use tokio::time::Instant; use crate::services::global_config::{self, GlobalConfigManager, Subscribe}; use crate::services::metrics::{Aggregator, FlushBuckets}; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; -use crate::services::processor::{ - EncodeMetrics, EnvelopeProcessor, ProcessEnvelope, ProcessingGroup, ProjectMetrics, -}; -use crate::services::project::{ - CheckedBuckets, Project, ProjectFetchState, ProjectSender, ProjectState, -}; +use crate::services::project::{Project, ProjectFetchState, ProjectSender, ProjectState}; use crate::services::project_local::{LocalProjectSource, LocalProjectSourceService}; #[cfg(feature = "processing")] use crate::services::project_redis::RedisProjectSource; @@ -204,28 +202,23 @@ impl From<&RequestMeta> for BucketSource { } } -/// Add metric buckets to the project. -/// -/// Metric buckets added via the project are filtered and rate limited -/// according to the project state. +/// Starts the processing flow for received metrics. /// -/// Adding buckets directly to the aggregator bypasses all of these checks. +/// Enriches the raw data with projcet information and forwards +/// the metrics using [`ProcessProjectMetrics`](crate::services::processor::ProcessProjectMetrics). #[derive(Debug)] -pub struct AddMetricBuckets { +pub struct ProcessMetrics { + /// A list of metric items. + pub data: MetricData, + /// The target project. pub project_key: ProjectKey, - pub buckets: Vec, + /// Whether to keep or reset the metric metadata. pub source: BucketSource, -} - -impl AddMetricBuckets { - /// Convenience constructor which creates an internal [`AddMetricBuckets`] message. - pub fn internal(project_key: ProjectKey, buckets: Vec) -> Self { - Self { - project_key, - buckets, - source: BucketSource::Internal, - } - } + /// The instant at which the request was received. + pub start_time: Instant, + /// The value of the Envelope's [`sent_at`](crate::envelope::Envelope::sent_at) + /// header for clock drift correction. + pub sent_at: Option>, } /// Add metric metadata to the aggregator. @@ -272,7 +265,7 @@ pub struct RefreshIndexCache(pub HashSet); /// information. /// /// There are also higher-level operations, such as [`CheckEnvelope`] and [`ValidateEnvelope`] that -/// inspect contents of envelopes for ingestion, as well as [`AddMetricBuckets`] to aggregate metrics +/// inspect contents of envelopes for ingestion, as well as [`ProcessMetrics`] to aggregate metrics /// associated with a project. /// /// See the enumerated variants for a full list of available messages for this service. @@ -286,7 +279,7 @@ pub enum ProjectCache { ), ValidateEnvelope(ValidateEnvelope), UpdateRateLimits(UpdateRateLimits), - AddMetricBuckets(AddMetricBuckets), + ProcessMetrics(ProcessMetrics), AddMetricMeta(AddMetricMeta), FlushBuckets(FlushBuckets), UpdateSpoolIndex(UpdateSpoolIndex), @@ -303,7 +296,7 @@ impl ProjectCache { Self::CheckEnvelope(_, _) => "CheckEnvelope", Self::ValidateEnvelope(_) => "ValidateEnvelope", Self::UpdateRateLimits(_) => "UpdateRateLimits", - Self::AddMetricBuckets(_) => "AddMetricBuckets", + Self::ProcessMetrics(_) => "ProcessMetrics", Self::AddMetricMeta(_) => "AddMetricMeta", Self::FlushBuckets(_) => "FlushBuckets", Self::UpdateSpoolIndex(_) => "UpdateSpoolIndex", @@ -382,11 +375,11 @@ impl FromMessage for ProjectCache { } } -impl FromMessage for ProjectCache { +impl FromMessage for ProjectCache { type Response = relay_system::NoResponse; - fn from_message(message: AddMetricBuckets, _: ()) -> Self { - Self::AddMetricBuckets(message) + fn from_message(message: ProcessMetrics, _: ()) -> Self { + Self::ProcessMetrics(message) } } @@ -569,7 +562,6 @@ struct ProjectCacheBroker { // TODO: Make non-optional when spool_v1 is removed. envelope_buffer: Option>, services: Services, - metric_outcomes: MetricOutcomes, // Need hashbrown because extract_if is not stable in std yet. projects: hashbrown::HashMap, /// Utility for disposing of expired project data in a background thread. @@ -947,21 +939,15 @@ impl ProjectCacheBroker { .merge_rate_limits(message.rate_limits); } - fn handle_add_metric_buckets(&mut self, message: AddMetricBuckets) { + fn handle_process_metrics(&mut self, message: ProcessMetrics) { let project_cache = self.services.project_cache.clone(); - let aggregator = self.services.aggregator.clone(); - let metric_outcomes = self.metric_outcomes.clone(); - let outcome_aggregator = self.services.outcome_aggregator.clone(); - let project = self.get_or_create_project(message.project_key); - project.prefetch(project_cache, false); - project.merge_buckets( - &aggregator, - &metric_outcomes, - &outcome_aggregator, - message.buckets, - message.source, - ); + let message = self + .get_or_create_project(message.project_key) + .prefetch(project_cache, false) + .process_metrics(message); + + self.services.envelope_processor.send(message); } fn handle_add_metric_meta(&mut self, message: AddMetricMeta) { @@ -972,8 +958,6 @@ impl ProjectCacheBroker { } fn handle_flush_buckets(&mut self, message: FlushBuckets) { - let metric_outcomes = self.metric_outcomes.clone(); - let outcome_aggregator = self.services.outcome_aggregator.clone(); let aggregator = self.services.aggregator.clone(); let project_cache = self.services.project_cache.clone(); @@ -981,26 +965,36 @@ impl ProjectCacheBroker { let mut scoped_buckets = BTreeMap::new(); for (project_key, buckets) in message.buckets { let project = self.get_or_create_project(project_key); - match project.check_buckets(&metric_outcomes, &outcome_aggregator, buckets) { - CheckedBuckets::NoProject(buckets) => { - no_project += 1; - // Schedule an update for the project just in case. - project.prefetch(project_cache.clone(), false); - project.return_buckets(&aggregator, buckets); - } - CheckedBuckets::Checked { - scoping, - project_info, - buckets, - } => scoped_buckets - .entry(scoping) - .or_insert(ProjectMetrics { + + let Some(project_info) = project.current_state().enabled() else { + no_project += 1; + // Schedule an update for the project just in case. + project.prefetch(project_cache.clone(), false); + project.return_buckets(&aggregator, buckets); + continue; + }; + + let Some(scoping) = project.scoping() else { + relay_log::error!( + tags.project_key = project_key.as_str(), + "there is no scoping: dropping {} buckets", + buckets.len(), + ); + continue; + }; + + use std::collections::btree_map::Entry::*; + match scoped_buckets.entry(scoping) { + Vacant(entry) => { + entry.insert(ProjectMetrics { project_info, - buckets: Vec::new(), - }) - .buckets - .extend(buckets), - CheckedBuckets::Dropped => {} + rate_limits: project.current_rate_limits().clone(), + buckets, + }); + } + Occupied(entry) => { + entry.into_mut().buckets.extend(buckets); + } } } @@ -1251,9 +1245,7 @@ impl ProjectCacheBroker { self.handle_validate_envelope(message) } ProjectCache::UpdateRateLimits(message) => self.handle_rate_limits(message), - ProjectCache::AddMetricBuckets(message) => { - self.handle_add_metric_buckets(message) - } + ProjectCache::ProcessMetrics(message) => self.handle_process_metrics(message), ProjectCache::AddMetricMeta(message) => self.handle_add_metric_meta(message), ProjectCache::FlushBuckets(message) => self.handle_flush_buckets(message), ProjectCache::UpdateSpoolIndex(message) => self.handle_buffer_index(message), @@ -1274,7 +1266,6 @@ pub struct ProjectCacheService { memory_checker: MemoryChecker, envelope_buffer: Option>, services: Services, - metric_outcomes: MetricOutcomes, redis: Option, } @@ -1285,7 +1276,6 @@ impl ProjectCacheService { memory_checker: MemoryChecker, envelope_buffer: Option>, services: Services, - metric_outcomes: MetricOutcomes, redis: Option, ) -> Self { Self { @@ -1293,7 +1283,6 @@ impl ProjectCacheService { memory_checker, envelope_buffer, services, - metric_outcomes, redis, } } @@ -1308,7 +1297,6 @@ impl Service for ProjectCacheService { memory_checker, envelope_buffer, services, - metric_outcomes, redis, } = self; let project_cache = services.project_cache.clone(); @@ -1401,7 +1389,6 @@ impl Service for ProjectCacheService { spool_v1_unspool_handle: SleepHandle::idle(), spool_v1, global_config, - metric_outcomes, }; loop { @@ -1496,9 +1483,6 @@ impl FetchOptionalProjectState { #[cfg(test)] mod tests { - use crate::metrics::MetricStats; - use crate::services::global_config::GlobalConfigHandle; - use relay_dynamic_config::GlobalConfig; use relay_test::mock_service; use tokio::select; use uuid::Uuid; @@ -1567,14 +1551,6 @@ mod tests { } }; - let metric_stats = MetricStats::new( - Arc::new(Config::default()), - GlobalConfigHandle::fixed(GlobalConfig::default()), - Addr::custom().0, - ); - let metric_outcomes = - MetricOutcomes::new(metric_stats, services.outcome_aggregator.clone()); - ( ProjectCacheBroker { config: config.clone(), @@ -1593,7 +1569,6 @@ mod tests { buffer_unspool_backoff: RetryBackoff::new(Duration::from_millis(100)), }), global_config: GlobalConfigStatus::Pending, - metric_outcomes, }, buffer, ) diff --git a/relay-server/src/services/project_local.rs b/relay-server/src/services/project_local.rs index 3d94b9ede5..745203780a 100644 --- a/relay-server/src/services/project_local.rs +++ b/relay-server/src/services/project_local.rs @@ -233,6 +233,7 @@ mod tests { let project_info = extracted_project_state .get(&project_key) .unwrap() + .clone() .enabled() .unwrap(); diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs index 2a9703246f..7658a197c1 100644 --- a/relay-server/src/testutils.rs +++ b/relay-server/src/testutils.rs @@ -119,6 +119,7 @@ pub fn empty_envelope_with_dsn(dsn: &str) -> Box { pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { let (outcome_aggregator, _) = mock_service("outcome_aggregator", (), |&mut (), _| {}); let (project_cache, _) = mock_service("project_cache", (), |&mut (), _| {}); + let (aggregator, _) = mock_service("aggregator", (), |&mut (), _| {}); let (upstream_relay, _) = mock_service("upstream_relay", (), |&mut (), _| {}); let (test_store, _) = mock_service("test_store", (), |&mut (), _| {}); @@ -152,6 +153,7 @@ pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { test_store, #[cfg(feature = "processing")] store_forwarder: None, + aggregator, }, metric_outcomes, )