From afe8be8d251d809d0ccbab71a0f024cb383ae396 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Wed, 28 Aug 2024 09:37:06 +0200 Subject: [PATCH] Revert "tmp(statsd): Disable high-volume timers to assess performance (#3955)" This reverts commit 886a53e1a6b241b6b0afcf054781982618e9c84b. --- relay-metrics/src/aggregator.rs | 10 ++++---- relay-metrics/src/statsd.rs | 1 - .../src/services/metrics/aggregator.rs | 25 +++++++++++++------ relay-server/src/services/metrics/router.rs | 13 +++++----- relay-server/src/services/processor.rs | 8 +++--- relay-server/src/services/project_cache.rs | 12 ++++----- relay-server/src/statsd.rs | 6 ----- 7 files changed, 36 insertions(+), 39 deletions(-) diff --git a/relay-metrics/src/aggregator.rs b/relay-metrics/src/aggregator.rs index 1a593a21f8..e4a5c088d2 100644 --- a/relay-metrics/src/aggregator.rs +++ b/relay-metrics/src/aggregator.rs @@ -468,11 +468,11 @@ fn get_flush_time( let initial_flush = bucket_key.timestamp + config.bucket_interval() + config.initial_delay(); let backdated = initial_flush <= now; - // let delay = now.as_secs() as i64 - bucket_key.timestamp.as_secs() as i64; - // relay_statsd::metric!( - // histogram(MetricHistograms::BucketsDelay) = delay as f64, - // backdated = if backdated { "true" } else { "false" }, - // ); + let delay = now.as_secs() as i64 - bucket_key.timestamp.as_secs() as i64; + relay_statsd::metric!( + histogram(MetricHistograms::BucketsDelay) = delay as f64, + backdated = if backdated { "true" } else { "false" }, + ); let flush_timestamp = if backdated { // If the initial flush time has passed or can't be represented, we want to treat the diff --git a/relay-metrics/src/statsd.rs b/relay-metrics/src/statsd.rs index 42008d9d0c..ae7976a1fa 100644 --- a/relay-metrics/src/statsd.rs +++ b/relay-metrics/src/statsd.rs @@ -98,7 +98,6 @@ pub enum MetricHistograms { /// This metric is tagged with: /// - `backdated`: A flag indicating whether the metric was reported within the `initial_delay` /// time period (`false`) or after the initial delay has expired (`true`). - #[allow(dead_code)] // TODO: Temporarily disabled for a performance measurement. BucketsDelay, /// Distribution of invalid bucket timestamps observed, relative to the time of observation. diff --git a/relay-server/src/services/metrics/aggregator.rs b/relay-server/src/services/metrics/aggregator.rs index ccbe10b77f..707d0eec72 100644 --- a/relay-server/src/services/metrics/aggregator.rs +++ b/relay-server/src/services/metrics/aggregator.rs @@ -9,7 +9,7 @@ use relay_metrics::aggregator::AggregateMetricsError; use relay_metrics::{aggregator, Bucket, UnixTimestamp}; use relay_system::{Controller, FromMessage, Interface, NoResponse, Recipient, Service, Shutdown}; -use crate::statsd::{RelayCounters, RelayHistograms}; +use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers}; /// Aggregator for metric buckets. /// @@ -31,6 +31,17 @@ pub enum Aggregator { BucketCountInquiry(BucketCountInquiry, relay_system::Sender), } +impl Aggregator { + /// Returns the name of the message variant. + pub fn variant(&self) -> &'static str { + match self { + Aggregator::MergeBuckets(_) => "MergeBuckets", + #[cfg(test)] + Aggregator::BucketCountInquiry(_, _) => "BucketCountInquiry", + } + } +} + impl Interface for Aggregator {} impl FromMessage for Aggregator { @@ -209,11 +220,10 @@ impl AggregatorService { } fn handle_message(&mut self, message: Aggregator) { - // let ty = message.variant(); - // relay_statsd::metric!( - // timer(RelayTimers::AggregatorServiceDuration), - // message = ty, - { + let ty = message.variant(); + relay_statsd::metric!( + timer(RelayTimers::AggregatorServiceDuration), + message = ty, { match message { Aggregator::MergeBuckets(msg) => self.handle_merge_buckets(msg), @@ -223,8 +233,7 @@ impl AggregatorService { } } } - } - // ) + ) } fn handle_shutdown(&mut self, message: Shutdown) { diff --git a/relay-server/src/services/metrics/router.rs b/relay-server/src/services/metrics/router.rs index f1ec402aba..522eb729f1 100644 --- a/relay-server/src/services/metrics/router.rs +++ b/relay-server/src/services/metrics/router.rs @@ -9,6 +9,7 @@ use relay_system::{Addr, NoResponse, Recipient, Service}; use crate::services::metrics::{ Aggregator, AggregatorHandle, AggregatorService, FlushBuckets, MergeBuckets, }; +use crate::statsd::RelayTimers; use crate::utils; /// Service that routes metrics & metric buckets to the appropriate aggregator. @@ -104,11 +105,10 @@ impl StartedRouter { } fn handle_message(&mut self, message: Aggregator) { - // let ty = message.variant(); - // relay_statsd::metric!( - // timer(RelayTimers::MetricRouterServiceDuration), - // message = ty, - { + let ty = message.variant(); + relay_statsd::metric!( + timer(RelayTimers::MetricRouterServiceDuration), + message = ty, { match message { Aggregator::MergeBuckets(msg) => self.handle_merge_buckets(msg), @@ -116,8 +116,7 @@ impl StartedRouter { Aggregator::BucketCountInquiry(_, _sender) => (), // not supported } } - } - // ) + ) } fn handle_merge_buckets(&mut self, message: MergeBuckets) { diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 760bbe3ff4..dbc8fef904 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -2904,11 +2904,10 @@ impl EnvelopeProcessorService { } fn handle_message(&self, message: EnvelopeProcessor) { - // let ty = message.variant(); + let ty = message.variant(); let feature_weights = self.feature_weights(&message); - // metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, { - { + metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, { let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights); match message { @@ -2925,8 +2924,7 @@ impl EnvelopeProcessorService { EnvelopeProcessor::SubmitEnvelope(m) => self.handle_submit_envelope(*m), EnvelopeProcessor::SubmitClientReports(m) => self.handle_submit_client_reports(*m), } - } - // }); + }); } fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights { diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index b3528fccb6..c7739992cc 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -1274,11 +1274,10 @@ impl ProjectCacheBroker { } fn handle_message(&mut self, message: ProjectCache) { - // let ty = message.variant(); - // metric!( - // timer(RelayTimers::ProjectCacheMessageDuration), - // message = ty, - { + let ty = message.variant(); + metric!( + timer(RelayTimers::ProjectCacheMessageDuration), + message = ty, { match message { ProjectCache::RequestUpdate(message) => self.handle_request_update(message), @@ -1303,8 +1302,7 @@ impl ProjectCacheBroker { } } } - } - // ) + ) } } diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index f1c0ffd508..00b0773df7 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -467,13 +467,11 @@ pub enum RelayTimers { /// This metric is tagged with: /// /// - `message`: The type of message that was processed. - #[allow(dead_code)] // TODO: Temporarily disabled for a performance measurement. ProcessMessageDuration, /// Timing in milliseconds for handling a project cache message. /// /// This metric is tagged with: /// - `message`: The type of message that was processed. - #[allow(dead_code)] // TODO: Temporarily disabled for a performance measurement. ProjectCacheMessageDuration, /// Timing in milliseconds for processing a message in the buffer service. /// @@ -507,13 +505,11 @@ pub enum RelayTimers { /// /// This metric is tagged with: /// - `message`: The type of message that was processed. - #[allow(dead_code)] // TODO: Temporarily disabled for a performance measurement. AggregatorServiceDuration, /// Timing in milliseconds for processing a message in the metric router service. /// /// This metric is tagged with: /// - `message`: The type of message that was processed. - #[allow(dead_code)] // TODO: Temporarily disabled for a performance measurement. MetricRouterServiceDuration, /// Timing in milliseconds for processing a message in the metric store service. /// @@ -558,9 +554,7 @@ impl TimerMetric for RelayTimers { RelayTimers::HealthCheckDuration => "health.message.duration", #[cfg(feature = "processing")] RelayTimers::RateLimitBucketsDuration => "processor.rate_limit_buckets", - #[allow(dead_code)] // TODO: Temporarily disabled for a performance measurement. RelayTimers::AggregatorServiceDuration => "metrics.aggregator.message.duration", - #[allow(dead_code)] // TODO: Temporarily disabled for a performance measurement. RelayTimers::MetricRouterServiceDuration => "metrics.router.message.duration", #[cfg(feature = "processing")] RelayTimers::StoreServiceDuration => "store.message.duration",