Skip to content

Commit

Permalink
ref(iroh-metrics, iroh-relay): Remove the UsageStatsReporter (#2952)
Browse files Browse the repository at this point in the history
## Description

This was unused and a rather scary footgun that could easily lead to
way to many http requests being made.

## Breaking Changes

<!-- Optional, if there are any breaking changes document them,
including how to migrate older code. -->

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [x] Self-review.
- [x] Documentation updates following the [style
guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text),
if relevant.
- [x] Tests if relevant.
- [x] All breaking changes documented.
  • Loading branch information
flub authored Nov 20, 2024
1 parent 73e7d44 commit 8b7611e
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 136 deletions.
97 changes: 1 addition & 96 deletions iroh-metrics/src/core.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,21 @@
use anyhow::Error;
use erased_set::ErasedSyncSet;
use once_cell::sync::OnceCell;
#[cfg(feature = "metrics")]
use prometheus_client::{encoding::text::encode, registry::Registry};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use tracing::trace;
#[cfg(not(feature = "metrics"))]
type Registry = ();

static CORE: OnceCell<Core> = OnceCell::new();

/// Core is the base metrics struct.
///
/// It manages the mapping between the metrics name and the actual metrics.
/// It also carries a single prometheus registry to be used by all metrics.
#[derive(Debug, Default)]
pub struct Core {
#[cfg(feature = "metrics")]
registry: Registry,
metrics_map: ErasedSyncSet,
#[cfg(feature = "metrics")]
usage_reporter: UsageReporter,
}
/// Open Metrics [`Counter`] to measure discrete events.
///
Expand Down Expand Up @@ -154,15 +149,10 @@ impl Core {
let mut metrics_map = ErasedSyncSet::new();
f(&mut registry, &mut metrics_map);

#[cfg(feature = "metrics")]
let usage_reporter = UsageReporter::new();

CORE.set(Core {
metrics_map,
#[cfg(feature = "metrics")]
registry,
#[cfg(feature = "metrics")]
usage_reporter,
})
.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "already set"))
}
Expand Down Expand Up @@ -191,11 +181,6 @@ impl Core {
encode(&mut buf, &self.registry)?;
Ok(buf)
}

#[cfg(feature = "metrics")]
pub(crate) fn usage_reporter(&self) -> &UsageReporter {
&self.usage_reporter
}
}

/// Interface for all single value based metrics.
Expand All @@ -209,83 +194,3 @@ pub trait HistogramType {
/// Returns the name of the metric
fn name(&self) -> &'static str;
}

/// Exposes a simple API to report usage statistics.
#[derive(Debug, Default)]
pub struct UsageReporter {
pub(crate) report_endpoint: Option<String>,
pub(crate) report_token: Option<String>,
}

impl UsageReporter {
/// Creates a new usage reporter.
pub fn new() -> Self {
let report_endpoint = std::env::var("IROH_METRICS_USAGE_STATS_ENDPOINT").ok();
let report_token = std::env::var("IROH_METRICS_USAGE_STATS_TOKEN").ok();
UsageReporter {
report_endpoint,
report_token,
}
}

/// Reports usage statistics to the configured endpoint.
pub async fn report_usage_stats(&self, report: &UsageStatsReport) -> Result<(), Error> {
if let Some(report_endpoint) = &self.report_endpoint {
trace!("reporting usage stats to {}", report_endpoint);
let mut client = reqwest::Client::new().post(report_endpoint);

if let Some(report_token) = &self.report_token {
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
reqwest::header::COOKIE,
format!("token={}", report_token).parse().unwrap(),
);
client = client.headers(headers);
}
let _ = client.json(report).send().await?;
}
Ok(())
}
}

/// Usage statistics report.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UsageStatsReport {
/// The timestamp of the report.
#[serde(with = "time::serde::rfc3339")]
pub timestamp: OffsetDateTime,
/// The resource being consumed.
pub resource: String,
/// Reference to the resource reporter.
pub resource_ref: String,
/// The value of the resource being consumed.
pub value: i64,
/// Identifier of the user consuming the resource.
pub attribution_id: Option<String>,
/// Public key of the user consuming the resource.
pub attribution_key: Option<String>,
}

