Skip to content

Commit

Permalink
fix: client rpc error types
Browse files Browse the repository at this point in the history
  • Loading branch information
dandanlen committed Oct 18, 2024
1 parent b4aaae5 commit a3126b4
Show file tree
Hide file tree
Showing 13 changed files with 601 additions and 833 deletions.
22 changes: 14 additions & 8 deletions api/bin/chainflip-broker-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,27 @@ use chainflip_api::{
SwapDepositAddress, WithdrawFeesDetail,
};
use clap::Parser;
use custom_rpc::to_rpc_error;
use futures::FutureExt;
use jsonrpsee::{
core::{async_trait, RpcResult},
proc_macros::rpc,
server::ServerBuilder,
types::ErrorCode,
};
use std::{
path::PathBuf,
sync::{atomic::AtomicBool, Arc},
};
use tracing::log;

fn to_error_object(error: impl core::fmt::Display) -> jsonrpsee::types::error::ErrorObjectOwned {
jsonrpsee::types::error::ErrorObjectOwned::owned(
ErrorCode::ServerError(0xcf).code(),
error.to_string(),
None::<()>,
)
}

#[rpc(server, client, namespace = "broker")]
pub trait Rpc {
#[method(name = "register_account", aliases = ["broker_registerAccount"])]
Expand Down Expand Up @@ -75,7 +83,7 @@ impl RpcServer for RpcServerImpl {
.register_account_role(AccountRole::Broker)
.await
.map(|tx_hash| format!("{tx_hash:#x}"))
.map_err(to_rpc_error)?)
.map_err(to_error_object)?)
}

async fn request_swap_deposit_address(
Expand Down Expand Up @@ -105,7 +113,7 @@ impl RpcServer for RpcServerImpl {
dca_parameters,
)
.await
.map_err(to_rpc_error)?)
.map_err(to_error_object)?)
}

async fn withdraw_fees(
Expand All @@ -118,7 +126,7 @@ impl RpcServer for RpcServerImpl {
.broker_api()
.withdraw_fees(asset, destination_address)
.await
.map_err(to_rpc_error)?)
.map_err(to_error_object)?)
}
}

Expand Down Expand Up @@ -176,11 +184,9 @@ async fn main() -> anyhow::Result<()> {
let server = ServerBuilder::default()
.max_connections(opts.max_connections)
.build(format!("0.0.0.0:{}", opts.port))
.await
.map_err(to_rpc_error)?;
.await?;
let server_addr = server.local_addr()?;
let server = server
.start(RpcServerImpl::new(scope, opts).await.map_err(to_rpc_error)?.into_rpc());
let server = server.start(RpcServerImpl::new(scope, opts).await?.into_rpc());

log::info!("🎙 Server is listening on {server_addr}.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,12 +378,14 @@ mod tests {
BlockInfo,
};
use frame_support::storage::types::QueryKindTrait;
use jsonrpsee::core::RpcResult;
use jsonrpsee::core::ClientError;
use mockall::mock;
use pallet_cf_ingress_egress::DepositWitness;
use sp_core::{storage::StorageKey, H160};
use std::collections::HashMap;

type RpcResult<T> = Result<T, ClientError>;

#[derive(Default)]
struct MockStore {
storage: HashMap<String, serde_json::Value>,
Expand Down
99 changes: 63 additions & 36 deletions api/bin/chainflip-lp-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ use chainflip_api::{
use clap::Parser;
use custom_rpc::{
order_fills::{order_fills_from_block_updates, OrderFills},
to_rpc_error, CustomApiClient,
CustomApiClient,
};
use futures::{try_join, FutureExt, StreamExt};
use jsonrpsee::{
core::{async_trait, RpcResult},
core::{async_trait, ClientError, RpcResult},
proc_macros::rpc,
server::ServerBuilder,
types::{ErrorCode, ErrorObject, ErrorObjectOwned},
PendingSubscriptionSink,
};
use pallet_cf_pools::{CloseOrder, IncreaseOrDecrease, OrderId, RangeOrderSize, MAX_ORDERS_DELETE};
Expand Down Expand Up @@ -229,11 +230,33 @@ impl RpcServerImpl {
Ok(Self {
api: StateChainApi::connect(scope, StateChain { ws_endpoint, signing_key_file })
.await
.map_err(to_rpc_error)?,
.map_err(to_error_object)?,
})
}
}

fn unwrap_client_error(err: ClientError) -> ErrorObjectOwned {
match err {
ClientError::Call(obj) => obj,
internal => {
log::error!("Internal rpc client error: {internal:?}");
ErrorObject::owned(
ErrorCode::InternalError.code(),
"Internal rpc client error",
None::<()>,
)
},
}
}

fn to_error_object(error: impl core::fmt::Display) -> jsonrpsee::types::error::ErrorObjectOwned {
jsonrpsee::types::error::ErrorObjectOwned::owned(
ErrorCode::ServerError(0xcf).code(),
error.to_string(),
None::<()>,
)
}

#[async_trait]
impl RpcServer for RpcServerImpl {
/// Returns a deposit address
Expand All @@ -249,7 +272,7 @@ impl RpcServer for RpcServerImpl {
.request_liquidity_deposit_address(asset, wait_for.unwrap_or_default(), boost_fee)
.await
.map(|result| result.map_details(|address| address.to_string()))
.map_err(to_rpc_error)?)
.map_err(to_error_object)?)
}

async fn register_liquidity_refund_address(
Expand All @@ -262,7 +285,7 @@ impl RpcServer for RpcServerImpl {
.lp_api()
.register_liquidity_refund_address(chain, address)
.await
.map_err(to_rpc_error)?)
.map_err(to_error_object)?)
}

