Skip to content

Commit

Permalink
feat: honor enhanced metrics bool (#307)
Browse files Browse the repository at this point in the history
* feat: honor enhanced metrics bool

* feat: add test

* feat: refactor to log instead of return result

* fix: clippy
  • Loading branch information
astuyve authored Jul 9, 2024
1 parent d5f51ec commit f46dd48
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 44 deletions.
26 changes: 7 additions & 19 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ async fn extension_loop_active(
error!("Error starting trace agent: {e:?}");
}
});
let lambda_enhanced_metrics = enhanced_metrics::new(Arc::clone(&metrics_aggr));
let lambda_enhanced_metrics =
enhanced_metrics::new(Arc::clone(&metrics_aggr), Arc::clone(config));
let dogstatsd_cancel_token = start_dogstatsd(event_bus.get_sender_copy(), &metrics_aggr).await;

let telemetry_listener_cancel_token =
Expand All @@ -335,9 +336,7 @@ async fn extension_loop_active(
"[extension_next] Invoke event {}; deadline: {}, invoked_function_arn: {}",
request_id, deadline_ms, invoked_function_arn
);
if let Err(e) = lambda_enhanced_metrics.increment_invocation_metric() {
error!("Failed to increment invocation metric: {e:?}");
}
lambda_enhanced_metrics.increment_invocation_metric();
}
Ok(NextEventResponse::Shutdown {
shutdown_reason,
Expand Down Expand Up @@ -375,11 +374,8 @@ async fn extension_loop_active(
metrics,
} => {
debug!("Platform init report for initialization_type: {:?} with phase: {:?} and metrics: {:?}", initialization_type, phase, metrics);
if let Err(e) = lambda_enhanced_metrics
.set_init_duration_metric(metrics.duration_ms)
{
error!("Failed to set init duration metric: {e:?}");
}
lambda_enhanced_metrics
.set_init_duration_metric(metrics.duration_ms);
}
TelemetryRecord::PlatformRuntimeDone {
request_id,
Expand All @@ -395,17 +391,9 @@ async fn extension_loop_active(
}

if status != Status::Success {
if let Err(e) =
lambda_enhanced_metrics.increment_errors_metric()
{
error!("Failed to increment error metric: {e:?}");
}
lambda_enhanced_metrics.increment_errors_metric();
if status == Status::Timeout {
if let Err(e) =
lambda_enhanced_metrics.increment_timeout_metric()
{
error!("Failed to increment timeout metric: {e:?}");
}
lambda_enhanced_metrics.increment_timeout_metric();
}
}
debug!(
Expand Down
2 changes: 2 additions & 0 deletions bottlecap/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub struct Config {
pub merge_xray_traces: bool,
pub serverless_appsec_enabled: bool,
pub extension_version: Option<String>,
pub enhanced_metrics: bool,
}

impl Default for Config {
Expand Down Expand Up @@ -75,6 +76,7 @@ impl Default for Config {
merge_xray_traces: false,
serverless_appsec_enabled: false,
extension_version: None,
enhanced_metrics: true,
}
}
}
Expand Down
139 changes: 114 additions & 25 deletions bottlecap/src/metrics/enhanced/lambda.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,81 @@
use super::constants::{self, BASE_LAMBDA_INVOCATION_PRICE};
use crate::metrics::aggregator::Aggregator;
use crate::metrics::{errors, metric};
use crate::metrics::metric;
use crate::telemetry::events::ReportMetrics;
use std::env::consts::ARCH;
use std::sync::{Arc, Mutex};
use tracing::error;

pub struct Lambda {
pub aggregator: Arc<Mutex<Aggregator<1024>>>,
pub config: Arc<crate::config::Config>,
}

impl Lambda {
#[must_use]
pub fn new(aggregator: Arc<Mutex<Aggregator<1024>>>) -> Lambda {
Lambda { aggregator }
pub fn new(
aggregator: Arc<Mutex<Aggregator<1024>>>,
config: Arc<crate::config::Config>,
) -> Lambda {
Lambda { aggregator, config }
}

pub fn increment_invocation_metric(&self) -> Result<(), errors::Insert> {
self.increment_metric(constants::INVOCATIONS_METRIC)
pub fn increment_invocation_metric(&self) {
self.increment_metric(constants::INVOCATIONS_METRIC);
}

pub fn increment_errors_metric(&self) -> Result<(), errors::Insert> {
self.increment_metric(constants::ERRORS_METRIC)
pub fn increment_errors_metric(&self) {
self.increment_metric(constants::ERRORS_METRIC);
}

pub fn increment_timeout_metric(&self) -> Result<(), errors::Insert> {
self.increment_metric(constants::TIMEOUTS_METRIC)
pub fn increment_timeout_metric(&self) {
self.increment_metric(constants::TIMEOUTS_METRIC);
}

pub fn set_init_duration_metric(&self, init_duration_ms: f64) -> Result<(), errors::Insert> {
pub fn set_init_duration_metric(&self, init_duration_ms: f64) {
if !self.config.enhanced_metrics {
return;
}
let metric = metric::Metric::new(
constants::INIT_DURATION_METRIC.into(),
metric::Type::Distribution,
(init_duration_ms * constants::MS_TO_SEC).to_string().into(),
None,
);
self.aggregator
if let Err(e) = self
.aggregator
.lock()
.expect("lock poisoned")
.insert(&metric)
{
error!("failed to insert metric: {}", e);
}
}

fn increment_metric(&self, metric_name: &str) -> Result<(), errors::Insert> {
fn increment_metric(&self, metric_name: &str) {
if !self.config.enhanced_metrics {
return;
}
let metric = metric::Metric::new(
metric_name.into(),
metric::Type::Distribution,
"1".into(),
None,
);
self.aggregator
if let Err(e) = self
.aggregator
.lock()
.expect("lock poisoned")
.insert(&metric)
{
error!("failed to insert metric: {}", e);
}
}

pub fn set_runtime_duration_metric(&self, duration_ms: f64) {
if !self.config.enhanced_metrics {
return;
}
let metric = metric::Metric::new(
constants::RUNTIME_DURATION_METRIC.into(),
metric::Type::Distribution,
Expand All @@ -73,6 +94,9 @@ impl Lambda {
}

pub fn set_post_runtime_duration_metric(&self, duration_ms: f64) {
if !self.config.enhanced_metrics {
return;
}
let metric = metric::Metric::new(
constants::POST_RUNTIME_DURATION_METRIC.into(),
metric::Type::Distribution,
Expand Down Expand Up @@ -107,6 +131,9 @@ impl Lambda {
}

pub fn set_report_log_metrics(&self, metrics: &ReportMetrics) {
if !self.config.enhanced_metrics {
return;
}
let mut aggr: std::sync::MutexGuard<Aggregator<1024>> =
self.aggregator.lock().expect("lock poisoned");
let metric = metric::Metric::new(
Expand Down Expand Up @@ -176,7 +203,7 @@ mod tests {
use std::collections::hash_map::HashMap;
use std::sync::MutexGuard;

fn setup() -> Arc<Mutex<Aggregator<1024>>> {
fn setup() -> (Arc<Mutex<Aggregator<1024>>>, Arc<config::Config>) {
let config = Arc::new(config::Config {
service: Some("test-service".to_string()),
tags: Some("test:tags".to_string()),
Expand All @@ -187,17 +214,21 @@ mod tests {
LAMBDA_RUNTIME_SLUG.to_string(),
&HashMap::new(),
));
Arc::new(Mutex::new(
Aggregator::<1024>::new(tags_provider.clone()).expect("failed to create aggregator"),
))
(
Arc::new(Mutex::new(
Aggregator::<1024>::new(tags_provider.clone())
.expect("failed to create aggregator"),
)),
config,
)
}

#[test]
#[allow(clippy::float_cmp)]
fn test_increment_invocation_metric() {
let metrics_aggr = setup();
let lambda = Lambda::new(metrics_aggr.clone());
lambda.increment_invocation_metric().unwrap();
let (metrics_aggr, my_config) = setup();
let lambda = Lambda::new(metrics_aggr.clone(), my_config);
lambda.increment_invocation_metric();
match metrics_aggr
.lock()
.expect("lock poisoned")
Expand All @@ -211,9 +242,9 @@ mod tests {
#[test]
#[allow(clippy::float_cmp)]
fn test_increment_errors_metric() {
let metrics_aggr = setup();
let lambda = Lambda::new(metrics_aggr.clone());
lambda.increment_errors_metric().unwrap();
let (metrics_aggr, my_config) = setup();
let lambda = Lambda::new(metrics_aggr.clone(), my_config);
lambda.increment_errors_metric();
match metrics_aggr
.lock()
.expect("lock poisoned")
Expand All @@ -224,10 +255,68 @@ mod tests {
};
}

#[test]
fn test_disabled() {
let (metrics_aggr, no_config) = setup();
let my_config = Arc::new(config::Config {
enhanced_metrics: false,
..no_config.as_ref().clone()
});
let lambda = Lambda::new(metrics_aggr.clone(), my_config);
lambda.increment_invocation_metric();
lambda.increment_errors_metric();
lambda.increment_timeout_metric();
lambda.set_init_duration_metric(100.0);
lambda.set_runtime_duration_metric(100.0);
lambda.set_post_runtime_duration_metric(100.0);
lambda.set_report_log_metrics(&ReportMetrics {
duration_ms: 100.0,
billed_duration_ms: 100,
max_memory_used_mb: 128,
memory_size_mb: 256,
init_duration_ms: Some(50.0),
restore_duration_ms: None,
});
let mut aggr = metrics_aggr.lock().expect("lock poisoned");
assert!(aggr
.get_value_by_id(constants::INVOCATIONS_METRIC.into(), None)
.is_none());
assert!(aggr
.get_value_by_id(constants::ERRORS_METRIC.into(), None)
.is_none());
assert!(aggr
.get_value_by_id(constants::TIMEOUTS_METRIC.into(), None)
.is_none());
assert!(aggr
.get_value_by_id(constants::INIT_DURATION_METRIC.into(), None)
.is_none());
assert!(aggr
.get_value_by_id(constants::RUNTIME_DURATION_METRIC.into(), None)
.is_none());
assert!(aggr
.get_value_by_id(constants::POST_RUNTIME_DURATION_METRIC.into(), None)
.is_none());
assert!(aggr
.get_value_by_id(constants::DURATION_METRIC.into(), None)
.is_none());
assert!(aggr
.get_value_by_id(constants::BILLED_DURATION_METRIC.into(), None)
.is_none());
assert!(aggr
.get_value_by_id(constants::MAX_MEMORY_USED_METRIC.into(), None)
.is_none());
assert!(aggr
.get_value_by_id(constants::MEMORY_SIZE_METRIC.into(), None)
.is_none());
assert!(aggr
.get_value_by_id(constants::ESTIMATED_COST_METRIC.into(), None)
.is_none());
}

#[test]
fn test_set_report_log_metrics() {
let metrics_aggr = setup();
let lambda = Lambda::new(metrics_aggr.clone());
let (metrics_aggr, my_config) = setup();
let lambda = Lambda::new(metrics_aggr.clone(), my_config);
let report_metrics = ReportMetrics {
duration_ms: 100.0,
billed_duration_ms: 100,
Expand Down

0 comments on commit f46dd48

Please sign in to comment.