Skip to content

Commit

Permalink
feat: refactor metrics table
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed Jan 10, 2024
1 parent 0a20bbc commit 9a40013
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 107 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ datafusion.workspace = true
datatypes.workspace = true
futures = "0.3"
futures-util.workspace = true
itertools.workspace = true
lazy_static.workspace = true
meta-client.workspace = true
moka = { workspace = true, features = ["future"] }
Expand Down
141 changes: 34 additions & 107 deletions src/catalog/src/information_schema/runtime_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::{
ConstantVector, Float64VectorBuilder, StringVector, StringVectorBuilder, VectorRef,
};
use prometheus::proto::{LabelPair, MetricType};
use itertools::Itertools;
use snafu::ResultExt;
use store_api::storage::{ScanRequest, TableId};

Expand Down Expand Up @@ -131,84 +131,38 @@ impl InformationSchemaMetricsBuilder {
async fn make_metrics(&mut self, _request: Option<ScanRequest>) -> Result<RecordBatch> {
let metric_families = prometheus::gather();

for mf in metric_families {
let mf_type = mf.get_field_type();
let mf_name = mf.get_name();

for m in mf.get_metric() {
match mf_type {
MetricType::COUNTER => self.add_metric(
mf_name,
join_labels(m.get_label(), None),
m.get_counter().get_value(),
),
MetricType::GAUGE => self.add_metric(
mf_name,
join_labels(m.get_label(), None),
m.get_gauge().get_value(),
),
MetricType::HISTOGRAM => {
let h = m.get_histogram();
let mut inf_seen = false;
let metric_name = format!("{}_bucket", mf_name);
for b in h.get_bucket() {
let upper_bound = b.get_upper_bound();
self.add_metric(
&metric_name,
join_labels(m.get_label(), Some(("le", upper_bound.to_string()))),
b.get_cumulative_count() as f64,
);
if upper_bound.is_sign_positive() && upper_bound.is_infinite() {
inf_seen = true;
}
}
if !inf_seen {
self.add_metric(
&metric_name,
join_labels(m.get_label(), Some(("le", "+Inf".to_string()))),
h.get_sample_count() as f64,
);
}
self.add_metric(
format!("{}_sum", mf_name).as_str(),
join_labels(m.get_label(), None),
h.get_sample_sum(),
);
self.add_metric(
format!("{}_count", mf_name).as_str(),
join_labels(m.get_label(), None),
h.get_sample_count() as f64,
);
let write_request =
common_telemetry::metric::convert_metric_to_write_request(metric_families, None, 0);

for ts in write_request.timeseries {
//Safety: always has `__name__` label
let metric_name = ts
.labels
.iter()
.find_map(|label| {
if label.name == "__name__" {
Some(label.value.clone())
} else {
None
}
MetricType::SUMMARY => {
let s = m.get_summary();
for q in s.get_quantile() {
self.add_metric(
mf_name,
join_labels(
m.get_label(),
Some(("quantile", q.get_quantile().to_string())),
),
q.get_value(),
);
})
.unwrap();

self.add_metric(
&metric_name,
ts.labels
.into_iter()
.filter_map(|label| {
if label.name == "__name__" {
None
} else {
Some(format!("{}={}", label.name, label.value))
}
self.add_metric(
format!("{}_sum", mf_name).as_str(),
join_labels(m.get_label(), None),
s.get_sample_sum(),
);
self.add_metric(
format!("{}_count", mf_name).as_str(),
join_labels(m.get_label(), None),
s.get_sample_count() as f64,
);
}
MetricType::UNTYPED => {
// `TextEncoder` `MetricType::UNTYPED` unimplemented
// To keep the implementation consistent and not cause unexpected panics, we do nothing here.
}
};
}
})
.join(", "),
// Safety: always has a sample
ts.samples[0].value,
);
}

self.finish()
Expand Down Expand Up @@ -252,42 +206,12 @@ impl DfPartitionStream for InformationSchemaMetrics {
}
}

fn join_labels(pairs: &[LabelPair], addon: Option<(&'static str, String)>) -> String {
let mut labels = Vec::with_capacity(pairs.len() + 1 + if addon.is_some() { 1 } else { 0 });
for label in pairs {
labels.push(format!("{}={}", label.get_name(), label.get_value(),));
}
if let Some(addon) = addon {
labels.push(format!("{}={}", addon.0, addon.1));
}
labels.sort_unstable();
labels.join(", ")
}

#[cfg(test)]
mod tests {
use common_recordbatch::RecordBatches;

use super::*;

fn new_pair(name: &str, value: &str) -> LabelPair {
let mut pair = LabelPair::new();
pair.set_name(name.to_string());
pair.set_value(value.to_string());
pair
}

#[test]
fn test_join_labels() {
let pairs = vec![new_pair("le", "0.999"), new_pair("host", "host1")];

assert_eq!("host=host1, le=0.999", join_labels(&pairs, None));
assert_eq!(
"a=a_value, host=host1, le=0.999",
join_labels(&pairs, Some(("a", "a_value".to_string())))
);
}

#[tokio::test]
async fn test_make_metrics() {
let metrics = InformationSchemaMetrics::new();
Expand All @@ -300,5 +224,8 @@ mod tests {

assert!(result_literal.contains(METRIC_NAME));
assert!(result_literal.contains(METRIC_VALUE));
assert!(result_literal.contains(METRIC_LABELS));
assert!(result_literal.contains(NODE));
assert!(result_literal.contains(NODE_TYPE));
}
}

0 comments on commit 9a40013

Please sign in to comment.