Skip to content

Commit

Permalink
Move server error to error set (#1378)
Browse files Browse the repository at this point in the history
Closes #797

---------

Co-authored-by: Grzegorz Koszyk <112548209+numinnex@users.noreply.github.com>
  • Loading branch information
haze518 and numinnex authored Dec 22, 2024
1 parent f92b360 commit 5cfffa1
Show file tree
Hide file tree
Showing 133 changed files with 2,915 additions and 881 deletions.
358 changes: 190 additions & 168 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions integration/tests/archiver/disk.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::archiver::DiskArchiverSetup;
use server::archiver::Archiver;
use server::server_error::ServerError;
use server::streaming::utils::file;
use server::{archiver::Archiver, server_error::ArchiverError};
use std::path::Path;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

Expand Down Expand Up @@ -89,7 +88,7 @@ async fn should_fail_when_file_to_archive_does_not_exist() {

assert!(result.is_err());
let error = result.err().unwrap();
assert!(matches!(error, ServerError::FileToArchiveNotFound(_)));
assert!(matches!(error, ArchiverError::FileToArchiveNotFound { .. }));
}

async fn create_file(path: &str, content: &str) {
Expand Down
1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ console-subscriber = { version = "0.4.0", optional = true }
dashmap = "6.0.1"
derive_more = "1.0.0"
dotenvy = { version = "0.15.7" }
error_set = { version = "0.6.4", features = ["tracing"] }
figlet-rs = "0.1.5"
figment = { version = "0.10.18", features = ["toml", "env"] }
flume = "0.11.0"
Expand Down
34 changes: 25 additions & 9 deletions server/src/archiver/disk.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::archiver::Archiver;
use crate::archiver::{Archiver, COMPONENT};
use crate::configs::server::DiskArchiverConfig;
use crate::server_error::ServerError;
use crate::server_error::ArchiverError;
use async_trait::async_trait;
use error_set::ResultContext;
use std::path::Path;
use tokio::fs;
use tracing::{debug, info};
Expand All @@ -19,10 +20,17 @@ impl DiskArchiver {

#[async_trait]
impl Archiver for DiskArchiver {
async fn init(&self) -> Result<(), ServerError> {
async fn init(&self) -> Result<(), ArchiverError> {
if !Path::new(&self.config.path).exists() {
info!("Creating disk archiver directory: {}", self.config.path);
fs::create_dir_all(&self.config.path).await?;
fs::create_dir_all(&self.config.path)
.await
.with_error(|err| {
format!(
"ARCHIVER - failed to create directory: {} with error: {err}",
self.config.path
)
})?;
}
Ok(())
}
Expand All @@ -31,7 +39,7 @@ impl Archiver for DiskArchiver {
&self,
file: &str,
base_directory: Option<String>,
) -> Result<bool, ServerError> {
) -> Result<bool, ArchiverError> {
debug!("Checking if file: {file} is archived on disk.");
let base_directory = base_directory.as_deref().unwrap_or_default();
let path = Path::new(&self.config.path).join(base_directory).join(file);
Expand All @@ -44,20 +52,28 @@ impl Archiver for DiskArchiver {
&self,
files: &[&str],
base_directory: Option<String>,
) -> Result<(), ServerError> {
) -> Result<(), ArchiverError> {
debug!("Archiving files on disk: {:?}", files);
for file in files {
debug!("Archiving file: {file}");
let source = Path::new(file);
if !source.exists() {
return Err(ServerError::FileToArchiveNotFound(file.to_string()));
return Err(ArchiverError::FileToArchiveNotFound {
file_path: file.to_string(),
});
}

let base_directory = base_directory.as_deref().unwrap_or_default();
let destination = Path::new(&self.config.path).join(base_directory).join(file);
let destination_path = destination.to_str().unwrap_or_default().to_owned();
fs::create_dir_all(destination.parent().unwrap()).await?;
fs::copy(source, destination).await?;
fs::create_dir_all(destination.parent().unwrap())
.await
.with_error(|err| {
format!("{COMPONENT} - failed to create file: {file} at path: {destination_path} with error: {err}",)
})?;
fs::copy(source, destination).await.with_error(|err| {
format!("{COMPONENT} - failed to copy file: {file} to destination: {destination_path} with error: {err}")
})?;
debug!("Archived file: {file} at: {destination_path}");
}

Expand Down
10 changes: 6 additions & 4 deletions server/src/archiver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
pub mod disk;
pub mod s3;

use crate::server_error::ServerError;
use crate::server_error::ArchiverError;
use async_trait::async_trait;
use derive_more::Display;
use serde::{Deserialize, Serialize};
use std::fmt::{Debug, Formatter};
use std::str::FromStr;

pub const COMPONENT: &str = "ARCHIVER";

#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Display, Copy, Clone)]
#[serde(rename_all = "lowercase")]
pub enum ArchiverKind {
Expand All @@ -31,17 +33,17 @@ impl FromStr for ArchiverKind {

#[async_trait]
pub trait Archiver: Sync + Send {
async fn init(&self) -> Result<(), ServerError>;
async fn init(&self) -> Result<(), ArchiverError>;
async fn is_archived(
&self,
file: &str,
base_directory: Option<String>,
) -> Result<bool, ServerError>;
) -> Result<bool, ArchiverError>;
async fn archive(
&self,
files: &[&str],
base_directory: Option<String>,
) -> Result<(), ServerError>;
) -> Result<(), ArchiverError>;
}

impl Debug for dyn Archiver {
Expand Down
61 changes: 42 additions & 19 deletions server/src/archiver/s3.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::archiver::Archiver;
use crate::archiver::{Archiver, COMPONENT};
use crate::configs::server::S3ArchiverConfig;
use crate::server_error::ServerError;
use crate::server_error::ArchiverError;
use crate::streaming::utils::file;
use async_trait::async_trait;
use error_set::ResultContext;
use s3::creds::Credentials;
use s3::{Bucket, Region};
use std::path::Path;
Expand All @@ -16,15 +17,15 @@ pub struct S3Archiver {
}

impl S3Archiver {
pub fn new(config: S3ArchiverConfig) -> Result<Self, ServerError> {
pub fn new(config: S3ArchiverConfig) -> Result<Self, ArchiverError> {
let credentials = Credentials::new(
Some(&config.key_id),
Some(&config.key_secret),
None,
None,
None,
)
.map_err(|_| ServerError::InvalidS3Credentials)?;
.map_err(|_| ArchiverError::InvalidS3Credentials)?;

let bucket = Bucket::new(
&config.bucket,
Expand All @@ -42,14 +43,14 @@ impl S3Archiver {
},
credentials,
)
.map_err(|_| ServerError::CannotInitializeS3Archiver)?;
.map_err(|_| ArchiverError::CannotInitializeS3Archiver)?;
Ok(Self {
bucket,
tmp_upload_dir: config.tmp_upload_dir,
})
}

async fn copy_file_to_tmp(&self, path: &str) -> Result<String, ServerError> {
async fn copy_file_to_tmp(&self, path: &str) -> Result<String, ArchiverError> {
debug!(
"Copying file: {path} to temporary S3 upload directory: {}",
self.tmp_upload_dir
Expand All @@ -58,21 +59,29 @@ impl S3Archiver {
let destination = Path::new(&self.tmp_upload_dir).join(path);
let destination_path = destination.to_str().unwrap_or_default().to_owned();
debug!("Creating temporary S3 upload directory: {destination_path}");
fs::create_dir_all(destination.parent().unwrap()).await?;
fs::create_dir_all(destination.parent().unwrap())
.await
.with_error(|err| {
format!(
"{COMPONENT} - failed to create temporary S3 upload directory for path: {destination_path} with error: {err}"
)
})?;
debug!("Copying file: {path} to temporary S3 upload path: {destination_path}");
fs::copy(source, destination).await?;
fs::copy(source, &destination).await.with_error(|err| {
format!("{COMPONENT} - failed to copy file: {path} to temporary S3 upload path: {destination_path} with error: {err}")
})?;
debug!("File: {path} copied to temporary S3 upload path: {destination_path}");
Ok(destination_path)
}
}

#[async_trait]
impl Archiver for S3Archiver {
async fn init(&self) -> Result<(), ServerError> {
async fn init(&self) -> Result<(), ArchiverError> {
let response = self.bucket.list("/".to_string(), None).await;
if let Err(error) = response {
error!("Cannot initialize S3 archiver: {error}");
return Err(ServerError::CannotInitializeS3Archiver);
return Err(ArchiverError::CannotInitializeS3Archiver);
}

if Path::new(&self.tmp_upload_dir).exists() {
Expand All @@ -94,7 +103,7 @@ impl Archiver for S3Archiver {
&self,
file: &str,
base_directory: Option<String>,
) -> Result<bool, ServerError> {
) -> Result<bool, ArchiverError> {
debug!("Checking if file: {file} is archived on S3.");
let base_directory = base_directory.as_deref().unwrap_or_default();
let destination = Path::new(&base_directory).join(file);
Expand All @@ -119,15 +128,19 @@ impl Archiver for S3Archiver {
&self,
files: &[&str],
base_directory: Option<String>,
) -> Result<(), ServerError> {
) -> Result<(), ArchiverError> {
for path in files {
if !Path::new(path).exists() {
return Err(ServerError::FileToArchiveNotFound(path.to_string()));
return Err(ArchiverError::FileToArchiveNotFound {
file_path: path.to_string(),
});
}

let source = self.copy_file_to_tmp(path).await?;
debug!("Archiving file: {source} on S3.");
let mut file = file::open(&source).await?;
let mut file = file::open(&source)
.await
.with_error(|err| format!("{COMPONENT} - failed to open source file: {source} for archiving with error: {err}"))?;
let base_directory = base_directory.as_deref().unwrap_or_default();
let destination = Path::new(&base_directory).join(path);
let destination_path = destination.to_str().unwrap_or_default().to_owned();
Expand All @@ -137,21 +150,31 @@ impl Archiver for S3Archiver {
.await;
if let Err(error) = response {
error!("Cannot archive file: {path} on S3: {}", error);
fs::remove_file(&source).await?;
return Err(ServerError::CannotArchiveFile(path.to_string()));
fs::remove_file(&source).await.with_error(|err| {
format!("{COMPONENT} - failed to remove temporary file: {source} after S3 failure with error: {err}")
})?;
return Err(ArchiverError::CannotArchiveFile {
file_path: path.to_string(),
});
}

let response = response.unwrap();
let status = response.status_code();
if status == 200 {
debug!("Archived file: {path} on S3.");
fs::remove_file(&source).await?;
fs::remove_file(&source).await.with_error(|err| {
format!("{COMPONENT} - failed to remove temporary file: {source} after successful archive with error: {err}")
})?;
continue;
}

error!("Cannot archive file: {path} on S3, received an invalid status code: {status}.");
fs::remove_file(&source).await?;
return Err(ServerError::CannotArchiveFile(path.to_string()));
fs::remove_file(&source).await.with_error(|err| {
format!("{COMPONENT} - failed to remove temporary file: {source} after invalid status code with error: {err}")
})?;
return Err(ArchiverError::CannotArchiveFile {
file_path: path.to_string(),
});
}
Ok(())
}
Expand Down
10 changes: 8 additions & 2 deletions server/src/binary/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ use crate::binary::handlers::users::{
update_user_handler,
};
use crate::binary::sender::Sender;
use crate::binary::COMPONENT;
use crate::command::ServerCommand;
use crate::streaming::session::Session;
use crate::streaming::systems::system::SharedSystem;
use error_set::ResultContext;
use iggy::error::IggyError;
use tracing::{debug, error};

Expand All @@ -38,12 +40,16 @@ pub async fn handle(
Err(error) => {
error!("Command was not handled successfully, session: {session}, error: {error}.");
if let IggyError::ClientNotFound(_) = error {
sender.send_error_response(error).await?;
sender.send_error_response(error).await.with_error(|_| {
format!("{COMPONENT} - failed to send error response, session: {session}")
})?;
debug!("TCP error response was sent to: {session}.");
error!("Session: {session} will be deleted.");
Err(IggyError::ClientNotFound(session.client_id))
} else {
sender.send_error_response(error).await?;
sender.send_error_response(error).await.with_error(|_| {
format!("{COMPONENT} - failed to send error response, session: {session}")
})?;
debug!("TCP error response was sent to: {session}.");
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::binary::handlers::consumer_groups::COMPONENT;
use crate::binary::mapper;
use crate::binary::sender::Sender;
use crate::state::command::EntryCommand;
use crate::streaming::session::Session;
use crate::streaming::systems::system::SharedSystem;
use anyhow::Result;
use error_set::ResultContext;
use iggy::consumer_groups::create_consumer_group::CreateConsumerGroup;
use iggy::error::IggyError;
use tracing::{debug, instrument};
Expand All @@ -27,18 +29,35 @@ pub async fn handle(
command.group_id,
&command.name,
)
.await?;
.await
.with_error(|_| {
format!(
"{COMPONENT} - failed to create consumer group for stream_id: {}, topic_id: {}, group_id: {:?}, session: {:?}",
command.stream_id, command.topic_id, command.group_id, session
)
})?;
let consumer_group = consumer_group.read().await;
response = mapper::map_consumer_group(&consumer_group).await;
}

let system = system.read().await;
let stream_id = command.stream_id.clone();
let topic_id = command.topic_id.clone();
let group_id = command.group_id;

system
.state
.apply(
session.get_user_id(),
EntryCommand::CreateConsumerGroup(command),
)
.await?;
.await
.with_error(|_| {
format!(
"{COMPONENT} - failed to apply create consumer group for stream_id: {}, topic_id: {}, group_id: {:?}, session: {}",
stream_id, topic_id, group_id, session
)
})?;
sender.send_ok_response(&response).await?;
Ok(())
}
Loading

0 comments on commit 5cfffa1

Please sign in to comment.