Skip to content

Commit

Permalink
Revert "tmp(statsd): Disable high-volume timers to assess performance" (
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-auer committed Aug 28, 2024
1 parent 6198771 commit d2f6ce4
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 39 deletions.
10 changes: 5 additions & 5 deletions relay-metrics/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,11 +468,11 @@ fn get_flush_time(
let initial_flush = bucket_key.timestamp + config.bucket_interval() + config.initial_delay();
let backdated = initial_flush <= now;

// let delay = now.as_secs() as i64 - bucket_key.timestamp.as_secs() as i64;
// relay_statsd::metric!(
// histogram(MetricHistograms::BucketsDelay) = delay as f64,
// backdated = if backdated { "true" } else { "false" },
// );
let delay = now.as_secs() as i64 - bucket_key.timestamp.as_secs() as i64;
relay_statsd::metric!(
histogram(MetricHistograms::BucketsDelay) = delay as f64,
backdated = if backdated { "true" } else { "false" },
);

let flush_timestamp = if backdated {
// If the initial flush time has passed or can't be represented, we want to treat the
Expand Down
1 change: 0 additions & 1 deletion relay-metrics/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ pub enum MetricHistograms {
/// This metric is tagged with:
/// - `backdated`: A flag indicating whether the metric was reported within the `initial_delay`
/// time period (`false`) or after the initial delay has expired (`true`).
#[allow(dead_code)] // TODO: Temporarily disabled for a performance measurement.
BucketsDelay,

/// Distribution of invalid bucket timestamps observed, relative to the time of observation.
Expand Down
25 changes: 17 additions & 8 deletions relay-server/src/services/metrics/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use relay_metrics::aggregator::AggregateMetricsError;
use relay_metrics::{aggregator, Bucket, UnixTimestamp};
use relay_system::{Controller, FromMessage, Interface, NoResponse, Recipient, Service, Shutdown};

use crate::statsd::{RelayCounters, RelayHistograms};
use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};

/// Aggregator for metric buckets.
///
Expand All @@ -31,6 +31,17 @@ pub enum Aggregator {
BucketCountInquiry(BucketCountInquiry, relay_system::Sender<usize>),
}

impl Aggregator {
/// Returns the name of the message variant.
pub fn variant(&self) -> &'static str {
match self {
Aggregator::MergeBuckets(_) => "MergeBuckets",
#[cfg(test)]
Aggregator::BucketCountInquiry(_, _) => "BucketCountInquiry",
}
}
}

impl Interface for Aggregator {}

impl FromMessage<MergeBuckets> for Aggregator {
Expand Down Expand Up @@ -209,11 +220,10 @@ impl AggregatorService {
}

fn handle_message(&mut self, message: Aggregator) {
// let ty = message.variant();
// relay_statsd::metric!(
// timer(RelayTimers::AggregatorServiceDuration),
// message = ty,
{
let ty = message.variant();
relay_statsd::metric!(
timer(RelayTimers::AggregatorServiceDuration),
message = ty,
{
match message {
Aggregator::MergeBuckets(msg) => self.handle_merge_buckets(msg),
Expand All @@ -223,8 +233,7 @@ impl AggregatorService {
}
}
}
}
// )
)
}

fn handle_shutdown(&mut self, message: Shutdown) {
Expand Down
13 changes: 6 additions & 7 deletions relay-server/src/services/metrics/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use relay_system::{Addr, NoResponse, Recipient, Service};
use crate::services::metrics::{
Aggregator, AggregatorHandle, AggregatorService, FlushBuckets, MergeBuckets,
};
use crate::statsd::RelayTimers;
use crate::utils;

/// Service that routes metrics & metric buckets to the appropriate aggregator.
Expand Down Expand Up @@ -104,20 +105,18 @@ impl StartedRouter {
}

fn handle_message(&mut self, message: Aggregator) {
// let ty = message.variant();
// relay_statsd::metric!(
// timer(RelayTimers::MetricRouterServiceDuration),
// message = ty,
{
let ty = message.variant();
relay_statsd::metric!(
timer(RelayTimers::MetricRouterServiceDuration),
message = ty,
{
match message {
Aggregator::MergeBuckets(msg) => self.handle_merge_buckets(msg),
#[cfg(test)]
Aggregator::BucketCountInquiry(_, _sender) => (), // not supported
}
}
}
// )
)
}

fn handle_merge_buckets(&mut self, message: MergeBuckets) {
Expand Down
8 changes: 3 additions & 5 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2904,11 +2904,10 @@ impl EnvelopeProcessorService {
}

fn handle_message(&self, message: EnvelopeProcessor) {
// let ty = message.variant();
let ty = message.variant();
let feature_weights = self.feature_weights(&message);

// metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
{
metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);

match message {
Expand All @@ -2925,8 +2924,7 @@ impl EnvelopeProcessorService {
EnvelopeProcessor::SubmitEnvelope(m) => self.handle_submit_envelope(*m),
EnvelopeProcessor::SubmitClientReports(m) => self.handle_submit_client_reports(*m),
}
}
// });
});
}

fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
Expand Down
12 changes: 5 additions & 7 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1274,11 +1274,10 @@ impl ProjectCacheBroker {
}

fn handle_message(&mut self, message: ProjectCache) {
// let ty = message.variant();
// metric!(
// timer(RelayTimers::ProjectCacheMessageDuration),
// message = ty,
{
let ty = message.variant();
metric!(
timer(RelayTimers::ProjectCacheMessageDuration),
message = ty,
{
match message {
ProjectCache::RequestUpdate(message) => self.handle_request_update(message),
Expand All @@ -1303,8 +1302,7 @@ impl ProjectCacheBroker {
}
}
}
}
// )
)
}
}

Expand Down
6 changes: 0 additions & 6 deletions relay-server/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,13 +467,11 @@ pub enum RelayTimers {
/// This metric is tagged with:
///
/// - `message`: The type of message that was processed.
#[allow(dead_code)] // TODO: Temporarily disabled for a performance measurement.
ProcessMessageDuration,
/// Timing in milliseconds for handling a project cache message.
///
/// This metric is tagged with:
/// - `message`: The type of message that was processed.
#[allow(dead_code)] // TODO: Temporarily disabled for a performance measurement.
ProjectCacheMessageDuration,
/// Timing in milliseconds for processing a message in the buffer service.
///
Expand Down Expand Up @@ -507,13 +505,11 @@ pub enum RelayTimers {
///
/// This metric is tagged with:
/// - `message`: The type of message that was processed.
#[allow(dead_code)] // TODO: Temporarily disabled for a performance measurement.
AggregatorServiceDuration,
/// Timing in milliseconds for processing a message in the metric router service.
///
/// This metric is tagged with:
/// - `message`: The type of message that was processed.
#[allow(dead_code)] // TODO: Temporarily disabled for a performance measurement.
MetricRouterServiceDuration,
/// Timing in milliseconds for processing a message in the metric store service.
///
Expand Down Expand Up @@ -558,9 +554,7 @@ impl TimerMetric for RelayTimers {
RelayTimers::HealthCheckDuration => "health.message.duration",
#[cfg(feature = "processing")]
RelayTimers::RateLimitBucketsDuration => "processor.rate_limit_buckets",
#[allow(dead_code)] // TODO: Temporarily disabled for a performance measurement.
RelayTimers::AggregatorServiceDuration => "metrics.aggregator.message.duration",
#[allow(dead_code)] // TODO: Temporarily disabled for a performance measurement.
RelayTimers::MetricRouterServiceDuration => "metrics.router.message.duration",
#[cfg(feature = "processing")]
RelayTimers::StoreServiceDuration => "store.message.duration",
Expand Down

0 comments on commit d2f6ce4

Please sign in to comment.