From ffc7c73521ef6e6fe278f0a74a304493a63e990d Mon Sep 17 00:00:00 2001 From: David Herberth Date: Wed, 28 Aug 2024 09:53:03 +0200 Subject: [PATCH] ref(cogs): No longer send cogs data to Kafka (#3953) We can track COGS now via statsd metrics, there is no longer a need to emit COGS to Kafka. This also means we have proper COGS for PoP Relays through statsd. In a follow-up we can remove the Kafka topic configuration from the deployment. --- CHANGELOG.md | 4 ++ Cargo.lock | 16 ----- Cargo.toml | 1 - relay-config/src/config.rs | 16 ----- relay-kafka/src/config.rs | 6 +- relay-server/Cargo.toml | 1 - relay-server/src/service.rs | 6 +- relay-server/src/services/cogs.rs | 78 +++--------------------- relay-server/src/services/store.rs | 33 ---------- tests/integration/conftest.py | 1 - tests/integration/fixtures/processing.py | 6 -- tests/integration/test_cogs.py | 28 --------- 12 files changed, 15 insertions(+), 181 deletions(-) delete mode 100644 tests/integration/test_cogs.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 7283e9e154..70747c31a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,10 @@ - Record too long discard reason for session replays. ([#3950](https://github.com/getsentry/relay/pull/3950)) - Add `EnvelopeStore` trait and implement `DiskUsage` for tracking disk usage. ([#3925](https://github.com/getsentry/relay/pull/3925)) +**Internal**: + +- No longer send COGS data to dedicated Kafka topic. ([#3953](https://github.com/getsentry/relay/pull/3953)) + ## 24.8.0 **Bug Fixes**: diff --git a/Cargo.lock b/Cargo.lock index ab378300ab..282eb591f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -579,10 +579,8 @@ checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" dependencies = [ "android-tzdata", "iana-time-zone", - "js-sys", "num-traits", "serde", - "wasm-bindgen", "windows-targets 0.48.5", ] @@ -4149,7 +4147,6 @@ dependencies = [ "rmp-serde", "rust-embed", "semver", - "sentry_usage_accountant", "serde", "serde_bytes", "serde_json", @@ -4686,19 +4683,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "sentry_usage_accountant" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94f0a631e328036db251337a0076dc5858a155f6831110c6417fe2537502ceae" -dependencies = [ - "chrono", - "serde", - "serde_json", - "thiserror", - "tracing", -] - [[package]] name = "serde" version = "1.0.189" diff --git a/Cargo.toml b/Cargo.toml index e8f7530ce2..093da30e62 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -148,7 +148,6 @@ sentry-core = "0.32.2" sentry-kafka-schemas = { version = "0.1.86", default-features = false } sentry-release-parser = { version = "1.3.2", default-features = false } sentry-types = "0.32.2" -sentry_usage_accountant = { version = "0.1.0", default-features = false } semver = "1.0.23" serde = { version = "1.0.159", features = ["derive", "rc"] } serde-transcode = "1.1.1" diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index b3b5d36bf6..5a57a2f556 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -1505,16 +1505,6 @@ pub struct Cogs { /// /// Defaults to `false`. enabled: bool, - /// Granularity of the COGS measurements. - /// - /// Measurements are aggregated based on the granularity in seconds. - /// - /// Aggregated measurements are always flushed at the end of their - /// aggregation window, which means the granularity also controls the flush - /// interval. - /// - /// Defaults to `60` (1 minute). - granularity_secs: u64, /// Maximium amount of COGS measurements allowed to backlog. /// /// Any additional COGS measurements recorded will be dropped. @@ -1533,7 +1523,6 @@ impl Default for Cogs { fn default() -> Self { Self { enabled: false, - granularity_secs: 60, max_queue_size: 10_000, relay_resource_id: "relay_service".to_owned(), } @@ -2511,11 +2500,6 @@ impl Config { self.values.cogs.enabled } - /// Granularity for COGS measurements. - pub fn cogs_granularity(&self) -> Duration { - Duration::from_secs(self.values.cogs.granularity_secs) - } - /// Maximum amount of COGS measurements buffered in memory. pub fn cogs_max_queue_size(&self) -> u64 { self.values.cogs.max_queue_size diff --git a/relay-kafka/src/config.rs b/relay-kafka/src/config.rs index 2958d161bf..452f20ad00 100644 --- a/relay-kafka/src/config.rs +++ b/relay-kafka/src/config.rs @@ -49,8 +49,6 @@ pub enum KafkaTopic { Spans, /// Summary for metrics collected during a span. MetricsSummaries, - /// COGS measurements topic. - Cogs, /// Feedback events topic. Feedback, } @@ -60,7 +58,7 @@ impl KafkaTopic { /// It will have to be adjusted if the new variants are added. pub fn iter() -> std::slice::Iter<'static, Self> { use KafkaTopic::*; - static TOPICS: [KafkaTopic; 15] = [ + static TOPICS: [KafkaTopic; 14] = [ Events, Attachments, Transactions, @@ -74,7 +72,6 @@ impl KafkaTopic { Monitors, Spans, MetricsSummaries, - Cogs, Feedback, ]; TOPICS.iter() @@ -137,7 +134,6 @@ define_topic_assignments! { monitors: (KafkaTopic::Monitors, "ingest-monitors", "Monitor check-ins."), spans: (KafkaTopic::Spans, "snuba-spans", "Standalone spans without a transaction."), metrics_summaries: (KafkaTopic::MetricsSummaries, "snuba-metrics-summaries", "Summary for metrics collected during a span."), - cogs: (KafkaTopic::Cogs, "shared-resources-usage", "COGS measurements."), feedback: (KafkaTopic::Feedback, "ingest-feedback-events", "Feedback events topic."), } diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index c93de36e9e..6598732de4 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -100,7 +100,6 @@ reqwest = { workspace = true, features = [ ] } rmp-serde = { workspace = true } rust-embed = { workspace = true, optional = true } -sentry_usage_accountant = { workspace = true, default-features = false } serde = { workspace = true } serde_bytes = { workspace = true } serde_json = { workspace = true } diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 26561c2d7d..46e782ab9f 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -217,11 +217,7 @@ impl ServiceState { }) .transpose()?; - let cogs = CogsService::new( - &config, - #[cfg(feature = "processing")] - store.clone(), - ); + let cogs = CogsService::new(&config); let cogs = Cogs::new(CogsServiceRecorder::new(&config, cogs.start())); EnvelopeProcessorService::new( diff --git a/relay-server/src/services/cogs.rs b/relay-server/src/services/cogs.rs index acbeb790ab..19daf7516d 100644 --- a/relay-server/src/services/cogs.rs +++ b/relay-server/src/services/cogs.rs @@ -2,11 +2,8 @@ use std::sync::atomic::{AtomicBool, Ordering}; use relay_cogs::{CogsMeasurement, CogsRecorder, ResourceId}; use relay_config::Config; -use relay_system::{Addr, Controller, FromMessage, Interface, Service}; -use sentry_usage_accountant::{Producer, UsageAccountant, UsageUnit}; +use relay_system::{Addr, FromMessage, Interface, Service}; -#[cfg(feature = "processing")] -use crate::services::store::{Store, StoreCogs}; use crate::statsd::RelayCounters; pub struct CogsReport(CogsMeasurement); @@ -21,52 +18,15 @@ impl FromMessage for CogsReport { } } -/// Relay [accountant producer](Producer) which produces to a dynamic -/// upstram. -enum RelayProducer { - /// Produces to Kafka via the store service. - #[cfg(feature = "processing")] - Store(Addr), - /// Discards all measurements. - Noop, -} - -impl Producer for RelayProducer { - type Error = std::convert::Infallible; - - fn send(&mut self, _message: Vec) -> Result<(), Self::Error> { - match self { - #[cfg(feature = "processing")] - Self::Store(addr) => addr.send(StoreCogs(_message)), - Self::Noop => {} - } - - Ok(()) - } -} - /// Service implementing the [`CogsReport`] interface. pub struct CogsService { relay_resource_id: String, - usage_accountant: UsageAccountant, } impl CogsService { - pub fn new(config: &Config, #[cfg(feature = "processing")] store: Option>) -> Self { - #[cfg(feature = "processing")] - let producer = store - .map(RelayProducer::Store) - .filter(|_| config.cogs_enabled()) - .unwrap_or(RelayProducer::Noop); - - #[cfg(not(feature = "processing"))] - let producer = RelayProducer::Noop; - - let granularity = chrono::Duration::from_std(config.cogs_granularity()).ok(); - + pub fn new(config: &Config) -> Self { Self { relay_resource_id: config.cogs_relay_resource_id().to_owned(), - usage_accountant: UsageAccountant::new(producer, granularity), } } @@ -77,28 +37,17 @@ impl CogsService { ResourceId::Relay => &self.relay_resource_id, }; - let (amount, unit) = match measurement.value { - relay_cogs::Value::Time(duration) => ( - duration.as_micros().try_into().unwrap_or(u64::MAX), - UsageUnit::Milliseconds, - ), + let amount = match measurement.value { + relay_cogs::Value::Time(duration) => { + duration.as_micros().try_into().unwrap_or(i64::MAX) + } }; relay_statsd::metric!( - counter(RelayCounters::CogsUsage) += amount as i64, + counter(RelayCounters::CogsUsage) += amount, resource_id = resource_id, app_feature = measurement.feature.as_str() ); - - self.usage_accountant - .record(resource_id, measurement.feature.as_str(), amount, unit) - .unwrap_or_else(|err| match err {}); - } - - fn handle_shutdown(&mut self) { - self.usage_accountant - .flush() - .unwrap_or_else(|err| match err {}); } } @@ -107,17 +56,8 @@ impl Service for CogsService { fn spawn_handler(mut self, mut rx: relay_system::Receiver) { tokio::spawn(async move { - let mut shutdown = Controller::shutdown_handle(); - - loop { - tokio::select! { - biased; - - Some(message) = rx.recv() => self.handle_report(message), - _ = shutdown.notified() => self.handle_shutdown(), - - else => break, - } + while let Some(message) = rx.recv().await { + self.handle_report(message); } }); } diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 0988beabf7..1af723ed68 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -90,15 +90,11 @@ pub struct StoreMetrics { pub retention: u16, } -#[derive(Debug)] -pub struct StoreCogs(pub Vec); - /// Service interface for the [`StoreEnvelope`] message. #[derive(Debug)] pub enum Store { Envelope(StoreEnvelope), Metrics(StoreMetrics), - Cogs(StoreCogs), } impl Store { @@ -107,7 +103,6 @@ impl Store { match self { Store::Envelope(_) => "envelope", Store::Metrics(_) => "metrics", - Store::Cogs(_) => "cogs", } } } @@ -130,14 +125,6 @@ impl FromMessage for Store { } } -impl FromMessage for Store { - type Response = NoResponse; - - fn from_message(message: StoreCogs, _: ()) -> Self { - Self::Cogs(message) - } -} - /// Service implementing the [`Store`] interface. pub struct StoreService { workers: WorkerGroup, @@ -173,7 +160,6 @@ impl StoreService { match message { Store::Envelope(message) => self.handle_store_envelope(message), Store::Metrics(message) => self.handle_store_metrics(message), - Store::Cogs(message) => self.handle_store_cogs(message), } }) } @@ -444,16 +430,6 @@ impl StoreService { } } - fn handle_store_cogs(&self, StoreCogs(payload): StoreCogs) { - let message = KafkaMessage::Cogs(CogsKafkaMessage(payload)); - if let Err(error) = self.produce(KafkaTopic::Cogs, message) { - relay_log::error!( - error = &error as &dyn std::error::Error, - "failed to store cogs measurement" - ); - } - } - fn create_metric_message<'a>( &self, organization_id: u64, @@ -1449,10 +1425,6 @@ struct ProfileChunkKafkaMessage { payload: Bytes, } -#[derive(Debug, Serialize)] -#[serde(transparent)] -struct CogsKafkaMessage(Vec); - /// An enum over all possible ingest messages. #[derive(Debug, Serialize)] #[serde(tag = "type", rename_all = "snake_case")] @@ -1479,7 +1451,6 @@ enum KafkaMessage<'a> { message: SpanKafkaMessage<'a>, }, MetricsSummary(MetricsSummaryKafkaMessage<'a>), - Cogs(CogsKafkaMessage), ProfileChunk(ProfileChunkKafkaMessage), } @@ -1505,7 +1476,6 @@ impl Message for KafkaMessage<'_> { KafkaMessage::CheckIn(_) => "check_in", KafkaMessage::Span { .. } => "span", KafkaMessage::MetricsSummary(_) => "metrics_summary", - KafkaMessage::Cogs(_) => "cogs", KafkaMessage::ProfileChunk(_) => "profile_chunk", } } @@ -1530,7 +1500,6 @@ impl Message for KafkaMessage<'_> { | Self::Span { .. } | Self::ReplayRecordingNotChunked(_) | Self::MetricsSummary(_) - | Self::Cogs(_) | Self::ProfileChunk(_) => Uuid::nil(), // TODO(ja): Determine a partitioning key @@ -1584,8 +1553,6 @@ impl Message for KafkaMessage<'_> { .map(Cow::Owned) .map_err(ClientError::InvalidJson), - KafkaMessage::Cogs(CogsKafkaMessage(payload)) => Ok(payload.into()), - _ => rmp_serde::to_vec_named(&self) .map(Cow::Owned) .map_err(ClientError::InvalidMsgPack), diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 6d922f25f4..328c6ab9d7 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -36,7 +36,6 @@ spans_consumer, profiles_consumer, metrics_summaries_consumer, - cogs_consumer, feedback_consumer, ) diff --git a/tests/integration/fixtures/processing.py b/tests/integration/fixtures/processing.py index 46d3729d3b..24e57b71d1 100644 --- a/tests/integration/fixtures/processing.py +++ b/tests/integration/fixtures/processing.py @@ -59,7 +59,6 @@ def inner(options=None): "spans": get_topic_name("spans"), "profiles": get_topic_name("profiles"), "metrics_summaries": get_topic_name("metrics_summaries"), - "cogs": get_topic_name("cogs"), "feedback": get_topic_name("feedback"), } @@ -364,11 +363,6 @@ def metrics_summaries_consumer(consumer_fixture): yield from consumer_fixture(MetricsSummariesConsumer, "metrics_summaries") -@pytest.fixture -def cogs_consumer(consumer_fixture): - yield from consumer_fixture(CogsConsumer, "cogs") - - class MetricsConsumer(ConsumerBase): def get_metric(self, timeout=None): message = self.poll(timeout=timeout) diff --git a/tests/integration/test_cogs.py b/tests/integration/test_cogs.py deleted file mode 100644 index 9d6319f117..0000000000 --- a/tests/integration/test_cogs.py +++ /dev/null @@ -1,28 +0,0 @@ -from sentry_sdk.envelope import Envelope - -TEST_CONFIG = { - "cogs": { - "enabled": True, - "granularity_secs": 0, - } -} - - -def test_cogs_simple(mini_sentry, relay_with_processing, cogs_consumer): - relay = relay_with_processing(TEST_CONFIG) - cogs_consumer = cogs_consumer() - - project_id = 42 - mini_sentry.add_basic_project_config(project_id) - - envelope = Envelope() - envelope.add_event({"message": "Hello, World!"}) - relay.send_envelope(project_id, envelope) - - m = cogs_consumer.get_measurement() - assert m["shared_resource_id"] == "relay_service" - assert m["app_feature"] == "errors" - assert m["usage_unit"] == "milliseconds" - assert ( - m["amount"] < 1000 * 1000 - ) # If processing this error takes a second, we have a problem ...