Skip to content

Commit

Permalink
chore: add variants for error handling (#317)
Browse files Browse the repository at this point in the history
* chore: add store error enum to allow higher levels to handle it

* chore: add error enum to recon crate

* chore: pr review
  • Loading branch information
dav1do authored Apr 26, 2024
1 parent 4557c5a commit 278d583
Show file tree
Hide file tree
Showing 21 changed files with 591 additions and 319 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion one/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ async fn migrate_from_database(input_ceramic_db: PathBuf, store: SqliteEventStor
Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true),
input_ceramic_db_filename
);
result
Ok(result?)
}

#[cfg(test)]
Expand Down
20 changes: 10 additions & 10 deletions p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1169,7 +1169,7 @@ mod tests {
use futures::TryStreamExt;
use rand::prelude::*;
use rand_chacha::ChaCha8Rng;
use recon::{Range, Sha256a, SyncState};
use recon::{Range, Result as ReconResult, Sha256a, SyncState};
use ssh_key::private::Ed25519Keypair;

use libp2p::{identity::Keypair as Libp2pKeypair, kad::RecordKey};
Expand Down Expand Up @@ -1251,7 +1251,7 @@ mod tests {
type Key = K;
type Hash = Sha256a;

async fn insert(&self, _key: Self::Key, _value: Option<Vec<u8>>) -> Result<()> {
async fn insert(&self, _key: Self::Key, _value: Option<Vec<u8>>) -> ReconResult<()> {
unreachable!()
}

Expand All @@ -1261,45 +1261,45 @@ mod tests {
_right_fencepost: Self::Key,
_offset: usize,
_limit: usize,
) -> Result<Vec<Self::Key>> {
) -> ReconResult<Vec<Self::Key>> {
unreachable!()
}

async fn len(&self) -> Result<usize> {
async fn len(&self) -> ReconResult<usize> {
unreachable!()
}

async fn value_for_key(&self, _key: Self::Key) -> Result<Option<Vec<u8>>> {
async fn value_for_key(&self, _key: Self::Key) -> ReconResult<Option<Vec<u8>>> {
Ok(None)
}
async fn keys_with_missing_values(
&self,
_range: RangeOpen<Self::Key>,
) -> Result<Vec<Self::Key>> {
) -> ReconResult<Vec<Self::Key>> {
unreachable!()
}
async fn interests(&self) -> Result<Vec<RangeOpen<Self::Key>>> {
async fn interests(&self) -> ReconResult<Vec<RangeOpen<Self::Key>>> {
unreachable!()
}

async fn process_interests(
&self,
_interests: Vec<RangeOpen<Self::Key>>,
) -> Result<Vec<RangeOpen<Self::Key>>> {
) -> ReconResult<Vec<RangeOpen<Self::Key>>> {
unreachable!()
}

async fn initial_range(
&self,
_interest: RangeOpen<Self::Key>,
) -> Result<Range<Self::Key, Self::Hash>> {
) -> ReconResult<Range<Self::Key, Self::Hash>> {
unreachable!()
}

async fn process_range(
&self,
_range: Range<Self::Key, Self::Hash>,
) -> Result<(SyncState<Self::Key, Self::Hash>, Vec<Self::Key>)> {
) -> ReconResult<(SyncState<Self::Key, Self::Hash>, Vec<Self::Key>)> {
unreachable!()
}

Expand Down
1 change: 1 addition & 0 deletions recon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ multihash.workspace = true
prometheus-client.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true
uuid = { version = "1.8.0", features = ["v4"] }
Expand Down
40 changes: 27 additions & 13 deletions recon/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use anyhow::Result;
use ceramic_core::RangeOpen;
use tokio::sync::{
mpsc::{channel, Receiver, Sender},
Expand All @@ -8,7 +7,7 @@ use tracing::warn;

use crate::{
recon::{Range, ReconItem, SyncState},
AssociativeHash, InterestProvider, Key, Metrics, Recon, Store,
AssociativeHash, Error, InterestProvider, Key, Metrics, Recon, Result, Store,
};

/// Client to a [`Recon`] [`Server`].
Expand Down Expand Up @@ -256,11 +255,12 @@ where
let val = self
.recon
.insert(&ReconItem::new(&key, value.as_deref()))
.await;
.await
.map_err(Error::from);
send(ret, val);
}
Request::Len { ret } => {
send(ret, self.recon.len().await);
send(ret, self.recon.len().await.map_err(Error::from));
}
Request::Range {
left_fencepost,
Expand All @@ -272,7 +272,8 @@ where
let keys = self
.recon
.range(&left_fencepost, &right_fencepost, offset, limit)
.await;
.await
.map_err(Error::from);
send(ret, keys);
}
Request::RangeWithValues {
Expand All @@ -285,35 +286,48 @@ where
let keys = self
.recon
.range_with_values(&left_fencepost, &right_fencepost, offset, limit)
.await;
.await
.map_err(Error::from);
send(ret, keys);
}
Request::FullRange { ret } => {
let keys = self.recon.full_range().await;
let keys = self.recon.full_range().await.map_err(Error::from);
send(ret, keys);
}
Request::ValueForKey { key, ret } => {
let value = self.recon.value_for_key(key).await;
let value = self.recon.value_for_key(key).await.map_err(Error::from);
send(ret, value);
}
Request::KeysWithMissingValues { range, ret } => {
let ok = self.recon.keys_with_missing_values(range).await;
let ok = self
.recon
.keys_with_missing_values(range)
.await
.map_err(Error::from);
send(ret, ok);
}
Request::Interests { ret } => {
let value = self.recon.interests().await;
let value = self.recon.interests().await.map_err(Error::from);
send(ret, value);
}
Request::InitialRange { interest, ret } => {
let value = self.recon.initial_range(interest).await;
let value = self
.recon
.initial_range(interest)
.await
.map_err(Error::from);
send(ret, value);
}
Request::ProcessInterests { interests, ret } => {
let value = self.recon.process_interests(&interests).await;
let value = self
.recon
.process_interests(&interests)
.await
.map_err(Error::from);
send(ret, value);
}
Request::ProcessRange { range, ret } => {
let value = self.recon.process_range(range).await;
let value = self.recon.process_range(range).await.map_err(Error::from);
send(ret, value);
}
};
Expand Down
77 changes: 77 additions & 0 deletions recon/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use anyhow::anyhow;

#[derive(Debug, thiserror::Error)]
/// The Errors that can be raised by store operations
pub enum Error {
#[error("Application error encountered: {error}")]
/// An application specific error that may be resolved with different inputs or other changes
Application {
/// The error details that may include context and other information
error: anyhow::Error,
},
#[error("Fatal error encountered: {error}")]
/// A fatal error that is unlikely to be recoverable, and may require terminating the process completely
Fatal {
/// The error details that may include context and other information
error: anyhow::Error,
},
#[error("Transient error encountered: {error}")]
/// An error that can be retried, and may resolve itself. If an error is transient repeatedly, it should be
/// considered an "application" level error and propagated upward.
Transient {
/// The error details that may include context and other information
error: anyhow::Error,
},
}

impl Error {
/// Create a transient error
pub fn new_transient(error: impl Into<anyhow::Error>) -> Self {
Self::Transient {
error: error.into(),
}
}
/// Create a fatal error
pub fn new_fatal(error: impl Into<anyhow::Error>) -> Self {
Self::Fatal {
error: error.into(),
}
}

/// Create an application error
pub fn new_app(error: impl Into<anyhow::Error>) -> Self {
Self::Application {
error: error.into(),
}
}

/// Add context to the internal error. Works identically to `anyhow::context`
pub fn context<C>(self, context: C) -> Self
where
C: std::fmt::Display + Send + Sync + 'static,
{
match self {
Error::Application { error } => Self::Application {
error: error.context(context),
},
Error::Fatal { error } => Self::Fatal {
error: error.context(context),
},
Error::Transient { error } => Self::Transient {
error: error.context(context),
},
}
}
}

impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
fn from(value: tokio::sync::mpsc::error::SendError<T>) -> Self {
Self::new_fatal(anyhow!(value.to_string()).context("Unable to send to recon server"))
}
}

impl From<tokio::sync::oneshot::error::RecvError> for Error {
fn from(value: tokio::sync::oneshot::error::RecvError) -> Self {
Self::new_fatal(anyhow!(value.to_string()).context("No response from recon server"))
}
}
5 changes: 5 additions & 0 deletions recon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

pub use crate::{
client::{Client, Server},
error::Error,
metrics::Metrics,
recon::{
btreestore::BTreeStore, AssociativeHash, EventIdStore, FullInterests, HashCount,
Expand All @@ -13,6 +14,7 @@ pub use crate::{
};

mod client;
mod error;
pub mod libp2p;
mod metrics;
pub mod protocol;
Expand All @@ -21,3 +23,6 @@ mod sha256a;

#[cfg(test)]
mod tests;

/// A result type that wraps a recon Error
pub type Result<T> = std::result::Result<T, Error>;
Loading

0 comments on commit 278d583

Please sign in to comment.