diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 3309b7a0e4..20c8ccf211 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -11,8 +11,8 @@ use serde::Deserialize; use crate::envelope::{AttachmentType, Envelope, EnvelopeError, Item, ItemType, Items}; use crate::service::ServiceState; use crate::services::outcome::{DiscardReason, Outcome}; -use crate::services::processor::{ProcessMetricMeta, ProcessMetrics, ProcessingGroup}; -use crate::services::project_cache::{CheckEnvelope, ValidateEnvelope}; +use crate::services::processor::{MetricData, ProcessMetricMeta, ProcessingGroup}; +use crate::services::project_cache::{CheckEnvelope, ProcessMetrics, ValidateEnvelope}; use crate::statsd::{RelayCounters, RelayHistograms}; use crate::utils::{self, ApiErrorResponse, FormDataIter, ManagedEnvelope, MultipartError}; @@ -273,9 +273,9 @@ fn queue_envelope( if !metric_items.is_empty() { relay_log::trace!("sending metrics into processing queue"); - state.processor().send(ProcessMetrics { - items: metric_items.into_vec(), - start_time: envelope.meta().start_time(), + state.project_cache().send(ProcessMetrics { + data: MetricData::Raw(metric_items.into_vec()), + start_time: envelope.meta().start_time().into(), sent_at: envelope.sent_at(), project_key: envelope.meta().public_key(), source: envelope.meta().into(), diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 2925ee92b8..b5655562de 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -242,6 +242,7 @@ impl ServiceState { test_store: test_store.clone(), #[cfg(feature = "processing")] store_forwarder: store.clone(), + aggregator: aggregator.clone(), }, metric_outcomes.clone(), ) @@ -263,7 +264,6 @@ impl ServiceState { MemoryChecker::new(memory_stat.clone(), config.clone()), envelope_buffer.clone(), project_cache_services, - metric_outcomes, redis_pool.clone(), ) .spawn_handler(project_cache_rx); diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 6059347562..d2d95283b6 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1,5 +1,5 @@ use std::borrow::Cow; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::error::Error; use std::fmt::{Debug, Display}; use std::future::Future; @@ -36,7 +36,7 @@ use relay_metrics::{ use relay_pii::PiiConfigError; use relay_profiling::ProfileId; use relay_protocol::{Annotated, Value}; -use relay_quotas::{DataCategory, Scoping}; +use relay_quotas::{DataCategory, RateLimits, Scoping}; use relay_sampling::config::RuleId; use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision}; use relay_sampling::DynamicSamplingContext; @@ -47,7 +47,6 @@ use smallvec::{smallvec, SmallVec}; #[cfg(feature = "processing")] use { - crate::metrics::MetricsLimiter, crate::services::store::{Store, StoreEnvelope}, crate::utils::{sample, Enforcement, EnvelopeLimiter, ItemAction}, itertools::Itertools, @@ -57,7 +56,7 @@ use { }, relay_dynamic_config::{CardinalityLimiterMode, MetricExtractionGroups}, relay_metrics::RedisMetricMetaStore, - relay_quotas::{Quota, RateLimitingError, RateLimits, RedisRateLimiter}, + relay_quotas::{Quota, RateLimitingError, RedisRateLimiter}, relay_redis::RedisPool, std::iter::Chain, std::slice::Iter, @@ -66,16 +65,18 @@ use { use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemType}; use crate::extractors::{PartialDsn, RequestMeta}; -use crate::metrics::{MetricOutcomes, MinimalTrackableBucket}; +use crate::http; +use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket}; use crate::metrics_extraction::transactions::types::ExtractMetricsError; use crate::metrics_extraction::transactions::{ExtractedMetrics, TransactionExtractor}; use crate::service::ServiceError; use crate::services::global_config::GlobalConfigHandle; +use crate::services::metrics::{Aggregator, MergeBuckets}; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::services::processor::event::FiltersStatus; -use crate::services::project::ProjectInfo; +use crate::services::project::{ProjectInfo, ProjectState}; use crate::services::project_cache::{ - AddMetricBuckets, AddMetricMeta, BucketSource, ProjectCache, UpdateRateLimits, + AddMetricMeta, BucketSource, ProcessMetrics, ProjectCache, UpdateRateLimits, }; use crate::services::test_store::{Capture, TestStore}; use crate::services::upstream::{ @@ -86,11 +87,11 @@ use crate::utils::{ self, InvalidProcessingGroupType, ManagedEnvelope, SamplingResult, ThreadPool, TypedEnvelope, WorkerGroup, }; -use crate::{http, metrics}; mod attachment; mod dynamic_sampling; mod event; +mod metrics; mod profile; mod profile_chunk; mod replay; @@ -728,7 +729,7 @@ impl ProcessingExtractedMetrics { } } -fn send_metrics(metrics: ExtractedMetrics, envelope: &Envelope, project_cache: Addr) { +fn send_metrics(metrics: ExtractedMetrics, envelope: &Envelope, aggregator: &Addr) { let project_key = envelope.meta().public_key(); let ExtractedMetrics { @@ -737,7 +738,10 @@ fn send_metrics(metrics: ExtractedMetrics, envelope: &Envelope, project_cache: A } = metrics; if !project_metrics.is_empty() { - project_cache.send(AddMetricBuckets::internal(project_key, project_metrics)); + aggregator.send(MergeBuckets { + project_key, + buckets: project_metrics, + }); } if !sampling_metrics.is_empty() { @@ -748,10 +752,10 @@ fn send_metrics(metrics: ExtractedMetrics, envelope: &Envelope, project_cache: A // dependent_project_with_tracing -> metrics goes to root // root_project_with_tracing -> metrics goes to root == self let sampling_project_key = envelope.sampling_key().unwrap_or(project_key); - project_cache.send(AddMetricBuckets::internal( - sampling_project_key, - sampling_metrics, - )); + aggregator.send(MergeBuckets { + project_key: sampling_project_key, + buckets: sampling_metrics, + }); } } @@ -903,14 +907,23 @@ pub struct ProcessEnvelope { /// ignored independently. /// - For [`MetricBuckets`](ItemType::MetricBuckets), the entire list of buckets is parsed and /// dropped together on parsing failure. -/// - Other items will be ignored with an error message. +/// - Other envelope items will be ignored with an error message. /// /// Additionally, processing applies clock drift correction using the system clock of this Relay, if /// the Envelope specifies the [`sent_at`](Envelope::sent_at) header. #[derive(Debug)] -pub struct ProcessMetrics { +pub struct ProcessProjectMetrics { + /// The project state the metrics belong to. + /// + /// The project state can be pending, in which case cached rate limits + /// and other project specific operations are skipped and executed once + /// the project state becomes available. + pub project_state: ProjectState, + /// Currently active cached rate limits for this project. + pub rate_limits: RateLimits, + /// A list of metric items. - pub items: Vec, + pub data: MetricData, /// The target project. pub project_key: ProjectKey, /// Whether to keep or reset the metric metadata. @@ -922,6 +935,68 @@ pub struct ProcessMetrics { pub sent_at: Option>, } +/// Raw unparsed metric data. +#[derive(Debug)] +pub enum MetricData { + /// Raw data, unparsed envelope items. + Raw(Vec), + /// Already parsed buckets but unprocessed. + Parsed(Vec), +} + +impl MetricData { + /// Consumes the metric data and parses the contained buckets. + /// + /// If the contained data is already parsed the buckets are returned unchanged. + /// Raw buckets are parsed and created with the passed `timestamp`. + fn into_buckets(self, timestamp: UnixTimestamp) -> Vec { + let items = match self { + Self::Parsed(buckets) => return buckets, + Self::Raw(items) => items, + }; + + let mut buckets = Vec::new(); + for item in items { + let payload = item.payload(); + if item.ty() == &ItemType::Statsd { + for bucket_result in Bucket::parse_all(&payload, timestamp) { + match bucket_result { + Ok(bucket) => buckets.push(bucket), + Err(error) => relay_log::debug!( + error = &error as &dyn Error, + "failed to parse metric bucket from statsd format", + ), + } + } + } else if item.ty() == &ItemType::MetricBuckets { + match serde_json::from_slice::>(&payload) { + Ok(parsed_buckets) => { + // Re-use the allocation of `b` if possible. + if buckets.is_empty() { + buckets = parsed_buckets; + } else { + buckets.extend(parsed_buckets); + } + } + Err(error) => { + relay_log::debug!( + error = &error as &dyn Error, + "failed to parse metric bucket", + ); + metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1); + } + } + } else { + relay_log::error!( + "invalid item of type {} passed to ProcessMetrics", + item.ty() + ); + } + } + buckets + } +} + #[derive(Debug)] pub struct ProcessBatchedMetrics { /// Metrics payload in JSON format. @@ -950,6 +1025,8 @@ pub struct ProjectMetrics { pub buckets: Vec, /// Project info for extracting quotas. pub project_info: Arc, + /// Currently cached rate limits. + pub rate_limits: RateLimits, } /// Encodes metrics into an envelope ready to be sent upstream. @@ -990,7 +1067,7 @@ pub struct SubmitClientReports { #[derive(Debug)] pub enum EnvelopeProcessor { ProcessEnvelope(Box), - ProcessMetrics(Box), + ProcessProjectMetrics(Box), ProcessBatchedMetrics(Box), ProcessMetricMeta(Box), EncodeMetrics(Box), @@ -1004,7 +1081,7 @@ impl EnvelopeProcessor { pub fn variant(&self) -> &'static str { match self { EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope", - EnvelopeProcessor::ProcessMetrics(_) => "ProcessMetrics", + EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics", EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics", EnvelopeProcessor::ProcessMetricMeta(_) => "ProcessMetricMeta", EnvelopeProcessor::EncodeMetrics(_) => "EncodeMetrics", @@ -1025,11 +1102,11 @@ impl FromMessage for EnvelopeProcessor { } } -impl FromMessage for EnvelopeProcessor { +impl FromMessage for EnvelopeProcessor { type Response = NoResponse; - fn from_message(message: ProcessMetrics, _: ()) -> Self { - Self::ProcessMetrics(Box::new(message)) + fn from_message(message: ProcessProjectMetrics, _: ()) -> Self { + Self::ProcessProjectMetrics(Box::new(message)) } } @@ -1097,6 +1174,7 @@ pub struct Addrs { pub test_store: Addr, #[cfg(feature = "processing")] pub store_forwarder: Option>, + pub aggregator: Addr, } impl Default for Addrs { @@ -1108,6 +1186,7 @@ impl Default for Addrs { test_store: Addr::dummy(), #[cfg(feature = "processing")] store_forwarder: None, + aggregator: Addr::dummy(), } } } @@ -2004,7 +2083,7 @@ impl EnvelopeProcessorService { send_metrics( state.extracted_metrics, state.managed_envelope.envelope(), - self.inner.addrs.project_cache.clone(), + &self.inner.addrs.aggregator, ); let envelope_response = if state.managed_envelope.envelope().is_empty() { @@ -2058,63 +2137,28 @@ impl EnvelopeProcessorService { } } - fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) { - let ProcessMetrics { - items, + fn handle_process_project_metrics(&self, cogs: &mut Token, message: ProcessProjectMetrics) { + let ProcessProjectMetrics { + project_state, + rate_limits, + data, project_key, start_time, sent_at, source, } = message; - let received = relay_common::time::instant_to_date_time(start_time); let received_timestamp = UnixTimestamp::from_instant(start_time); - let clock_drift_processor = - ClockDriftProcessor::new(sent_at, received).at_least(MINIMUM_CLOCK_DRIFT); - - let mut buckets = Vec::new(); - for item in items { - let payload = item.payload(); - if item.ty() == &ItemType::Statsd { - for bucket_result in Bucket::parse_all(&payload, received_timestamp) { - match bucket_result { - Ok(bucket) => buckets.push(bucket), - Err(error) => relay_log::debug!( - error = &error as &dyn Error, - "failed to parse metric bucket from statsd format", - ), - } - } - } else if item.ty() == &ItemType::MetricBuckets { - match serde_json::from_slice::>(&payload) { - Ok(parsed_buckets) => { - // Re-use the allocation of `b` if possible. - if buckets.is_empty() { - buckets = parsed_buckets; - } else { - buckets.extend(parsed_buckets); - } - } - Err(error) => { - relay_log::debug!( - error = &error as &dyn Error, - "failed to parse metric bucket", - ); - metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1); - } - } - } else { - relay_log::error!( - "invalid item of type {} passed to ProcessMetrics", - item.ty() - ); - } - } - + let mut buckets = data.into_buckets(received_timestamp); if buckets.is_empty() { return; }; + cogs.update(relay_metrics::cogs::BySize(&buckets)); + + let received = relay_common::time::instant_to_date_time(start_time); + let clock_drift_processor = + ClockDriftProcessor::new(sent_at, received).at_least(MINIMUM_CLOCK_DRIFT); for bucket in &mut buckets { clock_drift_processor.process_timestamp(&mut bucket.timestamp); @@ -2123,14 +2167,22 @@ impl EnvelopeProcessorService { } } - cogs.update(relay_metrics::cogs::BySize(&buckets)); + let buckets = self::metrics::filter_namespaces(buckets, source); - relay_log::trace!("merging metric buckets into project cache"); - self.inner.addrs.project_cache.send(AddMetricBuckets { - project_key, - buckets, - source, - }); + // Best effort check to filter and rate limit buckets, if there is no project state + // available at the current time, we will check again after flushing. + let buckets = match project_state.enabled() { + Some(project_info) => { + self.check_buckets(project_key, &project_info, &rate_limits, buckets) + } + None => buckets, + }; + + relay_log::trace!("merging metric buckets into the aggregator"); + self.inner + .addrs + .aggregator + .send(MergeBuckets::new(project_key, buckets)); } fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) { @@ -2141,12 +2193,6 @@ impl EnvelopeProcessorService { sent_at, } = message; - let received = relay_common::time::instant_to_date_time(start_time); - let received_timestamp = UnixTimestamp::from_instant(start_time); - - let clock_drift_processor = - ClockDriftProcessor::new(sent_at, received).at_least(MINIMUM_CLOCK_DRIFT); - #[derive(serde::Deserialize)] struct Wrapper { buckets: HashMap>, @@ -2165,25 +2211,15 @@ impl EnvelopeProcessorService { }; let mut feature_weights = FeatureWeights::none(); - for (project_key, mut buckets) in buckets { - if buckets.is_empty() { - continue; - } - - for bucket in &mut buckets { - clock_drift_processor.process_timestamp(&mut bucket.timestamp); - if !matches!(source, BucketSource::Internal) { - bucket.metadata = BucketMetadata::new(received_timestamp); - } - } - + for (project_key, buckets) in buckets { feature_weights = feature_weights.merge(relay_metrics::cogs::BySize(&buckets).into()); - relay_log::trace!("merging metric buckets into project cache"); - self.inner.addrs.project_cache.send(AddMetricBuckets { + self.inner.addrs.project_cache.send(ProcessMetrics { + data: MetricData::Parsed(buckets), project_key, - buckets, source, + start_time: start_time.into(), + sent_at, }); } @@ -2309,6 +2345,69 @@ impl EnvelopeProcessorService { }); } + fn check_buckets( + &self, + project_key: ProjectKey, + project_info: &ProjectInfo, + rate_limits: &RateLimits, + buckets: Vec, + ) -> Vec { + let Some(scoping) = project_info.scoping(project_key) else { + relay_log::error!( + tags.project_key = project_key.as_str(), + "there is no scoping: dropping {} buckets", + buckets.len(), + ); + return Vec::new(); + }; + + let mut buckets = self::metrics::apply_project_info( + buckets, + &self.inner.metric_outcomes, + project_info, + scoping, + ); + + let namespaces: BTreeSet = buckets + .iter() + .filter_map(|bucket| bucket.name.try_namespace()) + .collect(); + + for namespace in namespaces { + let limits = rate_limits.check_with_quotas( + project_info.get_quotas(), + scoping.item(DataCategory::MetricBucket), + ); + + if limits.is_limited() { + let rejected; + (buckets, rejected) = utils::split_off(buckets, |bucket| { + bucket.name.try_namespace() == Some(namespace) + }); + + let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone()); + self.inner.metric_outcomes.track( + scoping, + &rejected, + Outcome::RateLimited(reason_code), + ); + } + } + + let quotas = project_info.config.quotas.clone(); + match MetricsLimiter::create(buckets, quotas, scoping) { + Ok(mut bucket_limiter) => { + bucket_limiter.enforce_limits( + rate_limits, + &self.inner.metric_outcomes, + &self.inner.addrs.outcome_aggregator, + ); + bucket_limiter.into_buckets() + } + Err(buckets) => buckets, + } + } + #[cfg(feature = "processing")] fn rate_limit_buckets( &self, @@ -2529,6 +2628,7 @@ impl EnvelopeProcessorService { let ProjectMetrics { buckets, project_info, + rate_limits: _, } = message; let buckets = self.rate_limit_buckets(scoping, &project_info, buckets); @@ -2589,7 +2689,7 @@ impl EnvelopeProcessorService { let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone())); let mut item = Item::new(ItemType::MetricBuckets); - item.set_source_quantities(metrics::extract_quantities(batch)); + item.set_source_quantities(crate::metrics::extract_quantities(batch)); item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap()); envelope.add_item(item); @@ -2694,7 +2794,17 @@ impl EnvelopeProcessorService { self.send_global_partition(partition_key, &mut partition); } - fn handle_encode_metrics(&self, message: EncodeMetrics) { + fn handle_encode_metrics(&self, mut message: EncodeMetrics) { + for (scoping, pm) in message.scopes.iter_mut() { + let buckets = std::mem::take(&mut pm.buckets); + pm.buckets = self.check_buckets( + scoping.project_key, + &pm.project_info, + &pm.rate_limits, + buckets, + ); + } + #[cfg(feature = "processing")] if self.inner.config.processing_enabled() { if let Some(ref store_forwarder) = self.inner.addrs.store_forwarder { @@ -2771,7 +2881,9 @@ impl EnvelopeProcessorService { match message { EnvelopeProcessor::ProcessEnvelope(m) => self.handle_process_envelope(*m), - EnvelopeProcessor::ProcessMetrics(m) => self.handle_process_metrics(&mut cogs, *m), + EnvelopeProcessor::ProcessProjectMetrics(m) => { + self.handle_process_project_metrics(&mut cogs, *m) + } EnvelopeProcessor::ProcessBatchedMetrics(m) => { self.handle_process_batched_metrics(&mut cogs, *m) } @@ -2787,7 +2899,7 @@ impl EnvelopeProcessorService { fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights { match message { EnvelopeProcessor::ProcessEnvelope(v) => AppFeature::from(v.envelope.group()).into(), - EnvelopeProcessor::ProcessMetrics(_) => AppFeature::Unattributed.into(), + EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(), EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(), EnvelopeProcessor::ProcessMetricMeta(_) => AppFeature::MetricMeta.into(), EnvelopeProcessor::EncodeMetrics(v) => v @@ -3165,6 +3277,7 @@ impl<'a> IntoIterator for CombinedQuotas<'a> { mod tests { use std::env; + use insta::assert_debug_snapshot; use relay_base_schema::metrics::{DurationUnit, MetricUnit}; use relay_common::glob2::LazyGlob; use relay_dynamic_config::{ExtrapolationConfig, MetricExtractionConfig, ProjectConfig}; @@ -3261,6 +3374,7 @@ mod tests { width: 10, metadata: BucketMetadata::default(), }], + rate_limits: Default::default(), project_info, }; @@ -3627,11 +3741,11 @@ mod tests { let start_time = Instant::now(); let config = Config::default(); - let (project_cache, mut project_cache_rx) = Addr::custom(); + let (aggregator, mut aggregator_rx) = Addr::custom(); let processor = create_test_processor_with_addrs( config, Addrs { - project_cache, + aggregator, ..Default::default() }, ); @@ -3648,17 +3762,19 @@ mod tests { ), (BucketSource::Internal, None), ] { - let message = ProcessMetrics { - items: vec![item.clone()], + let message = ProcessProjectMetrics { + data: MetricData::Raw(vec![item.clone()]), + project_state: ProjectState::Pending, + rate_limits: Default::default(), project_key, source, start_time, sent_at: Some(Utc::now()), }; - processor.handle_process_metrics(&mut token, message); + processor.handle_process_project_metrics(&mut token, message); - let value = project_cache_rx.recv().await.unwrap(); - let ProjectCache::AddMetricBuckets(merge_buckets) = value else { + let value = aggregator_rx.recv().await.unwrap(); + let Aggregator::MergeBuckets(merge_buckets) = value else { panic!() }; let buckets = merge_buckets.buckets; @@ -3668,7 +3784,7 @@ mod tests { } #[tokio::test] - async fn test_process_batched_metrics_bucket_metadata() { + async fn test_process_batched_metrics() { let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed); let start_time = Instant::now(); let config = Config::default(); @@ -3682,89 +3798,119 @@ mod tests { }, ); - let payload_no_metadata = r#"{ + let payload = r#"{ "buckets": { - "a94ae32be2584e0bbd7a4cbb95971fee": [ + "11111111111111111111111111111111": [ { "timestamp": 1615889440, "width": 0, "name": "d:custom/endpoint.response_time@millisecond", "type": "d", "value": [ - 36.0, - 49.0, - 57.0, 68.0 ], "tags": { "route": "user_index" } } - ] - } -} -"#; - - let payload_metadata = r#"{ - "buckets": { - "a94ae32be2584e0bbd7a4cbb95971fee": [ + ], + "22222222222222222222222222222222": [ { "timestamp": 1615889440, "width": 0, - "name": "d:custom/endpoint.response_time@millisecond", + "name": "d:custom/endpoint.cache_rate@none", "type": "d", "value": [ - 36.0, - 49.0, - 57.0, - 68.0 - ], - "tags": { - "route": "user_index" - }, - "metadata": { - "merges": 1, - "received_at": 1615889440 - } + 36.0 + ] } ] } } "#; - for (source, payload, expected_received_at) in [ - ( - BucketSource::External, - payload_no_metadata, - Some(UnixTimestamp::from_instant(start_time)), - ), - (BucketSource::Internal, payload_no_metadata, None), + let message = ProcessBatchedMetrics { + payload: Bytes::from(payload), + source: BucketSource::Internal, + start_time, + sent_at: Some(Utc::now()), + }; + processor.handle_process_batched_metrics(&mut token, message); + + let value = project_cache_rx.recv().await.unwrap(); + let ProjectCache::ProcessMetrics(pm1) = value else { + panic!() + }; + let value = project_cache_rx.recv().await.unwrap(); + let ProjectCache::ProcessMetrics(pm2) = value else { + panic!() + }; + + let mut messages = vec![pm1, pm2]; + messages.sort_by_key(|pm| pm.project_key); + + let actual = messages + .into_iter() + .map(|pm| (pm.project_key, pm.data, pm.source)) + .collect::>(); + + assert_debug_snapshot!(actual, @r###" + [ ( - BucketSource::External, - payload_metadata, - Some(UnixTimestamp::from_instant(start_time)), + ProjectKey("11111111111111111111111111111111"), + Parsed( + [ + Bucket { + timestamp: UnixTimestamp(1615889440), + width: 0, + name: MetricName( + "d:custom/endpoint.response_time@millisecond", + ), + value: Distribution( + [ + 68.0, + ], + ), + tags: { + "route": "user_index", + }, + metadata: BucketMetadata { + merges: 1, + received_at: None, + extracted_from_indexed: false, + }, + }, + ], + ), + Internal, ), ( - BucketSource::Internal, - payload_metadata, - Some(UnixTimestamp::from_secs(1615889440)), + ProjectKey("22222222222222222222222222222222"), + Parsed( + [ + Bucket { + timestamp: UnixTimestamp(1615889440), + width: 0, + name: MetricName( + "d:custom/endpoint.cache_rate@none", + ), + value: Distribution( + [ + 36.0, + ], + ), + tags: {}, + metadata: BucketMetadata { + merges: 1, + received_at: None, + extracted_from_indexed: false, + }, + }, + ], + ), + Internal, ), - ] { - let message = ProcessBatchedMetrics { - payload: Bytes::from(payload), - source, - start_time, - sent_at: Some(Utc::now()), - }; - processor.handle_process_batched_metrics(&mut token, message); - - let value = project_cache_rx.recv().await.unwrap(); - let ProjectCache::AddMetricBuckets(merge_buckets) = value else { - panic!() - }; - let buckets = merge_buckets.buckets; - assert_eq!(buckets.len(), 1); - assert_eq!(buckets[0].metadata.received_at, expected_received_at); - } + ] + "###); } #[test] diff --git a/relay-server/src/services/project/metrics.rs b/relay-server/src/services/processor/metrics.rs similarity index 94% rename from relay-server/src/services/project/metrics.rs rename to relay-server/src/services/processor/metrics.rs index 4fa31ab4a6..7d330a322e 100644 --- a/relay-server/src/services/project/metrics.rs +++ b/relay-server/src/services/processor/metrics.rs @@ -8,6 +8,11 @@ use crate::services::outcome::Outcome; use crate::services::project::ProjectInfo; use crate::services::project_cache::BucketSource; +/// Filters buckets based on their namespace. +/// +/// This is a noop for most namespaces except: +/// - [`MetricNamespace::Unsupported`]: Equal to invalid/unknown namespaces. +/// - [`MetricNamespace::Stats`]: Metric stats are only allowed if the `source` is [`BucketSource::Internal`]. pub fn filter_namespaces(mut buckets: Vec, source: BucketSource) -> Vec { buckets.retain(|bucket| match bucket.name.namespace() { MetricNamespace::Sessions => true, @@ -111,6 +116,17 @@ mod tests { use super::*; + fn create_custom_bucket_with_name(name: String) -> Bucket { + Bucket { + name: format!("d:custom/{name}@byte").into(), + value: BucketValue::Counter(1.into()), + timestamp: UnixTimestamp::now(), + tags: Default::default(), + width: 10, + metadata: Default::default(), + } + } + fn get_test_bucket(name: &str, tags: BTreeMap) -> Bucket { let json = serde_json::json!({ "timestamp": 1615889440, @@ -149,24 +165,13 @@ mod tests { assert_eq!(bucket.tags.len(), 1); } - fn create_custom_bucket_with_name(name: String) -> Bucket { - Bucket { - name: format!("d:custom/{name}@byte").into(), - value: BucketValue::Counter(1.into()), - timestamp: UnixTimestamp::now(), - tags: Default::default(), - width: 10, - metadata: Default::default(), - } - } - #[test] - fn test_apply_project_state() { + fn test_apply_project_info() { let (outcome_aggregator, _) = Addr::custom(); let (metric_stats, mut metric_stats_rx) = MetricStats::test(); let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator); - let project_state = ProjectInfo { + let project_info = ProjectInfo { config: serde_json::from_value(serde_json::json!({ "metrics": { "deniedNames": ["*cpu_time*"] }, "features": ["organizations:custom-metrics"] @@ -182,7 +187,7 @@ mod tests { let buckets = apply_project_info( buckets, &metric_outcomes, - &project_state, + &project_info, Scoping { organization_id: 42, project_id: ProjectId::new(43), @@ -207,7 +212,7 @@ mod tests { } #[test] - fn test_apply_project_state_with_disabled_custom_namespace() { + fn test_apply_project_info_with_disabled_custom_namespace() { let (outcome_aggregator, _) = Addr::custom(); let (metric_stats, mut metric_stats_rx) = MetricStats::test(); let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator); diff --git a/relay-server/src/services/project.rs b/relay-server/src/services/project.rs index c33651e826..5359773e54 100644 --- a/relay-server/src/services/project.rs +++ b/relay-server/src/services/project.rs @@ -1,11 +1,10 @@ -use std::collections::BTreeSet; use std::sync::Arc; use std::time::Duration; use relay_base_schema::project::ProjectKey; use relay_config::Config; use relay_dynamic_config::{ErrorBoundary, Feature}; -use relay_metrics::{Bucket, MetaAggregator, MetricMeta, MetricNamespace}; +use relay_metrics::{Bucket, MetaAggregator, MetricMeta}; use relay_quotas::{CachedRateLimits, DataCategory, MetricNamespaceScoping, RateLimits, Scoping}; use relay_sampling::evaluation::ReservoirCounters; use relay_statsd::metric; @@ -14,19 +13,18 @@ use serde::{Deserialize, Serialize}; use tokio::time::Instant; use crate::envelope::ItemType; -use crate::metrics::{MetricOutcomes, MetricsLimiter}; use crate::services::metrics::{Aggregator, MergeBuckets}; -use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; -use crate::services::processor::{EncodeMetricMeta, EnvelopeProcessor}; -use crate::services::project::metrics::{apply_project_info, filter_namespaces}; +use crate::services::outcome::{DiscardReason, Outcome}; +use crate::services::processor::{EncodeMetricMeta, EnvelopeProcessor, ProcessProjectMetrics}; use crate::services::project::state::ExpiryState; -use crate::services::project_cache::{BucketSource, CheckedEnvelope, ProjectCache, RequestUpdate}; +use crate::services::project_cache::{ + CheckedEnvelope, ProcessMetrics, ProjectCache, RequestUpdate, +}; use crate::utils::{Enforcement, SeqCount}; use crate::statsd::RelayCounters; -use crate::utils::{self, EnvelopeLimiter, ManagedEnvelope, RetryBackoff}; +use crate::utils::{EnvelopeLimiter, ManagedEnvelope, RetryBackoff}; -mod metrics; pub mod state; pub use state::{ @@ -119,10 +117,16 @@ impl Project { self.reservoir_counters.clone() } - fn current_state(&self) -> ProjectState { + /// Returns the current [`ProjectState`] attached to the project. + pub fn current_state(&self) -> ProjectState { self.state.current_state(&self.config) } + /// Returns the currently active cached rate limits. + pub fn current_rate_limits(&mut self) -> &RateLimits { + self.rate_limits.current_limits() + } + /// If a reservoir rule is no longer in the sampling config, we will remove those counters. fn remove_expired_reservoir_rules(&self) { let Some(state) = self.current_state().enabled() else { @@ -160,30 +164,21 @@ impl Project { self.last_updated_at = Instant::now(); } - /// Validates and inserts given [buckets](Bucket) into the metrics aggregator. - /// - /// The buckets will be keyed underneath this project key. - pub fn merge_buckets( - &mut self, - aggregator: &Addr, - metric_outcomes: &MetricOutcomes, - outcome_aggregator: &Addr, - buckets: Vec, - source: BucketSource, - ) { - // Best effort check for rate limits and project state. Continue if there is no project state. - let buckets = match self.check_buckets(metric_outcomes, outcome_aggregator, buckets) { - CheckedBuckets::NoProject(buckets) => buckets, - CheckedBuckets::Checked { buckets, .. } => buckets, - CheckedBuckets::Dropped => return, - }; + /// Collects internal project state and assembles a [`ProcessProjectMetrics`] message. + pub fn process_metrics(&mut self, message: ProcessMetrics) -> ProcessProjectMetrics { + let project_state = self.current_state(); + let rate_limits = self.rate_limits.current_limits().clone(); - let buckets = filter_namespaces(buckets, source); + ProcessProjectMetrics { + project_state, + rate_limits, - aggregator.send(MergeBuckets::new( - self.project_key, - buckets.into_iter().collect(), - )); + data: message.data, + project_key: message.project_key, + source: message.source, + start_time: message.start_time.into(), + sent_at: message.sent_at, + } } /// Returns a list of buckets back to the aggregator. @@ -419,8 +414,9 @@ impl Project { /// point. Therefore, this method is useful to trigger an update early if it is already clear /// that the project state will be needed soon. To retrieve an updated state, use /// [`Project::get_state`] instead. - pub fn prefetch(&mut self, project_cache: Addr, no_cache: bool) { + pub fn prefetch(&mut self, project_cache: Addr, no_cache: bool) -> &mut Self { self.get_cached_state(project_cache, no_cache); + self } /// Replaces the internal project state with a new one and triggers pending actions. @@ -501,15 +497,7 @@ impl Project { /// Returns `Some` if the project state has been fetched and contains a project identifier, /// otherwise `None`. pub fn scoping(&self) -> Option { - let info = self.current_state().enabled()?; - Some(Scoping { - organization_id: info.organization_id.unwrap_or(0), - project_id: info.project_id?, - project_key: self.project_key, - key_id: info - .get_public_key_config() - .and_then(|config| config.numeric_id), - }) + self.current_state().scoping(self.project_key) } /// Runs the checks on incoming envelopes. @@ -595,78 +583,6 @@ impl Project { rate_limits, }) } - - /// Drops metrics buckets if they are not allowed for this project. - /// - /// Reasons for dropping can be rate limits or a disabled project. - pub fn check_buckets( - &mut self, - metric_outcomes: &MetricOutcomes, - outcome_aggregator: &Addr, - buckets: Vec, - ) -> CheckedBuckets { - let project_info = match self.current_state() { - ProjectState::Enabled(info) => info.clone(), - ProjectState::Disabled => { - relay_log::debug!("dropping {} buckets for disabled project", buckets.len()); - return CheckedBuckets::Dropped; - } - ProjectState::Pending => return CheckedBuckets::NoProject(buckets), - }; - - let Some(scoping) = self.scoping() else { - relay_log::error!( - tags.project_key = self.project_key.as_str(), - "there is no scoping: dropping {} buckets", - buckets.len(), - ); - return CheckedBuckets::Dropped; - }; - - let mut buckets = apply_project_info(buckets, metric_outcomes, &project_info, scoping); - - let namespaces: BTreeSet = buckets - .iter() - .filter_map(|bucket| bucket.name.try_namespace()) - .collect(); - - let current_limits = self.rate_limits.current_limits(); - for namespace in namespaces { - let limits = current_limits.check_with_quotas( - project_info.get_quotas(), - scoping.item(DataCategory::MetricBucket), - ); - - if limits.is_limited() { - let rejected; - (buckets, rejected) = utils::split_off(buckets, |bucket| { - bucket.name.try_namespace() == Some(namespace) - }); - - let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone()); - metric_outcomes.track(scoping, &rejected, Outcome::RateLimited(reason_code)); - } - } - - let quotas = project_info.config.quotas.clone(); - let buckets = match MetricsLimiter::create(buckets, quotas, scoping) { - Ok(mut bucket_limiter) => { - bucket_limiter.enforce_limits(current_limits, metric_outcomes, outcome_aggregator); - bucket_limiter.into_buckets() - } - Err(buckets) => buckets, - }; - - if buckets.is_empty() { - return CheckedBuckets::Dropped; - } - - CheckedBuckets::Checked { - scoping, - project_info, - buckets, - } - } } /// Adds category limits for the nested spans inside a transaction. @@ -712,40 +628,13 @@ fn count_nested_spans(envelope: &ManagedEnvelope) -> usize { .map_or(0, |event| event.spans.0 + 1) } -/// Return value of [`Project::check_buckets`]. -#[derive(Debug)] -pub enum CheckedBuckets { - /// There is no project state available for these metrics yet. - /// - /// The metrics should be returned to the aggregator until the project state becomes available. - NoProject(Vec), - /// The buckets have been validated and can be processed. - Checked { - /// Project scoping. - scoping: Scoping, - /// Project info. - project_info: Arc, - /// List of buckets. - buckets: Vec, - }, - /// All buckets have been dropped. - /// - /// Can happen for multiple reasons: - /// - The project is disabled or not valid. - /// - All metrics have been filtered. - Dropped, -} - #[cfg(test)] mod tests { use crate::envelope::{ContentType, Envelope, Item}; use crate::extractors::RequestMeta; - use crate::metrics::MetricStats; use crate::services::processor::ProcessingGroup; use relay_base_schema::project::ProjectId; - use relay_common::time::UnixTimestamp; use relay_event_schema::protocol::EventId; - use relay_metrics::BucketValue; use relay_test::mock_service; use serde_json::json; use smallvec::SmallVec; @@ -865,156 +754,6 @@ mod tests { project } - fn create_metric(name: &str) -> Bucket { - Bucket { - name: name.into(), - width: 0, - value: BucketValue::counter(1.into()), - timestamp: UnixTimestamp::from_secs(1000), - tags: Default::default(), - metadata: Default::default(), - } - } - - #[test] - fn test_check_buckets_no_project() { - let (outcome_aggregator, _) = Addr::custom(); - let (metric_stats, mut metric_stats_rx) = MetricStats::test(); - let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator.clone()); - - let mut project = create_project(None); - project.state = ProjectFetchState::pending(); - let buckets = vec![create_metric("d:transactions/foo")]; - let cb = project.check_buckets(&metric_outcomes, &outcome_aggregator, buckets.clone()); - - match cb { - CheckedBuckets::NoProject(b) => { - assert_eq!(b, buckets) - } - cb => panic!("{cb:?}"), - } - - drop(metric_outcomes); - assert!(metric_stats_rx.blocking_recv().is_none()); - } - - #[test] - fn test_check_buckets_rate_limit() { - let (outcome_aggregator, _) = Addr::custom(); - let (metric_stats, mut metric_stats_rx) = MetricStats::test(); - let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator.clone()); - - let mut project = create_project(None); - let buckets = vec![create_metric("d:transactions/foo")]; - let cb = project.check_buckets(&metric_outcomes, &outcome_aggregator, buckets.clone()); - - match cb { - CheckedBuckets::Checked { - scoping, - project_info: _, - buckets: b, - } => { - assert_eq!(scoping, project.scoping().unwrap()); - assert_eq!(b, buckets) - } - cb => panic!("{cb:?}"), - } - - drop(metric_outcomes); - assert!(metric_stats_rx.blocking_recv().is_none()); - } - - #[test] - fn test_check_buckets_rate_limit_no_quota() { - let (outcome_aggregator, _) = Addr::custom(); - let (metric_stats, mut metric_stats_rx) = MetricStats::test(); - let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator.clone()); - - let mut project = create_project(Some(json!({ - "quotas": [{ - "id": "foo", - "categories": ["transaction"], - "window": 3600, - "limit": 0, - "reasonCode": "foo", - }] - }))); - let cb = project.check_buckets( - &metric_outcomes, - &outcome_aggregator, - vec![create_metric("d:transactions/foo")], - ); - - assert!(matches!(cb, CheckedBuckets::Dropped)); - - drop(metric_outcomes); - assert!(metric_stats_rx.blocking_recv().is_none()); - } - - #[test] - fn test_check_buckets_rate_limit_mixed_no_quota() { - let (outcome_aggregator, _) = Addr::custom(); - let (metric_stats, mut metric_stats_rx) = MetricStats::test(); - let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator.clone()); - - let mut project = create_project(Some(json!({ - "quotas": [{ - "id": "foo", - "categories": ["transaction"], - "window": 3600, - "limit": 0, - "reasonCode": "foo", - }] - }))); - let cb = project.check_buckets( - &metric_outcomes, - &outcome_aggregator, - vec![ - create_metric("d:transactions/foo"), - create_metric("d:profiles/foo"), - ], - ); - - match cb { - CheckedBuckets::Checked { - scoping, - project_info: _, - buckets, - } => { - assert_eq!(scoping, project.scoping().unwrap()); - assert_eq!(buckets, vec![create_metric("d:profiles/foo")]) - } - cb => panic!("{cb:?}"), - } - - drop(metric_outcomes); - assert!(metric_stats_rx.blocking_recv().is_none()); - } - - #[test] - fn test_check_buckets_project_state_filter() { - let (outcome_aggregator, _) = Addr::custom(); - let (metric_stats, mut metric_stats_rx) = MetricStats::test(); - let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator.clone()); - - let mut project = create_project(None); - let cb = project.check_buckets( - &metric_outcomes, - &outcome_aggregator, - vec![create_metric("d:custom/foo")], - ); - - assert!(matches!(cb, CheckedBuckets::Dropped)); - - drop(metric_outcomes); - let value = metric_stats_rx.blocking_recv().unwrap(); - let Aggregator::MergeBuckets(merge_buckets) = value else { - panic!(); - }; - assert_eq!(merge_buckets.buckets.len(), 1); - assert!(metric_stats_rx.blocking_recv().is_none()); - } - fn request_meta() -> RequestMeta { let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" .parse() diff --git a/relay-server/src/services/project/state.rs b/relay-server/src/services/project/state.rs index 9c7ea90acc..6c770b2d5b 100644 --- a/relay-server/src/services/project/state.rs +++ b/relay-server/src/services/project/state.rs @@ -3,12 +3,15 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; -pub use fetch_state::{ExpiryState, ProjectFetchState}; -pub use info::{LimitedProjectInfo, ProjectInfo}; +use relay_base_schema::project::ProjectKey; +use relay_quotas::Scoping; mod fetch_state; mod info; +pub use self::fetch_state::{ExpiryState, ProjectFetchState}; +pub use self::info::{LimitedProjectInfo, ProjectInfo}; + /// Representation of a project's current state. #[derive(Clone, Debug)] pub enum ProjectState { @@ -43,12 +46,23 @@ impl ProjectState { } /// Utility function that returns the project config if enabled. - pub fn enabled(&self) -> Option> { + pub fn enabled(self) -> Option> { match self { - ProjectState::Enabled(info) => Some(Arc::clone(info)), + ProjectState::Enabled(info) => Some(info), ProjectState::Disabled | ProjectState::Pending => None, } } + + /// Creates `Scoping` for this project if the state is loaded. + /// + /// Returns `Some` if the project state has been fetched and contains a project identifier, + /// otherwise `None`. + pub fn scoping(&self, project_key: ProjectKey) -> Option { + match self { + Self::Enabled(info) => info.scoping(project_key), + _ => None, + } + } } impl From for ProjectState { diff --git a/relay-server/src/services/project/state/info.rs b/relay-server/src/services/project/state/info.rs index 9e97a73aaf..08e62b62fe 100644 --- a/relay-server/src/services/project/state/info.rs +++ b/relay-server/src/services/project/state/info.rs @@ -69,6 +69,20 @@ impl ProjectInfo { self.public_keys.first() } + /// Creates `Scoping` for this project. + /// + /// Returns `Some` if the project contains a project identifier otherwise `None`. + pub fn scoping(&self, project_key: ProjectKey) -> Option { + Some(Scoping { + organization_id: self.organization_id.unwrap_or(0), + project_id: self.project_id?, + project_key, + key_id: self + .get_public_key_config() + .and_then(|config| config.numeric_id), + }) + } + /// Returns the project config. pub fn config(&self) -> &ProjectConfig { &self.config diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 8d506063da..3da280feff 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -4,12 +4,15 @@ use std::sync::Arc; use std::time::Duration; use crate::extractors::RequestMeta; -use crate::metrics::MetricOutcomes; use crate::services::buffer::{EnvelopeBufferError, GuardedEnvelopeBuffer, Peek}; +use crate::services::processor::{ + EncodeMetrics, EnvelopeProcessor, MetricData, ProcessEnvelope, ProcessingGroup, ProjectMetrics, +}; +use chrono::{DateTime, Utc}; use hashbrown::HashSet; use relay_base_schema::project::ProjectKey; use relay_config::{Config, RelayMode}; -use relay_metrics::{Bucket, MetricMeta}; +use relay_metrics::MetricMeta; use relay_quotas::RateLimits; use relay_redis::RedisPool; use relay_statsd::metric; @@ -22,12 +25,7 @@ use tokio::time::Instant; use crate::services::global_config::{self, GlobalConfigManager, Subscribe}; use crate::services::metrics::{Aggregator, FlushBuckets}; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; -use crate::services::processor::{ - EncodeMetrics, EnvelopeProcessor, ProcessEnvelope, ProcessingGroup, ProjectMetrics, -}; -use crate::services::project::{ - CheckedBuckets, Project, ProjectFetchState, ProjectSender, ProjectState, -}; +use crate::services::project::{Project, ProjectFetchState, ProjectSender, ProjectState}; use crate::services::project_local::{LocalProjectSource, LocalProjectSourceService}; #[cfg(feature = "processing")] use crate::services::project_redis::RedisProjectSource; @@ -204,28 +202,23 @@ impl From<&RequestMeta> for BucketSource { } } -/// Add metric buckets to the project. -/// -/// Metric buckets added via the project are filtered and rate limited -/// according to the project state. +/// Starts the processing flow for received metrics. /// -/// Adding buckets directly to the aggregator bypasses all of these checks. +/// Enriches the raw data with projcet information and forwards +/// the metrics using [`ProcessProjectMetrics`](crate::services::processor::ProcessProjectMetrics). #[derive(Debug)] -pub struct AddMetricBuckets { +pub struct ProcessMetrics { + /// A list of metric items. + pub data: MetricData, + /// The target project. pub project_key: ProjectKey, - pub buckets: Vec, + /// Whether to keep or reset the metric metadata. pub source: BucketSource, -} - -impl AddMetricBuckets { - /// Convenience constructor which creates an internal [`AddMetricBuckets`] message. - pub fn internal(project_key: ProjectKey, buckets: Vec) -> Self { - Self { - project_key, - buckets, - source: BucketSource::Internal, - } - } + /// The instant at which the request was received. + pub start_time: Instant, + /// The value of the Envelope's [`sent_at`](crate::envelope::Envelope::sent_at) + /// header for clock drift correction. + pub sent_at: Option>, } /// Add metric metadata to the aggregator. @@ -272,7 +265,7 @@ pub struct RefreshIndexCache(pub HashSet); /// information. /// /// There are also higher-level operations, such as [`CheckEnvelope`] and [`ValidateEnvelope`] that -/// inspect contents of envelopes for ingestion, as well as [`AddMetricBuckets`] to aggregate metrics +/// inspect contents of envelopes for ingestion, as well as [`ProcessMetrics`] to aggregate metrics /// associated with a project. /// /// See the enumerated variants for a full list of available messages for this service. @@ -286,7 +279,7 @@ pub enum ProjectCache { ), ValidateEnvelope(ValidateEnvelope), UpdateRateLimits(UpdateRateLimits), - AddMetricBuckets(AddMetricBuckets), + ProcessMetrics(ProcessMetrics), AddMetricMeta(AddMetricMeta), FlushBuckets(FlushBuckets), UpdateSpoolIndex(UpdateSpoolIndex), @@ -303,7 +296,7 @@ impl ProjectCache { Self::CheckEnvelope(_, _) => "CheckEnvelope", Self::ValidateEnvelope(_) => "ValidateEnvelope", Self::UpdateRateLimits(_) => "UpdateRateLimits", - Self::AddMetricBuckets(_) => "AddMetricBuckets", + Self::ProcessMetrics(_) => "ProcessMetrics", Self::AddMetricMeta(_) => "AddMetricMeta", Self::FlushBuckets(_) => "FlushBuckets", Self::UpdateSpoolIndex(_) => "UpdateSpoolIndex", @@ -382,11 +375,11 @@ impl FromMessage for ProjectCache { } } -impl FromMessage for ProjectCache { +impl FromMessage for ProjectCache { type Response = relay_system::NoResponse; - fn from_message(message: AddMetricBuckets, _: ()) -> Self { - Self::AddMetricBuckets(message) + fn from_message(message: ProcessMetrics, _: ()) -> Self { + Self::ProcessMetrics(message) } } @@ -569,7 +562,6 @@ struct ProjectCacheBroker { // TODO: Make non-optional when spool_v1 is removed. envelope_buffer: Option>, services: Services, - metric_outcomes: MetricOutcomes, // Need hashbrown because extract_if is not stable in std yet. projects: hashbrown::HashMap, /// Utility for disposing of expired project data in a background thread. @@ -947,21 +939,15 @@ impl ProjectCacheBroker { .merge_rate_limits(message.rate_limits); } - fn handle_add_metric_buckets(&mut self, message: AddMetricBuckets) { + fn handle_process_metrics(&mut self, message: ProcessMetrics) { let project_cache = self.services.project_cache.clone(); - let aggregator = self.services.aggregator.clone(); - let metric_outcomes = self.metric_outcomes.clone(); - let outcome_aggregator = self.services.outcome_aggregator.clone(); - let project = self.get_or_create_project(message.project_key); - project.prefetch(project_cache, false); - project.merge_buckets( - &aggregator, - &metric_outcomes, - &outcome_aggregator, - message.buckets, - message.source, - ); + let message = self + .get_or_create_project(message.project_key) + .prefetch(project_cache, false) + .process_metrics(message); + + self.services.envelope_processor.send(message); } fn handle_add_metric_meta(&mut self, message: AddMetricMeta) { @@ -972,8 +958,6 @@ impl ProjectCacheBroker { } fn handle_flush_buckets(&mut self, message: FlushBuckets) { - let metric_outcomes = self.metric_outcomes.clone(); - let outcome_aggregator = self.services.outcome_aggregator.clone(); let aggregator = self.services.aggregator.clone(); let project_cache = self.services.project_cache.clone(); @@ -981,26 +965,36 @@ impl ProjectCacheBroker { let mut scoped_buckets = BTreeMap::new(); for (project_key, buckets) in message.buckets { let project = self.get_or_create_project(project_key); - match project.check_buckets(&metric_outcomes, &outcome_aggregator, buckets) { - CheckedBuckets::NoProject(buckets) => { - no_project += 1; - // Schedule an update for the project just in case. - project.prefetch(project_cache.clone(), false); - project.return_buckets(&aggregator, buckets); - } - CheckedBuckets::Checked { - scoping, - project_info, - buckets, - } => scoped_buckets - .entry(scoping) - .or_insert(ProjectMetrics { + + let Some(project_info) = project.current_state().enabled() else { + no_project += 1; + // Schedule an update for the project just in case. + project.prefetch(project_cache.clone(), false); + project.return_buckets(&aggregator, buckets); + continue; + }; + + let Some(scoping) = project.scoping() else { + relay_log::error!( + tags.project_key = project_key.as_str(), + "there is no scoping: dropping {} buckets", + buckets.len(), + ); + continue; + }; + + use std::collections::btree_map::Entry::*; + match scoped_buckets.entry(scoping) { + Vacant(entry) => { + entry.insert(ProjectMetrics { project_info, - buckets: Vec::new(), - }) - .buckets - .extend(buckets), - CheckedBuckets::Dropped => {} + rate_limits: project.current_rate_limits().clone(), + buckets, + }); + } + Occupied(entry) => { + entry.into_mut().buckets.extend(buckets); + } } } @@ -1251,9 +1245,7 @@ impl ProjectCacheBroker { self.handle_validate_envelope(message) } ProjectCache::UpdateRateLimits(message) => self.handle_rate_limits(message), - ProjectCache::AddMetricBuckets(message) => { - self.handle_add_metric_buckets(message) - } + ProjectCache::ProcessMetrics(message) => self.handle_process_metrics(message), ProjectCache::AddMetricMeta(message) => self.handle_add_metric_meta(message), ProjectCache::FlushBuckets(message) => self.handle_flush_buckets(message), ProjectCache::UpdateSpoolIndex(message) => self.handle_buffer_index(message), @@ -1274,7 +1266,6 @@ pub struct ProjectCacheService { memory_checker: MemoryChecker, envelope_buffer: Option>, services: Services, - metric_outcomes: MetricOutcomes, redis: Option, } @@ -1285,7 +1276,6 @@ impl ProjectCacheService { memory_checker: MemoryChecker, envelope_buffer: Option>, services: Services, - metric_outcomes: MetricOutcomes, redis: Option, ) -> Self { Self { @@ -1293,7 +1283,6 @@ impl ProjectCacheService { memory_checker, envelope_buffer, services, - metric_outcomes, redis, } } @@ -1308,7 +1297,6 @@ impl Service for ProjectCacheService { memory_checker, envelope_buffer, services, - metric_outcomes, redis, } = self; let project_cache = services.project_cache.clone(); @@ -1401,7 +1389,6 @@ impl Service for ProjectCacheService { spool_v1_unspool_handle: SleepHandle::idle(), spool_v1, global_config, - metric_outcomes, }; loop { @@ -1496,9 +1483,6 @@ impl FetchOptionalProjectState { #[cfg(test)] mod tests { - use crate::metrics::MetricStats; - use crate::services::global_config::GlobalConfigHandle; - use relay_dynamic_config::GlobalConfig; use relay_test::mock_service; use tokio::select; use uuid::Uuid; @@ -1567,14 +1551,6 @@ mod tests { } }; - let metric_stats = MetricStats::new( - Arc::new(Config::default()), - GlobalConfigHandle::fixed(GlobalConfig::default()), - Addr::custom().0, - ); - let metric_outcomes = - MetricOutcomes::new(metric_stats, services.outcome_aggregator.clone()); - ( ProjectCacheBroker { config: config.clone(), @@ -1593,7 +1569,6 @@ mod tests { buffer_unspool_backoff: RetryBackoff::new(Duration::from_millis(100)), }), global_config: GlobalConfigStatus::Pending, - metric_outcomes, }, buffer, ) diff --git a/relay-server/src/services/project_local.rs b/relay-server/src/services/project_local.rs index 3d94b9ede5..745203780a 100644 --- a/relay-server/src/services/project_local.rs +++ b/relay-server/src/services/project_local.rs @@ -233,6 +233,7 @@ mod tests { let project_info = extracted_project_state .get(&project_key) .unwrap() + .clone() .enabled() .unwrap(); diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs index 2a9703246f..7658a197c1 100644 --- a/relay-server/src/testutils.rs +++ b/relay-server/src/testutils.rs @@ -119,6 +119,7 @@ pub fn empty_envelope_with_dsn(dsn: &str) -> Box { pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { let (outcome_aggregator, _) = mock_service("outcome_aggregator", (), |&mut (), _| {}); let (project_cache, _) = mock_service("project_cache", (), |&mut (), _| {}); + let (aggregator, _) = mock_service("aggregator", (), |&mut (), _| {}); let (upstream_relay, _) = mock_service("upstream_relay", (), |&mut (), _| {}); let (test_store, _) = mock_service("test_store", (), |&mut (), _| {}); @@ -152,6 +153,7 @@ pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { test_store, #[cfg(feature = "processing")] store_forwarder: None, + aggregator, }, metric_outcomes, )