From 2bdbf32bda0bc10336a5dbb49bf468e912ad8118 Mon Sep 17 00:00:00 2001 From: Simeon Romanov Date: Sat, 2 Dec 2023 00:20:57 +0300 Subject: [PATCH] refactoring --- backend/src/models/api.rs | 57 ++++++- backend/src/services/api.rs | 304 +++++++----------------------------- backend/src/services/mod.rs | 20 +-- 3 files changed, 118 insertions(+), 263 deletions(-) diff --git a/backend/src/models/api.rs b/backend/src/models/api.rs index ade0c666..8a2f2e57 100644 --- a/backend/src/models/api.rs +++ b/backend/src/models/api.rs @@ -31,6 +31,37 @@ pub struct Disk { pub iops: u64, } +impl Disk { + #[must_use] + pub fn from_metrics( + disk_name: String, + disk_path: String, + raw_metrics: &dto::MetricsSnapshotModel, + raw_space: &dto::SpaceInfo, + ) -> Self { + let status = DiskStatus::from_space_info(raw_space, &disk_name); + let used_space = raw_space + .occupied_disk_space_by_disk + .get(&disk_name) + .copied() + .unwrap_or_default(); + let iops = raw_metrics + .metrics + .get(&format!("hardware.disks.{:?}_iops", disk_name)) + .cloned() + .unwrap_or_default() + .value; + Self { + name: disk_name, + path: disk_path, + status, + total_space: raw_space.total_disk_space_bytes, + used_space, + iops, + } + } +} + /// Defines kind of problem on disk #[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Serialize, Hash)] #[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] @@ -189,6 +220,12 @@ pub enum NodeStatus { Offline, } +impl Default for NodeStatus { + fn default() -> Self { + Self::Offline + } +} + impl NodeStatus { #[must_use] pub fn from_problems(problems: Vec) -> Self { @@ -343,7 +380,7 @@ pub enum VDiskStatus { Offline, } -#[derive(Debug, Clone, Serialize)] +#[derive(Default, Debug, Clone, Serialize)] #[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] pub struct DetailedNode { pub name: String, @@ -360,7 +397,7 @@ pub struct DetailedNode { pub disks: Vec, } -#[derive(Debug, Clone, Serialize)] +#[derive(Default, Debug, Clone, Serialize)] #[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] #[serde(rename_all = "camelCase")] pub struct DetailedNodeMetrics { @@ -381,6 +418,22 @@ pub struct DetailedNodeMetrics { pub descr_amount: u64, } +impl DetailedNodeMetrics { + #[must_use] + pub fn from_metrics(metrics: &TypedMetrics, space: SpaceInfo) -> Self { + Self { + rps: RPS::from_metrics(metrics), + alien_count: metrics[RawMetricEntry::BackendAlienCount].value, + corrupted_count: metrics[RawMetricEntry::BackendCorruptedBlobCount].value, + space, + cpu_load: metrics[RawMetricEntry::HardwareBobCpuLoad].value, + total_ram: metrics[RawMetricEntry::HardwareTotalRam].value, + used_ram: metrics[RawMetricEntry::HardwareUsedRam].value, + descr_amount: metrics[RawMetricEntry::HardwareDescrAmount].value, + } + } +} + /// Types of operations on BOB cluster #[derive(Debug, Clone, Serialize, Hash, Eq, PartialEq, PartialOrd, Ord, EnumIter)] #[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] diff --git a/backend/src/services/api.rs b/backend/src/services/api.rs index 140dcdc0..3097ec72 100644 --- a/backend/src/services/api.rs +++ b/backend/src/services/api.rs @@ -375,140 +375,6 @@ pub async fn get_node_info( Ok(Json(node)) } -async fn process_replica( - client: &HttpBobClient, - replica: dto::Replica, - disks: &HashMap, - nodes: &HashMap<&NodeName, &dto::Node>, -) -> Replica { - let mut status = ReplicaStatus::Good; - if let Some(disk_state) = disks.get(&replica.disk) { - if !disk_state { - status = ReplicaStatus::Offline(vec![ReplicaProblem::DiskUnavailable]); - } - } else { - status = ReplicaStatus::Offline(vec![ReplicaProblem::DiskUnavailable]); - } - - if let Some(node) = nodes.get(&replica.node) { - if !is_node_online(client, node).await { - status = match status { - ReplicaStatus::Good => { - ReplicaStatus::Offline(vec![ReplicaProblem::DiskUnavailable]) - } - ReplicaStatus::Offline(mut problems) => { - problems.push(ReplicaProblem::NodeUnavailable); - ReplicaStatus::Offline(problems) - } - } - } - } else { - status = match status { - ReplicaStatus::Good => ReplicaStatus::Offline(vec![ReplicaProblem::DiskUnavailable]), - ReplicaStatus::Offline(mut problems) => { - problems.push(ReplicaProblem::NodeUnavailable); - ReplicaStatus::Offline(problems) - } - } - } - - Replica { - node: replica.node, - disk: replica.disk, - path: replica.path, - status, - } -} - -async fn is_node_online(client: &HttpBobClient, node: &dto::Node) -> bool { - (client.probe_socket(&node.name).await).map_or(false, |code| code == StatusCode::OK) -} - -fn proccess_disks( - disks: &[dto::DiskState], - space: &dto::SpaceInfo, - metrics: &dto::MetricsSnapshotModel, -) -> Vec { - let mut res_disks = vec![]; - let mut visited_disks = HashSet::new(); - for disk in disks { - if !visited_disks.insert(disk.name.clone()) { - tracing::warn!( - "disk {} with path {} duplicated, skipping...", - disk.name, - disk.path - ); - continue; - } - res_disks.push(Disk { - name: disk.name.clone(), - path: disk.path.clone(), - status: DiskStatus::from_space_info(space, &disk.name), - total_space: space.total_disk_space_bytes, - used_space: space - .occupied_disk_space_by_disk - .get(&disk.name.clone()) - .copied() - .unwrap_or_default(), - iops: metrics - .metrics - .get(&format!("hardware.disks.{:?}_iops", disk.name)) - .cloned() - .unwrap_or_default() - .value, - }); - } - - res_disks -} - -// Bad function, 6 args :( -async fn process_vdisks_for_node( - client: HttpBobClient, - virt_disks: &[dto::VDisk], - node_name: NodeName, - all_disks: &HashMap, - nodes: &HashMap<&NodeName, &dto::Node>, - partitions_count_on_vdisk: &HashMap, -) -> Vec { - let mut res_replicas = vec![]; - let mut res_vdisks = vec![]; - for (vdisk, replicas) in virt_disks - .iter() - .filter_map(|vdisk| vdisk.replicas.as_ref().map(|repl| (vdisk, repl))) - .filter(|(_, replicas)| replicas.iter().any(|replica| replica.node == node_name)) - { - for replica in replicas { - res_replicas.push(( - vdisk.id, - process_replica(&client, replica.clone(), all_disks, nodes).await, - )); - } - res_vdisks.push(VDisk { - id: vdisk.id as u64, - status: if res_replicas - .iter() - .any(|(_, replica)| matches!(replica.status, ReplicaStatus::Offline(_))) - { - VDiskStatus::Bad - } else { - VDiskStatus::Good - }, - partition_count: partitions_count_on_vdisk - .get(&vdisk.id) - .copied() - .unwrap_or_default() as u64, - replicas: res_replicas - .iter() - .filter(|(id, _)| id == &vdisk.id) - .map(|(_, replica)| replica.clone()) - .collect(), - }); - } - - res_vdisks -} - /// Get Raw Metrics from Node /// /// # Errors @@ -596,132 +462,76 @@ pub async fn get_detailed_node_info( Extension(client): Extension, Path(node_name): Path, ) -> AxumResult> { - let mut all_disks: FuturesUnordered<_> = client - .cluster() - .map(move |node| { - let handle = node.clone(); - tokio::spawn(async move { handle.get_disks().await }) - }) - .collect(); + tracing::info!("get /nodes/{node_name}/detailed : {client:?}"); + let handle = Arc::new( + client + .api_secondary(&node_name) + .cloned() + .ok_or(StatusCode::NOT_FOUND)?, + ); - let node_client = get_client_by_node(&client, node_name.clone()).await?; + let status = { + let handle = handle.clone(); + tokio::spawn(async move { handle.get_status().await }) + }; + let metrics = { + let handle = handle.clone(); + tokio::spawn(async move { handle.clone().get_metrics().await }) + }; + let space_info = { + let handle = handle.clone(); + tokio::spawn(async move { handle.clone().get_space_info().await }) + }; + let disks = { + let handle = handle.clone(); + tokio::spawn(async move { handle.clone().get_disks().await }) + }; - let virt_disks = fetch_vdisks(node_client.as_ref()).await?; + let Ok(Ok(GetStatusResponse::AJSONWithNodeInfo(status))) = status.await else { + return Err(StatusCode::NOT_FOUND.into()); + }; - let mut all_partitions: FuturesUnordered<_> = virt_disks + let mut node = DetailedNode { + name: status.name, + hostname: status.address, + ..Default::default() + }; + let mut virt_disks: FuturesUnordered<_> = status + .vdisks .iter() + .flatten() .map(|vdisk| { - let id = vdisk.id; - let handle = client.api_main().clone(); - tokio::spawn(async move { (id, handle.get_partitions(id).await) }) + let handle = client.clone(); + let id = vdisk.id as u64; + tokio::spawn(async move { get_vdisk_by_id(&handle, id).await }) }) .collect(); - let metrics = fetch_metrics(node_client.as_ref()).await?; - let typed_metrics: TypedMetrics = metrics.clone().into(); - - let space = fetch_space_info(node_client.as_ref()).await?; - let node_status = fetch_node_status(node_client.as_ref()).await?; - let disks = fetch_disks(node_client.as_ref()).await?; - - let res_disks = proccess_disks(&disks, &space, &metrics); - let nodes = fetch_nodes(node_client.as_ref()).await?; - - let nodes = nodes - .iter() - .map(|node| (&node.name, node)) - .collect::>(); - - let mut proc_disks = HashMap::new(); - while let Some(disks) = all_disks.next().await { - let Ok(Ok(GetDisksResponse::AJSONArrayWithDisksAndTheirStates(disks))) = disks else { - tracing::error!("couldn't get disk inforamtion from node"); - continue; - }; - for disk in disks { - proc_disks.insert(disk.name, disk.is_active); + if let ( + Ok(Ok(GetMetricsResponse::Metrics(raw_metrics))), + Ok(Ok(GetSpaceInfoResponse::SpaceInfo(raw_space))), + ) = (metrics.await, space_info.await) + { + if let Ok(Ok(GetDisksResponse::AJSONArrayWithDisksAndTheirStates(disks))) = disks.await { + node.disks = disks + .into_iter() + .map(|disk| Disk::from_metrics(disk.name, disk.path, &raw_metrics, &raw_space)) + .collect(); } + let metrics = Into::::into(raw_metrics); + node.status = NodeStatus::from_problems(NodeProblem::default_from_metrics(&metrics)); + node.metrics = DetailedNodeMetrics::from_metrics(&metrics, raw_space.into()); } - let mut res_partitions = HashMap::new(); - while let Some(partitions) = all_partitions.next().await { - let Ok((id, Ok(GetPartitionsResponse::NodeInfoAndJSONArrayWithPartitionsInfo(partitions)))) = - partitions - else { - // tracing::error!("couldn't get Partition inforamtion from node"); // Too noisy - continue; - }; - if let Some(partitions) = partitions.partitions { - res_partitions.insert(id, partitions.len()); + while let Some(vdisk) = virt_disks.next().await { + if let Ok(Ok(vdisk)) = vdisk { + node.vdisks.push(vdisk); + } else { + tracing::warn!("some warning"); //TODO } } - let virtual_disks = process_vdisks_for_node( - client, - &virt_disks, - node_name, - &proc_disks, - &nodes, - &res_partitions, - ) - .await; + tracing::trace!("send response: {node:?}"); - let mut rps = RPS::new(); - - let status = NodeStatus::from_problems(NodeProblem::default_from_metrics(&typed_metrics)); - - rps[Operation::Get] = typed_metrics[RawMetricEntry::PearlGetCountRate].value; - rps[Operation::Put] = typed_metrics[RawMetricEntry::PearlPutCountRate].value; - rps[Operation::Exist] = typed_metrics[RawMetricEntry::PearlExistCountRate].value; - rps[Operation::Delete] = typed_metrics[RawMetricEntry::PearlDeleteCountRate].value; - - let result = Json(DetailedNode { - name: node_status.name, - hostname: node_status.address, - vdisks: virtual_disks, - status, - metrics: DetailedNodeMetrics { - rps, - alien_count: typed_metrics[RawMetricEntry::BackendAlienCount].value, - corrupted_count: typed_metrics[RawMetricEntry::BackendCorruptedBlobCount].value, - space: SpaceInfo { - total_disk: space.total_disk_space_bytes, - free_disk: space.total_disk_space_bytes - space.used_disk_space_bytes, - used_disk: space.used_disk_space_bytes, - occupied_disk: space.occupied_disk_space_bytes, - }, - cpu_load: typed_metrics[RawMetricEntry::HardwareBobCpuLoad].value, - total_ram: typed_metrics[RawMetricEntry::HardwareTotalRam].value, - used_ram: typed_metrics[RawMetricEntry::HardwareUsedRam].value, - descr_amount: typed_metrics[RawMetricEntry::HardwareDescrAmount].value, - }, - disks: res_disks, - }); - tracing::trace!("send response: {result:?}"); - - Ok(result) -} - -async fn get_client_by_node( - client: &HttpBobClient, - node_name: NodeName, -) -> AxumResult> { - let nodes = fetch_nodes(client.api_main()).await?; - - let node = nodes - .iter() - .find(|node| node.name == node_name) - .ok_or_else(|| { - tracing::error!("Couldn't find specified node"); - APIError::RequestFailed - })?; - - client - .cluster_with_addr() - .get(&node.name) - .ok_or_else(|| { - tracing::error!("Couldn't find specified node"); - APIError::RequestFailed.into() - }) - .cloned() + Ok(Json(node)) } diff --git a/backend/src/services/mod.rs b/backend/src/services/mod.rs index 10a6c0ed..47133a3f 100644 --- a/backend/src/services/mod.rs +++ b/backend/src/services/mod.rs @@ -1,20 +1,13 @@ mod prelude { - pub use super::{ - auth::HttpClient, - methods::{ - fetch_configuration, fetch_disks, fetch_metrics, fetch_node_status, fetch_nodes, - fetch_space_info, fetch_vdisks, get_vdisk_by_id, - }, + pub use super::methods::{ + fetch_configuration, fetch_metrics, fetch_nodes, fetch_vdisks, get_vdisk_by_id, }; pub use crate::{ connector::{ api::{prelude::*, ApiNoContext}, ClientError, }, - models::{ - api::*, - bob::{DiskName, IsActive}, - }, + models::api::*, prelude::*, }; pub use axum::{ @@ -33,14 +26,13 @@ pub mod auth; pub mod methods; use api::{ - get_disks_count, get_node_info, get_nodes_count, get_nodes_list, get_rps, get_space, - raw_configuration_by_node, raw_metrics_by_node, + get_detailed_node_info, get_disks_count, get_node_info, get_nodes_count, get_nodes_list, + get_rps, get_space, get_vdisk_info, get_vdisks_list, raw_configuration_by_node, + raw_metrics_by_node, }; use auth::{login, logout, require_auth, AuthState, BobUser, HttpBobClient, InMemorySessionStore}; use prelude::*; -use self::api::{get_vdisk_info, get_vdisks_list, get_detailed_node_info}; - type BobAuthState = AuthState< BobUser, Uuid,