Skip to content

Commit

Permalink
add error handling to dht
Browse files Browse the repository at this point in the history
  • Loading branch information
zupzup committed Nov 27, 2024
1 parent 02df6f8 commit 7b70b7b
Show file tree
Hide file tree
Showing 13 changed files with 415 additions and 380 deletions.
1 change: 1 addition & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::net::Ipv4Addr;

// General
pub const BILLS_PREFIX: &str = "BILLS";
pub const INFO_PREFIX: &str = "INFO";
pub const BILL_PREFIX: &str = "BILL";
pub const BILL_ATTACHMENT_PREFIX: &str = "BILLATT";
pub const KEY_PREFIX: &str = "KEY";
Expand Down
519 changes: 213 additions & 306 deletions src/dht/client.rs

Large diffs are not rendered by default.

62 changes: 59 additions & 3 deletions src/dht/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,63 @@ mod event_loop;

use crate::persistence::bill::BillStoreApi;
use crate::persistence::identity::IdentityStoreApi;
use crate::util;
use anyhow::Result;
use crate::{persistence, util};
pub use client::Client;
use libp2p::identity::Keypair;
use log::{error, info};
use std::sync::Arc;
use thiserror::Error;

/// Generic result type
pub type Result<T> = std::result::Result<T, Error>;

