Skip to content

Commit

Permalink
post review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
archeoss committed Jan 21, 2024
1 parent 05367a0 commit 278f19f
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 107 deletions.
1 change: 1 addition & 0 deletions backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ tracing = "0.1"
file-rotate = "0.7"
tracing-appender = "0.2"
tracing-subscriber = "0.3"
# tracing-forest = { version = "0.1", features = ["tokio"] }

## Error Handling
error-stack = "0.4"
Expand Down
12 changes: 5 additions & 7 deletions backend/src/connector/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ pub enum GetConfigurationResponse {
}

/// API
#[async_trait]
pub trait Api<C: Send + Sync> {
fn poll_ready(
&self,
Expand Down Expand Up @@ -226,7 +225,6 @@ pub trait Api<C: Send + Sync> {
}

/// API where `Context` isn't passed on every API call
#[async_trait]
pub trait ApiNoContext<C: Send + Sync> {
fn poll_ready(&self, _cx: &mut Context) -> Poll<Result<(), ServiceError>>;

Expand Down Expand Up @@ -297,7 +295,6 @@ impl<T: Api<C> + Send + Sync, C: Clone + Send + Sync> ContextWrapperExt<C> for T
}
}

#[async_trait]
impl<T: Api<C> + Send + Sync, C: Clone + Send + Sync> ApiNoContext<C> for ContextWrapper<T, C> {
fn poll_ready(&self, cx: &mut Context) -> Poll<Result<(), ServiceError>> {
self.api().poll_ready(cx)
Expand Down Expand Up @@ -404,9 +401,10 @@ impl<T: Api<C> + Send + Sync, C: Clone + Send + Sync> ApiNoContext<C> for Contex

pub mod prelude {
pub use super::{
APIError, Api, GetAlienDirResponse, GetConfigurationResponse, GetDisksResponse,
GetMetricsResponse, GetNodesResponse, GetPartitionResponse, GetPartitionsResponse,
GetRecordsResponse, GetReplicasLocalDirsResponse, GetSpaceInfoResponse, GetStatusResponse,
GetVDiskResponse, GetVDisksResponse, GetVersionResponse,
APIError, Api, GetAlienDirResponse, GetAlienResponse, GetConfigurationResponse,
GetDisksResponse, GetMetricsResponse, GetNodesResponse, GetPartitionResponse,
GetPartitionsResponse, GetRecordsResponse, GetReplicasLocalDirsResponse,
GetSpaceInfoResponse, GetStatusResponse, GetVDiskResponse, GetVDisksResponse,
GetVersionResponse,
};
}
8 changes: 5 additions & 3 deletions backend/src/connector/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
unused_variables
)]

use hyper::body::to_bytes;

use super::{api::prelude::*, prelude::*};

/// Error type for failing to create a Client
Expand Down Expand Up @@ -255,8 +257,7 @@ where
body_handler: impl Fn(R) -> T + Send,
) -> Result<T, APIError> {
let body = response.into_body();
let body = body
.into_raw()
let body = to_bytes(body)
.await
.change_context(APIError::ResponseError)?;
let body = std::str::from_utf8(&body).change_context(APIError::ResponseError)?;
Expand All @@ -268,6 +269,7 @@ where
Ok(body_handler(body))
}
}

impl<S, C, Cr> Client<S, C, Cr>
where
Cr: Credentials + Clone,
Expand All @@ -286,10 +288,10 @@ where
.change_context(APIError::RequestFailed)
.attach_printable("No Response received")?
.change_context(APIError::RequestFailed)
.attach_printable("Hyper error")
}
}

#[async_trait]
impl<S, C, Cr> Api<C> for Client<S, C, Cr>
where
Cr: Credentials + Clone,
Expand Down
35 changes: 0 additions & 35 deletions backend/src/connector/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,38 +86,3 @@ where
self.inner.call(req)
}
}

/// Additional function for `hyper::Body`
pub trait BodyExt {
/// Raw body type
type Raw;

/// Error if we can't gather up the raw body
type Error;

/// Collect the body into a raw form
fn into_raw(
self,
) -> futures::future::BoxFuture<'static, std::result::Result<Self::Raw, Self::Error>>;
}

impl<T, E> BodyExt for T
where
T: Stream<Item = std::result::Result<Bytes, E>> + Unpin + Send + 'static,
{
type Raw = Vec<u8>;
type Error = E;

fn into_raw(
mut self,
) -> futures::future::BoxFuture<'static, std::result::Result<Self::Raw, Self::Error>> {
Box::pin(async {
let mut raw = Vec::new();
while let (Some(chunk), rest) = self.into_future().await {
raw.extend_from_slice(&chunk?);
self = rest;
}
Ok(raw)
})
}
}
129 changes: 129 additions & 0 deletions backend/src/connector/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use super::prelude::*;

