Skip to content

Commit

Permalink
ref(metrics): Move associated fns away from aggregator (#3566)
Browse files Browse the repository at this point in the history
No reason for them to be associated, we have in several other places
just free standing functions. In an upcoming PR I introduce some
generics to the aggregator which makes these fns really awkward. I move
it out here to reduce the diff later.

Functions are unchanged.
  • Loading branch information
Dav1dde committed May 8, 2024
1 parent 9cef1a4 commit 34a4f62
Showing 1 changed file with 125 additions and 129 deletions.
254 changes: 125 additions & 129 deletions relay-metrics/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,125 +597,6 @@ impl Aggregator {
buckets
}

/// Validates the metric name and its tags are correct.
///
/// Returns `Err` if the metric should be dropped.
fn validate_bucket_key(
mut key: BucketKey,
aggregator_config: &AggregatorConfig,
) -> Result<BucketKey, AggregateMetricsError> {
key = Self::validate_metric_name(key, aggregator_config)?;
key = Self::validate_metric_tags(key, aggregator_config);
Ok(key)
}

/// Removes invalid characters from metric names.
///
/// Returns `Err` if the metric must be dropped.
fn validate_metric_name(
mut key: BucketKey,
aggregator_config: &AggregatorConfig,
) -> Result<BucketKey, AggregateMetricsError> {
let metric_name_length = key.metric_name.len();
if metric_name_length > aggregator_config.max_name_length {
relay_log::configure_scope(|scope| {
scope.set_extra(
"bucket.project_key",
key.project_key.as_str().to_owned().into(),
);
scope.set_extra(
"bucket.metric_name.length",
metric_name_length.to_string().into(),
);
scope.set_extra(
"aggregator_config.max_name_length",
aggregator_config.max_name_length.to_string().into(),
);
});
return Err(AggregateMetricsErrorKind::InvalidStringLength(key.metric_name).into());
}

if let Err(err) = Self::normalize_metric_name(&mut key) {
relay_log::configure_scope(|scope| {
scope.set_extra(
"bucket.project_key",
key.project_key.as_str().to_owned().into(),
);
scope.set_extra("bucket.metric_name", key.metric_name.to_string().into());
});
return Err(err);
}

Ok(key)
}

fn normalize_metric_name(key: &mut BucketKey) -> Result<(), AggregateMetricsError> {
key.metric_name = match MetricResourceIdentifier::parse(&key.metric_name) {
Ok(mri) => {
if matches!(mri.namespace, MetricNamespace::Unsupported) {
relay_log::debug!("invalid metric namespace {:?}", &key.metric_name);
return Err(
AggregateMetricsErrorKind::UnsupportedNamespace(mri.namespace).into(),
);
}

mri.to_string().into()
}
Err(_) => {
relay_log::debug!("invalid metric name {:?}", &key.metric_name);
return Err(
AggregateMetricsErrorKind::InvalidCharacters(key.metric_name.clone()).into(),
);
}
};

Ok(())
}

/// Removes tags with invalid characters in the key, and validates tag values.
///
/// Tag values are validated with `protocol::validate_tag_value`.
fn validate_metric_tags(mut key: BucketKey, aggregator_config: &AggregatorConfig) -> BucketKey {
let proj_key = key.project_key.as_str();
key.tags.retain(|tag_key, tag_value| {
if tag_key.len() > aggregator_config.max_tag_key_length {
relay_log::configure_scope(|scope| {
scope.set_extra("bucket.project_key", proj_key.to_owned().into());
scope.set_extra("bucket.metric.tag_key", tag_key.to_owned().into());
scope.set_extra(
"aggregator_config.max_tag_key_length",
aggregator_config.max_tag_key_length.to_string().into(),
);
});
relay_log::debug!("Invalid metric tag key");
return false;
}
if bytecount::num_chars(tag_value.as_bytes()) > aggregator_config.max_tag_value_length {
relay_log::configure_scope(|scope| {
scope.set_extra("bucket.project_key", proj_key.to_owned().into());
scope.set_extra("bucket.metric.tag_value", tag_value.to_owned().into());
scope.set_extra(
"aggregator_config.max_tag_value_length",
aggregator_config.max_tag_value_length.to_string().into(),
);
});
relay_log::debug!("Invalid metric tag value");
return false;
}

if protocol::is_valid_tag_key(tag_key) {
true
} else {
relay_log::debug!("invalid metric tag key {tag_key:?}");
false
}
});
for (_, tag_value) in key.tags.iter_mut() {
protocol::validate_tag_value(tag_value);
}
key
}

/// Wrapper for [`AggregatorConfig::get_bucket_timestamp`].
///
/// Logs a statsd metric for invalid timestamps.
Expand Down Expand Up @@ -755,7 +636,7 @@ impl Aggregator {
metric_name: bucket.name,
tags: bucket.tags,
};
let key = Self::validate_bucket_key(key, &self.config)?;
let key = validate_bucket_key(key, &self.config)?;

// XXX: This is not a great implementation of cost enforcement.
//
Expand Down Expand Up @@ -877,6 +758,123 @@ impl fmt::Debug for Aggregator {
}
}

