Skip to content

Commit

Permalink
Merge pull request #16 from Yisaer/add_metrics
Browse files Browse the repository at this point in the history
feat: add records in/out metrics
  • Loading branch information
Yisaer authored Aug 29, 2024
2 parents b0c85ea + 94c6b7c commit 8e8f2a2
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 12 deletions.
16 changes: 10 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#[macro_use]
extern crate lazy_static;

use std::{collections::HashMap, mem, sync::Arc, time::Duration};
use std::{collections::HashMap, mem, process::id, sync::Arc, time::Duration};

use axum::{
extract::{self},
Expand All @@ -14,7 +14,7 @@ use log::{info, LevelFilter};
use prometheus::{Encoder, TextEncoder};
use rumqttc::QoS;
use serde::{Deserialize, Serialize};
use sysinfo::{CpuExt, System, SystemExt};
use sysinfo::{CpuExt, Pid, ProcessExt, System, SystemExt};
use tokio::{sync::Mutex, time};

use catalog::Catalog;
Expand Down Expand Up @@ -97,6 +97,7 @@ async fn execute_sql(
.publish("/yisa/data2", QoS::AtLeastOnce, false, data)
.await
.unwrap();
metrics::RECORDS_OUT.inc();
}
}
info!("Subscribers of /yisa/data2 is closed");
Expand Down Expand Up @@ -150,15 +151,18 @@ async fn metrics_handler() -> String {

fn start_collecting_metric() {
let mut sys = System::new();
let current_pid = id() as i32;
tokio::spawn(async move {
let mut interval = time::interval(Duration::from_secs(1));
loop {
interval.tick().await;
sys.refresh_all();
let cpu_usage = sys.global_cpu_info().cpu_usage() as i64;
let memory_usage = sys.used_memory() as i64;
metrics::CPU.set(cpu_usage);
metrics::MEMORY.set(memory_usage);
if let Some(process) = sys.process(Pid::from(current_pid)) {
let cpu_usage = process.cpu_usage() as i64;
let memory_usage = process.memory() as i64;
metrics::CPU.set(cpu_usage);
metrics::MEMORY.set(memory_usage);
}
}
});
}
Expand Down
6 changes: 5 additions & 1 deletion src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use prometheus::{register_int_gauge, IntGauge};
use prometheus::{register_int_counter, register_int_gauge, IntCounter, IntGauge};

lazy_static! {
pub static ref CPU: IntGauge =
register_int_gauge!("cpu_usage", "CPU usage in percentage").unwrap();
pub static ref MEMORY: IntGauge =
register_int_gauge!("memory_usage", "Memory usage in bytes").unwrap();
pub static ref RECORDS_IN: IntCounter =
register_int_counter!("records_in", "message records in").unwrap();
pub static ref RECORDS_OUT: IntCounter =
register_int_counter!("records_out", "message records in").unwrap();
}
12 changes: 7 additions & 5 deletions src/sql/runtime/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tokio::{
use crate::{
connector::MqttClient,
core::{tuple::Tuple, Datum, ErrorKind, SQLError, Type},
metrics,
sql::{
expression::{
aggregate::{AggregateFunction, AggregateState},
Expand Down Expand Up @@ -330,11 +331,11 @@ impl ScanExecutor {
let id = String::from("source");
let mut mqtt_client = MqttClient::new(&id);
let topic = String::from("/yisa/data");
let def = ctx
.catalog
.find_table_by_name(&*self.schema_name, &*self.table_name)
.unwrap()
.unwrap();
// let def = ctx
// .catalog
// .find_table_by_name(&*self.schema_name, &*self.table_name)
// .unwrap()
// .unwrap();
tokio::spawn(async move {
info!("ScanExecutor listening");
mqtt_client
Expand All @@ -353,6 +354,7 @@ impl ScanExecutor {
serde_json::from_str(message.as_ref()).unwrap();
let tuple = Tuple::from_hashmap(parsed);
result_tx.send(Ok(Some(tuple))).unwrap();
metrics::RECORDS_IN.inc();
}
_ => {}
}
Expand Down

0 comments on commit 8e8f2a2

Please sign in to comment.