impl From<GetAlienResponse> for StatusCode {
fn from(value: GetAlienResponse) -> Self {
match value {
GetAlienResponse::AlienNodeName(_) => StatusCode::OK,
}
}
}

impl From<GetAlienDirResponse> for StatusCode {
fn from(value: GetAlienDirResponse) -> Self {
match value {
GetAlienDirResponse::Directory(_) => StatusCode::OK,
GetAlienDirResponse::PermissionDenied(_) => StatusCode::FORBIDDEN,
GetAlienDirResponse::NotAcceptableBackend(_) => StatusCode::NOT_ACCEPTABLE,
}
}
}

impl From<GetDisksResponse> for StatusCode {
fn from(value: GetDisksResponse) -> Self {
match value {
GetDisksResponse::AJSONArrayWithDisksAndTheirStates(_) => StatusCode::OK,
GetDisksResponse::PermissionDenied(_) => StatusCode::FORBIDDEN,
}
}
}

impl From<GetMetricsResponse> for StatusCode {
fn from(value: GetMetricsResponse) -> Self {
match value {
GetMetricsResponse::Metrics(_) => StatusCode::OK,
}
}
}

impl From<GetNodesResponse> for StatusCode {
fn from(value: GetNodesResponse) -> Self {
match value {
GetNodesResponse::AJSONArrayOfNodesInfoAndVdisksOnThem(_) => StatusCode::OK,
GetNodesResponse::PermissionDenied => StatusCode::FORBIDDEN,
}
}
}

impl From<GetPartitionResponse> for StatusCode {
fn from(value: GetPartitionResponse) -> Self {
match value {
GetPartitionResponse::AJSONWithPartitionInfo(_) => StatusCode::OK,
GetPartitionResponse::PermissionDenied(_) => StatusCode::FORBIDDEN,
GetPartitionResponse::NotFound(_) => StatusCode::NOT_FOUND,
}
}
}

impl From<GetRecordsResponse> for StatusCode {
fn from(value: GetRecordsResponse) -> Self {
match value {
GetRecordsResponse::RecordsCount(_) => StatusCode::OK,
GetRecordsResponse::PermissionDenied(_) => StatusCode::FORBIDDEN,
GetRecordsResponse::NotFound(_) => StatusCode::NOT_FOUND,
}
}
}

impl From<GetReplicasLocalDirsResponse> for StatusCode {
fn from(value: GetReplicasLocalDirsResponse) -> Self {
match value {
GetReplicasLocalDirsResponse::AJSONArrayWithDirs(_) => StatusCode::OK,
GetReplicasLocalDirsResponse::PermissionDenied(_) => StatusCode::FORBIDDEN,
GetReplicasLocalDirsResponse::NotFound(_) => StatusCode::NOT_FOUND,
}
}
}

impl From<GetStatusResponse> for StatusCode {
fn from(value: GetStatusResponse) -> Self {
match value {
GetStatusResponse::AJSONWithNodeInfo(_) => StatusCode::OK,
}
}
}

impl From<GetVDiskResponse> for StatusCode {
fn from(value: GetVDiskResponse) -> Self {
match value {
GetVDiskResponse::AJSONWithVdiskInfo(_) => StatusCode::OK,
GetVDiskResponse::PermissionDenied(_) => StatusCode::FORBIDDEN,
GetVDiskResponse::NotFound(_) => StatusCode::NOT_FOUND,
}
}
}

impl From<GetVDisksResponse> for StatusCode {
fn from(value: GetVDisksResponse) -> Self {
match value {
GetVDisksResponse::AJSONArrayOfVdisksInfo(_) => StatusCode::OK,
GetVDisksResponse::PermissionDenied => StatusCode::FORBIDDEN,
}
}
}

impl From<GetVersionResponse> for StatusCode {
fn from(value: GetVersionResponse) -> Self {
match value {
GetVersionResponse::VersionInfo(_) => StatusCode::OK,
}
}
}

impl From<GetConfigurationResponse> for StatusCode {
fn from(value: GetConfigurationResponse) -> Self {
match value {
GetConfigurationResponse::ConfigurationObject(_) => StatusCode::OK,
GetConfigurationResponse::PermissionDenied => StatusCode::FORBIDDEN,
}
}
}

pub trait AsApiError {
fn as_invalid_status(self) -> APIError;
}

impl<T: Into<StatusCode>> AsApiError for T {
fn as_invalid_status(self) -> APIError {
APIError::InvalidStatusCode(self.into())
}
}
59 changes: 20 additions & 39 deletions backend/src/connector/mod.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,35 @@
mod prelude {
pub use super::api::prelude::*;
pub use super::{
context::{BodyExt, ContextWrapper, DropContextService, Has},
context::{ContextWrapper, DropContextService, Has},
ClientError, Connector,
};
pub use crate::{models::shared::XSpanIdString, prelude::*, services::auth::HttpClient};
pub use axum::{
headers::{authorization::Credentials, Authorization, HeaderMapExt},
http::{HeaderName, HeaderValue},
};
pub use futures::{Stream, StreamExt};
pub use hyper::{body::Bytes, service::Service, Response, Uri};
pub use futures::StreamExt;
pub use hyper::{service::Service, Response, Uri};
pub use std::{
str::FromStr,
sync::Arc,
task::{Context, Poll},
};
}