/// Validates the metric name and its tags are correct.
///
/// Returns `Err` if the metric should be dropped.
fn validate_bucket_key(
mut key: BucketKey,
aggregator_config: &AggregatorConfig,
) -> Result<BucketKey, AggregateMetricsError> {
key = validate_metric_name(key, aggregator_config)?;
key = validate_metric_tags(key, aggregator_config);
Ok(key)
}

/// Removes invalid characters from metric names.
///
/// Returns `Err` if the metric must be dropped.
fn validate_metric_name(
mut key: BucketKey,
aggregator_config: &AggregatorConfig,
) -> Result<BucketKey, AggregateMetricsError> {
let metric_name_length = key.metric_name.len();
if metric_name_length > aggregator_config.max_name_length {
relay_log::configure_scope(|scope| {
scope.set_extra(
"bucket.project_key",
key.project_key.as_str().to_owned().into(),
);
scope.set_extra(
"bucket.metric_name.length",
metric_name_length.to_string().into(),
);
scope.set_extra(
"aggregator_config.max_name_length",
aggregator_config.max_name_length.to_string().into(),
);
});
return Err(AggregateMetricsErrorKind::InvalidStringLength(key.metric_name).into());
}

if let Err(err) = normalize_metric_name(&mut key) {
relay_log::configure_scope(|scope| {
scope.set_extra(
"bucket.project_key",
key.project_key.as_str().to_owned().into(),
);
scope.set_extra("bucket.metric_name", key.metric_name.to_string().into());
});
return Err(err);
}

Ok(key)
}

fn normalize_metric_name(key: &mut BucketKey) -> Result<(), AggregateMetricsError> {
key.metric_name = match MetricResourceIdentifier::parse(&key.metric_name) {
Ok(mri) => {
if matches!(mri.namespace, MetricNamespace::Unsupported) {
relay_log::debug!("invalid metric namespace {:?}", &key.metric_name);
return Err(AggregateMetricsErrorKind::UnsupportedNamespace(mri.namespace).into());
}

mri.to_string().into()
}
Err(_) => {
relay_log::debug!("invalid metric name {:?}", &key.metric_name);
return Err(
AggregateMetricsErrorKind::InvalidCharacters(key.metric_name.clone()).into(),
);
}
};

Ok(())
}

/// Removes tags with invalid characters in the key, and validates tag values.
///
/// Tag values are validated with `protocol::validate_tag_value`.
fn validate_metric_tags(mut key: BucketKey, aggregator_config: &AggregatorConfig) -> BucketKey {
let proj_key = key.project_key.as_str();
key.tags.retain(|tag_key, tag_value| {
if tag_key.len() > aggregator_config.max_tag_key_length {
relay_log::configure_scope(|scope| {
scope.set_extra("bucket.project_key", proj_key.to_owned().into());
scope.set_extra("bucket.metric.tag_key", tag_key.to_owned().into());
scope.set_extra(
"aggregator_config.max_tag_key_length",
aggregator_config.max_tag_key_length.to_string().into(),
);
});
relay_log::debug!("Invalid metric tag key");
return false;
}
if bytecount::num_chars(tag_value.as_bytes()) > aggregator_config.max_tag_value_length {
relay_log::configure_scope(|scope| {
scope.set_extra("bucket.project_key", proj_key.to_owned().into());
scope.set_extra("bucket.metric.tag_value", tag_value.to_owned().into());
scope.set_extra(
"aggregator_config.max_tag_value_length",
aggregator_config.max_tag_value_length.to_string().into(),
);
});
relay_log::debug!("Invalid metric tag value");
return false;
}

if protocol::is_valid_tag_key(tag_key) {
true
} else {
relay_log::debug!("invalid metric tag key {tag_key:?}");
false
}
});
for (_, tag_value) in key.tags.iter_mut() {
protocol::validate_tag_value(tag_value);
}
key
}

