From 9a400138148963ae201c6cf2f38a19f8c035226a Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Wed, 10 Jan 2024 16:40:41 +0800 Subject: [PATCH] feat: refactor metrics table --- Cargo.lock | 1 + src/catalog/Cargo.toml | 1 + .../src/information_schema/runtime_metrics.rs | 141 +++++------------- 3 files changed, 36 insertions(+), 107 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aa0bbd3e1126..bae2f71c4de0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1206,6 +1206,7 @@ dependencies = [ "datatypes", "futures", "futures-util", + "itertools 0.10.5", "lazy_static", "log-store", "meta-client", diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 715324e7a2db..656cc9fd1ee2 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -30,6 +30,7 @@ datafusion.workspace = true datatypes.workspace = true futures = "0.3" futures-util.workspace = true +itertools.workspace = true lazy_static.workspace = true meta-client.workspace = true moka = { workspace = true, features = ["future"] } diff --git a/src/catalog/src/information_schema/runtime_metrics.rs b/src/catalog/src/information_schema/runtime_metrics.rs index 0bbea13d8927..14ee93544ccb 100644 --- a/src/catalog/src/information_schema/runtime_metrics.rs +++ b/src/catalog/src/information_schema/runtime_metrics.rs @@ -29,7 +29,7 @@ use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::vectors::{ ConstantVector, Float64VectorBuilder, StringVector, StringVectorBuilder, VectorRef, }; -use prometheus::proto::{LabelPair, MetricType}; +use itertools::Itertools; use snafu::ResultExt; use store_api::storage::{ScanRequest, TableId}; @@ -131,84 +131,38 @@ impl InformationSchemaMetricsBuilder { async fn make_metrics(&mut self, _request: Option) -> Result { let metric_families = prometheus::gather(); - for mf in metric_families { - let mf_type = mf.get_field_type(); - let mf_name = mf.get_name(); - - for m in mf.get_metric() { - match mf_type { - MetricType::COUNTER => self.add_metric( - mf_name, - join_labels(m.get_label(), None), - m.get_counter().get_value(), - ), - MetricType::GAUGE => self.add_metric( - mf_name, - join_labels(m.get_label(), None), - m.get_gauge().get_value(), - ), - MetricType::HISTOGRAM => { - let h = m.get_histogram(); - let mut inf_seen = false; - let metric_name = format!("{}_bucket", mf_name); - for b in h.get_bucket() { - let upper_bound = b.get_upper_bound(); - self.add_metric( - &metric_name, - join_labels(m.get_label(), Some(("le", upper_bound.to_string()))), - b.get_cumulative_count() as f64, - ); - if upper_bound.is_sign_positive() && upper_bound.is_infinite() { - inf_seen = true; - } - } - if !inf_seen { - self.add_metric( - &metric_name, - join_labels(m.get_label(), Some(("le", "+Inf".to_string()))), - h.get_sample_count() as f64, - ); - } - self.add_metric( - format!("{}_sum", mf_name).as_str(), - join_labels(m.get_label(), None), - h.get_sample_sum(), - ); - self.add_metric( - format!("{}_count", mf_name).as_str(), - join_labels(m.get_label(), None), - h.get_sample_count() as f64, - ); + let write_request = + common_telemetry::metric::convert_metric_to_write_request(metric_families, None, 0); + + for ts in write_request.timeseries { + //Safety: always has `__name__` label + let metric_name = ts + .labels + .iter() + .find_map(|label| { + if label.name == "__name__" { + Some(label.value.clone()) + } else { + None } - MetricType::SUMMARY => { - let s = m.get_summary(); - for q in s.get_quantile() { - self.add_metric( - mf_name, - join_labels( - m.get_label(), - Some(("quantile", q.get_quantile().to_string())), - ), - q.get_value(), - ); + }) + .unwrap(); + + self.add_metric( + &metric_name, + ts.labels + .into_iter() + .filter_map(|label| { + if label.name == "__name__" { + None + } else { + Some(format!("{}={}", label.name, label.value)) } - self.add_metric( - format!("{}_sum", mf_name).as_str(), - join_labels(m.get_label(), None), - s.get_sample_sum(), - ); - self.add_metric( - format!("{}_count", mf_name).as_str(), - join_labels(m.get_label(), None), - s.get_sample_count() as f64, - ); - } - MetricType::UNTYPED => { - // `TextEncoder` `MetricType::UNTYPED` unimplemented - // To keep the implementation consistent and not cause unexpected panics, we do nothing here. - } - }; - } + }) + .join(", "), + // Safety: always has a sample + ts.samples[0].value, + ); } self.finish() @@ -252,42 +206,12 @@ impl DfPartitionStream for InformationSchemaMetrics { } } -fn join_labels(pairs: &[LabelPair], addon: Option<(&'static str, String)>) -> String { - let mut labels = Vec::with_capacity(pairs.len() + 1 + if addon.is_some() { 1 } else { 0 }); - for label in pairs { - labels.push(format!("{}={}", label.get_name(), label.get_value(),)); - } - if let Some(addon) = addon { - labels.push(format!("{}={}", addon.0, addon.1)); - } - labels.sort_unstable(); - labels.join(", ") -} - #[cfg(test)] mod tests { use common_recordbatch::RecordBatches; use super::*; - fn new_pair(name: &str, value: &str) -> LabelPair { - let mut pair = LabelPair::new(); - pair.set_name(name.to_string()); - pair.set_value(value.to_string()); - pair - } - - #[test] - fn test_join_labels() { - let pairs = vec![new_pair("le", "0.999"), new_pair("host", "host1")]; - - assert_eq!("host=host1, le=0.999", join_labels(&pairs, None)); - assert_eq!( - "a=a_value, host=host1, le=0.999", - join_labels(&pairs, Some(("a", "a_value".to_string()))) - ); - } - #[tokio::test] async fn test_make_metrics() { let metrics = InformationSchemaMetrics::new(); @@ -300,5 +224,8 @@ mod tests { assert!(result_literal.contains(METRIC_NAME)); assert!(result_literal.contains(METRIC_VALUE)); + assert!(result_literal.contains(METRIC_LABELS)); + assert!(result_literal.contains(NODE)); + assert!(result_literal.contains(NODE_TYPE)); } }