/// Generic error type
#[derive(Debug, Error)]
pub enum Error {
/// all errors originating from serializing, or deserializing json
#[error("unable to serialize/deserialize to/from JSON {0}")]
Json(#[from] serde_json::Error),

/// all errors originating from the persistence layer
#[error("Persistence error: {0}")]
Persistence(#[from] persistence::Error),

/// all errors originating from running into utf8-related errors
#[error("utf-8 error when parsing string {0}")]
Utf8(#[from] std::str::Utf8Error),

/// all errors originating from using a broken channel
#[error("channel error {0}")]
SendChannel(#[from] futures::channel::mpsc::SendError),

/// all errors originating from using a closed onsehot channel
#[error("oneshot channel cancel error {0}")]
ChannelCanceled(#[from] futures::channel::oneshot::Canceled),

/// error if there are no providers
#[error("No providers found: {0}")]
NoProviders(String),

/// error if a file wasn't returned from any provider
#[error("No file returned from providers: {0}")]
NoFileFromProviders(String),

/// error if an attached file wasn't found in a bill
#[error("No file found in bill: {0}")]
FileNotFoundInBill(String),

/// error if file hashes of two files did not match
#[error("File hashes did not match: {0}")]
FileHashesDidNotMatch(String),

/// error if requesting a file fails
#[error("request file error: {0}")]
RequestFile(String),

/// error if the listen url is invalid
#[error("invalid listen p2p url error")]
ListenP2pUrlInvalid,
}

pub struct Dht {
pub client: Client,
Expand Down Expand Up @@ -100,7 +151,12 @@ async fn new(

let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build();

swarm.listen_on(conf.p2p_listen_url()?).unwrap();
swarm
.listen_on(
conf.p2p_listen_url()
.map_err(|_| Error::ListenP2pUrlInvalid)?,
)
.unwrap();

// Wait to listen on all interfaces.
let sleep = tokio::time::sleep(std::time::Duration::from_secs(1));
Expand Down
14 changes: 8 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ async fn main() -> Result<()> {
});

let local_peer_id = db.identity_store.get_peer_id().await?;
dht_client.check_new_bills(local_peer_id.to_string()).await;
dht_client.upgrade_table(local_peer_id.to_string()).await;
dht_client.subscribe_to_all_bills_topics().await;
dht_client.put_bills_for_parties().await;
dht_client.start_provide().await;
dht_client.receive_updates_for_all_bills_topics().await;
dht_client
.check_new_bills(local_peer_id.to_string())
.await?;
dht_client.update_table(local_peer_id.to_string()).await?;
dht_client.subscribe_to_all_bills_topics().await?;
dht_client.put_bills_for_parties().await?;
dht_client.start_providing_bills().await?;
dht_client.receive_updates_for_all_bills_topics().await?;
dht_client.put_identity_public_data_in_dht().await?;

let web_server_error_shutdown_sender = dht.shutdown_sender.clone();
Expand Down
18 changes: 18 additions & 0 deletions src/persistence/bill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ pub trait BillStoreApi: Send + Sync {
/// Reads the keys for the given bill as bytes
async fn get_bill_keys_as_bytes(&self, bill_name: &str) -> Result<Vec<u8>>;

/// Gets all bill names
async fn get_bill_names(&self) -> Result<Vec<String>>;

/// Gets all bills
async fn get_bills(&self) -> Result<Vec<BitcreditBill>>;

Expand Down Expand Up @@ -137,6 +140,21 @@ impl BillStoreApi for FileBasedBillStore {
Ok(bytes)
}

async fn get_bill_names(&self) -> Result<Vec<String>> {
let mut res = vec![];
let mut dir = read_dir(self.get_path_for_bills()).await?;
while let Some(entry) = dir.next_entry().await? {
if is_not_hidden_or_directory_async(&entry).await {
if let Some(bill_name) = entry.path().file_stem() {
if let Some(bill_name_str) = bill_name.to_str() {
res.push(bill_name_str.to_owned());
}
}
}
}
Ok(res)
}

async fn get_bills(&self) -> Result<Vec<BitcreditBill>> {
let mut bills = Vec::new();
let mut dir = read_dir(self.get_path_for_bills()).await?;
Expand Down
31 changes: 18 additions & 13 deletions src/service/bill_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use crate::persistence::file_upload::FileUploadStoreApi;
use crate::persistence::identity::IdentityStoreApi;
use crate::util::get_current_payee_private_key;
use crate::web::data::File;
use crate::{dht, external, persistence, util};
use crate::{dht::Client, persistence::bill::BillStoreApi};
use crate::{external, persistence, util};
use async_trait::async_trait;
use borsh_derive::{BorshDeserialize, BorshSerialize};
use chrono::Utc;
Expand Down Expand Up @@ -55,6 +55,10 @@ pub enum Error {
#[error("Blockchain error: {0}")]
Blockchain(#[from] blockchain::Error),

/// errors that stem from interacting with the Dht
#[error("Dht error: {0}")]
Dht(#[from] dht::Error),

/// all errors originating from the persistence layer
#[error("Persistence error: {0}")]
Persistence(#[from] persistence::Error),
Expand Down Expand Up @@ -90,6 +94,10 @@ impl<'r, 'o: 'r> Responder<'r, 'o> for Error {
error!("{e}");
Status::InternalServerError.respond_to(req)
}
Error::Dht(e) => {
error!("{e}");
Status::InternalServerError.respond_to(req)
}
Error::Cryptography(e) => {
error!("{e}");
Status::InternalServerError.respond_to(req)
Expand Down Expand Up @@ -510,13 +518,10 @@ impl BillServiceApi for BillService {
}

async fn find_bill_in_dht(&self, bill_name: &str) -> Result<()> {
let mut client = self.client.clone();
let bill_bytes = client.get_bill(bill_name.to_string()).await;
if !bill_bytes.is_empty() {
self.store
.write_bill_to_file(bill_name, &bill_bytes)
.await?;
}
let bill_bytes = self.client.clone().get_bill(bill_name).await?;
self.store
.write_bill_to_file(bill_name, &bill_bytes)
.await?;
Ok(())
}

Expand Down Expand Up @@ -676,15 +681,15 @@ impl BillServiceApi for BillService {
self.client
.clone()
.add_message_to_topic(message, bill_name.to_owned())
.await;
.await?;
Ok(())
}

async fn propagate_bill_for_node(&self, bill_name: &str, node_id: &str) -> Result<()> {
self.client
.clone()
.add_bill_to_dht_for_node(bill_name, node_id)
.await;
.await?;
Ok(())
}

Expand All @@ -700,13 +705,13 @@ impl BillServiceApi for BillService {
for node in [drawer_peer_id, drawee_peer_id, payee_peer_id] {
if !node.is_empty() {
info!("issue bill: add {} for node {}", bill_name, &node);
client.add_bill_to_dht_for_node(bill_name, node).await;
client.add_bill_to_dht_for_node(bill_name, node).await?;
}
}

client.subscribe_to_topic(bill_name.to_owned()).await;
client.subscribe_to_topic(bill_name.to_owned()).await?;

client.put(bill_name).await;
client.put(bill_name).await?;
Ok(())
}

Expand Down
72 changes: 59 additions & 13 deletions src/service/company_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl CompanyServiceApi for CompanyService {

async fn get_company_by_id(&self, id: &str) -> Result<CompanyToReturn> {
if !self.store.exists(id).await {
return Err(super::Error::Validation(String::from(
return Err(super::Error::Validation(format!(
"No company with id: {id} found",
)));
}
Expand Down Expand Up @@ -219,11 +219,19 @@ impl CompanyServiceApi for CompanyService {
logo_file_upload_id: Option<String>,
) -> Result<()> {
if !self.store.exists(id).await {
return Err(super::Error::Validation(String::from(
return Err(super::Error::Validation(format!(
"No company with id: {id} found",
)));
}
let peer_id = self.identity_store.get_peer_id().await?;
let mut company = self.store.get(id).await?;

if !company.signatories.contains(&peer_id.to_string()) {
return Err(super::Error::Validation(String::from(
"Caller must be signatory for company",
)));
}

if let Some(legal_name_to_set) = legal_name {
company.legal_name = legal_name_to_set;
}
Expand Down Expand Up @@ -256,7 +264,7 @@ impl CompanyServiceApi for CompanyService {

async fn add_signatory(&self, id: &str, signatory_node_id: String) -> Result<()> {
if !self.store.exists(id).await {
return Err(super::Error::Validation(String::from(
return Err(super::Error::Validation(format!(
"No company with id: {id} found.",
)));
}
Expand All @@ -265,14 +273,14 @@ impl CompanyServiceApi for CompanyService {
.iter()
.any(|(_name, identity)| identity.peer_id == signatory_node_id);
if !is_in_contacts {
return Err(super::Error::Validation(String::from(
return Err(super::Error::Validation(format!(
"Node Id {signatory_node_id} is not in the contacts.",
)));
}

let mut company = self.store.get(id).await?;
if company.signatories.contains(&signatory_node_id) {
return Err(super::Error::Validation(String::from(
return Err(super::Error::Validation(format!(
"Node Id {signatory_node_id} is already a signatory.",
)));
}
Expand All @@ -284,7 +292,7 @@ impl CompanyServiceApi for CompanyService {

async fn remove_signatory(&self, id: &str, signatory_node_id: String) -> Result<()> {
if !self.store.exists(id).await {
return Err(super::Error::Validation(String::from(
return Err(super::Error::Validation(format!(
"No company with id: {id} found.",
)));
}
Expand All @@ -296,7 +304,7 @@ impl CompanyServiceApi for CompanyService {
)));
}
if !company.signatories.contains(&signatory_node_id) {
return Err(super::Error::Validation(String::from(
return Err(super::Error::Validation(format!(
"Node id {signatory_node_id} is not a signatory.",
)));
}
Expand Down Expand Up @@ -646,16 +654,22 @@ mod test {

#[tokio::test]
async fn edit_company_baseline() {
let peer_id = PeerId::random();
let (mut storage, mut file_upload_store, mut identity_store, contact_store) =
get_storages();
storage
.expect_get()
.returning(|_| Ok(get_baseline_company_data().1 .0));
storage.expect_get().returning(move |_| {
let mut data = get_baseline_company_data().1 .0;
data.signatories = vec![peer_id.to_string()];
Ok(data)
});
storage.expect_exists().returning(|_| true);
storage.expect_update().returning(|_, _| Ok(()));
storage
.expect_save_attached_file()
.returning(|_, _, _| Ok(()));
identity_store
.expect_get_peer_id()
.returning(move || Ok(peer_id));
identity_store.expect_get().returning(|| {
let mut identity = Identity::new_empty();
identity.public_key_pem = TEST_PUB_KEY.to_string();
Expand Down Expand Up @@ -702,12 +716,41 @@ mod test {
assert!(res.is_err());
}

#[tokio::test]
async fn edit_company_fails_if_caller_is_not_signatory() {
let peer_id = PeerId::random();
let (mut storage, file_upload_store, mut identity_store, contact_store) = get_storages();
storage.expect_exists().returning(|_| false);
storage.expect_get().returning(|_| {
let mut data = get_baseline_company_data().1 .0;
data.signatories = vec!["some_other_dude".to_string()];
Ok(data)
});
identity_store
.expect_get_peer_id()
.returning(move || Ok(peer_id));
let service = get_service(storage, file_upload_store, identity_store, contact_store);
let res = service
.edit_company(
"some_id",
Some("legal_name".to_string()),
Some("some Address".to_string()),
Some("company@example.com".to_string()),
None,
)
.await;
assert!(res.is_err());
}

#[tokio::test]
async fn edit_company_propagates_persistence_errors() {
let (mut storage, file_upload_store, mut identity_store, contact_store) = get_storages();
storage
.expect_get()
.returning(|_| Ok(get_baseline_company_data().1 .0));
let peer_id = PeerId::random();
storage.expect_get().returning(move |_| {
let mut data = get_baseline_company_data().1 .0;
data.signatories = vec![peer_id.to_string()];
Ok(data)
});
storage.expect_exists().returning(|_| true);
storage.expect_update().returning(|_, _| {
Err(persistence::Error::Io(std::io::Error::new(
Expand All @@ -720,6 +763,9 @@ mod test {
identity.public_key_pem = TEST_PUB_KEY.to_string();
Ok(identity)
});
identity_store
.expect_get_peer_id()
.returning(move || Ok(peer_id));
let service = get_service(storage, file_upload_store, identity_store, contact_store);
let res = service
.edit_company(
Expand Down
4 changes: 2 additions & 2 deletions src/service/contact_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl ContactServiceApi for ContactService {
.client
.clone()
.get_identity_public_data_from_dht(identity.peer_id.clone())
.await;
.await?;

if !public.name.is_empty() && public.ne(&identity) {
self.update_identity(name, public.to_owned()).await?;
Expand Down Expand Up @@ -101,7 +101,7 @@ impl ContactServiceApi for ContactService {
.client
.clone()
.get_identity_public_data_from_dht(peer_id.to_owned())
.await;
.await?;

if public.name.is_empty() {
self.store.insert(name, default.clone()).await?;
Expand Down
Loading

0 comments on commit 7b70b7b

Please sign in to comment.