Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
archeoss committed Dec 1, 2023
1 parent 9ae3382 commit 19c5c61
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 48 deletions.
32 changes: 32 additions & 0 deletions backend/src/models/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,38 @@ pub type NodeCount = TypedMap<NodeStatusName, u64>;
// #[cfg(not(all(feature = "swagger", debug_assertions)))]
pub type DiskCount = TypedMap<DiskStatusName, u64>;

impl RPS {
#[must_use]
pub fn from_metrics(metrics: &TypedMetrics) -> Self {
let mut rps = Self::new();
rps[Operation::Get] += metrics[RawMetricEntry::ClusterGrinderGetCountRate].value;
rps[Operation::Delete] += metrics[RawMetricEntry::ClusterGrinderDeleteCountRate].value;
rps[Operation::Exist] += metrics[RawMetricEntry::ClusterGrinderExistCountRate].value;
rps[Operation::Put] += metrics[RawMetricEntry::ClusterGrinderPutCountRate].value;

rps
}
}

impl AddAssign for RPS {
fn add_assign(&mut self, rhs: Self) {
self[Operation::Get] += rhs[Operation::Get];
self[Operation::Delete] += rhs[Operation::Delete];
self[Operation::Exist] += rhs[Operation::Exist];
self[Operation::Put] += rhs[Operation::Put];
}
}

impl Add for RPS {
type Output = Self;

fn add(mut self, rhs: Self) -> Self::Output {
self += rhs;

self
}
}

