From 5445632741048c55a50d57a84bcf58222979f038 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Thu, 25 Jul 2024 18:38:17 +0200 Subject: [PATCH] ref(metrics): Remodel metric submission flow --- relay-server/src/endpoints/common.rs | 12 +- relay-server/src/service.rs | 1 + relay-server/src/services/processor.rs | 269 ++++++++++++------ .../{project => processor}/metrics.rs | 30 +- relay-server/src/services/project.rs | 159 ++--------- relay-server/src/services/project/state.rs | 17 +- .../src/services/project/state/info.rs | 14 + relay-server/src/services/project_cache.rs | 90 +++--- relay-server/src/services/project_local.rs | 1 + 9 files changed, 309 insertions(+), 284 deletions(-) rename relay-server/src/services/{project => processor}/metrics.rs (97%) diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 798867f6e7e..88a8c0136cd 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -11,8 +11,10 @@ use serde::Deserialize; use crate::envelope::{AttachmentType, Envelope, EnvelopeError, Item, ItemType, Items}; use crate::service::ServiceState; use crate::services::outcome::{DiscardReason, Outcome}; -use crate::services::processor::{ProcessMetricMeta, ProcessMetrics, ProcessingGroup}; -use crate::services::project_cache::{CheckEnvelope, ValidateEnvelope}; +use crate::services::processor::{ + MetricData, ProcessMetricMeta, ProcessProjectMetrics, ProcessingGroup, +}; +use crate::services::project_cache::{CheckEnvelope, ProcessMetrics, ValidateEnvelope}; use crate::statsd::{RelayCounters, RelayHistograms}; use crate::utils::{self, ApiErrorResponse, FormDataIter, ManagedEnvelope, MultipartError}; @@ -273,9 +275,9 @@ fn queue_envelope( if !metric_items.is_empty() { relay_log::trace!("sending metrics into processing queue"); - state.processor().send(ProcessMetrics { - items: metric_items.into_vec(), - start_time: envelope.meta().start_time(), + state.project_cache().send(ProcessMetrics { + data: MetricData::Raw(metric_items.into_vec()), + start_time: envelope.meta().start_time().into(), sent_at: envelope.sent_at(), project_key: envelope.meta().public_key(), source: envelope.meta().into(), diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 4f40ce73d0a..23dbea73220 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -240,6 +240,7 @@ impl ServiceState { test_store: test_store.clone(), #[cfg(feature = "processing")] store_forwarder: store.clone(), + aggregator: aggregator.clone(), }, metric_outcomes.clone(), ) diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 60593475620..7d62ee9187a 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1,5 +1,5 @@ use std::borrow::Cow; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::error::Error; use std::fmt::{Debug, Display}; use std::future::Future; @@ -66,16 +66,18 @@ use { use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemType}; use crate::extractors::{PartialDsn, RequestMeta}; +use crate::http; use crate::metrics::{MetricOutcomes, MinimalTrackableBucket}; use crate::metrics_extraction::transactions::types::ExtractMetricsError; use crate::metrics_extraction::transactions::{ExtractedMetrics, TransactionExtractor}; use crate::service::ServiceError; use crate::services::global_config::GlobalConfigHandle; +use crate::services::metrics::{Aggregator, MergeBuckets}; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::services::processor::event::FiltersStatus; -use crate::services::project::ProjectInfo; +use crate::services::project::{ProjectInfo, ProjectState}; use crate::services::project_cache::{ - AddMetricBuckets, AddMetricMeta, BucketSource, ProjectCache, UpdateRateLimits, + AddMetricMeta, BucketSource, ProcessMetrics, ProjectCache, UpdateRateLimits, }; use crate::services::test_store::{Capture, TestStore}; use crate::services::upstream::{ @@ -86,11 +88,11 @@ use crate::utils::{ self, InvalidProcessingGroupType, ManagedEnvelope, SamplingResult, ThreadPool, TypedEnvelope, WorkerGroup, }; -use crate::{http, metrics}; mod attachment; mod dynamic_sampling; mod event; +mod metrics; mod profile; mod profile_chunk; mod replay; @@ -908,9 +910,18 @@ pub struct ProcessEnvelope { /// Additionally, processing applies clock drift correction using the system clock of this Relay, if /// the Envelope specifies the [`sent_at`](Envelope::sent_at) header. #[derive(Debug)] -pub struct ProcessMetrics { +pub struct ProcessProjectMetrics { + /// The project state the metrics belong to. + /// + /// The project state can be pending, in which case cached rate limits + /// and other project specific operations are skipped and executed once + /// the project state becomes available. + pub project_state: ProjectState, + /// Currently active cached rate limits for this project. + pub rate_limits: RateLimits, + /// A list of metric items. - pub items: Vec, + pub data: MetricData, /// The target project. pub project_key: ProjectKey, /// Whether to keep or reset the metric metadata. @@ -922,6 +933,15 @@ pub struct ProcessMetrics { pub sent_at: Option>, } +/// Raw unparsed metric data. +#[derive(Debug)] +pub enum MetricData { + /// Raw data, unparsed envelope items. + Raw(Vec), + /// Already parsed buckets but unprocessed. + Parsed(Vec), +} + #[derive(Debug)] pub struct ProcessBatchedMetrics { /// Metrics payload in JSON format. @@ -950,6 +970,8 @@ pub struct ProjectMetrics { pub buckets: Vec, /// Project info for extracting quotas. pub project_info: Arc, + /// Currently cached rate limits. + pub rate_limits: RateLimits, } /// Encodes metrics into an envelope ready to be sent upstream. @@ -990,7 +1012,7 @@ pub struct SubmitClientReports { #[derive(Debug)] pub enum EnvelopeProcessor { ProcessEnvelope(Box), - ProcessMetrics(Box), + ProcessProjectMetrics(Box), ProcessBatchedMetrics(Box), ProcessMetricMeta(Box), EncodeMetrics(Box), @@ -1004,7 +1026,7 @@ impl EnvelopeProcessor { pub fn variant(&self) -> &'static str { match self { EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope", - EnvelopeProcessor::ProcessMetrics(_) => "ProcessMetrics", + EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics", EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics", EnvelopeProcessor::ProcessMetricMeta(_) => "ProcessMetricMeta", EnvelopeProcessor::EncodeMetrics(_) => "EncodeMetrics", @@ -1025,11 +1047,11 @@ impl FromMessage for EnvelopeProcessor { } } -impl FromMessage for EnvelopeProcessor { +impl FromMessage for EnvelopeProcessor { type Response = NoResponse; - fn from_message(message: ProcessMetrics, _: ()) -> Self { - Self::ProcessMetrics(Box::new(message)) + fn from_message(message: ProcessProjectMetrics, _: ()) -> Self { + Self::ProcessProjectMetrics(Box::new(message)) } } @@ -1097,6 +1119,7 @@ pub struct Addrs { pub test_store: Addr, #[cfg(feature = "processing")] pub store_forwarder: Option>, + pub aggregator: Addr, } impl Default for Addrs { @@ -1108,6 +1131,7 @@ impl Default for Addrs { test_store: Addr::dummy(), #[cfg(feature = "processing")] store_forwarder: None, + aggregator: Addr::dummy(), } } } @@ -2058,9 +2082,11 @@ impl EnvelopeProcessorService { } } - fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) { - let ProcessMetrics { - items, + fn handle_process_project_metrics(&self, cogs: &mut Token, message: ProcessProjectMetrics) { + let ProcessProjectMetrics { + project_state, + rate_limits, + data, project_key, start_time, sent_at, @@ -2070,51 +2096,58 @@ impl EnvelopeProcessorService { let received = relay_common::time::instant_to_date_time(start_time); let received_timestamp = UnixTimestamp::from_instant(start_time); - let clock_drift_processor = - ClockDriftProcessor::new(sent_at, received).at_least(MINIMUM_CLOCK_DRIFT); - - let mut buckets = Vec::new(); - for item in items { - let payload = item.payload(); - if item.ty() == &ItemType::Statsd { - for bucket_result in Bucket::parse_all(&payload, received_timestamp) { - match bucket_result { - Ok(bucket) => buckets.push(bucket), - Err(error) => relay_log::debug!( - error = &error as &dyn Error, - "failed to parse metric bucket from statsd format", - ), - } - } - } else if item.ty() == &ItemType::MetricBuckets { - match serde_json::from_slice::>(&payload) { - Ok(parsed_buckets) => { - // Re-use the allocation of `b` if possible. - if buckets.is_empty() { - buckets = parsed_buckets; - } else { - buckets.extend(parsed_buckets); + let mut buckets = match data { + MetricData::Parsed(buckets) => buckets, + MetricData::Raw(items) => { + let mut buckets = Vec::new(); + for item in items { + let payload = item.payload(); + if item.ty() == &ItemType::Statsd { + for bucket_result in Bucket::parse_all(&payload, received_timestamp) { + match bucket_result { + Ok(bucket) => buckets.push(bucket), + Err(error) => relay_log::debug!( + error = &error as &dyn Error, + "failed to parse metric bucket from statsd format", + ), + } } - } - Err(error) => { - relay_log::debug!( - error = &error as &dyn Error, - "failed to parse metric bucket", + } else if item.ty() == &ItemType::MetricBuckets { + match serde_json::from_slice::>(&payload) { + Ok(parsed_buckets) => { + // Re-use the allocation of `b` if possible. + if buckets.is_empty() { + buckets = parsed_buckets; + } else { + buckets.extend(parsed_buckets); + } + } + Err(error) => { + relay_log::debug!( + error = &error as &dyn Error, + "failed to parse metric bucket", + ); + metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1); + } + } + } else { + relay_log::error!( + "invalid item of type {} passed to ProcessMetrics", + item.ty() ); - metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1); } } - } else { - relay_log::error!( - "invalid item of type {} passed to ProcessMetrics", - item.ty() - ); + buckets } - } + }; if buckets.is_empty() { return; }; + cogs.update(relay_metrics::cogs::BySize(&buckets)); + + let clock_drift_processor = + ClockDriftProcessor::new(sent_at, received).at_least(MINIMUM_CLOCK_DRIFT); for bucket in &mut buckets { clock_drift_processor.process_timestamp(&mut bucket.timestamp); @@ -2123,14 +2156,22 @@ impl EnvelopeProcessorService { } } - cogs.update(relay_metrics::cogs::BySize(&buckets)); + let buckets = self::metrics::filter_namespaces(buckets, source); - relay_log::trace!("merging metric buckets into project cache"); - self.inner.addrs.project_cache.send(AddMetricBuckets { - project_key, - buckets, - source, - }); + // Best effort check to filter and rate limit buckets, if there is no project state + // available at the current time, we will check again after flushing. + let buckets = match project_state.enabled() { + Some(project_info) => { + self.check_buckets(project_key, &project_info, &rate_limits, buckets) + } + None => buckets, + }; + + relay_log::trace!("merging metric buckets into the aggregator"); + self.inner + .addrs + .aggregator + .send(MergeBuckets::new(project_key, buckets)); } fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) { @@ -2141,12 +2182,6 @@ impl EnvelopeProcessorService { sent_at, } = message; - let received = relay_common::time::instant_to_date_time(start_time); - let received_timestamp = UnixTimestamp::from_instant(start_time); - - let clock_drift_processor = - ClockDriftProcessor::new(sent_at, received).at_least(MINIMUM_CLOCK_DRIFT); - #[derive(serde::Deserialize)] struct Wrapper { buckets: HashMap>, @@ -2165,25 +2200,15 @@ impl EnvelopeProcessorService { }; let mut feature_weights = FeatureWeights::none(); - for (project_key, mut buckets) in buckets { - if buckets.is_empty() { - continue; - } - - for bucket in &mut buckets { - clock_drift_processor.process_timestamp(&mut bucket.timestamp); - if !matches!(source, BucketSource::Internal) { - bucket.metadata = BucketMetadata::new(received_timestamp); - } - } - + for (project_key, buckets) in buckets { feature_weights = feature_weights.merge(relay_metrics::cogs::BySize(&buckets).into()); - relay_log::trace!("merging metric buckets into project cache"); - self.inner.addrs.project_cache.send(AddMetricBuckets { + self.inner.addrs.project_cache.send(ProcessMetrics { + data: MetricData::Parsed(buckets), project_key, - buckets, source, + start_time: start_time.into(), + sent_at, }); } @@ -2309,6 +2334,69 @@ impl EnvelopeProcessorService { }); } + fn check_buckets( + &self, + project_key: ProjectKey, + project_info: &ProjectInfo, + rate_limits: &RateLimits, + buckets: Vec, + ) -> Vec { + let Some(scoping) = project_info.scoping(project_key) else { + relay_log::error!( + tags.project_key = project_key.as_str(), + "there is no scoping: dropping {} buckets", + buckets.len(), + ); + return Vec::new(); + }; + + let mut buckets = self::metrics::apply_project_info( + buckets, + &self.inner.metric_outcomes, + project_info, + scoping, + ); + + let namespaces: BTreeSet = buckets + .iter() + .filter_map(|bucket| bucket.name.try_namespace()) + .collect(); + + for namespace in namespaces { + let limits = rate_limits.check_with_quotas( + project_info.get_quotas(), + scoping.item(DataCategory::MetricBucket), + ); + + if limits.is_limited() { + let rejected; + (buckets, rejected) = utils::split_off(buckets, |bucket| { + bucket.name.try_namespace() == Some(namespace) + }); + + let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone()); + self.inner.metric_outcomes.track( + scoping, + &rejected, + Outcome::RateLimited(reason_code), + ); + } + } + + let quotas = project_info.config.quotas.clone(); + match MetricsLimiter::create(buckets, quotas, scoping) { + Ok(mut bucket_limiter) => { + bucket_limiter.enforce_limits( + &rate_limits, + &self.inner.metric_outcomes, + &self.inner.addrs.outcome_aggregator, + ); + bucket_limiter.into_buckets() + } + Err(buckets) => buckets, + } + } + #[cfg(feature = "processing")] fn rate_limit_buckets( &self, @@ -2529,6 +2617,7 @@ impl EnvelopeProcessorService { let ProjectMetrics { buckets, project_info, + rate_limits: _, } = message; let buckets = self.rate_limit_buckets(scoping, &project_info, buckets); @@ -2589,7 +2678,7 @@ impl EnvelopeProcessorService { let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone())); let mut item = Item::new(ItemType::MetricBuckets); - item.set_source_quantities(metrics::extract_quantities(batch)); + item.set_source_quantities(crate::metrics::extract_quantities(batch)); item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap()); envelope.add_item(item); @@ -2694,7 +2783,17 @@ impl EnvelopeProcessorService { self.send_global_partition(partition_key, &mut partition); } - fn handle_encode_metrics(&self, message: EncodeMetrics) { + fn handle_encode_metrics(&self, mut message: EncodeMetrics) { + for (scoping, pm) in message.scopes.iter_mut() { + let buckets = std::mem::take(&mut pm.buckets); + pm.buckets = self.check_buckets( + scoping.project_key, + &pm.project_info, + &pm.rate_limits, + buckets, + ); + } + #[cfg(feature = "processing")] if self.inner.config.processing_enabled() { if let Some(ref store_forwarder) = self.inner.addrs.store_forwarder { @@ -2771,7 +2870,9 @@ impl EnvelopeProcessorService { match message { EnvelopeProcessor::ProcessEnvelope(m) => self.handle_process_envelope(*m), - EnvelopeProcessor::ProcessMetrics(m) => self.handle_process_metrics(&mut cogs, *m), + EnvelopeProcessor::ProcessProjectMetrics(m) => { + self.handle_process_project_metrics(&mut cogs, *m) + } EnvelopeProcessor::ProcessBatchedMetrics(m) => { self.handle_process_batched_metrics(&mut cogs, *m) } @@ -2787,7 +2888,7 @@ impl EnvelopeProcessorService { fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights { match message { EnvelopeProcessor::ProcessEnvelope(v) => AppFeature::from(v.envelope.group()).into(), - EnvelopeProcessor::ProcessMetrics(_) => AppFeature::Unattributed.into(), + EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(), EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(), EnvelopeProcessor::ProcessMetricMeta(_) => AppFeature::MetricMeta.into(), EnvelopeProcessor::EncodeMetrics(v) => v @@ -3648,14 +3749,14 @@ mod tests { ), (BucketSource::Internal, None), ] { - let message = ProcessMetrics { + let message = ProcessProjectMetrics { items: vec![item.clone()], project_key, source, start_time, sent_at: Some(Utc::now()), }; - processor.handle_process_metrics(&mut token, message); + processor.handle_process_project_metrics(&mut token, message); let value = project_cache_rx.recv().await.unwrap(); let ProjectCache::AddMetricBuckets(merge_buckets) = value else { diff --git a/relay-server/src/services/project/metrics.rs b/relay-server/src/services/processor/metrics.rs similarity index 97% rename from relay-server/src/services/project/metrics.rs rename to relay-server/src/services/processor/metrics.rs index 4fa31ab4a6a..9195f8e9a96 100644 --- a/relay-server/src/services/project/metrics.rs +++ b/relay-server/src/services/processor/metrics.rs @@ -111,6 +111,17 @@ mod tests { use super::*; + fn create_custom_bucket_with_name(name: String) -> Bucket { + Bucket { + name: format!("d:custom/{name}@byte").into(), + value: BucketValue::Counter(1.into()), + timestamp: UnixTimestamp::now(), + tags: Default::default(), + width: 10, + metadata: Default::default(), + } + } + fn get_test_bucket(name: &str, tags: BTreeMap) -> Bucket { let json = serde_json::json!({ "timestamp": 1615889440, @@ -149,24 +160,13 @@ mod tests { assert_eq!(bucket.tags.len(), 1); } - fn create_custom_bucket_with_name(name: String) -> Bucket { - Bucket { - name: format!("d:custom/{name}@byte").into(), - value: BucketValue::Counter(1.into()), - timestamp: UnixTimestamp::now(), - tags: Default::default(), - width: 10, - metadata: Default::default(), - } - } - #[test] - fn test_apply_project_state() { + fn test_apply_project_info() { let (outcome_aggregator, _) = Addr::custom(); let (metric_stats, mut metric_stats_rx) = MetricStats::test(); let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator); - let project_state = ProjectInfo { + let project_info = ProjectInfo { config: serde_json::from_value(serde_json::json!({ "metrics": { "deniedNames": ["*cpu_time*"] }, "features": ["organizations:custom-metrics"] @@ -182,7 +182,7 @@ mod tests { let buckets = apply_project_info( buckets, &metric_outcomes, - &project_state, + &project_info, Scoping { organization_id: 42, project_id: ProjectId::new(43), @@ -207,7 +207,7 @@ mod tests { } #[test] - fn test_apply_project_state_with_disabled_custom_namespace() { + fn test_apply_project_info_with_disabled_custom_namespace() { let (outcome_aggregator, _) = Addr::custom(); let (metric_stats, mut metric_stats_rx) = MetricStats::test(); let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator); diff --git a/relay-server/src/services/project.rs b/relay-server/src/services/project.rs index c33651e826e..7bfe8bf340f 100644 --- a/relay-server/src/services/project.rs +++ b/relay-server/src/services/project.rs @@ -1,4 +1,3 @@ -use std::collections::BTreeSet; use std::sync::Arc; use std::time::Duration; @@ -14,19 +13,19 @@ use serde::{Deserialize, Serialize}; use tokio::time::Instant; use crate::envelope::ItemType; -use crate::metrics::{MetricOutcomes, MetricsLimiter}; +use crate::metrics::MetricOutcomes; use crate::services::metrics::{Aggregator, MergeBuckets}; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; -use crate::services::processor::{EncodeMetricMeta, EnvelopeProcessor}; -use crate::services::project::metrics::{apply_project_info, filter_namespaces}; +use crate::services::processor::{EncodeMetricMeta, EnvelopeProcessor, ProcessProjectMetrics}; use crate::services::project::state::ExpiryState; -use crate::services::project_cache::{BucketSource, CheckedEnvelope, ProjectCache, RequestUpdate}; +use crate::services::project_cache::{ + BucketSource, CheckedEnvelope, ProcessMetrics, ProjectCache, RequestUpdate, +}; use crate::utils::{Enforcement, SeqCount}; use crate::statsd::RelayCounters; -use crate::utils::{self, EnvelopeLimiter, ManagedEnvelope, RetryBackoff}; +use crate::utils::{EnvelopeLimiter, ManagedEnvelope, RetryBackoff}; -mod metrics; pub mod state; pub use state::{ @@ -119,7 +118,8 @@ impl Project { self.reservoir_counters.clone() } - fn current_state(&self) -> ProjectState { + /// Returns the current [`ProjectState`] attached to the project. + pub fn current_state(&self) -> ProjectState { self.state.current_state(&self.config) } @@ -160,30 +160,21 @@ impl Project { self.last_updated_at = Instant::now(); } - /// Validates and inserts given [buckets](Bucket) into the metrics aggregator. - /// - /// The buckets will be keyed underneath this project key. - pub fn merge_buckets( - &mut self, - aggregator: &Addr, - metric_outcomes: &MetricOutcomes, - outcome_aggregator: &Addr, - buckets: Vec, - source: BucketSource, - ) { - // Best effort check for rate limits and project state. Continue if there is no project state. - let buckets = match self.check_buckets(metric_outcomes, outcome_aggregator, buckets) { - CheckedBuckets::NoProject(buckets) => buckets, - CheckedBuckets::Checked { buckets, .. } => buckets, - CheckedBuckets::Dropped => return, - }; + /// Collects internal project state and assembles a [`ProcessProjectMetrics`] message. + pub fn process_metrics(&self, message: ProcessMetrics) -> ProcessProjectMetrics { + let project_state = self.current_state(); + let rate_limits = self.rate_limits.current_limits().clone(); - let buckets = filter_namespaces(buckets, source); + ProcessProjectMetrics { + project_state, + rate_limits, - aggregator.send(MergeBuckets::new( - self.project_key, - buckets.into_iter().collect(), - )); + data: message.data, + project_key: message.project_key, + source: message.source, + start_time: message.start_time, + sent_at: message.sent_at, + } } /// Returns a list of buckets back to the aggregator. @@ -419,7 +410,7 @@ impl Project { /// point. Therefore, this method is useful to trigger an update early if it is already clear /// that the project state will be needed soon. To retrieve an updated state, use /// [`Project::get_state`] instead. - pub fn prefetch(&mut self, project_cache: Addr, no_cache: bool) { + pub fn prefetch(&mut self, project_cache: Addr, no_cache: bool) -> &mut Self { self.get_cached_state(project_cache, no_cache); } @@ -501,15 +492,7 @@ impl Project { /// Returns `Some` if the project state has been fetched and contains a project identifier, /// otherwise `None`. pub fn scoping(&self) -> Option { - let info = self.current_state().enabled()?; - Some(Scoping { - organization_id: info.organization_id.unwrap_or(0), - project_id: info.project_id?, - project_key: self.project_key, - key_id: info - .get_public_key_config() - .and_then(|config| config.numeric_id), - }) + self.current_state().scoping(self.project_key) } /// Runs the checks on incoming envelopes. @@ -595,78 +578,6 @@ impl Project { rate_limits, }) } - - /// Drops metrics buckets if they are not allowed for this project. - /// - /// Reasons for dropping can be rate limits or a disabled project. - pub fn check_buckets( - &mut self, - metric_outcomes: &MetricOutcomes, - outcome_aggregator: &Addr, - buckets: Vec, - ) -> CheckedBuckets { - let project_info = match self.current_state() { - ProjectState::Enabled(info) => info.clone(), - ProjectState::Disabled => { - relay_log::debug!("dropping {} buckets for disabled project", buckets.len()); - return CheckedBuckets::Dropped; - } - ProjectState::Pending => return CheckedBuckets::NoProject(buckets), - }; - - let Some(scoping) = self.scoping() else { - relay_log::error!( - tags.project_key = self.project_key.as_str(), - "there is no scoping: dropping {} buckets", - buckets.len(), - ); - return CheckedBuckets::Dropped; - }; - - let mut buckets = apply_project_info(buckets, metric_outcomes, &project_info, scoping); - - let namespaces: BTreeSet = buckets - .iter() - .filter_map(|bucket| bucket.name.try_namespace()) - .collect(); - - let current_limits = self.rate_limits.current_limits(); - for namespace in namespaces { - let limits = current_limits.check_with_quotas( - project_info.get_quotas(), - scoping.item(DataCategory::MetricBucket), - ); - - if limits.is_limited() { - let rejected; - (buckets, rejected) = utils::split_off(buckets, |bucket| { - bucket.name.try_namespace() == Some(namespace) - }); - - let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone()); - metric_outcomes.track(scoping, &rejected, Outcome::RateLimited(reason_code)); - } - } - - let quotas = project_info.config.quotas.clone(); - let buckets = match MetricsLimiter::create(buckets, quotas, scoping) { - Ok(mut bucket_limiter) => { - bucket_limiter.enforce_limits(current_limits, metric_outcomes, outcome_aggregator); - bucket_limiter.into_buckets() - } - Err(buckets) => buckets, - }; - - if buckets.is_empty() { - return CheckedBuckets::Dropped; - } - - CheckedBuckets::Checked { - scoping, - project_info, - buckets, - } - } } /// Adds category limits for the nested spans inside a transaction. @@ -712,30 +623,6 @@ fn count_nested_spans(envelope: &ManagedEnvelope) -> usize { .map_or(0, |event| event.spans.0 + 1) } -/// Return value of [`Project::check_buckets`]. -#[derive(Debug)] -pub enum CheckedBuckets { - /// There is no project state available for these metrics yet. - /// - /// The metrics should be returned to the aggregator until the project state becomes available. - NoProject(Vec), - /// The buckets have been validated and can be processed. - Checked { - /// Project scoping. - scoping: Scoping, - /// Project info. - project_info: Arc, - /// List of buckets. - buckets: Vec, - }, - /// All buckets have been dropped. - /// - /// Can happen for multiple reasons: - /// - The project is disabled or not valid. - /// - All metrics have been filtered. - Dropped, -} - #[cfg(test)] mod tests { use crate::envelope::{ContentType, Envelope, Item}; diff --git a/relay-server/src/services/project/state.rs b/relay-server/src/services/project/state.rs index 4788f51e10c..4a5606fd604 100644 --- a/relay-server/src/services/project/state.rs +++ b/relay-server/src/services/project/state.rs @@ -6,6 +6,8 @@ mod info; pub use fetch_state::{ExpiryState, ProjectFetchState}; pub use info::{LimitedProjectInfo, ProjectInfo}; +use relay_base_schema::project::ProjectKey; +use relay_quotas::Scoping; use serde::{Deserialize, Serialize}; /// Representation of a project's current state. @@ -42,12 +44,23 @@ impl ProjectState { } /// Utility function that returns the project config if enabled. - pub fn enabled(&self) -> Option> { + pub fn enabled(self) -> Option> { match self { - ProjectState::Enabled(info) => Some(Arc::clone(info)), + ProjectState::Enabled(info) => Some(info), ProjectState::Disabled | ProjectState::Pending => None, } } + + /// Creates `Scoping` for this project if the state is loaded. + /// + /// Returns `Some` if the project state has been fetched and contains a project identifier, + /// otherwise `None`. + pub fn scoping(&self, project_key: ProjectKey) -> Option { + match self { + Self::Enabled(info) => info.scoping(project_key), + _ => None, + } + } } impl From for ProjectState { diff --git a/relay-server/src/services/project/state/info.rs b/relay-server/src/services/project/state/info.rs index 0f21d2e015f..ff54d791e53 100644 --- a/relay-server/src/services/project/state/info.rs +++ b/relay-server/src/services/project/state/info.rs @@ -68,6 +68,20 @@ impl ProjectInfo { self.public_keys.first() } + /// Creates `Scoping` for this project. + /// + /// Returns `Some` if the project contains a project identifier otherwise `None`. + pub fn scoping(&self, project_key: ProjectKey) -> Option { + Some(Scoping { + organization_id: self.organization_id.unwrap_or(0), + project_id: self.project_id?, + project_key, + key_id: self + .get_public_key_config() + .and_then(|config| config.numeric_id), + }) + } + /// Returns the project config. pub fn config(&self) -> &ProjectConfig { &self.config diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index dd7440deced..3b5f97028de 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -3,8 +3,11 @@ use std::error::Error; use std::sync::Arc; use std::time::Duration; +use crate::envelope::Item; use crate::extractors::RequestMeta; use crate::metrics::MetricOutcomes; +use crate::services::project_cache; +use chrono::{DateTime, Utc}; use hashbrown::HashSet; use relay_base_schema::project::ProjectKey; use relay_config::{Config, RelayMode}; @@ -22,7 +25,7 @@ use crate::services::global_config::{self, GlobalConfigManager, Subscribe}; use crate::services::metrics::{Aggregator, FlushBuckets}; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::services::processor::{ - EncodeMetrics, EnvelopeProcessor, ProcessEnvelope, ProjectMetrics, + EncodeMetrics, EnvelopeProcessor, MetricData, ProcessEnvelope, ProjectMetrics, }; use crate::services::project::{ CheckedBuckets, Project, ProjectFetchState, ProjectSender, ProjectState, @@ -203,28 +206,30 @@ impl From<&RequestMeta> for BucketSource { } } -/// Add metric buckets to the project. +/// Parses a list of metrics or metric buckets and pushes them to the project's aggregator. /// -/// Metric buckets added via the project are filtered and rate limited -/// according to the project state. +/// This parses and validates the metrics: +/// - For [`Metrics`](ItemType::Statsd), each metric is parsed separately, and invalid metrics are +/// ignored independently. +/// - For [`MetricBuckets`](ItemType::MetricBuckets), the entire list of buckets is parsed and +/// dropped together on parsing failure. +/// - Other items will be ignored with an error message. /// -/// Adding buckets directly to the aggregator bypasses all of these checks. +/// Additionally, processing applies clock drift correction using the system clock of this Relay, if +/// the Envelope specifies the [`sent_at`](Envelope::sent_at) header. #[derive(Debug)] -pub struct AddMetricBuckets { +pub struct ProcessMetrics { + /// A list of metric items. + pub data: MetricData, + /// The target project. pub project_key: ProjectKey, - pub buckets: Vec, + /// Whether to keep or reset the metric metadata. pub source: BucketSource, -} - -impl AddMetricBuckets { - /// Convenience constructor which creates an internal [`AddMetricBuckets`] message. - pub fn internal(project_key: ProjectKey, buckets: Vec) -> Self { - Self { - project_key, - buckets, - source: BucketSource::Internal, - } - } + /// The instant at which the request was received. + pub start_time: Instant, + /// The value of the Envelope's [`sent_at`](Envelope::sent_at) header for clock drift + /// correction. + pub sent_at: Option>, } /// Add metric metadata to the aggregator. @@ -271,7 +276,7 @@ pub struct RefreshIndexCache(pub HashSet); /// information. /// /// There are also higher-level operations, such as [`CheckEnvelope`] and [`ValidateEnvelope`] that -/// inspect contents of envelopes for ingestion, as well as [`AddMetricBuckets`] to aggregate metrics +/// inspect contents of envelopes for ingestion, as well as [`ProcessMetrics`] to aggregate metrics /// associated with a project. /// /// See the enumerated variants for a full list of available messages for this service. @@ -285,7 +290,7 @@ pub enum ProjectCache { ), ValidateEnvelope(ValidateEnvelope), UpdateRateLimits(UpdateRateLimits), - AddMetricBuckets(AddMetricBuckets), + ProcessMetrics(ProcessMetrics), AddMetricMeta(AddMetricMeta), FlushBuckets(FlushBuckets), UpdateSpoolIndex(UpdateSpoolIndex), @@ -302,7 +307,7 @@ impl ProjectCache { Self::CheckEnvelope(_, _) => "CheckEnvelope", Self::ValidateEnvelope(_) => "ValidateEnvelope", Self::UpdateRateLimits(_) => "UpdateRateLimits", - Self::AddMetricBuckets(_) => "AddMetricBuckets", + Self::ProcessMetrics(_) => "ProcessMetrics", Self::AddMetricMeta(_) => "AddMetricMeta", Self::FlushBuckets(_) => "FlushBuckets", Self::UpdateSpoolIndex(_) => "UpdateSpoolIndex", @@ -381,11 +386,11 @@ impl FromMessage for ProjectCache { } } -impl FromMessage for ProjectCache { +impl FromMessage for ProjectCache { type Response = relay_system::NoResponse; - fn from_message(message: AddMetricBuckets, _: ()) -> Self { - Self::AddMetricBuckets(message) + fn from_message(message: ProcessMetrics, _: ()) -> Self { + Self::ProcessMetrics(message) } } @@ -920,21 +925,15 @@ impl ProjectCacheBroker { .merge_rate_limits(message.rate_limits); } - fn handle_add_metric_buckets(&mut self, message: AddMetricBuckets) { + fn handle_process_metrics(&mut self, message: ProcessMetrics) { let project_cache = self.services.project_cache.clone(); - let aggregator = self.services.aggregator.clone(); - let metric_outcomes = self.metric_outcomes.clone(); - let outcome_aggregator = self.services.outcome_aggregator.clone(); - let project = self.get_or_create_project(message.project_key); - project.prefetch(project_cache, false); - project.merge_buckets( - &aggregator, - &metric_outcomes, - &outcome_aggregator, - message.buckets, - message.source, - ); + let message = self + .get_or_create_project(message.project_key) + .prefetch(project_cache, false) + .process_metrics(message); + + self.services.envelope_processor.send(message); } fn handle_add_metric_meta(&mut self, message: AddMetricMeta) { @@ -954,13 +953,22 @@ impl ProjectCacheBroker { let mut scoped_buckets = BTreeMap::new(); for (project_key, buckets) in message.buckets { let project = self.get_or_create_project(project_key); - match project.check_buckets(&metric_outcomes, &outcome_aggregator, buckets) { - CheckedBuckets::NoProject(buckets) => { + + let state = project.current_state(); + let scoping = project.scoping(); + + match (state.enabled(), scoping) { + (Some(project_info), Some(scoping)) => {} + (None, _) => { no_project += 1; // Schedule an update for the project just in case. project.prefetch(project_cache.clone(), false); project.return_buckets(&aggregator, buckets); } + } + + match project.check_buckets(&metric_outcomes, &outcome_aggregator, buckets) { + CheckedBuckets::NoProject(buckets) => {} CheckedBuckets::Checked { scoping, project_info, @@ -1109,9 +1117,7 @@ impl ProjectCacheBroker { self.handle_validate_envelope(message) } ProjectCache::UpdateRateLimits(message) => self.handle_rate_limits(message), - ProjectCache::AddMetricBuckets(message) => { - self.handle_add_metric_buckets(message) - } + ProjectCache::ProcessMetrics(message) => self.handle_process_metrics(message), ProjectCache::AddMetricMeta(message) => self.handle_add_metric_meta(message), ProjectCache::FlushBuckets(message) => self.handle_flush_buckets(message), ProjectCache::UpdateSpoolIndex(message) => self.handle_buffer_index(message), diff --git a/relay-server/src/services/project_local.rs b/relay-server/src/services/project_local.rs index 3d94b9ede5b..745203780a9 100644 --- a/relay-server/src/services/project_local.rs +++ b/relay-server/src/services/project_local.rs @@ -233,6 +233,7 @@ mod tests { let project_info = extracted_project_state .get(&project_key) .unwrap() + .clone() .enabled() .unwrap();