Skip to content

Commit

Permalink
ref(metrics): Normalize metrics in the processor instead of aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Aug 7, 2024
1 parent 9a938e9 commit 37e53c5
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 59 deletions.
50 changes: 6 additions & 44 deletions relay-metrics/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,14 @@ use thiserror::Error;
use tokio::time::Instant;

use crate::bucket::{Bucket, BucketValue};
use crate::protocol::{self, MetricNamespace, MetricResourceIdentifier};
use crate::statsd::{MetricCounters, MetricGauges, MetricHistograms, MetricSets, MetricTimers};
use crate::{BucketMetadata, FiniteF64, MetricName};
use crate::{BucketMetadata, FiniteF64, MetricName, MetricNamespace};

use hashbrown::HashMap;

/// Any error that may occur during aggregation.
#[derive(Debug, Error, PartialEq)]
pub enum AggregateMetricsError {
/// A metric bucket had invalid characters in the metric name.
#[error("found invalid characters: {0}")]
InvalidCharacters(MetricName),
/// A metric bucket had an unknown namespace in the metric name.
#[error("found unsupported namespace: {0}")]
UnsupportedNamespace(MetricNamespace),
/// A metric bucket's timestamp was out of the configured acceptable range.
#[error("found invalid timestamp: {0}")]
InvalidTimestamp(UnixTimestamp),
Expand Down Expand Up @@ -821,11 +814,11 @@ fn validate_bucket_key(
Ok(key)
}

/// Removes invalid characters from metric names.
/// Validates metric name against [`AggregatorConfig`].
///
/// Returns `Err` if the metric must be dropped.
fn validate_metric_name(
mut key: BucketKey,
key: BucketKey,
aggregator_config: &AggregatorConfig,
) -> Result<BucketKey, AggregateMetricsError> {
let metric_name_length = key.metric_name.len();
Expand All @@ -838,35 +831,12 @@ fn validate_metric_name(
return Err(AggregateMetricsError::InvalidStringLength(key.metric_name));
}

normalize_metric_name(&mut key)?;

Ok(key)
}

fn normalize_metric_name(key: &mut BucketKey) -> Result<(), AggregateMetricsError> {
key.metric_name = match MetricResourceIdentifier::parse(&key.metric_name) {
Ok(mri) => {
if matches!(mri.namespace, MetricNamespace::Unsupported) {
relay_log::debug!("invalid metric namespace {:?}", &key.metric_name);
return Err(AggregateMetricsError::UnsupportedNamespace(mri.namespace));
}

mri.to_string().into()
}
Err(_) => {
relay_log::debug!("invalid metric name {:?}", &key.metric_name);
return Err(AggregateMetricsError::InvalidCharacters(
key.metric_name.clone(),
));
}
};

Ok(())
}

/// Removes tags with invalid characters in the key, and validates tag values.
/// Validates metric tags against [`AggregatorConfig`].
///
/// Tag values are validated with `protocol::validate_tag_value`.
/// Invalid tags are removed.
fn validate_metric_tags(mut key: BucketKey, aggregator_config: &AggregatorConfig) -> BucketKey {
key.tags.retain(|tag_key, tag_value| {
if tag_key.len() > aggregator_config.max_tag_key_length {
Expand All @@ -878,16 +848,8 @@ fn validate_metric_tags(mut key: BucketKey, aggregator_config: &AggregatorConfig
return false;
}

if protocol::is_valid_tag_key(tag_key) {
true
} else {
relay_log::debug!("invalid metric tag key {tag_key:?}");
false
}
true
});
for (_, tag_value) in key.tags.iter_mut() {
protocol::validate_tag_value(tag_value);
}
key
}

Expand Down
11 changes: 7 additions & 4 deletions relay-metrics/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ use crate::{FiniteF64, MetricNamespace, ParseMetricError};

const VALUE_SEPARATOR: char = ':';

/// Type of [`Bucket::tags`].
pub type MetricTags = BTreeMap<String, String>;

/// A snapshot of values within a [`Bucket`].
#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize)]
pub struct GaugeValue {
Expand Down Expand Up @@ -401,8 +404,8 @@ fn parse_gauge(string: &str) -> Option<GaugeValue> {
/// Parses tags in the format `tag1,tag2:value`.
///
/// Tag values are optional. For tags with missing values, an empty `""` value is assumed.
fn parse_tags(string: &str) -> Option<BTreeMap<String, String>> {
let mut map = BTreeMap::new();
fn parse_tags(string: &str) -> Option<MetricTags> {
let mut map = MetricTags::new();

for pair in string.split(',') {
let mut name_value = pair.splitn(2, ':');
Expand Down Expand Up @@ -606,8 +609,8 @@ pub struct Bucket {
/// ```text
/// endpoint.hits:1|c|#route:user_index,environment:production,release:1.4.0
/// ```
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub tags: BTreeMap<String, String>,
#[serde(default, skip_serializing_if = "MetricTags::is_empty")]
pub tags: MetricTags,

/// Relay internal metadata for a metric bucket.
///
Expand Down
68 changes: 67 additions & 1 deletion relay-metrics/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub use relay_common::time::UnixTimestamp;
#[doc(inline)]
pub use unescaper::Error as UnescapeError;

use crate::FiniteF64;
use crate::{Bucket, FiniteF64, MetricTags};

/// Type used for Counter metric
pub type CounterType = FiniteF64;
Expand All @@ -26,6 +26,72 @@ pub type SetType = u32;
/// Type used for Gauge entries
pub type GaugeType = FiniteF64;

/// Error returned from [`normalize_bucket`].
#[derive(Debug, thiserror::Error)]
pub enum NormalizationError {
/// The metric name includes an invalid or unsupported metric namespace.
#[error("unsupported metric namespace")]
UnsupportedNamespace,
/// The metric name cannot be parsed and is invalid.
#[error("invalid metric name: {0:?}")]
InvalidMetricName(MetricName),
}

/// Normalizes a bucket.
///
/// The passed metric will have its name and tags normalized and tested for validity.
/// Invalid characters in the metric name may be replaced,
/// see [`relay_base_schema::metrics::try_normalize_metric_name`].
///
/// Invalid tags are removed and tag keys are normalized, for example control characters are
/// removed from tag keys.
pub fn normalize_bucket(bucket: &mut Bucket) -> Result<(), NormalizationError> {
normalize_metric_name(&mut bucket.name)?;
normalize_metric_tags(&mut bucket.tags);
Ok(())
}

/// Normalizes a metric name.
///
/// Normalization includes expanding valid metric names without a namespace to the default
/// namespace.
///
/// Invalid metric names are rejected with [`Error`].
fn normalize_metric_name(name: &mut MetricName) -> Result<(), NormalizationError> {
*name = match MetricResourceIdentifier::parse(name) {
Ok(mri) => {
if matches!(mri.namespace, MetricNamespace::Unsupported) {
return Err(NormalizationError::UnsupportedNamespace);
}

// We can improve this code part, by not always re-creating the name, if the name is
// already a valid MRI with namespace we can use the original name instead.
mri.to_string().into()
}
Err(_) => {
return Err(NormalizationError::InvalidMetricName(name.clone()));
}
};

Ok(())
}

/// Removes tags with invalid characters in the key, and validates tag values.
///
/// Tag values are validated with [`validate_tag_value`].
fn normalize_metric_tags(tags: &mut MetricTags) {
tags.retain(|tag_key, tag_value| {
if !is_valid_tag_key(tag_key) {
relay_log::debug!("invalid metric tag key {tag_key:?}");
return false;
}

validate_tag_value(tag_value);

true
});
}

/// Validates a tag key.
///
/// Tag keys currently only need to not contain ASCII control characters. This might change.
Expand Down
16 changes: 13 additions & 3 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2181,14 +2181,24 @@ impl EnvelopeProcessorService {
let clock_drift_processor =
ClockDriftProcessor::new(sent_at, received).at_least(MINIMUM_CLOCK_DRIFT);

for bucket in &mut buckets {
buckets.retain_mut(|bucket| {
if let Err(error) = relay_metrics::normalize_bucket(bucket) {
relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
return false;
}

if !self::metrics::is_valid_namespace(bucket, source) {
return false;
}

clock_drift_processor.process_timestamp(&mut bucket.timestamp);

if !matches!(source, BucketSource::Internal) {
bucket.metadata = BucketMetadata::new(received_timestamp);
}
}

let buckets = self::metrics::filter_namespaces(buckets, source);
true
});

// Best effort check to filter and rate limit buckets, if there is no project state
// available at the current time, we will check again after flushing.
Expand Down
12 changes: 5 additions & 7 deletions relay-server/src/services/processor/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,21 @@ use crate::services::outcome::Outcome;
use crate::services::project::ProjectInfo;
use crate::services::project_cache::BucketSource;

/// Filters buckets based on their namespace.
/// Checks if the namespace of the passed bucket is valid.
///
/// This is a noop for most namespaces except:
/// This is returns `true` for most namespaces except:
/// - [`MetricNamespace::Unsupported`]: Equal to invalid/unknown namespaces.
/// - [`MetricNamespace::Stats`]: Metric stats are only allowed if the `source` is [`BucketSource::Internal`].
pub fn filter_namespaces(mut buckets: Vec<Bucket>, source: BucketSource) -> Vec<Bucket> {
buckets.retain(|bucket| match bucket.name.namespace() {
pub fn is_valid_namespace(bucket: &Bucket, source: BucketSource) -> bool {
match bucket.name.namespace() {
MetricNamespace::Sessions => true,
MetricNamespace::Transactions => true,
MetricNamespace::Spans => true,
MetricNamespace::Profiles => true,
MetricNamespace::Custom => true,
MetricNamespace::Stats => source == BucketSource::Internal,
MetricNamespace::Unsupported => false,
});

buckets
}
}

pub fn apply_project_info(
Expand Down

0 comments on commit 37e53c5

Please sign in to comment.