/// Type alias for a metered resource.
pub type UsageResource = String;

impl UsageStatsReport {
/// Creates a new usage stats report.
pub fn new(
resource: UsageResource,
resource_ref: String,
value: i64,
attribution_id: Option<String>,
attribution_key: Option<String>,
) -> Self {
let timestamp = OffsetDateTime::now_utc();
UsageStatsReport {
timestamp,
resource,
resource_ref,
value,
attribution_id,
attribution_key,
}
}
}
17 changes: 0 additions & 17 deletions iroh-metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ pub mod core;
#[cfg(feature = "metrics")]
mod service;

use core::UsageStatsReport;
use std::collections::HashMap;

/// Reexport to make matching versions easier.
Expand Down Expand Up @@ -40,22 +39,6 @@ macro_rules! set {
};
}

/// Report usage statistics to the configured endpoint.
#[allow(unused_variables)]
pub async fn report_usage_stats(report: &UsageStatsReport) {
#[cfg(feature = "metrics")]
{
if let Some(core) = core::Core::get() {
core.usage_reporter()
.report_usage_stats(report)
.await
.unwrap_or_else(|e| {
tracing::error!("Failed to report usage stats: {}", e);
});
}
}
}

/// Parse Prometheus metrics from a string.
pub fn parse_prometheus_metrics(data: &str) -> anyhow::Result<HashMap<String, f64>> {
let mut metrics = HashMap::new();
Expand Down
29 changes: 6 additions & 23 deletions iroh-relay/src/server/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use std::{collections::HashMap, time::Duration};
use anyhow::{bail, Result};
use bytes::Bytes;
use iroh_base::key::NodeId;
use iroh_metrics::{core::UsageStatsReport, inc, inc_by, report_usage_stats};
use iroh_metrics::{inc, inc_by};
use time::{Date, OffsetDateTime};
use tokio::{sync::mpsc, task::JoinSet};
use tokio::sync::mpsc;
use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle};
use tracing::{info, info_span, trace, warn, Instrument};

Expand Down Expand Up @@ -114,7 +114,6 @@ impl Actor {
}

async fn run(mut self, done: CancellationToken) -> Result<()> {
let mut tasks = JoinSet::new();
loop {
tokio::select! {
biased;
Expand All @@ -126,16 +125,9 @@ impl Actor {
self.clients.shutdown().await;
return Ok(());
}
Some(res) = tasks.join_next(), if !tasks.is_empty() => {
if let Err(err) = res {
if err.is_panic() {
panic!("Task panicked: {err:?}");
}
}
}
msg = self.receiver.recv() => match msg {
Some(msg) => {
self.handle_message(msg, &mut tasks).await;
self.handle_message(msg).await;
}
None => {
warn!("unexpected actor error: receiver gone, shutting down actor loop");
Expand All @@ -147,7 +139,7 @@ impl Actor {
}
}

async fn handle_message(&mut self, msg: Message, tasks: &mut JoinSet<()>) {
async fn handle_message(&mut self, msg: Message) {
match msg {
Message::SendPacket { dst, data, src } => {
trace!(?src, ?dst, len = data.len(), "send packet");
Expand Down Expand Up @@ -198,18 +190,9 @@ impl Actor {
);
let node_id = client_builder.node_id;

// build and register client, starting up read & write loops for the client connection
// build and register client, starting up read & write loops for the client
// connection
self.clients.register(client_builder).await;
tasks.spawn(async move {
report_usage_stats(&UsageStatsReport::new(
"relay_accepts".to_string(),
"relay_server".to_string(), // TODO: other id?
1,
None, // TODO(arqu): attribute to user id; possibly with the re-introduction of request tokens or other auth
Some(node_id.to_string()),
))
.await;
});
let nc = self.client_counter.update(node_id);
inc_by!(Metrics, unique_client_keys, nc);
}
Expand Down

0 comments on commit 8b7611e

Please sign in to comment.