Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: error handling overhaul #65

Merged
merged 4 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ url = "2.3"
warp = { version = "0.3", default-features = false }
serde_json = "1.0"
rand = "0.8.5"
futures-util = "0.3"
once_cell = "1.19"

[[example]]
name = "websocket_client"
Expand Down
6 changes: 3 additions & 3 deletions examples/websocket_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
relay_client::{
error::Error,
error::ClientError,
websocket::{Client, CloseFrame, ConnectionHandler, PublishedMessage},
ConnectionOptions,
},
Expand Down Expand Up @@ -49,11 +49,11 @@ impl ConnectionHandler for Handler {
);
}

fn inbound_error(&mut self, error: Error) {
fn inbound_error(&mut self, error: ClientError) {
println!("[{}] inbound error: {error}", self.name);
}

fn outbound_error(&mut self, error: Error) {
fn outbound_error(&mut self, error: ClientError) {
println!("[{}] outbound error: {error}", self.name);
}
}
Expand Down
62 changes: 59 additions & 3 deletions relay_client/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use relay_rpc::rpc::{self, error::ServiceError};

pub type BoxError = Box<dyn std::error::Error + Send + Sync>;

/// Errors generated while parsing
Expand All @@ -23,7 +25,7 @@ pub enum RequestBuildError {

/// Possible Relay client errors.
#[derive(Debug, thiserror::Error)]
pub enum Error {
pub enum ClientError {
#[error("Failed to build connection request: {0}")]
RequestBuilder(#[from] RequestBuildError),

Expand All @@ -42,15 +44,69 @@ pub enum Error {
#[error("Invalid response ID")]
InvalidResponseId,

#[error("Invalid error response")]
InvalidErrorResponse,

#[error("Serialization failed: {0}")]
Serialization(serde_json::Error),

#[error("Deserialization failed: {0}")]
Deserialization(serde_json::Error),

#[error("RPC error ({code}): {message}")]
Rpc { code: i32, message: String },
#[error("RPC error: code={code} data={data:?} message={message}")]
Rpc {
code: i32,
message: String,
data: Option<String>,
Copy link

@jakubuid jakubuid Feb 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data field is not handled currently in Kotlin client. It should not break tho, would be just ignored.

},

#[error("Invalid request type")]
InvalidRequestType,
}

impl From<rpc::ErrorData> for ClientError {
fn from(err: rpc::ErrorData) -> Self {
Self::Rpc {
code: err.code,
message: err.message,
data: err.data,
}
}
}

#[derive(Debug, thiserror::Error)]
pub enum Error<T: ServiceError> {
heilhead marked this conversation as resolved.
Show resolved Hide resolved
/// Client errors encountered while performing the request.
#[error(transparent)]
Client(ClientError),

/// Error response received from the relay.
#[error(transparent)]
Response(#[from] rpc::Error<T>),
}

impl<T: ServiceError> From<ClientError> for Error<T> {
fn from(err: ClientError) -> Self {
match err {
ClientError::Rpc {
code,
message,
data,
} => {
let err = rpc::ErrorData {
code,
message,
data,
};

match rpc::Error::try_from(err) {
Ok(err) => Error::Response(err),

Err(_) => Error::Client(ClientError::InvalidErrorResponse),
}
}

_ => Error::Client(err),
}
}
}
80 changes: 47 additions & 33 deletions relay_client/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
crate::{
error::{BoxError, Error},
error::{BoxError, ClientError, Error},
ConnectionOptions,
MessageIdGenerator,
},
Expand All @@ -9,15 +9,15 @@ use {
auth::ed25519_dalek::SigningKey,
domain::{DecodedClientId, SubscriptionId, Topic},
jwt::{self, JwtError, VerifyableClaims},
rpc::{self, Receipt, RequestPayload},
rpc::{self, Receipt, ServiceRequest},
},
std::{sync::Arc, time::Duration},
url::Url,
};

pub type TransportError = reqwest::Error;
pub type Response<T> = Result<<T as RequestPayload>::Response, Error>;
pub type EmptyResponse = Result<(), Error>;
pub type Response<T> = Result<<T as ServiceRequest>::Response, Error<<T as ServiceRequest>::Error>>;
pub type EmptyResponse<T> = Result<(), Error<<T as ServiceRequest>::Error>>;

#[derive(Debug, thiserror::Error)]
pub enum RequestParamsError {
Expand All @@ -41,9 +41,6 @@ pub enum HttpClientError {

#[error("JWT error: {0}")]
Jwt(#[from] JwtError),

#[error("RPC error: code={} message={}", .0.code, .0.message)]
RpcError(rpc::ErrorData),
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -82,7 +79,7 @@ pub struct Client {
}

impl Client {
pub fn new(opts: &ConnectionOptions) -> Result<Self, Error> {
pub fn new(opts: &ConnectionOptions) -> Result<Self, ClientError> {
let mut headers = HeaderMap::new();
opts.update_request_headers(&mut headers)?;

Expand Down Expand Up @@ -111,11 +108,14 @@ impl Client {
tag: u32,
ttl: Duration,
prompt: bool,
) -> EmptyResponse {
) -> EmptyResponse<rpc::Publish> {
let ttl_secs = ttl
.as_secs()
.try_into()
.map_err(|_| HttpClientError::InvalidRequest(RequestParamsError::InvalidTtl.into()))?;
.map_err(|_| {
HttpClientError::InvalidRequest(RequestParamsError::InvalidTtl.into()).into()
})
.map_err(Error::Client)?;

self.request(rpc::Publish {
topic,
Expand Down Expand Up @@ -175,7 +175,8 @@ impl Client {
.ttl
.as_secs()
.try_into()
.map_err(|err| HttpClientError::InvalidRequest(Box::new(err)))?;
.map_err(|err| HttpClientError::InvalidRequest(Box::new(err)).into())
.map_err(Error::Client)?;
let exp = iat + ttl_sec;

let claims = rpc::WatchRegisterClaims {
Expand All @@ -194,7 +195,11 @@ impl Client {
};

let payload = rpc::WatchRegister {
register_auth: claims.encode(keypair).map_err(HttpClientError::Jwt)?,
register_auth: claims
.encode(keypair)
.map_err(HttpClientError::Jwt)
.map_err(ClientError::from)
.map_err(Error::Client)?,
};

self.request(payload).await
Expand Down Expand Up @@ -230,7 +235,11 @@ impl Client {
};

let payload = rpc::WatchUnregister {
unregister_auth: claims.encode(keypair).map_err(HttpClientError::Jwt)?,
unregister_auth: claims
.encode(keypair)
.map_err(HttpClientError::Jwt)
.map_err(ClientError::from)
.map_err(Error::Client)?,
};

self.request(payload).await
Expand Down Expand Up @@ -299,45 +308,50 @@ impl Client {

pub(crate) async fn request<T>(&self, payload: T) -> Response<T>
where
T: RequestPayload,
T: ServiceRequest,
{
let payload = rpc::Payload::Request(rpc::Request {
id: self.id_generator.next(),
jsonrpc: rpc::JSON_RPC_VERSION.clone(),
params: payload.into_params(),
});

let result = self
.client
.post(self.url.clone())
.json(&payload)
.send()
.await
.map_err(HttpClientError::Transport)?;
let response = async {
let result = self
.client
.post(self.url.clone())
.json(&payload)
.send()
.await
.map_err(HttpClientError::Transport)?;

let status = result.status();
let status = result.status();

if !status.is_success() {
let body = result.text().await;
return Err(HttpClientError::InvalidHttpCode(status, body).into());
}
if !status.is_success() {
let body = result.text().await;
return Err(HttpClientError::InvalidHttpCode(status, body));
}

let response = result
.json::<rpc::Payload>()
.await
.map_err(|_| HttpClientError::InvalidResponse)?;
result
.json::<rpc::Payload>()
.await
.map_err(|_| HttpClientError::InvalidResponse)
}
.await
.map_err(ClientError::from)
.map_err(Error::Client)?;

match response {
rpc::Payload::Response(rpc::Response::Success(response)) => {
serde_json::from_value(response.result)
.map_err(|_| HttpClientError::InvalidResponse.into())
.map_err(|_| Error::Client(HttpClientError::InvalidResponse.into()))
}

rpc::Payload::Response(rpc::Response::Error(response)) => {
Err(HttpClientError::RpcError(response.error).into())
Err(ClientError::from(response.error).into())
}

_ => Err(HttpClientError::InvalidResponse.into()),
_ => Err(Error::Client(HttpClientError::InvalidResponse.into())),
}
}
}
2 changes: 1 addition & 1 deletion relay_client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
crate::error::{Error, RequestBuildError},
crate::error::{ClientError, RequestBuildError},
::http::HeaderMap,
relay_rpc::{
auth::{SerializedAuthToken, RELAY_WEBSOCKET_ADDRESS},
Expand Down
20 changes: 10 additions & 10 deletions relay_client/src/websocket.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
self::connection::{connection_event_loop, ConnectionControl},
crate::{error::Error, ConnectionOptions},
crate::{error::ClientError, ConnectionOptions},
relay_rpc::{
domain::{MessageId, SubscriptionId, Topic},
rpc::{
Expand Down Expand Up @@ -114,11 +114,11 @@ pub trait ConnectionHandler: Send + 'static {

/// Called when an inbound error occurs, such as data deserialization
/// failure, or an unknown response message ID.
fn inbound_error(&mut self, _error: Error) {}
fn inbound_error(&mut self, _error: ClientError) {}

/// Called when an outbound error occurs, i.e. failed to write to the
/// websocket stream.
fn outbound_error(&mut self, _error: Error) {}
fn outbound_error(&mut self, _error: ClientError) {}
}

/// The Relay WebSocket RPC client.
Expand Down Expand Up @@ -291,7 +291,7 @@ impl Client {
}

/// Opens a connection to the Relay.
pub async fn connect(&self, opts: &ConnectionOptions) -> Result<(), Error> {
pub async fn connect(&self, opts: &ConnectionOptions) -> Result<(), ClientError> {
let (tx, rx) = oneshot::channel();
let request = opts.as_ws_request()?;

Expand All @@ -300,24 +300,24 @@ impl Client {
.send(ConnectionControl::Connect { request, tx })
.is_ok()
{
rx.await.map_err(|_| Error::ChannelClosed)?
rx.await.map_err(|_| ClientError::ChannelClosed)?
} else {
Err(Error::ChannelClosed)
Err(ClientError::ChannelClosed)
}
}

/// Closes the Relay connection.
pub async fn disconnect(&self) -> Result<(), Error> {
pub async fn disconnect(&self) -> Result<(), ClientError> {
let (tx, rx) = oneshot::channel();

if self
.control_tx
.send(ConnectionControl::Disconnect { tx })
.is_ok()
{
rx.await.map_err(|_| Error::ChannelClosed)?
rx.await.map_err(|_| ClientError::ChannelClosed)?
} else {
Err(Error::ChannelClosed)
Err(ClientError::ChannelClosed)
}
}

Expand All @@ -330,7 +330,7 @@ impl Client {
unreachable!();
};

request.tx.send(Err(Error::ChannelClosed)).ok();
request.tx.send(Err(ClientError::ChannelClosed)).ok();
}
}
}
Loading
Loading