/// Returns an egress id
Expand All @@ -277,13 +300,13 @@ impl RpcServer for RpcServerImpl {
.api
.lp_api()
.withdraw_asset(
try_parse_number_or_hex(amount).map_err(to_rpc_error)?,
try_parse_number_or_hex(amount).map_err(to_error_object)?,
asset,
destination_address,
wait_for.unwrap_or_default(),
)
.await
.map_err(to_rpc_error)?)
.map_err(to_error_object)?)
}

/// Returns an egress id
Expand All @@ -300,12 +323,12 @@ impl RpcServer for RpcServerImpl {
amount
.try_into()
.map_err(|_| anyhow!("Failed to convert amount to u128"))
.map_err(to_rpc_error)?,
.map_err(to_error_object)?,
asset,
destination_account,
)
.await
.map_err(to_rpc_error)?)
.map_err(to_error_object)?)
}

/// Returns a list of all assets and their free balance in json format
Expand All @@ -319,7 +342,7 @@ impl RpcServer for RpcServerImpl {
Some(self.api.state_chain_client.latest_finalized_block().hash),
)
.await
.map_err(to_rpc_error)
.map_err(to_error_object)
}

async fn update_range_order(
Expand All @@ -337,13 +360,13 @@ impl RpcServer for RpcServerImpl {
.update_range_order(
base_asset,
quote_asset,
id.try_into().map_err(to_rpc_error)?,
id.try_into().map_err(to_error_object)?,
tick_range,
size_change.try_map(|size| size.try_into()).map_err(to_rpc_error)?,
size_change.try_map(|size| size.try_into()).map_err(to_error_object)?,
wait_for.unwrap_or_default(),
)
.await
.map_err(to_rpc_error)?)
.map_err(to_error_object)?)
}

async fn set_range_order(
Expand All @@ -361,13 +384,13 @@ impl RpcServer for RpcServerImpl {
.set_range_order(
base_asset,
quote_asset,
id.try_into().map_err(to_rpc_error)?,
id.try_into().map_err(to_error_object)?,
tick_range,
size.try_into().map_err(to_rpc_error)?,
size.try_into().map_err(to_error_object)?,
wait_for.unwrap_or_default(),
)
.await
.map_err(to_rpc_error)?)
.map_err(to_error_object)?)
}

async fn update_limit_order(
Expand All @@ -388,14 +411,14 @@ impl RpcServer for RpcServerImpl {
base_asset,
quote_asset,
side,
id.try_into().map_err(to_rpc_error)?,
id.try_into().map_err(to_error_object)?,
tick,
amount_change.try_map(try_parse_number_or_hex).map_err(to_rpc_error)?,
amount_change.try_map(try_parse_number_or_hex).map_err(to_error_object)?,
dispatch_at,
wait_for.unwrap_or_default(),
)
.await
.map_err(to_rpc_error)?)
.map_err(to_error_object)?)
}

async fn set_limit_order(
Expand All @@ -416,14 +439,14 @@ impl RpcServer for RpcServerImpl {
base_asset,
quote_asset,
side,
id.try_into().map_err(to_rpc_error)?,
id.try_into().map_err(to_error_object)?,
tick,
try_parse_number_or_hex(sell_amount).map_err(to_rpc_error)?,
try_parse_number_or_hex(sell_amount).map_err(to_error_object)?,
dispatch_at,
wait_for.unwrap_or_default(),
)
.await
.map_err(to_rpc_error)?)
.map_err(to_error_object)?)
}