#[cfg(test)]
mod tests {
use similar_asserts::assert_eq;
Expand Down Expand Up @@ -1332,7 +1330,7 @@ mod tests {
},
};

let mut bucket_key = Aggregator::validate_bucket_key(bucket_key, &test_config()).unwrap();
let mut bucket_key = validate_bucket_key(bucket_key, &test_config()).unwrap();

assert_eq!(bucket_key.tags.len(), 1);
assert_eq!(
Expand All @@ -1342,7 +1340,7 @@ mod tests {
assert_eq!(bucket_key.tags.get("another\0garbage"), None);

bucket_key.metric_name = "hergus\0bergus".into();
Aggregator::validate_bucket_key(bucket_key, &test_config()).unwrap_err();
validate_bucket_key(bucket_key, &test_config()).unwrap_err();
}

#[test]
Expand All @@ -1356,15 +1354,15 @@ mod tests {
metric_name: "c:transactions/a_short_metric".into(),
tags: BTreeMap::new(),
};
assert!(Aggregator::validate_bucket_key(short_metric, &test_config()).is_ok());
assert!(validate_bucket_key(short_metric, &test_config()).is_ok());

let long_metric = BucketKey {
project_key,
timestamp: UnixTimestamp::now(),
metric_name: "c:transactions/long_name_a_very_long_name_its_super_long_really_but_like_super_long_probably_the_longest_name_youve_seen_and_even_the_longest_name_ever_its_extremly_long_i_cant_tell_how_long_it_is_because_i_dont_have_that_many_fingers_thus_i_cant_count_the_many_characters_this_long_name_is".into(),
tags: BTreeMap::new(),
};
let validation = Aggregator::validate_bucket_key(long_metric, &test_config());
let validation = validate_bucket_key(long_metric, &test_config());

assert!(matches!(
validation.unwrap_err(),
Expand All @@ -1379,8 +1377,7 @@ mod tests {
metric_name: "c:transactions/a_short_metric_with_long_tag_key".into(),
tags: BTreeMap::from([("i_run_out_of_creativity_so_here_we_go_Lorem_Ipsum_is_simply_dummy_text_of_the_printing_and_typesetting_industry_Lorem_Ipsum_has_been_the_industrys_standard_dummy_text_ever_since_the_1500s_when_an_unknown_printer_took_a_galley_of_type_and_scrambled_it_to_make_a_type_specimen_book".into(), "tag_value".into())]),
};
let validation =
Aggregator::validate_bucket_key(short_metric_long_tag_key, &test_config()).unwrap();
let validation = validate_bucket_key(short_metric_long_tag_key, &test_config()).unwrap();
assert_eq!(validation.tags.len(), 0);

let short_metric_long_tag_value = BucketKey {
Expand All @@ -1389,8 +1386,7 @@ mod tests {
metric_name: "c:transactions/a_short_metric_with_long_tag_value".into(),
tags: BTreeMap::from([("tag_key".into(), "i_run_out_of_creativity_so_here_we_go_Lorem_Ipsum_is_simply_dummy_text_of_the_printing_and_typesetting_industry_Lorem_Ipsum_has_been_the_industrys_standard_dummy_text_ever_since_the_1500s_when_an_unknown_printer_took_a_galley_of_type_and_scrambled_it_to_make_a_type_specimen_book".into())]),
};
let validation =
Aggregator::validate_bucket_key(short_metric_long_tag_value, &test_config()).unwrap();
let validation = validate_bucket_key(short_metric_long_tag_value, &test_config()).unwrap();
assert_eq!(validation.tags.len(), 0);
}

Expand All @@ -1407,7 +1403,7 @@ mod tests {
metric_name: "c:transactions/a_short_metric".into(),
tags: BTreeMap::from([("foo".into(), tag_value.clone())]),
};
let validated_bucket = Aggregator::validate_metric_tags(short_metric, &test_config());
let validated_bucket = validate_metric_tags(short_metric, &test_config());
assert_eq!(validated_bucket.tags["foo"], tag_value);
}

Expand Down

0 comments on commit 34a4f62

Please sign in to comment.