Skip to content

Commit

Permalink
ref(system): Instrument spawned tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Jul 23, 2024
1 parent 3f03ac2 commit 0c3b964
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 2 deletions.
4 changes: 4 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
disallowed-methods = [
{ path = "tokio::spawn", reason = "use `relay_system::spawn` instead" },
]

2 changes: 2 additions & 0 deletions relay-system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
#![allow(clippy::derive_partial_eq_without_eq)]

mod controller;
mod runtime;
mod service;
mod statsd;

pub use self::controller::*;
pub use self::runtime::*;
pub use self::service::*;
37 changes: 37 additions & 0 deletions relay-system/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::panic::Location;
use std::time::Instant;

use futures::Future;
use tokio::task::JoinHandle;

use crate::statsd::{SystemCounters, SystemTimers};

/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
///
/// This is in instrumented spawn variant of Tokio's [`tokio::spawn`].
#[track_caller]
pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let location = Location::caller();
let task_id = format!("{}:{}", location.file(), location.line());

relay_statsd::metric!(
counter(SystemCounters::RuntimeTasksCreated) += 1,
id = &task_id
);

let created = Instant::now();
tokio::spawn(async move {
let result = future.await;

relay_statsd::metric!(
timer(SystemTimers::RuntimeTasksFinished) = created.elapsed(),
id = &task_id
);

result
})
}
43 changes: 41 additions & 2 deletions relay-system/src/statsd.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,43 @@
use relay_statsd::GaugeMetric;
use relay_statsd::{CounterMetric, GaugeMetric, TimerMetric};

/// Counter metrics for Relay system components.
pub enum SystemCounters {
/// Number of runtime tasks created/spawned.
///
/// Every call to [`spawn`](`crate::spawn`) increases this counter by one.
///
/// This metric is tagged with:
/// - `id`: A unique identifier for the task, derived from its location in code.
RuntimeTasksCreated,
}

impl CounterMetric for SystemCounters {
fn name(&self) -> &'static str {
match self {
Self::RuntimeTasksCreated => "runtime.task.spawn.created",
}
}
}

/// Timer metrics for Relay system components.
pub enum SystemTimers {
/// Duration how long a runtime task was alive for.
///
/// This metric is emitted once for each task passed to [`spawn`](crate::spawn)
/// with the time it took for the passed task to terminate.
///
/// This metric is tagged with:
/// - `id`: A unique identifier for the task, derived from its location in code.
RuntimeTasksFinished,
}

impl TimerMetric for SystemTimers {
fn name(&self) -> &'static str {
match self {
Self::RuntimeTasksFinished => "runtime.task.spawn.finished",
}
}
}

/// Gauge metrics for Relay system components.
pub enum SystemGauges {
Expand All @@ -16,7 +55,7 @@ pub enum SystemGauges {
impl GaugeMetric for SystemGauges {
fn name(&self) -> &'static str {
match *self {
SystemGauges::ServiceBackPressure => "service.back_pressure",
Self::ServiceBackPressure => "service.back_pressure",
}
}
}

0 comments on commit 0c3b964

Please sign in to comment.