/// Returns the tx hash that the account role was set
Expand All @@ -433,7 +456,7 @@ impl RpcServer for RpcServerImpl {
.operator_api()
.register_account_role(AccountRole::LiquidityProvider)
.await
.map_err(to_rpc_error)?)
.map_err(to_error_object)?)
}

async fn get_open_swap_channels(&self) -> RpcResult<OpenSwapChannels> {
Expand All @@ -443,7 +466,8 @@ impl RpcServer for RpcServerImpl {
api.get_open_swap_channels::<Ethereum>(None),
api.get_open_swap_channels::<Bitcoin>(None),
api.get_open_swap_channels::<Polkadot>(None),
)?;
)
.map_err(unwrap_client_error)?;
Ok(OpenSwapChannels { ethereum, bitcoin, polkadot })
}

Expand All @@ -454,7 +478,9 @@ impl RpcServer for RpcServerImpl {
executor_address: Option<EthereumAddress>,
) -> RpcResult<Hash> {
let redeem_amount = if let Some(number_or_hex) = exact_amount {
RedemptionAmount::Exact(try_parse_number_or_hex(number_or_hex).map_err(to_rpc_error)?)
RedemptionAmount::Exact(
try_parse_number_or_hex(number_or_hex).map_err(to_error_object)?,
)
} else {
RedemptionAmount::Max
};
Expand All @@ -464,7 +490,7 @@ impl RpcServer for RpcServerImpl {
.operator_api()
.request_redemption(redeem_amount, redeem_address, executor_address)
.await
.map_err(to_rpc_error)?)
.map_err(to_error_object)?)
}

fn subscribe_order_fills(&self, pending_sink: PendingSubscriptionSink) {
Expand Down Expand Up @@ -494,12 +520,12 @@ impl RpcServer for RpcServerImpl {
let state_chain_client = &self.api.state_chain_client;

let block = if let Some(at) = at {
state_chain_client.block(at).await.map_err(to_rpc_error)?
state_chain_client.block(at).await.map_err(to_error_object)?
} else {
state_chain_client.latest_finalized_block()
};

Ok(order_fills(state_chain_client.clone(), block).await.map_err(to_rpc_error)?)
Ok(order_fills(state_chain_client.clone(), block).await.map_err(to_error_object)?)
}

async fn cancel_all_orders(
Expand All @@ -514,7 +540,7 @@ impl RpcServer for RpcServerImpl {
.raw_rpc_client
.cf_available_pools(None)
.await
.map_err(to_rpc_error)?;
.map_err(to_error_object)?;
for pool in pool_pairs {
let orders = self
.api
Expand All @@ -529,7 +555,7 @@ impl RpcServer for RpcServerImpl {
None,
)
.await
.map_err(to_rpc_error)?;
.map_err(to_error_object)?;
for order in orders.range_orders {
orders_to_delete.push(CloseOrder::Range {
base_asset: pool.base,
Expand Down Expand Up @@ -570,7 +596,7 @@ impl RpcServer for RpcServerImpl {
wait_for.unwrap_or_default(),
)
.await
.map_err(to_rpc_error)?,
.map_err(to_error_object)?,
);
}

Expand All @@ -587,7 +613,7 @@ impl RpcServer for RpcServerImpl {
.lp_api()
.cancel_orders_batch(orders, wait_for.unwrap_or_default())
.await
.map_err(to_rpc_error)?)
.map_err(to_error_object)?)
}
}

Expand All @@ -604,7 +630,8 @@ where
state_chain_client
.storage_map::<pallet_cf_pools::Pools<Runtime>, BTreeMap<_, _>>(block.hash),
state_chain_client.storage_value::<frame_system::Events<Runtime>>(block.hash)
)?;
)
.map_err(unwrap_client_error)?;

let lp_events = events
.into_iter()
Expand Down Expand Up @@ -674,15 +701,15 @@ async fn main() -> anyhow::Result<()> {
has_completed_initialising.clone(),
)
.await
.map_err(to_rpc_error)?;
.map_err(to_error_object)?;

let server = ServerBuilder::default()
.build(format!("0.0.0.0:{}", opts.port))
.await
.map_err(to_rpc_error)?;
.map_err(to_error_object)?;
let server_addr = server.local_addr()?;
let server = server
.start(RpcServerImpl::new(scope, opts).await.map_err(to_rpc_error)?.into_rpc());
.start(RpcServerImpl::new(scope, opts).await.map_err(to_error_object)?.into_rpc());

log::info!("🎙 Server is listening on {server_addr}.");

Expand Down
Loading

0 comments on commit a3126b4

Please sign in to comment.