use api::{APIError, ApiNoContext, ContextWrapperExt};
use api::{ApiNoContext, ContextWrapperExt};
use client::Client;
use context::ClientContext;
use prelude::*;

use self::error::AsApiError;

pub mod api;
pub mod client;
pub mod context;
pub mod dto;
pub mod error;

pub type ApiInterface = dyn ApiNoContext<ClientContext> + Send + Sync;

Expand Down Expand Up @@ -122,7 +126,6 @@ impl<Client: ApiNoContext<ClientContext> + Send + Sync + Clone> std::fmt::Debug
}

impl<ApiInterface: ApiNoContext<ClientContext> + Send + Sync> BobClient<ApiInterface> {
// impl BobClient<C> {
/// Creates new [`BobClient`] from [`BobConnectionData`]
///
/// # Errors
Expand All @@ -140,19 +143,22 @@ impl<ApiInterface: ApiNoContext<ClientContext> + Send + Sync> BobClient<ApiInter
let context: ClientContext = ClientContext {
timeout,
auth_data: auth,
xspan: XSpanIdString::default(),
xspan: XSpanIdString::gen(),
};
let client =
Client::try_new_http(&hostname.to_string()).change_context(ClientError::InitClient)?;
let nodes = client
let client = Client::try_new_http(&hostname.to_string())
.change_context(ClientError::InitClient)
.attach_printable(format!("Hostname: {}", hostname.to_string()))?;
let nodes_resp = client
.clone()
.with_context(context.clone())
.get_nodes()
.await
.change_context(ClientError::Inaccessible)?;
let api::GetNodesResponse::AJSONArrayOfNodesInfoAndVdisksOnThem(nodes) = nodes else {
Err(APIError::InvalidStatusCode(StatusCode::FORBIDDEN))
.change_context(ClientError::PermissionDenied)?
.change_context(ClientError::Inaccessible)
.attach_printable(format!("Hostname: {}", hostname.to_string()))?;
let api::GetNodesResponse::AJSONArrayOfNodesInfoAndVdisksOnThem(nodes) = nodes_resp else {
Err(nodes_resp.as_invalid_status())
.change_context(ClientError::Inaccessible)
.attach_printable(format!("Hostname: {}", hostname.to_string()))?
};

let cluster: HashMap<NodeName, Arc<_>> = nodes
Expand Down Expand Up @@ -212,7 +218,7 @@ impl<ApiInterface: ApiNoContext<ClientContext> + Send + Sync> BobClient<ApiInter
}
}

/// Probes connection to the Bob's main connected node
/// Probes connection to all Bob's connected nodes
///
/// Returns `StatusCode::OK` on success
///
Expand Down Expand Up @@ -242,31 +248,6 @@ impl<ApiInterface: ApiNoContext<ClientContext> + Send + Sync> BobClient<ApiInter
futures::future::join_all(v).await
}

/// Probes Node's connection
///
/// # Errors
///
/// This function will return an error if no client present for the specified Node's name,
/// the client was unable to receive response or the client doesn't have authority to do request.
pub async fn probe_socket(&self, name: &NodeName) -> Result<StatusCode, ClientError> {
if let Some(client) = self.cluster.get(name) {
match client
.get_nodes()
.await
.change_context(ClientError::Inaccessible)?
{
api::GetNodesResponse::AJSONArrayOfNodesInfoAndVdisksOnThem(_) => {
Ok(StatusCode::OK)
}
api::GetNodesResponse::PermissionDenied => {
Err(ClientError::PermissionDenied.into())
}
}
} else {
Err(ClientError::NoClient.into())
}
}

#[must_use]
pub fn context(&self) -> &ClientContext {
self.main.context()
Expand Down
6 changes: 5 additions & 1 deletion backend/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
#![allow(clippy::multiple_crate_versions, clippy::module_name_repetitions)]
#![allow(
async_fn_in_trait,
clippy::multiple_crate_versions,
clippy::module_name_repetitions
)]

#[cfg(all(feature = "swagger", debug_assertions))]
use axum::{routing::get, Router};
Expand Down
3 changes: 2 additions & 1 deletion backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ async fn main() -> Result<(), AppError> {
fn router(cors: CorsLayer) -> Router {
let session_store = MemoryStore::default();
let session_service = ServiceBuilder::new()
.layer(HandleErrorLayer::new(|_: BoxError| async {
.layer(HandleErrorLayer::new(|err: BoxError| async move {
tracing::error!(err);
StatusCode::BAD_REQUEST
}))
.layer(
Expand Down
Loading

0 comments on commit 278f19f

Please sign in to comment.