#[derive(Debug, Serialize, Clone)]
// #[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))]
// #[cfg_attr(all(feature = "swagger", debug_assertions),
Expand Down
6 changes: 5 additions & 1 deletion backend/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ pub mod shared;
pub mod prelude {
pub use crate::prelude::*;
pub use hyper::Uri;
pub use std::{net::SocketAddr, time::Duration};
pub use std::{
net::SocketAddr,
ops::{Add, AddAssign},
time::Duration,
};
pub use strum::{EnumIter, IntoEnumIterator};
#[cfg(all(feature = "swagger", debug_assertions))]
pub use utoipa::openapi::{Object, ObjectBuilder};
Expand Down
66 changes: 19 additions & 47 deletions backend/src/services/api.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use super::prelude::*;

/// Returns count of Physical Disks per status
///
#[cfg_attr(all(feature = "swagger", debug_assertions),
utoipa::path(
get,
Expand Down Expand Up @@ -31,31 +30,16 @@ pub async fn get_disks_count(Extension(client): Extension<HttpBobClient>) -> Jso
let mut count = DiskCount::new();

while let Some(res) = space.next().await {
let mut disks_visited = HashSet::new();
let (disks, space) = match res {
Ok(d) => d,
Err(err) => {
tracing::warn!("couldn't finish request: tokio task failed. Err: {err}");
continue;
}
let Ok((disks, space)) = res else {
tracing::warn!("couldn't finish request: tokio task failed. Err: {res:?}");
continue;
};
let space = match space {
Ok(GetSpaceInfoResponse::SpaceInfo(space)) => space,
Err(err) => {
tracing::warn!("couldn't finish getSpace request. Err: {err}");
continue;
}
let Ok(GetSpaceInfoResponse::SpaceInfo(space)) = space else {
tracing::warn!("couldn't finish getSpace request. {space:?}");
continue;
};
let disks = match disks {
Ok(GetDisksResponse::AJSONArrayWithDisksAndTheirStates(disks)) => {
let mut res = vec![];
for disk in disks {
if disks_visited.insert(disk.name.clone()) {
res.push(disk);
}
}
res
}
Ok(GetDisksResponse::AJSONArrayWithDisksAndTheirStates(disks)) => disks,
Ok(GetDisksResponse::PermissionDenied(err)) => {
count[DiskStatusName::Offline] += 1;
tracing::warn!("Permission Denied. Err: {err:?}");
Expand All @@ -67,22 +51,23 @@ pub async fn get_disks_count(Extension(client): Extension<HttpBobClient>) -> Jso
continue;
}
};
let active_disks = disks.iter().filter(|disk| disk.is_active);
for disk in active_disks {
let mut active = 0;
disks.iter().filter(|disk| disk.is_active).for_each(|disk| {
active += 1;
match DiskStatus::from_space_info(&space, &disk.name) {
DiskStatus::Good => count[DiskStatusName::Good] += 1,
DiskStatus::Offline => count[DiskStatusName::Offline] += 1,
DiskStatus::Bad(_) => count[DiskStatusName::Bad] += 1,
}
}
});
count[DiskStatusName::Offline] = (disks.len() - active) as u64;
}
tracing::info!("total disks count: {count:?}");

Json(count)
}

/// Get Nodes count per Status
///
#[cfg_attr(all(feature = "swagger", debug_assertions),
utoipa::path(
get,
Expand Down Expand Up @@ -111,28 +96,25 @@ pub async fn get_nodes_count(Extension(client): Extension<HttpBobClient>) -> Jso

let mut count = NodeCount::new();

let mut counter = 0;
while let Some(res) = metrics.next().await {
if let Ok(Ok(GetMetricsResponse::Metrics(metrics))) = res {
tracing::trace!("#{counter}: metrics received successfully");
tracing::trace!("metrics received successfully");
if Into::<TypedMetrics>::into(metrics).is_bad_node() {
count[NodeStatusName::Bad] += 1;
} else {
count[NodeStatusName::Good] += 1;
}
} else {
tracing::warn!("#{counter}: couldn't receive metrics from node");
tracing::warn!("couldn't receive metrics from node"); // TODO: Some better message
count[NodeStatusName::Offline] += 1;
}
counter += 1;
}
tracing::info!("total nodes per status count: {count:?}");

Json(count)
}

/// Returns Total RPS on cluster
///
#[cfg_attr(all(feature = "swagger", debug_assertions),
utoipa::path(
get,
Expand Down Expand Up @@ -160,28 +142,21 @@ pub async fn get_rps(Extension(client): Extension<HttpBobClient>) -> Json<RPS> {
.collect();

let mut rps = RPS::new();
let mut counter = 0;
while let Some(res) = metrics.next().await {
if let Ok(Ok(metrics)) = res {
tracing::info!("#{counter}: metrics received successfully");
tracing::info!("metrics received successfully");
let GetMetricsResponse::Metrics(metrics) = metrics;
let metrics = Into::<TypedMetrics>::into(metrics);
rps[Operation::Get] += metrics[RawMetricEntry::ClusterGrinderGetCountRate].value;
rps[Operation::Delete] += metrics[RawMetricEntry::ClusterGrinderDeleteCountRate].value;
rps[Operation::Exist] += metrics[RawMetricEntry::ClusterGrinderExistCountRate].value;
rps[Operation::Put] += metrics[RawMetricEntry::ClusterGrinderPutCountRate].value;
rps += RPS::from_metrics(&metrics.into());
} else {
tracing::warn!("#{counter}: couldn't receive metrics from node");
tracing::warn!("couldn't receive metrics from node"); // TODO: Some better message
}
counter += 1;
}
tracing::info!("total rps: {rps:?}");

Json(rps)
}

/// Return inforamtion about space on cluster
///
#[cfg_attr(all(feature = "swagger", debug_assertions),
utoipa::path(
get,
Expand All @@ -204,20 +179,17 @@ pub async fn get_space(Extension(client): Extension<HttpBobClient>) -> Json<Spac
.collect();

let mut total_space = SpaceInfo::default();
let mut counter = 0;
while let Some(res) = spaces.next().await {
if let Ok(Ok(space)) = res {
tracing::info!("#{counter}: space info received successfully");

tracing::info!("space info received successfully");
let GetSpaceInfoResponse::SpaceInfo(space) = space;
total_space.total_disk += space.total_disk_space_bytes;
total_space.free_disk += space.free_disk_space_bytes;
total_space.used_disk += space.total_disk_space_bytes - space.free_disk_space_bytes;
total_space.occupied_disk += space.occupied_disk_space_bytes;
} else {
tracing::warn!("#{counter}: couldn't receive space info from node");
tracing::warn!("couldn't receive space info from node"); // Some better message
}
counter += 1;
}
tracing::trace!("send response: {total_space:?}");

Expand Down
Loading

0 comments on commit 19c5c61

Please sign in to comment.