From 83530646a2ab509eca6254c67bb6effee90ad1ef Mon Sep 17 00:00:00 2001 From: Tor Date: Wed, 6 Dec 2023 15:07:07 +0100 Subject: [PATCH] feat(server): Org rate limit per metric bucket (#2758) part of: https://github.com/getsentry/relay/issues/2716 in order to protect our kafka metric consumers, we want to have a way of rate limiting based on the amount of buckets, as that's what decides the load placed on our kafka topics. We are starting out with just the org throughput limits but will be expanded upon further as outlined in the linked epic. --- CHANGELOG.md | 1 + py/sentry_relay/consts.py | 1 + relay-base-schema/src/data_category.rs | 4 + relay-cabi/include/relay.h | 28 +-- relay-quotas/src/quota.rs | 11 +- relay-server/src/actors/processor.rs | 205 ++++++++++++++++-- relay-server/src/actors/project.rs | 2 + relay-server/src/envelope.rs | 8 + relay-server/src/utils/metrics_rate_limits.rs | 2 +- tests/integration/test_store.py | 75 +++++++ 10 files changed, 295 insertions(+), 42 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 002ad8e293..b7b354a6cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ - Temporarily add metric summaries on spans and top-level transaction events to link DDM with performance monitoring. ([#2757](https://github.com/getsentry/relay/pull/2757)) - Add size limits on metric related envelope items. ([#2800](https://github.com/getsentry/relay/pull/2800)) - Include the size offending item in the size limit error message. ([#2801](https://github.com/getsentry/relay/pull/2801)) +- Org rate limit metrics per bucket. ([#2758](https://github.com/getsentry/relay/pull/2758)) ## 23.11.2 diff --git a/py/sentry_relay/consts.py b/py/sentry_relay/consts.py index 4e962a4aea..b9e10f1680 100644 --- a/py/sentry_relay/consts.py +++ b/py/sentry_relay/consts.py @@ -25,6 +25,7 @@ class DataCategory(IntEnum): SPAN = 12 MONITOR_SEAT = 13 USER_REPORT_V2 = 14 + METRIC_BUCKET = 15 UNKNOWN = -1 # end generated diff --git a/relay-base-schema/src/data_category.rs b/relay-base-schema/src/data_category.rs index ed29d684dd..6af6998945 100644 --- a/relay-base-schema/src/data_category.rs +++ b/relay-base-schema/src/data_category.rs @@ -63,6 +63,8 @@ pub enum DataCategory { /// Currently standardized on name UserReportV2 to avoid clashing with the old UserReport. /// TODO(jferg): Rename this to UserFeedback once old UserReport is deprecated. UserReportV2 = 14, + /// Metric bucket. + MetricBucket = 15, // // IMPORTANT: After adding a new entry to DataCategory, go to the `relay-cabi` subfolder and run // `make header` to regenerate the C-binding. This allows using the data category from Python. @@ -93,6 +95,7 @@ impl DataCategory { "span" => Self::Span, "monitor_seat" => Self::MonitorSeat, "feedback" => Self::UserReportV2, + "metric_bucket" => Self::MetricBucket, _ => Self::Unknown, } } @@ -116,6 +119,7 @@ impl DataCategory { Self::Span => "span", Self::MonitorSeat => "monitor_seat", Self::UserReportV2 => "feedback", + Self::MetricBucket => "metric_bucket", Self::Unknown => "unknown", } } diff --git a/relay-cabi/include/relay.h b/relay-cabi/include/relay.h index ba149a5e09..e15efe0af9 100644 --- a/relay-cabi/include/relay.h +++ b/relay-cabi/include/relay.h @@ -15,8 +15,7 @@ /** * Classifies the type of data that is being ingested. */ -enum RelayDataCategory -{ +enum RelayDataCategory { /** * Reserved and unused. */ @@ -97,6 +96,10 @@ enum RelayDataCategory * TODO(jferg): Rename this to UserFeedback once old UserReport is deprecated. */ RELAY_DATA_CATEGORY_USER_REPORT_V2 = 14, + /** + * Metric bucket. + */ + RELAY_DATA_CATEGORY_METRIC_BUCKET = 15, /** * Any other data category not known by this Relay. */ @@ -107,8 +110,7 @@ typedef int8_t RelayDataCategory; /** * Controls the globbing behaviors. */ -enum GlobFlags -{ +enum GlobFlags { /** * When enabled `**` matches over path separators and `*` does not. */ @@ -131,8 +133,7 @@ typedef uint32_t GlobFlags; /** * Represents all possible error codes. */ -enum RelayErrorCode -{ +enum RelayErrorCode { RELAY_ERROR_CODE_NO_ERROR = 0, RELAY_ERROR_CODE_PANIC = 1, RELAY_ERROR_CODE_UNKNOWN = 2, @@ -157,8 +158,7 @@ typedef uint32_t RelayErrorCode; * Values from * Mapping to HTTP from */ -enum RelaySpanStatus -{ +enum RelaySpanStatus { /** * The operation completed successfully. * @@ -283,8 +283,7 @@ typedef struct RelayStoreNormalizer RelayStoreNormalizer; * - When obtained as instance through return values, always free the string. * - When obtained as pointer through field access, never free the string. */ -typedef struct RelayStr -{ +typedef struct RelayStr { /** * Pointer to the UTF-8 encoded string data. */ @@ -308,8 +307,7 @@ typedef struct RelayStr * - When obtained as instance through return values, always free the buffer. * - When obtained as pointer through field access, never free the buffer. */ -typedef struct RelayBuf -{ +typedef struct RelayBuf { /** * Pointer to the raw data. */ @@ -327,8 +325,7 @@ typedef struct RelayBuf /** * Represents a key pair from key generation. */ -typedef struct RelayKeyPair -{ +typedef struct RelayKeyPair { /** * The public key used for verifying Relay signatures. */ @@ -342,8 +339,7 @@ typedef struct RelayKeyPair /** * A 16-byte UUID. */ -typedef struct RelayUuid -{ +typedef struct RelayUuid { /** * UUID bytes in network byte order (big endian). */ diff --git a/relay-quotas/src/quota.rs b/relay-quotas/src/quota.rs index 821b3ef79e..d587c8ca85 100644 --- a/relay-quotas/src/quota.rs +++ b/relay-quotas/src/quota.rs @@ -111,6 +111,7 @@ impl CategoryUnit { | DataCategory::Span | DataCategory::MonitorSeat | DataCategory::Monitor + | DataCategory::MetricBucket | DataCategory::UserReportV2 => Some(Self::Count), DataCategory::Attachment => Some(Self::Bytes), DataCategory::Session => Some(Self::Batched), @@ -295,16 +296,14 @@ impl Quota { // Check for a scope identifier constraint. If there is no constraint, this means that the // quota matches any scope. In case the scope is unknown, it will be coerced to the most // specific scope later. - let scope_id = match self.scope_id { - Some(ref scope_id) => scope_id, - None => return true, + let Some(scope_id) = self.scope_id.as_ref() else { + return true; }; // Check if the scope identifier in the quota is parseable. If not, this means we cannot // fulfill the constraint, so the quota does not match. - let parsed = match scope_id.parse::() { - Ok(parsed) => parsed, - Err(_) => return false, + let Ok(parsed) = scope_id.parse::() else { + return false; }; // At this stage, require that the scope is known since we have to fulfill the constraint. diff --git a/relay-server/src/actors/processor.rs b/relay-server/src/actors/processor.rs index f07ba8c849..08a30c3b54 100644 --- a/relay-server/src/actors/processor.rs +++ b/relay-server/src/actors/processor.rs @@ -27,8 +27,7 @@ use relay_metrics::{Bucket, BucketsView, MergeBuckets, MetricMeta, MetricNamespa use relay_pii::PiiConfigError; use relay_profiling::ProfileId; use relay_protocol::{Annotated, Value}; - -use relay_quotas::{DataCategory, Scoping}; +use relay_quotas::{DataCategory, RateLimits, Scoping}; use relay_sampling::evaluation::{MatchedRuleIds, ReservoirCounters, ReservoirEvaluator}; use relay_statsd::metric; use relay_system::{Addr, FromMessage, NoResponse, Service}; @@ -39,8 +38,9 @@ use { crate::actors::project_cache::UpdateRateLimits, crate::utils::{EnvelopeLimiter, ItemAction, MetricsLimiter}, relay_metrics::{Aggregator, RedisMetricMetaStore}, - relay_quotas::{RateLimitingError, RedisRateLimiter}, + relay_quotas::{ItemScoping, RateLimitingError, ReasonCode, RedisRateLimiter}, relay_redis::RedisPool, + std::collections::BTreeMap, symbolic_unreal::{Unreal4Error, Unreal4ErrorKind}, }; @@ -233,6 +233,23 @@ impl ExtractedMetrics { } } +fn source_quantities_from_buckets( + buckets: &BucketsView, + extraction_mode: ExtractionMode, +) -> SourceQuantities { + buckets + .iter() + .filter_map(|bucket| extract_transaction_count(&bucket, extraction_mode)) + .fold(SourceQuantities::default(), |acc, c| { + let profile_count = if c.has_profile { c.count } else { 0 }; + + SourceQuantities { + transactions: acc.transactions + c.count, + profiles: acc.profiles + profile_count, + } + }) +} + /// A state container for envelope processing. #[derive(Debug)] struct ProcessEnvelopeState<'a> { @@ -438,6 +455,10 @@ pub struct EncodeMetrics { pub scoping: Scoping, /// Transaction metrics extraction mode. pub extraction_mode: ExtractionMode, + /// Project state for extracting quotas. + pub project_state: Arc, + /// The ratelimits belonging to the project. + pub rate_limits: RateLimits, } /// Encodes metric meta into an envelope and sends it upstream. @@ -1172,8 +1193,6 @@ impl EnvelopeProcessorService { /// Check and apply rate limits to metrics buckets. #[cfg(feature = "processing")] fn handle_rate_limit_buckets(&self, message: RateLimitBuckets) { - use relay_quotas::ItemScoping; - let RateLimitBuckets { mut bucket_limiter } = message; let scoping = *bucket_limiter.scoping(); @@ -1261,11 +1280,158 @@ impl EnvelopeProcessorService { } } + /// Records the outcomes of the dropped buckets. + #[cfg(feature = "processing")] + fn drop_buckets_with_outcomes( + &self, + reason_code: Option, + total_buckets: usize, + scoping: Scoping, + bucket_partitions: &BTreeMap, Vec>, + mode: ExtractionMode, + ) { + let mut source_quantities = SourceQuantities::default(); + + for buckets in bucket_partitions.values() { + source_quantities += source_quantities_from_buckets(&BucketsView::new(buckets), mode); + } + + let timestamp = Utc::now(); + + if source_quantities.transactions > 0 { + self.inner.outcome_aggregator.send(TrackOutcome { + timestamp, + scoping, + outcome: Outcome::RateLimited(reason_code.clone()), + event_id: None, + remote_addr: None, + category: DataCategory::Transaction, + quantity: source_quantities.transactions as u32, + }); + } + if source_quantities.profiles > 0 { + self.inner.outcome_aggregator.send(TrackOutcome { + timestamp, + scoping, + outcome: Outcome::RateLimited(reason_code.clone()), + event_id: None, + remote_addr: None, + category: DataCategory::Profile, + quantity: source_quantities.profiles as u32, + }); + } + + self.inner.outcome_aggregator.send(TrackOutcome { + timestamp, + scoping, + outcome: Outcome::RateLimited(reason_code), + event_id: None, + remote_addr: None, + category: DataCategory::MetricBucket, + quantity: total_buckets as u32, + }); + } + + /// Returns `true` if the batches should be rate limited. + #[cfg(feature = "processing")] + fn rate_limit_batches( + &self, + cached_rate_limits: RateLimits, + scoping: Scoping, + bucket_partitions: &BTreeMap, Vec>, + max_batch_size_bytes: usize, + project_state: Arc, + ) -> bool { + let Some(rate_limiter) = self.inner.rate_limiter.as_ref() else { + return false; + }; + + let mode = { + let usage = match project_state.config.transaction_metrics { + Some(ErrorBoundary::Ok(ref c)) => c.usage_metric(), + _ => false, + }; + ExtractionMode::from_usage(usage) + }; + + let quotas = &project_state.config.quotas; + let item_scoping = ItemScoping { + category: DataCategory::MetricBucket, + scoping: &scoping, + }; + + // We couldn't use the bucket length directly because batching may change the amount of buckets. + let total_buckets: usize = bucket_partitions + .values() + .flat_map(|buckets| { + // Cheap operation because there's no allocations. + BucketsView::new(buckets) + .by_size(max_batch_size_bytes) + .map(|batch| batch.len()) + }) + .sum(); + + // For limiting the amount of redis calls we make, in case we already passed the limit. + if cached_rate_limits + .check_with_quotas(quotas, item_scoping) + .is_limited() + { + relay_log::info!("dropping {total_buckets} buckets due to throughput ratelimit"); + let reason_code = cached_rate_limits + .longest() + .and_then(|limit| limit.reason_code.clone()); + + self.drop_buckets_with_outcomes( + reason_code, + total_buckets, + scoping, + bucket_partitions, + mode, + ); + + return true; + } + + // Check with redis if the throughput limit has been exceeded, while also updating + // the count so that other relays will be updated too. + match rate_limiter.is_rate_limited(quotas, item_scoping, total_buckets, false) { + Ok(limits) if limits.is_limited() => { + relay_log::info!("dropping {total_buckets} buckets due to throughput ratelimit"); + + let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone()); + self.drop_buckets_with_outcomes( + reason_code, + total_buckets, + scoping, + bucket_partitions, + mode, + ); + + self.inner + .project_cache + .send(UpdateRateLimits::new(scoping.project_key, limits)); + + return true; + } + Ok(_) => {} // not ratelimited + Err(e) => { + relay_log::error!( + error = &e as &dyn std::error::Error, + "failed to check redis rate limits" + ); + } + } + + false + } + fn handle_encode_metrics(&self, message: EncodeMetrics) { let EncodeMetrics { buckets, scoping, extraction_mode, + project_state: _project_state, + rate_limits: _cached_rate_limits, } = message; let partitions = self.inner.config.metrics_partitions(); @@ -1274,8 +1440,20 @@ impl EnvelopeProcessorService { let upstream = self.inner.config.upstream_descriptor(); let dsn = PartialDsn::outbound(&scoping, upstream); - for (partition_key, buckets) in partition_buckets(scoping.project_key, buckets, partitions) - { + let bucket_partitions = partition_buckets(scoping.project_key, buckets, partitions); + + #[cfg(feature = "processing")] + if self.rate_limit_batches( + _cached_rate_limits, + scoping, + &bucket_partitions, + max_batch_size_bytes, + _project_state, + ) { + return; + } + + for (partition_key, buckets) in bucket_partitions { let mut num_batches = 0; for batch in BucketsView::new(&buckets).by_size(max_batch_size_bytes) { @@ -1409,18 +1587,7 @@ impl Service for EnvelopeProcessorService { } fn create_metrics_item(buckets: &BucketsView<'_>, extraction_mode: ExtractionMode) -> Item { - let source_quantities = buckets - .iter() - .filter_map(|bucket| extract_transaction_count(&bucket, extraction_mode)) - .fold(SourceQuantities::default(), |acc, c| { - let profile_count = if c.has_profile { c.count } else { 0 }; - - SourceQuantities { - transactions: acc.transactions + c.count, - profiles: acc.profiles + profile_count, - } - }); - + let source_quantities = source_quantities_from_buckets(buckets, extraction_mode); let mut item = Item::new(ItemType::MetricBuckets); item.set_source_quantities(source_quantities); item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap()); diff --git a/relay-server/src/actors/project.rs b/relay-server/src/actors/project.rs index c904601780..356f480511 100644 --- a/relay-server/src/actors/project.rs +++ b/relay-server/src/actors/project.rs @@ -1138,6 +1138,8 @@ impl Project { buckets, scoping, extraction_mode, + project_state, + rate_limits: self.rate_limits.clone(), }); } } diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index dcf0abe86c..ef1b7b614c 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -34,6 +34,7 @@ use std::borrow::Borrow; use std::collections::BTreeMap; use std::fmt; use std::io::{self, Write}; +use std::ops::AddAssign; use std::time::Instant; use uuid::Uuid; @@ -532,6 +533,13 @@ pub struct SourceQuantities { pub profiles: usize, } +impl AddAssign for SourceQuantities { + fn add_assign(&mut self, other: Self) { + self.transactions += other.transactions; + self.profiles += other.profiles; + } +} + #[derive(Clone, Debug)] pub struct Item { headers: ItemHeaders, diff --git a/relay-server/src/utils/metrics_rate_limits.rs b/relay-server/src/utils/metrics_rate_limits.rs index 31c226c01d..34960b7985 100644 --- a/relay-server/src/utils/metrics_rate_limits.rs +++ b/relay-server/src/utils/metrics_rate_limits.rs @@ -172,6 +172,7 @@ impl>> MetricsLimiter { fn drop_with_outcome(&mut self, outcome: Outcome, outcome_aggregator: Addr) { // Drop transaction buckets: let metrics = std::mem::take(&mut self.metrics); + let timestamp = Utc::now(); self.metrics = metrics .into_iter() @@ -183,7 +184,6 @@ impl>> MetricsLimiter { // Track outcome for the transaction metrics we dropped here: if self.transaction_count > 0 { - let timestamp = UnixTimestamp::now().as_datetime().unwrap_or_else(Utc::now); outcome_aggregator.send(TrackOutcome { timestamp, scoping: self.scoping, diff --git a/tests/integration/test_store.py b/tests/integration/test_store.py index 85fe066189..1924a29bb7 100644 --- a/tests/integration/test_store.py +++ b/tests/integration/test_store.py @@ -499,6 +499,81 @@ def transform(e): assert event["logentry"]["formatted"] == f"otherkey{i}" +def test_rate_limit_metric_bucket( + mini_sentry, + relay_with_processing, + metrics_consumer, +): + metrics_consumer = metrics_consumer() + + bucket_interval = 1 # second + relay = relay_with_processing( + { + "processing": {"max_rate_limit": 2 * 86400}, + "aggregator": { + "bucket_interval": bucket_interval, + "initial_delay": 0, + "debounce_delay": 0, + }, + } + ) + + metric_bucket_limit = 5 + buckets_sent = 10 + + project_id = 42 + projectconfig = mini_sentry.add_full_project_config(project_id) + mini_sentry.add_dsn_key_to_project(project_id) + + public_keys = mini_sentry.get_dsn_public_key_configs(project_id) + key_id = public_keys[0]["numericId"] + projectconfig["config"]["quotas"] = [ + { + "id": f"test_rate_limiting_{uuid.uuid4().hex}", + "scope": "key", + "scopeId": str(key_id), + "categories": ["metric_bucket"], + "limit": metric_bucket_limit, + "window": 86400, + "reasonCode": "throughput rate limiting", + } + ] + + def generate_ticks(): + # Generate a new timestamp for every bucket, so they do not get merged by the aggregator + tick = int(datetime.utcnow().timestamp() // bucket_interval * bucket_interval) + while True: + yield tick + tick += bucket_interval + + tick = generate_ticks() + + def make_bucket(name, type_, values): + return { + "org_id": 1, + "project_id": project_id, + "timestamp": next(tick), + "name": name, + "type": type_, + "value": values, + "width": bucket_interval, + } + + def send_buckets(buckets): + relay.send_metrics_buckets(project_id, buckets) + sleep(0.2) + + for _ in range(buckets_sent): + bucket = make_bucket("d:transactions/measurements.lcp@millisecond", "d", [1.0]) + send_buckets( + [bucket], + ) + produced_buckets = [m for m, _ in metrics_consumer.get_metrics()] + + assert metric_bucket_limit < buckets_sent + assert len(produced_buckets) == metric_bucket_limit + + @pytest.mark.parametrize("violating_bucket", [[4.0, 5.0], [4.0, 5.0, 6.0]]) def test_rate_limit_metrics_buckets( mini_sentry,