From b2d873af83354307252eb4b15b9c0627b69f5d6b Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 16 Oct 2024 12:46:34 +0200 Subject: [PATCH] feat: structured error return types for rpcs --- Cargo.lock | 2 + api/bin/chainflip-broker-api/Cargo.toml | 1 + api/bin/chainflip-broker-api/src/main.rs | 64 +- .../src/witnessing/state_chain.rs | 4 +- api/bin/chainflip-lp-api/Cargo.toml | 1 + api/bin/chainflip-lp-api/src/main.rs | 136 +- api/lib/src/queries.rs | 4 +- engine/src/state_chain_observer/client.rs | 5 +- .../client/base_rpc_api.rs | 74 +- .../state_chain_observer/client/chain_api.rs | 2 +- .../signed/submission_watcher.rs | 34 +- .../client/extrinsic_api/unsigned.rs | 19 +- .../client/storage_api.rs | 3 +- state-chain/custom-rpc/src/lib.rs | 1106 ++++++----------- state-chain/custom-rpc/src/monitoring.rs | 70 +- state-chain/runtime/src/runtime_apis.rs | 17 +- 16 files changed, 646 insertions(+), 896 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1bd338aedeb..f11d9b49799 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1680,6 +1680,7 @@ dependencies = [ "sp-core 34.0.0", "sp-rpc", "substrate-build-script-utils", + "thiserror", "tokio", "tracing", "tracing-subscriber 0.3.18", @@ -1864,6 +1865,7 @@ dependencies = [ "sp-core 34.0.0", "sp-rpc", "substrate-build-script-utils", + "thiserror", "tokio", "tracing", "tracing-subscriber 0.3.18", diff --git a/api/bin/chainflip-broker-api/Cargo.toml b/api/bin/chainflip-broker-api/Cargo.toml index 153d1ec7ac5..c0a5016baaf 100644 --- a/api/bin/chainflip-broker-api/Cargo.toml +++ b/api/bin/chainflip-broker-api/Cargo.toml @@ -34,6 +34,7 @@ jsonrpsee = { version = "0.23.2", features = ["full"] } serde = { version = '1.0.197', features = ['derive'] } sp-core = { git = "https://github.com/chainflip-io/polkadot-sdk.git", tag = "chainflip-substrate-1.15.2+2" } sp-rpc = { git = "https://github.com/chainflip-io/polkadot-sdk.git", tag = "chainflip-substrate-1.15.2+2" } +thiserror = "1.0" tokio = "1.20.1" tracing = "0.1.34" tracing-subscriber = { version = "0.3.3", features = ["env-filter"] } diff --git a/api/bin/chainflip-broker-api/src/main.rs b/api/bin/chainflip-broker-api/src/main.rs index fb574e03150..91070243861 100644 --- a/api/bin/chainflip-broker-api/src/main.rs +++ b/api/bin/chainflip-broker-api/src/main.rs @@ -12,12 +12,12 @@ use chainflip_api::{ SwapDepositAddress, SwapPayload, WithdrawFeesDetail, }; use clap::Parser; -use custom_rpc::to_rpc_error; use futures::FutureExt; use jsonrpsee::{ - core::{async_trait, RpcResult}, + core::{async_trait, ClientError}, proc_macros::rpc, server::ServerBuilder, + types::{ErrorCode, ErrorObject, ErrorObjectOwned}, }; use std::{ path::PathBuf, @@ -25,6 +25,42 @@ use std::{ }; use tracing::log; +#[derive(thiserror::Error, Debug)] +pub enum BrokerApiError { + #[error(transparent)] + ErrorObject(#[from] ErrorObjectOwned), + #[error(transparent)] + ClientError(#[from] jsonrpsee::core::ClientError), + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +type RpcResult = Result; + +impl From for ErrorObjectOwned { + fn from(error: BrokerApiError) -> Self { + match error { + BrokerApiError::ErrorObject(error) => error, + BrokerApiError::ClientError(error) => match error { + ClientError::Call(obj) => obj, + internal => { + log::error!("Internal rpc client error: {internal:?}"); + ErrorObject::owned( + ErrorCode::InternalError.code(), + "Internal rpc client error", + None::<()>, + ) + }, + }, + BrokerApiError::Other(error) => jsonrpsee::types::error::ErrorObjectOwned::owned( + ErrorCode::ServerError(0xcf).code(), + error.to_string(), + None::<()>, + ), + } + } +} + #[rpc(server, client, namespace = "broker")] pub trait Rpc { #[method(name = "register_account", aliases = ["broker_registerAccount"])] @@ -90,8 +126,7 @@ impl RpcServer for RpcServerImpl { .operator_api() .register_account_role(AccountRole::Broker) .await - .map(|tx_hash| format!("{tx_hash:#x}")) - .map_err(to_rpc_error)?) + .map(|tx_hash| format!("{tx_hash:#x}"))?) } async fn request_swap_deposit_address( @@ -120,8 +155,7 @@ impl RpcServer for RpcServerImpl { refund_parameters, dca_parameters, ) - .await - .map_err(to_rpc_error)?) + .await?) } async fn withdraw_fees( @@ -129,12 +163,7 @@ impl RpcServer for RpcServerImpl { asset: Asset, destination_address: AddressString, ) -> RpcResult { - Ok(self - .api - .broker_api() - .withdraw_fees(asset, destination_address) - .await - .map_err(to_rpc_error)?) + Ok(self.api.broker_api().withdraw_fees(asset, destination_address).await?) } async fn request_swap_parameter_encoding( @@ -157,14 +186,13 @@ impl RpcServer for RpcServerImpl { destination_asset, destination_address, retry_duration, - try_parse_number_or_hex(min_output_amount).map_err(to_rpc_error)?, + try_parse_number_or_hex(min_output_amount)?, boost_fee, dca_parameters, broker_commission, affiliate_fees, ) - .await - .map_err(to_rpc_error)?) + .await?) } } @@ -222,11 +250,9 @@ async fn main() -> anyhow::Result<()> { let server = ServerBuilder::default() .max_connections(opts.max_connections) .build(format!("0.0.0.0:{}", opts.port)) - .await - .map_err(to_rpc_error)?; + .await?; let server_addr = server.local_addr()?; - let server = server - .start(RpcServerImpl::new(scope, opts).await.map_err(to_rpc_error)?.into_rpc()); + let server = server.start(RpcServerImpl::new(scope, opts).await?.into_rpc()); log::info!("🎙 Server is listening on {server_addr}."); diff --git a/api/bin/chainflip-ingress-egress-tracker/src/witnessing/state_chain.rs b/api/bin/chainflip-ingress-egress-tracker/src/witnessing/state_chain.rs index 5d9c6c553c4..962dbd0b613 100644 --- a/api/bin/chainflip-ingress-egress-tracker/src/witnessing/state_chain.rs +++ b/api/bin/chainflip-ingress-egress-tracker/src/witnessing/state_chain.rs @@ -396,12 +396,14 @@ mod tests { BlockInfo, }; use frame_support::storage::types::QueryKindTrait; - use jsonrpsee::core::RpcResult; + use jsonrpsee::core::ClientError; use mockall::mock; use pallet_cf_ingress_egress::DepositWitness; use sp_core::{storage::StorageKey, H160}; use std::collections::HashMap; + type RpcResult = Result; + #[derive(Default)] struct MockStore { storage: HashMap, diff --git a/api/bin/chainflip-lp-api/Cargo.toml b/api/bin/chainflip-lp-api/Cargo.toml index 0a70e548957..0b82a72098c 100644 --- a/api/bin/chainflip-lp-api/Cargo.toml +++ b/api/bin/chainflip-lp-api/Cargo.toml @@ -39,6 +39,7 @@ cf-primitives = { path = "../../../state-chain/primitives" } custom-rpc = { path = "../../../state-chain/custom-rpc" } frame-system = { git = 'https://github.com/chainflip-io/polkadot-sdk.git', tag = 'chainflip-substrate-1.15.2+2' } sc-rpc = { git = 'https://github.com/chainflip-io/polkadot-sdk.git', tag = 'chainflip-substrate-1.15.2+2' } +thiserror = "1.0" # Local chainflip-api = { path = "../../lib" } diff --git a/api/bin/chainflip-lp-api/src/main.rs b/api/bin/chainflip-lp-api/src/main.rs index bc0649f1fac..b1990c528a8 100644 --- a/api/bin/chainflip-lp-api/src/main.rs +++ b/api/bin/chainflip-lp-api/src/main.rs @@ -24,13 +24,14 @@ use chainflip_api::{ use clap::Parser; use custom_rpc::{ order_fills::{order_fills_from_block_updates, OrderFills}, - to_rpc_error, CustomApiClient, + CustomApiClient, }; use futures::{try_join, FutureExt, StreamExt}; use jsonrpsee::{ - core::{async_trait, RpcResult}, + core::{async_trait, ClientError}, proc_macros::rpc, server::ServerBuilder, + types::{ErrorCode, ErrorObject, ErrorObjectOwned}, PendingSubscriptionSink, }; use pallet_cf_pools::{CloseOrder, IncreaseOrDecrease, OrderId, RangeOrderSize, MAX_ORDERS_DELETE}; @@ -228,12 +229,47 @@ impl RpcServerImpl { ) -> Result { Ok(Self { api: StateChainApi::connect(scope, StateChain { ws_endpoint, signing_key_file }) - .await - .map_err(to_rpc_error)?, + .await?, }) } } +#[derive(thiserror::Error, Debug)] +pub enum LpApiError { + #[error(transparent)] + ErrorObject(#[from] ErrorObjectOwned), + #[error(transparent)] + ClientError(#[from] jsonrpsee::core::ClientError), + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +type RpcResult = Result; + +impl From for ErrorObjectOwned { + fn from(error: LpApiError) -> Self { + match error { + LpApiError::ErrorObject(error) => error, + LpApiError::ClientError(error) => match error { + ClientError::Call(obj) => obj, + internal => { + log::error!("Internal rpc client error: {internal:?}"); + ErrorObject::owned( + ErrorCode::InternalError.code(), + "Internal rpc client error", + None::<()>, + ) + }, + }, + LpApiError::Other(error) => jsonrpsee::types::error::ErrorObjectOwned::owned( + ErrorCode::ServerError(0xcf).code(), + error.to_string(), + None::<()>, + ), + } + } +} + #[async_trait] impl RpcServer for RpcServerImpl { /// Returns a deposit address @@ -248,8 +284,7 @@ impl RpcServer for RpcServerImpl { .lp_api() .request_liquidity_deposit_address(asset, wait_for.unwrap_or_default(), boost_fee) .await - .map(|result| result.map_details(|address| address.to_string())) - .map_err(to_rpc_error)?) + .map(|result| result.map_details(|address| address.to_string()))?) } async fn register_liquidity_refund_address( @@ -257,12 +292,7 @@ impl RpcServer for RpcServerImpl { chain: ForeignChain, address: AddressString, ) -> RpcResult { - Ok(self - .api - .lp_api() - .register_liquidity_refund_address(chain, address) - .await - .map_err(to_rpc_error)?) + Ok(self.api.lp_api().register_liquidity_refund_address(chain, address).await?) } /// Returns an egress id @@ -277,13 +307,12 @@ impl RpcServer for RpcServerImpl { .api .lp_api() .withdraw_asset( - try_parse_number_or_hex(amount).map_err(to_rpc_error)?, + try_parse_number_or_hex(amount)?, asset, destination_address, wait_for.unwrap_or_default(), ) - .await - .map_err(to_rpc_error)?) + .await?) } /// Returns an egress id @@ -297,20 +326,17 @@ impl RpcServer for RpcServerImpl { .api .lp_api() .transfer_asset( - amount - .try_into() - .map_err(|_| anyhow!("Failed to convert amount to u128")) - .map_err(to_rpc_error)?, + amount.try_into().map_err(|_| anyhow!("Failed to convert amount to u128"))?, asset, destination_account, ) - .await - .map_err(to_rpc_error)?) + .await?) } /// Returns a list of all assets and their free balance in json format async fn free_balances(&self) -> RpcResult> { - self.api + Ok(self + .api .state_chain_client .base_rpc_client .raw_rpc_client @@ -318,8 +344,7 @@ impl RpcServer for RpcServerImpl { self.api.state_chain_client.account_id(), Some(self.api.state_chain_client.latest_finalized_block().hash), ) - .await - .map_err(to_rpc_error) + .await?) } async fn update_range_order( @@ -337,13 +362,12 @@ impl RpcServer for RpcServerImpl { .update_range_order( base_asset, quote_asset, - id.try_into().map_err(to_rpc_error)?, + id.try_into()?, tick_range, - size_change.try_map(|size| size.try_into()).map_err(to_rpc_error)?, + size_change.try_map(|size| size.try_into())?, wait_for.unwrap_or_default(), ) - .await - .map_err(to_rpc_error)?) + .await?) } async fn set_range_order( @@ -361,13 +385,12 @@ impl RpcServer for RpcServerImpl { .set_range_order( base_asset, quote_asset, - id.try_into().map_err(to_rpc_error)?, + id.try_into()?, tick_range, - size.try_into().map_err(to_rpc_error)?, + size.try_into()?, wait_for.unwrap_or_default(), ) - .await - .map_err(to_rpc_error)?) + .await?) } async fn update_limit_order( @@ -388,14 +411,13 @@ impl RpcServer for RpcServerImpl { base_asset, quote_asset, side, - id.try_into().map_err(to_rpc_error)?, + id.try_into()?, tick, - amount_change.try_map(try_parse_number_or_hex).map_err(to_rpc_error)?, + amount_change.try_map(try_parse_number_or_hex)?, dispatch_at, wait_for.unwrap_or_default(), ) - .await - .map_err(to_rpc_error)?) + .await?) } async fn set_limit_order( @@ -416,14 +438,13 @@ impl RpcServer for RpcServerImpl { base_asset, quote_asset, side, - id.try_into().map_err(to_rpc_error)?, + id.try_into()?, tick, - try_parse_number_or_hex(sell_amount).map_err(to_rpc_error)?, + try_parse_number_or_hex(sell_amount)?, dispatch_at, wait_for.unwrap_or_default(), ) - .await - .map_err(to_rpc_error)?) + .await?) } /// Returns the tx hash that the account role was set @@ -432,8 +453,7 @@ impl RpcServer for RpcServerImpl { .api .operator_api() .register_account_role(AccountRole::LiquidityProvider) - .await - .map_err(to_rpc_error)?) + .await?) } async fn get_open_swap_channels(&self) -> RpcResult { @@ -454,7 +474,7 @@ impl RpcServer for RpcServerImpl { executor_address: Option, ) -> RpcResult { let redeem_amount = if let Some(number_or_hex) = exact_amount { - RedemptionAmount::Exact(try_parse_number_or_hex(number_or_hex).map_err(to_rpc_error)?) + RedemptionAmount::Exact(try_parse_number_or_hex(number_or_hex)?) } else { RedemptionAmount::Max }; @@ -463,8 +483,7 @@ impl RpcServer for RpcServerImpl { .api .operator_api() .request_redemption(redeem_amount, redeem_address, executor_address) - .await - .map_err(to_rpc_error)?) + .await?) } fn subscribe_order_fills(&self, pending_sink: PendingSubscriptionSink) { @@ -494,12 +513,12 @@ impl RpcServer for RpcServerImpl { let state_chain_client = &self.api.state_chain_client; let block = if let Some(at) = at { - state_chain_client.block(at).await.map_err(to_rpc_error)? + state_chain_client.block(at).await? } else { state_chain_client.latest_finalized_block() }; - Ok(order_fills(state_chain_client.clone(), block).await.map_err(to_rpc_error)?) + Ok(order_fills(state_chain_client.clone(), block).await?) } async fn cancel_all_orders( @@ -513,8 +532,7 @@ impl RpcServer for RpcServerImpl { .base_rpc_client .raw_rpc_client .cf_available_pools(None) - .await - .map_err(to_rpc_error)?; + .await?; for pool in pool_pairs { let orders = self .api @@ -528,8 +546,7 @@ impl RpcServer for RpcServerImpl { None, None, ) - .await - .map_err(to_rpc_error)?; + .await?; for order in orders.range_orders { orders_to_delete.push(CloseOrder::Range { base_asset: pool.base, @@ -569,8 +586,7 @@ impl RpcServer for RpcServerImpl { .expect("Guaranteed by `chunk` method."), wait_for.unwrap_or_default(), ) - .await - .map_err(to_rpc_error)?, + .await?, ); } @@ -586,8 +602,7 @@ impl RpcServer for RpcServerImpl { .api .lp_api() .cancel_orders_batch(orders, wait_for.unwrap_or_default()) - .await - .map_err(to_rpc_error)?) + .await?) } } @@ -673,16 +688,11 @@ async fn main() -> anyhow::Result<()> { &opts.health_check, has_completed_initialising.clone(), ) - .await - .map_err(to_rpc_error)?; + .await?; - let server = ServerBuilder::default() - .build(format!("0.0.0.0:{}", opts.port)) - .await - .map_err(to_rpc_error)?; + let server = ServerBuilder::default().build(format!("0.0.0.0:{}", opts.port)).await?; let server_addr = server.local_addr()?; - let server = server - .start(RpcServerImpl::new(scope, opts).await.map_err(to_rpc_error)?.into_rpc()); + let server = server.start(RpcServerImpl::new(scope, opts).await?.into_rpc()); log::info!("🎙 Server is listening on {server_addr}."); diff --git a/api/lib/src/queries.rs b/api/lib/src/queries.rs index ac866199dfd..b0840c76c03 100644 --- a/api/lib/src/queries.rs +++ b/api/lib/src/queries.rs @@ -7,7 +7,7 @@ use chainflip_engine::state_chain_observer::client::{ use codec::Decode; use custom_rpc::CustomApiClient; use frame_support::sp_runtime::DigestItem; -use jsonrpsee::core::RpcResult; +use jsonrpsee::core::ClientError; use pallet_cf_ingress_egress::DepositChannelDetails; use pallet_cf_validator::RotationPhase; use serde::Deserialize; @@ -17,6 +17,8 @@ use std::{collections::BTreeMap, ops::Deref, sync::Arc}; use tracing::log; use utilities::task_scope; +type RpcResult = Result; + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SwapChannelInfo { deposit_address: ::Humanreadable, diff --git a/engine/src/state_chain_observer/client.rs b/engine/src/state_chain_observer/client.rs index 453c9ae5252..0531a5b3d4a 100644 --- a/engine/src/state_chain_observer/client.rs +++ b/engine/src/state_chain_observer/client.rs @@ -16,7 +16,7 @@ use futures::{StreamExt, TryStreamExt}; use cf_primitives::CfeCompatibility; use futures_core::future::BoxFuture; use futures_util::FutureExt; -use jsonrpsee::core::RpcResult; +use jsonrpsee::core::ClientError; use sp_core::{Pair, H256}; use state_chain_runtime::AccountId; use std::{pin::Pin, sync::Arc, time::Duration}; @@ -84,6 +84,7 @@ impl From for BlockInfo { } pub type DefaultRpcClient = base_rpc_api::BaseRpcClient; +pub(crate) type RpcResult = Result; impl DefaultRpcClient { pub async fn connect(ws_endpoint: &str) -> Result { @@ -1049,6 +1050,7 @@ impl< #[cfg(test)] pub mod mocks { + use super::RpcResult; use crate::state_chain_observer::client::{ extrinsic_api::{signed::SignedExtrinsicApi, unsigned::UnsignedExtrinsicApi}, storage_api::StorageApi, @@ -1056,7 +1058,6 @@ pub mod mocks { }; use async_trait::async_trait; use frame_support::storage::types::QueryKindTrait; - use jsonrpsee::core::RpcResult; use mockall::mock; use sp_core::{storage::StorageKey, H256}; use state_chain_runtime::AccountId; diff --git a/engine/src/state_chain_observer/client/base_rpc_api.rs b/engine/src/state_chain_observer/client/base_rpc_api.rs index e105cb49940..a0e33b6f233 100644 --- a/engine/src/state_chain_observer/client/base_rpc_api.rs +++ b/engine/src/state_chain_observer/client/base_rpc_api.rs @@ -1,9 +1,6 @@ use async_trait::async_trait; -use jsonrpsee::core::{ - client::{ClientT, Subscription, SubscriptionClientT}, - RpcResult, -}; +use jsonrpsee::core::client::{ClientT, Subscription, SubscriptionClientT}; use sc_transaction_pool_api::TransactionStatus; use sp_core::{ storage::{StorageData, StorageKey}, @@ -13,7 +10,7 @@ use sp_version::RuntimeVersion; use state_chain_runtime::SignedBlock; use codec::Encode; -use custom_rpc::{to_rpc_error, CustomApiClient}; +use custom_rpc::CustomApiClient; use sc_rpc_api::{ author::AuthorApiClient, chain::ChainApiClient, @@ -26,6 +23,8 @@ use serde_json::value::RawValue; use std::sync::Arc; use subxt::backend::rpc::RawRpcSubscription; +use super::RpcResult; + #[cfg(test)] use mockall::automock; @@ -196,34 +195,28 @@ fn unwrap_value(list_or_value: sp_rpc::list::ListOrValue) -> T { #[async_trait] impl BaseRpcApi for BaseRpcClient { async fn health(&self) -> RpcResult { - self.raw_rpc_client.system_health().await.map_err(to_rpc_error) + self.raw_rpc_client.system_health().await } async fn next_account_nonce( &self, account_id: state_chain_runtime::AccountId, ) -> RpcResult { - self.raw_rpc_client.nonce(account_id).await.map_err(to_rpc_error) + self.raw_rpc_client.nonce(account_id).await } async fn submit_extrinsic( &self, extrinsic: state_chain_runtime::UncheckedExtrinsic, ) -> RpcResult { - self.raw_rpc_client - .submit_extrinsic(Bytes::from(extrinsic.encode())) - .await - .map_err(to_rpc_error) + self.raw_rpc_client.submit_extrinsic(Bytes::from(extrinsic.encode())).await } async fn submit_and_watch_extrinsic( &self, extrinsic: state_chain_runtime::UncheckedExtrinsic, ) -> RpcResult>> { - self.raw_rpc_client - .watch_extrinsic(Bytes::from(extrinsic.encode())) - .await - .map_err(to_rpc_error) + self.raw_rpc_client.watch_extrinsic(Bytes::from(extrinsic.encode())).await } async fn storage( @@ -231,10 +224,7 @@ impl BaseRpcApi for BaseRpcClient RpcResult> { - self.raw_rpc_client - .storage(storage_key, Some(block_hash)) - .await - .map_err(to_rpc_error) + self.raw_rpc_client.storage(storage_key, Some(block_hash)).await } async fn storage_pairs( @@ -242,14 +232,11 @@ impl BaseRpcApi for BaseRpcClient RpcResult> { - self.raw_rpc_client - .storage_pairs(storage_key, Some(block_hash)) - .await - .map_err(to_rpc_error) + self.raw_rpc_client.storage_pairs(storage_key, Some(block_hash)).await } async fn block(&self, block_hash: state_chain_runtime::Hash) -> RpcResult> { - self.raw_rpc_client.block(Some(block_hash)).await.map_err(to_rpc_error) + self.raw_rpc_client.block(Some(block_hash)).await } async fn block_hash( @@ -259,8 +246,7 @@ impl BaseRpcApi for BaseRpcClient BaseRpcApi for BaseRpcClient RpcResult { - Ok(self - .raw_rpc_client - .header(Some(block_hash)) - .await - .map_err(to_rpc_error)? - .expect(SUBSTRATE_BEHAVIOUR)) + Ok(self.raw_rpc_client.header(Some(block_hash)).await?.expect(SUBSTRATE_BEHAVIOUR)) } async fn latest_unfinalized_block_hash(&self) -> RpcResult { - Ok(unwrap_value(self.raw_rpc_client.block_hash(None).await.map_err(to_rpc_error)?) - .expect(SUBSTRATE_BEHAVIOUR)) + Ok(unwrap_value(self.raw_rpc_client.block_hash(None).await?).expect(SUBSTRATE_BEHAVIOUR)) } async fn latest_finalized_block_hash(&self) -> RpcResult { - self.raw_rpc_client.finalized_head().await.map_err(to_rpc_error) + self.raw_rpc_client.finalized_head().await } async fn subscribe_finalized_block_headers( &self, ) -> RpcResult> { - self.raw_rpc_client.subscribe_finalized_heads().await.map_err(to_rpc_error) + self.raw_rpc_client.subscribe_finalized_heads().await } async fn subscribe_unfinalized_block_headers( &self, ) -> RpcResult> { - self.raw_rpc_client.subscribe_new_heads().await.map_err(to_rpc_error) + self.raw_rpc_client.subscribe_new_heads().await } async fn runtime_version( &self, at: Option, ) -> RpcResult { - self.raw_rpc_client.runtime_version(at).await.map_err(to_rpc_error) + self.raw_rpc_client.runtime_version(at).await } async fn dry_run( @@ -309,10 +289,7 @@ impl BaseRpcApi for BaseRpcClient RpcResult { - self.raw_rpc_client - .dry_run(extrinsic, Some(block_hash)) - .await - .map_err(to_rpc_error) + self.raw_rpc_client.dry_run(extrinsic, Some(block_hash)).await } async fn request_raw( @@ -320,7 +297,7 @@ impl BaseRpcApi for BaseRpcClient>, ) -> RpcResult> { - self.raw_rpc_client.request(method, Params(params)).await.map_err(to_rpc_error) + self.raw_rpc_client.request(method, Params(params)).await } async fn subscribe_raw( @@ -329,10 +306,7 @@ impl BaseRpcApi for BaseRpcClient>, unsub: &str, ) -> RpcResult>> { - self.raw_rpc_client - .subscribe(sub, Params(params), unsub) - .await - .map_err(to_rpc_error) + self.raw_rpc_client.subscribe(sub, Params(params), unsub).await } async fn validate_refund_params( @@ -340,10 +314,7 @@ impl BaseRpcApi for BaseRpcClient, ) -> RpcResult<()> { - self.raw_rpc_client - .cf_validate_refund_params(retry_duration, block_hash) - .await - .map_err(to_rpc_error) + self.raw_rpc_client.cf_validate_refund_params(retry_duration, block_hash).await } async fn validate_dca_params( @@ -355,7 +326,6 @@ impl BaseRpcApi for BaseRpcClient { #[derive(Error, Debug)] pub enum DryRunError { #[error(transparent)] - RpcCallError(#[from] ErrorObjectOwned), + RpcCallError(ErrorObjectOwned), #[error("Unable to decode dry_run RPC result: {0}")] CannotDecodeReply(#[from] codec::Error), #[error("The transaction is invalid: {0}")] InvalidTransaction(#[from] TransactionValidityError), #[error("The transaction failed: {0}")] Dispatch(#[from] DispatchError), + #[error(transparent)] + RpcError(ClientError), +} + +impl From for DryRunError { + fn from(e: ClientError) -> Self { + match e { + ClientError::Call(obj) => Self::RpcCallError(obj), + e => Self::RpcError(e), + } + } } pub type ExtrinsicDetails = @@ -206,7 +217,7 @@ impl<'a, 'env, BaseRpcClient: base_rpc_api::BaseRpcApi + Send + Sync + 'static> sp_core::blake2_256(&encoded).into() }; - match self.base_rpc_client.submit_and_watch_extrinsic(signed_extrinsic).await { + match self.base_rpc_client.submit_and_watch_extrinsic(signed_extrinsic.clone()).await { Ok(mut transaction_status_stream) => { request.pending_submissions.insert(request.next_submission_id, nonce); self.submissions_by_nonce.entry(nonce).or_default().push(Submission { @@ -240,17 +251,21 @@ impl<'a, 'env, BaseRpcClient: base_rpc_api::BaseRpcApi + Send + Sync + 'static> // This occurs when a transaction with the same nonce is in the // transaction pool (and the priority is <= priority of that // existing tx) - obj if obj.code() == 1014 => { + ClientError::Call(obj) if obj.code() == 1014 => { debug!(target: "state_chain_client", request_id = request.id, "Submission failed as transaction with same nonce found in transaction pool: {obj:?}"); break Ok(Err(SubmissionLogicError::NonceTooLow)) }, // This occurs when the nonce has already been *consumed* i.e a // transaction with that nonce is in a block - obj if obj == invalid_err_obj(InvalidTransaction::Stale) => { + ClientError::Call(obj) + if obj == invalid_err_obj(InvalidTransaction::Stale) => + { debug!(target: "state_chain_client", request_id = request.id, "Submission failed as the transaction is stale: {obj:?}"); break Ok(Err(SubmissionLogicError::NonceTooLow)) }, - obj if obj == invalid_err_obj(InvalidTransaction::BadProof) => { + ClientError::Call(obj) + if obj == invalid_err_obj(InvalidTransaction::BadProof) => + { warn!(target: "state_chain_client", request_id = request.id, "Submission failed due to a bad proof: {obj:?}. Refetching the runtime version."); // TODO: Check if hash and block number should also be updated @@ -266,7 +281,12 @@ impl<'a, 'env, BaseRpcClient: base_rpc_api::BaseRpcApi + Send + Sync + 'static> self.runtime_version = new_runtime_version; }, - obj => break Err(obj.into()), + err => + break Err(anyhow!( + "Unhandled error while submitting signed extrinsic {:?}: {}", + signed_extrinsic, + err + )), } }, } diff --git a/engine/src/state_chain_observer/client/extrinsic_api/unsigned.rs b/engine/src/state_chain_observer/client/extrinsic_api/unsigned.rs index 1171bce173c..61c83493a3d 100644 --- a/engine/src/state_chain_observer/client/extrinsic_api/unsigned.rs +++ b/engine/src/state_chain_observer/client/extrinsic_api/unsigned.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use async_trait::async_trait; +use jsonrpsee::core::ClientError; use sp_core::H256; use sp_runtime::{traits::Hash, transaction_validity::InvalidTransaction}; use tokio::sync::{mpsc, oneshot}; @@ -54,7 +55,7 @@ impl UnsignedExtrinsicClient { let extrinsic = state_chain_runtime::UncheckedExtrinsic::new_unsigned(call.clone()); let expected_hash = sp_runtime::traits::BlakeTwo256::hash_of(&extrinsic); - match base_rpc_client.submit_extrinsic(extrinsic).await { + match base_rpc_client.submit_extrinsic(extrinsic.clone()).await { Ok(tx_hash) => { assert_eq!(tx_hash, expected_hash, "{SUBSTRATE_BEHAVIOUR}"); Ok(tx_hash) @@ -69,7 +70,7 @@ impl UnsignedExtrinsicClient { // that this particular extrinsic has already been // submitted. And so we can ignore the error and return // the transaction hash - obj if obj.code() == 1013 => { + ClientError::Call(obj) if obj.code() == 1013 => { tracing::debug!( "Already in pool with tx_hash: {expected_hash:#x}." ); @@ -79,17 +80,25 @@ impl UnsignedExtrinsicClient { // believe it has a similiar meaning to POOL_ALREADY_IMPORTED, // but we don't know. We believe there maybe cases where we need // to resubmit if this error occurs. - obj if obj.code() == 1012 => { + ClientError::Call(obj) if obj.code() == 1012 => { tracing::debug!( "Transaction is temporarily banned with tx_hash: {expected_hash:#x}." ); Ok(expected_hash) }, - obj if obj == invalid_err_obj(InvalidTransaction::Stale) => { + ClientError::Call(obj) + if obj == invalid_err_obj(InvalidTransaction::Stale) => + { tracing::debug!("Submission failed as the transaction is stale: {obj:?}"); Err(ExtrinsicError::Stale) }, - obj => return Err(obj.into()), + err => { + return Err(anyhow::anyhow!( + "Unhandled error while submitting unsigned extrinsic {:?}: {}", + extrinsic, + err + )); + }, } }, } diff --git a/engine/src/state_chain_observer/client/storage_api.rs b/engine/src/state_chain_observer/client/storage_api.rs index 66bf87d38ef..62b73319faf 100644 --- a/engine/src/state_chain_observer/client/storage_api.rs +++ b/engine/src/state_chain_observer/client/storage_api.rs @@ -9,11 +9,10 @@ use frame_support::{ traits::{Get, StorageInstance}, ReversibleStorageHasher, StorageHasher, }; -use jsonrpsee::core::RpcResult; use sp_core::storage::StorageKey; use utilities::context; -use super::{BlockInfo, CFE_VERSION, SUBSTRATE_BEHAVIOUR}; +use super::{BlockInfo, RpcResult, CFE_VERSION, SUBSTRATE_BEHAVIOUR}; /// This trait extracts otherwise private type information about Substrate storage double maps pub trait StorageDoubleMapAssociatedTypes { diff --git a/state-chain/custom-rpc/src/lib.rs b/state-chain/custom-rpc/src/lib.rs index cc7ffc221af..c0f6fe8305b 100644 --- a/state-chain/custom-rpc/src/lib.rs +++ b/state-chain/custom-rpc/src/lib.rs @@ -1,7 +1,7 @@ use crate::boost_pool_rpc::BoostPoolFeesRpc; use boost_pool_rpc::BoostPoolDetailsRpc; use cf_amm::{ - common::{Amount, PoolPairsMap, Side, Tick}, + common::{Amount as AmmAmount, PoolPairsMap, Side, Tick}, range_orders::Liquidity, }; use cf_chains::{ @@ -19,14 +19,19 @@ use cf_primitives::{ use cf_utilities::rpc::NumberOrHex; use core::ops::Range; use jsonrpsee::{ - core::RpcResult, proc_macros::rpc, - types::error::{ErrorObject, ErrorObjectOwned}, + types::{ + error::{ErrorObject, ErrorObjectOwned}, + ErrorCode, + }, PendingSubscriptionSink, }; use order_fills::OrderFills; use pallet_cf_governance::GovCallHash; -use pallet_cf_pools::{AskBidMap, PoolInfo, PoolLiquidity, PoolPriceV1, UnidirectionalPoolDepth}; +use pallet_cf_pools::{ + AskBidMap, PoolInfo, PoolLiquidity, PoolOrderbook, PoolOrders, PoolPriceV1, + UnidirectionalPoolDepth, +}; use pallet_cf_swapping::SwapLegInfo; use sc_client_api::{BlockchainEvents, HeaderBackend}; use serde::{Deserialize, Serialize}; @@ -46,9 +51,9 @@ use state_chain_runtime::{ PendingBroadcasts, PendingTssCeremonies, RedemptionsInfo, SolanaNonces, }, runtime_apis::{ - BoostPoolDepth, BoostPoolDetails, BrokerInfo, CustomRuntimeApi, DispatchErrorWithMessage, - ElectoralRuntimeApi, FailingWitnessValidators, LiquidityProviderBoostPoolInfo, - LiquidityProviderInfo, ValidatorInfo, + AuctionState, BoostPoolDepth, BoostPoolDetails, BrokerInfo, CustomRuntimeApi, + DispatchErrorWithMessage, ElectoralRuntimeApi, FailingWitnessValidators, + LiquidityProviderBoostPoolInfo, LiquidityProviderInfo, RuntimeApiPenalty, ValidatorInfo, }, safe_mode::RuntimeSafeMode, Hash, NetworkFee, SolanaInstance, @@ -367,6 +372,19 @@ pub struct RpcAuctionState { min_active_bid: Option, } +impl From for RpcAuctionState { + fn from(auction_state: AuctionState) -> Self { + Self { + blocks_per_epoch: auction_state.blocks_per_epoch, + current_epoch_started_at: auction_state.current_epoch_started_at, + redemption_period_as_percentage: auction_state.redemption_period_as_percentage, + min_funding: auction_state.min_funding.into(), + auction_size_range: auction_state.auction_size_range, + min_active_bid: auction_state.min_active_bid.map(|bond| bond.into()), + } + } +} + #[derive(Serialize, Deserialize, Clone)] pub struct RpcSwapOutputV1 { // Intermediary amount, if there's any @@ -388,7 +406,7 @@ impl From for RpcSwapOutputV1 { pub struct RpcFee { #[serde(flatten)] pub asset: Asset, - pub amount: Amount, + pub amount: AmmAmount, } #[derive(Serialize, Deserialize, Clone)] @@ -734,7 +752,7 @@ pub trait CustomApi { quote_asset: Asset, tick_range: Range, at: Option, - ) -> RpcResult>; + ) -> RpcResult>; #[method(name = "pool_orderbook")] fn cf_pool_orderbook( &self, @@ -782,7 +800,7 @@ pub trait CustomApi { tick_range: Range, liquidity: Liquidity, at: Option, - ) -> RpcResult>; + ) -> RpcResult>; #[method(name = "funding_environment")] fn cf_funding_environment( &self, @@ -856,12 +874,14 @@ pub trait CustomApi { fn cf_failed_call_ethereum( &self, broadcast_id: BroadcastId, + at: Option, ) -> RpcResult::Transaction>>; #[method(name = "failed_call_arbitrum")] fn cf_failed_call_arbitrum( &self, broadcast_id: BroadcastId, + at: Option, ) -> RpcResult::Transaction>>; #[method(name = "witness_count")] @@ -946,6 +966,23 @@ where } } +impl CustomRpc +where + B: BlockT, + C: Send + Sync + 'static + HeaderBackend + sp_api::ProvideRuntimeApi, +{ + fn with_runtime_api( + &self, + at: Option, + f: impl FnOnce(&C::Api, Hash) -> Result, + ) -> RpcResult + where + CfApiError: From, + { + Ok(f(&*self.client.runtime_api(), self.unwrap_or_best(at))?) + } +} + pub struct StorageQueryApi<'a, C, B>(&'a C, PhantomData); impl<'a, C, B> StorageQueryApi<'a, C, B> @@ -1008,30 +1045,98 @@ where hash: ::Hash, f: impl Fn() -> R, ) -> RpcResult { - Ok(self.0.state_at(hash).map_err(to_rpc_error)?.inspect_state(f)) + Ok(self.0.state_at(hash)?.inspect_state(f)) } } -pub fn str_to_rpc_error(e: &str) -> ErrorObjectOwned { - ErrorObject::owned(jsonrpsee::types::error::CALL_EXECUTION_FAILED_CODE, e, Option::<()>::None) +#[derive(thiserror::Error, Debug)] +pub enum CfApiError { + #[error(transparent)] + ClientError(#[from] jsonrpsee::core::client::Error), + #[error("{0:?}")] + DispatchError(#[from] DispatchErrorWithMessage), + #[error("{0:?}")] + RuntimeApiError(#[from] ApiError), + #[error(transparent)] + ErrorObject(#[from] ErrorObjectOwned), } - -pub fn to_rpc_error(e: E) -> ErrorObjectOwned { - str_to_rpc_error(&format!("{}", e)[..]) +pub type RpcResult = Result; + +fn internal_error(error: impl core::fmt::Debug) -> ErrorObjectOwned { + log::error!(target: "cf_rpc", "Internal error: {:?}", error); + ErrorObject::owned( + ErrorCode::InternalError.code(), + "Internal error while processing request.", + None::<()>, + ) +} +fn call_error(error: impl Into>) -> ErrorObjectOwned { + let error = error.into(); + log::debug!(target: "cf_rpc", "Call error: {}", error); + ErrorObject::owned(ErrorCode::InternalError.code(), "{error}", None::<()>) } -fn map_dispatch_error(e: DispatchErrorWithMessage) -> ErrorObjectOwned { - str_to_rpc_error( - &(match e { - DispatchErrorWithMessage::Module(message) => match std::str::from_utf8(&message) { - Ok(message) => format!("DispatchError: {message}"), - Err(error) => - format!("DispatchError: Unable to deserialize error message: '{error}'"), +impl From for ErrorObjectOwned { + fn from(error: CfApiError) -> Self { + match error { + CfApiError::ClientError(client_error) => match client_error { + jsonrpsee::core::client::Error::Call(obj) => obj, + other => internal_error(other), }, - DispatchErrorWithMessage::Other(e) => - format!("DispatchError: {}", <&'static str>::from(e)), - })[..], - ) + CfApiError::DispatchError(dispatch_error) => match dispatch_error { + DispatchErrorWithMessage::Module(message) => match std::str::from_utf8(&message) { + Ok(message) => call_error(std::format!("DispatchError: {message}")), + Err(error) => + internal_error(format!("Unable to decode DispatchError: {error}")), + }, + DispatchErrorWithMessage::Other(error) => + internal_error(format!("Unable to decode DispatchError: {error:?}")), + }, + CfApiError::RuntimeApiError(error) => match error { + ApiError::Application(error) => call_error(format!("Application error: {error}")), + ApiError::UnknownBlock(error) => call_error(format!("Unknown block: {error}")), + other => internal_error(format!("Unexpected ApiError: {other}")), + }, + CfApiError::ErrorObject(object) => object, + } + } +} + +#[macro_export] +macro_rules! pass_through { + ($( $name:ident ( $( $arg:ident: $argt:ty ),* $(,)? ) -> $result_type:ty $([map: $mapping:expr])? ),+ $(,)?) => { + $( + fn $name(&self, $( $arg: $argt, )* at: Option,) -> RpcResult<$result_type> { + self.with_runtime_api(at, |api, hash| api.$name(hash, $($arg),* )) + $(.map($mapping))? + } + )+ + }; +} + +#[macro_export] +macro_rules! pass_through_and_flatten { + ($( $name:ident ( $( $arg:ident: $argt:ty ),* $(,)? ) -> $result_type:ty $([map: $mapping:expr])? ),+ $(,)?) => { + $( + fn $name(&self, $( $arg: $argt, )* at: Option,) -> RpcResult<$result_type> { + flatten_into_error( + self.with_runtime_api(at, |api, hash| api.$name(hash, $($arg),* )) + $(.map($mapping))? + ) + } + )+ + }; +} + +fn flatten_into_error(res: Result, E2>) -> Result +where + E2: From, +{ + match res.map(|inner| inner.map_err(Into::into)) { + Ok(Ok(r)) => Ok(r), + Ok(Err(e)) => Err(e), + Err(e) => Err(e), + } } impl CustomApiServer for CustomRpc @@ -1046,130 +1151,124 @@ where + CallApiAt, C::Api: CustomRuntimeApi + ElectoralRuntimeApi, { - fn cf_is_auction_phase(&self, at: Option<::Hash>) -> RpcResult { - self.client - .runtime_api() - .cf_is_auction_phase(self.unwrap_or_best(at)) - .map_err(to_rpc_error) - } - fn cf_eth_flip_token_address(&self, at: Option<::Hash>) -> RpcResult { - self.client - .runtime_api() - .cf_eth_flip_token_address(self.unwrap_or_best(at)) - .map_err(to_rpc_error) - .map(hex::encode) - } - fn cf_eth_state_chain_gateway_address( - &self, - at: Option<::Hash>, - ) -> RpcResult { - self.client - .runtime_api() - .cf_eth_state_chain_gateway_address(self.unwrap_or_best(at)) - .map_err(to_rpc_error) - .map(hex::encode) - } - fn cf_eth_key_manager_address(&self, at: Option<::Hash>) -> RpcResult { - self.client - .runtime_api() - .cf_eth_key_manager_address(self.unwrap_or_best(at)) - .map_err(to_rpc_error) - .map(hex::encode) - } - fn cf_eth_chain_id(&self, at: Option<::Hash>) -> RpcResult { - self.client - .runtime_api() - .cf_eth_chain_id(self.unwrap_or_best(at)) - .map_err(to_rpc_error) - } - fn cf_eth_vault(&self, at: Option<::Hash>) -> RpcResult<(String, u32)> { - self.client - .runtime_api() - .cf_eth_vault(self.unwrap_or_best(at)) - .map(|(public_key, active_from_block)| (hex::encode(public_key), active_from_block)) - .map_err(to_rpc_error) - } - // FIXME: Respect the block hash argument here - fn cf_tx_fee_multiplier(&self, _at: Option<::Hash>) -> RpcResult { - Ok(TX_FEE_MULTIPLIER as u64) - } - fn cf_auction_parameters(&self, at: Option<::Hash>) -> RpcResult<(u32, u32)> { - self.client - .runtime_api() - .cf_auction_parameters(self.unwrap_or_best(at)) - .map_err(to_rpc_error) - } - fn cf_min_funding(&self, at: Option<::Hash>) -> RpcResult { - self.client - .runtime_api() - .cf_min_funding(self.unwrap_or_best(at)) - .map_err(to_rpc_error) - .map(Into::into) - } - fn cf_current_epoch(&self, at: Option<::Hash>) -> RpcResult { - self.client - .runtime_api() - .cf_current_epoch(self.unwrap_or_best(at)) - .map_err(to_rpc_error) + pass_through! { + cf_is_auction_phase() -> bool, + cf_eth_flip_token_address() -> String [map: hex::encode], + cf_eth_state_chain_gateway_address() -> String [map: hex::encode], + cf_eth_key_manager_address() -> String [map: hex::encode], + cf_eth_chain_id() -> u64, + cf_eth_vault() -> (String, u32) [map: |(public_key, active_from_block)| (hex::encode(public_key), active_from_block)], + cf_auction_parameters() -> (u32, u32), + cf_min_funding() -> NumberOrHex [map: Into::into], + cf_current_epoch() -> u32, + cf_epoch_duration() -> u32, + cf_current_epoch_started_at() -> u32, + cf_authority_emission_per_block() -> NumberOrHex [map: Into::into], + cf_backup_emission_per_block() -> NumberOrHex [map: Into::into], + cf_flip_supply() -> (NumberOrHex, NumberOrHex) [map: |(issuance, offchain_supply)| (issuance.into(), offchain_supply.into())], + cf_accounts() -> Vec<(state_chain_runtime::AccountId, String)> [map: |accounts| { + accounts + .into_iter() + .map(|(account_id, vanity_name_bytes)| { + // we can use from_utf8_lossy here because we're guaranteed utf8 when we + // save the vanity name on the chain + (account_id, String::from_utf8_lossy(&vanity_name_bytes).into_owned()) + }) + .collect() + }], + cf_free_balances(account_id: state_chain_runtime::AccountId) -> AssetMap [map: |asset_map| asset_map.map(Into::into)], + cf_lp_total_balances(account_id: state_chain_runtime::AccountId) -> any::AssetMap [map: |asset_map| asset_map.map(Into::into)], + cf_penalties() -> Vec<(Offence, RpcPenalty)> [map: |penalties| { + penalties + .into_iter() + .map(|(offence, RuntimeApiPenalty {reputation_points,suspension_duration_blocks})| ( + offence, + RpcPenalty { + reputation_points, + suspension_duration_blocks, + }) + ) + .collect() + }], + cf_suspensions() -> RpcSuspensions, + cf_generate_gov_key_call_hash(call: Vec) -> GovCallHash, + cf_auction_state() -> RpcAuctionState [map: Into::into], + cf_safe_mode_statuses() -> RuntimeSafeMode, + cf_failed_call_ethereum(broadcast_id: BroadcastId) -> Option<::Transaction>, + cf_failed_call_arbitrum(broadcast_id: BroadcastId) -> Option<::Transaction>, + cf_boost_pools_depth() -> Vec, + cf_pool_price(from_asset: Asset, to_asset: Asset) -> Option, + } + + pass_through_and_flatten! { + cf_required_asset_ratio_for_range_order(base_asset: Asset, quote_asset: Asset, tick_range: Range) -> PoolPairsMap, + cf_pool_orderbook(base_asset: Asset, quote_asset: Asset, orders: u32) -> PoolOrderbook, + cf_pool_info(base_asset: Asset, quote_asset: Asset) -> PoolInfo, + cf_pool_depth(base_asset: Asset, quote_asset: Asset, tick_range: Range) -> AskBidMap, + cf_pool_liquidity(base_asset: Asset, quote_asset: Asset) -> PoolLiquidity, + cf_pool_range_order_liquidity_value( + base_asset: Asset, + quote_asset: Asset, + tick_range: Range, + liquidity: Liquidity, + ) -> PoolPairsMap, + cf_validate_dca_params(number_of_chunks: u32, chunk_interval: u32) -> (), + cf_validate_refund_params(retry_duration: u32) -> (), } - fn cf_epoch_duration(&self, at: Option<::Hash>) -> RpcResult { - self.client - .runtime_api() - .cf_epoch_duration(self.unwrap_or_best(at)) - .map_err(to_rpc_error) + + fn cf_current_compatibility_version(&self) -> RpcResult { + #[allow(deprecated)] + self.with_runtime_api(None, |api, hash| api.cf_current_compatibility_version(hash)) } - fn cf_current_epoch_started_at(&self, at: Option<::Hash>) -> RpcResult { - self.client - .runtime_api() - .cf_current_epoch_started_at(self.unwrap_or_best(at)) - .map_err(to_rpc_error) + + fn cf_max_swap_amount(&self, asset: Asset) -> RpcResult> { + self.with_runtime_api(None, |api, hash| api.cf_max_swap_amount(hash, asset)) } - fn cf_authority_emission_per_block( - &self, - at: Option<::Hash>, - ) -> RpcResult { - self.client - .runtime_api() - .cf_authority_emission_per_block(self.unwrap_or_best(at)) - .map_err(to_rpc_error) - .map(Into::into) + + fn cf_tx_fee_multiplier(&self, _at: Option) -> RpcResult { + Ok(TX_FEE_MULTIPLIER as u64) } - fn cf_backup_emission_per_block( + + fn cf_witness_count( &self, - at: Option<::Hash>, - ) -> RpcResult { - self.client - .runtime_api() - .cf_backup_emission_per_block(self.unwrap_or_best(at)) - .map_err(to_rpc_error) - .map(Into::into) + call_hash: Hash, + epoch_index: Option, + at: Option, + ) -> RpcResult> { + self.with_runtime_api(at, |api, block_hash| { + api.cf_witness_count( + block_hash, + pallet_cf_witnesser::CallHash(call_hash.into()), + epoch_index, + ) + }) } - fn cf_flip_supply( + + fn cf_pool_orders( &self, - at: Option<::Hash>, - ) -> RpcResult<(NumberOrHex, NumberOrHex)> { - self.client - .runtime_api() - .cf_flip_supply(self.unwrap_or_best(at)) - .map_err(to_rpc_error) - .map(|(issuance, offchain)| (issuance.into(), offchain.into())) + base_asset: Asset, + quote_asset: Asset, + lp: Option, + filled_orders: Option, + at: Option, + ) -> RpcResult> { + flatten_into_error(self.with_runtime_api(at, |api, hash| { + api.cf_pool_orders(hash, base_asset, quote_asset, lp, filled_orders.unwrap_or_default()) + })) } - fn cf_accounts( + fn cf_pool_price_v2( &self, - at: Option<::Hash>, - ) -> RpcResult> { - Ok(self - .client - .runtime_api() - .cf_accounts(self.unwrap_or_best(at)) - .map_err(to_rpc_error)? - .into_iter() - .map(|(account_id, vanity_name_bytes)| { - // we can use from_utf8_lossy here because we're guaranteed utf8 when we - // save the vanity name on the chain - (account_id, String::from_utf8_lossy(&vanity_name_bytes).into_owned()) + base_asset: Asset, + quote_asset: Asset, + at: Option, + ) -> RpcResult { + self.with_runtime_api(at, |api, hash| { + Ok::<_, CfApiError>(PoolPriceV2 { + base_asset, + quote_asset, + price: api.cf_pool_price_v2(hash, base_asset, quote_asset)??, }) - .collect()) + }) } fn cf_account_info( @@ -1177,41 +1276,33 @@ where account_id: state_chain_runtime::AccountId, at: Option, ) -> RpcResult { - let api = self.client.runtime_api(); - - let hash = self.unwrap_or_best(at); - - let balance = api.cf_account_flip_balance(hash, &account_id).map_err(to_rpc_error)?; - - Ok( - match api - .cf_account_role(hash, account_id.clone()) - .map_err(to_rpc_error)? - .unwrap_or(AccountRole::Unregistered) - { - AccountRole::Unregistered => RpcAccountInfo::unregistered(balance), - AccountRole::Broker => { - let info = api.cf_broker_info(hash, account_id).map_err(to_rpc_error)?; + self.with_runtime_api(at, |api, hash| { + let balance = api.cf_account_flip_balance(hash, &account_id)?; + + Ok::<_, CfApiError>( + match api + .cf_account_role(hash, account_id.clone())? + .unwrap_or(AccountRole::Unregistered) + { + AccountRole::Unregistered => RpcAccountInfo::unregistered(balance), + AccountRole::Broker => { + let info = api.cf_broker_info(hash, account_id)?; + + RpcAccountInfo::broker(balance, info) + }, + AccountRole::LiquidityProvider => { + let info = api.cf_liquidity_provider_info(hash, account_id)?; - RpcAccountInfo::broker(balance, info) - }, - AccountRole::LiquidityProvider => { - let info = - api.cf_liquidity_provider_info(hash, account_id).map_err(to_rpc_error)?; - - RpcAccountInfo::lp( - info, - api.cf_network_environment(hash).map_err(to_rpc_error)?, - balance, - ) - }, - AccountRole::Validator => { - let info = api.cf_validator_info(hash, &account_id).map_err(to_rpc_error)?; + RpcAccountInfo::lp(info, api.cf_network_environment(hash)?, balance) + }, + AccountRole::Validator => { + let info = api.cf_validator_info(hash, &account_id)?; - RpcAccountInfo::validator(info) + RpcAccountInfo::validator(info) + }, }, - }, - ) + ) + }) } fn cf_account_info_v2( @@ -1219,11 +1310,8 @@ where account_id: state_chain_runtime::AccountId, at: Option<::Hash>, ) -> RpcResult { - let account_info = self - .client - .runtime_api() - .cf_validator_info(self.unwrap_or_best(at), &account_id) - .map_err(to_rpc_error)?; + let account_info = + self.with_runtime_api(at, |api, hash| api.cf_validator_info(hash, &account_id))?; Ok(RpcAccountInfoV2 { balance: account_info.balance.into(), @@ -1242,117 +1330,6 @@ where }) } - fn cf_free_balances( - &self, - account_id: state_chain_runtime::AccountId, - at: Option, - ) -> RpcResult> { - self.client - .runtime_api() - .cf_free_balances(self.unwrap_or_best(at), account_id) - .map(|asset_map| asset_map.map(Into::into)) - .map_err(to_rpc_error) - } - - fn cf_lp_total_balances( - &self, - account_id: state_chain_runtime::AccountId, - at: Option, - ) -> RpcResult> { - self.client - .runtime_api() - .cf_lp_total_balances(self.unwrap_or_best(at), account_id) - .map(|asset_map| asset_map.map(Into::into)) - .map_err(to_rpc_error) - } - - fn cf_penalties( - &self, - at: Option<::Hash>, - ) -> RpcResult> { - Ok(self - .client - .runtime_api() - .cf_penalties(self.unwrap_or_best(at)) - .map_err(to_rpc_error)? - .iter() - .map(|(offence, runtime_api_penalty)| { - ( - *offence, - RpcPenalty { - reputation_points: runtime_api_penalty.reputation_points, - suspension_duration_blocks: runtime_api_penalty.suspension_duration_blocks, - }, - ) - }) - .collect()) - } - fn cf_suspensions(&self, at: Option<::Hash>) -> RpcResult { - self.client - .runtime_api() - .cf_suspensions(self.unwrap_or_best(at)) - .map_err(to_rpc_error) - } - - fn cf_generate_gov_key_call_hash( - &self, - call: Vec, - at: Option<::Hash>, - ) -> RpcResult { - self.client - .runtime_api() - .cf_generate_gov_key_call_hash(self.unwrap_or_best(at), call) - .map_err(to_rpc_error) - } - - fn cf_auction_state(&self, at: Option<::Hash>) -> RpcResult { - let auction_state = self - .client - .runtime_api() - .cf_auction_state(self.unwrap_or_best(at)) - .map_err(to_rpc_error)?; - - Ok(RpcAuctionState { - blocks_per_epoch: auction_state.blocks_per_epoch, - current_epoch_started_at: auction_state.current_epoch_started_at, - redemption_period_as_percentage: auction_state.redemption_period_as_percentage, - min_funding: auction_state.min_funding.into(), - auction_size_range: auction_state.auction_size_range, - min_active_bid: auction_state.min_active_bid.map(|bond| bond.into()), - }) - } - - fn cf_pool_price( - &self, - from_asset: Asset, - to_asset: Asset, - at: Option, - ) -> RpcResult> { - self.client - .runtime_api() - .cf_pool_price(self.unwrap_or_best(at), from_asset, to_asset) - .map_err(to_rpc_error) - } - - fn cf_pool_price_v2( - &self, - base_asset: Asset, - quote_asset: Asset, - at: Option, - ) -> RpcResult { - let hash = self.unwrap_or_best(at); - Ok(PoolPriceV2 { - base_asset, - quote_asset, - price: self - .client - .runtime_api() - .cf_pool_price_v2(hash, base_asset, quote_asset) - .map_err(to_rpc_error) - .and_then(|result| result.map_err(map_dispatch_error))?, - }) - } - fn cf_pool_swap_rate( &self, from_asset: Asset, @@ -1372,27 +1349,30 @@ where additional_orders: Option>, at: Option, ) -> RpcResult { - self.client - .runtime_api() - .cf_pool_simulate_swap( - self.unwrap_or_best(at), - from_asset, - to_asset, - amount - .try_into() - .and_then(|amount| { - if amount == 0 { - Err("Swap input amount cannot be zero.") - } else { - Ok(amount) - } - }) - .map_err(str_to_rpc_error)?, - additional_orders.map(|additional_orders| { - additional_orders - .into_iter() - .map(|additional_order| { - match additional_order { + self.with_runtime_api(at, |api, hash| { + Ok::<_, CfApiError>( + api.cf_pool_simulate_swap( + hash, + from_asset, + to_asset, + amount + .try_into() + .map_err(|_| "Swap input amount too large.") + .and_then(|amount| { + if amount == 0 { + Err("Swap input amount cannot be zero.") + } else { + Ok(amount) + } + }) + .map_err(|s| { + ErrorObject::owned(ErrorCode::InvalidParams.code(), s, None::<()>) + })?, + additional_orders.map(|additional_orders| { + additional_orders + .into_iter() + .map(|additional_order| { + match additional_order { SwapRateV2AdditionalOrder::LimitOrder { base_asset, quote_asset, @@ -1407,203 +1387,59 @@ where sell_amount: sell_amount.unique_saturated_into(), } } - }) - .collect() - }), - ) - .map_err(to_rpc_error) - .and_then(|result| result.map_err(map_dispatch_error)) - .map(|simulated_swap_info| RpcSwapOutputV2 { - intermediary: simulated_swap_info.intermediary.map(Into::into), - output: simulated_swap_info.output.into(), - network_fee: RpcFee { - asset: cf_primitives::STABLE_ASSET, - amount: simulated_swap_info.network_fee.into(), - }, - ingress_fee: RpcFee { - asset: from_asset, - amount: simulated_swap_info.ingress_fee.into(), - }, - egress_fee: RpcFee { - asset: to_asset, - amount: simulated_swap_info.egress_fee.into(), - }, - }) - } - - fn cf_pool_info( - &self, - base_asset: Asset, - quote_asset: Asset, - at: Option, - ) -> RpcResult { - self.client - .runtime_api() - .cf_pool_info(self.unwrap_or_best(at), base_asset, quote_asset) - .map_err(to_rpc_error) - .and_then(|result| result.map_err(map_dispatch_error)) - } - - fn cf_pool_depth( - &self, - base_asset: Asset, - quote_asset: Asset, - tick_range: Range, - at: Option, - ) -> RpcResult> { - self.client - .runtime_api() - .cf_pool_depth(self.unwrap_or_best(at), base_asset, quote_asset, tick_range) - .map_err(to_rpc_error) - .and_then(|result| result.map_err(map_dispatch_error)) - } - - fn cf_boost_pools_depth( - &self, - at: Option, - ) -> RpcResult> { - self.client - .runtime_api() - .cf_boost_pools_depth(self.unwrap_or_best(at)) - .map_err(to_rpc_error) - } - - fn cf_pool_liquidity( - &self, - base_asset: Asset, - quote_asset: Asset, - at: Option, - ) -> RpcResult { - self.client - .runtime_api() - .cf_pool_liquidity(self.unwrap_or_best(at), base_asset, quote_asset) - .map_err(to_rpc_error) - .and_then(|result| result.map_err(map_dispatch_error)) - } - - fn cf_required_asset_ratio_for_range_order( - &self, - base_asset: Asset, - quote_asset: Asset, - tick_range: Range, - at: Option, - ) -> RpcResult> { - self.client - .runtime_api() - .cf_required_asset_ratio_for_range_order( - self.unwrap_or_best(at), - base_asset, - quote_asset, - tick_range, - ) - .map_err(to_rpc_error) - .and_then(|result| result.map_err(map_dispatch_error)) - } - - fn cf_pool_orderbook( - &self, - base_asset: Asset, - quote_asset: Asset, - orders: u32, - at: Option, - ) -> RpcResult { - self.client - .runtime_api() - .cf_pool_orderbook(self.unwrap_or_best(at), base_asset, quote_asset, orders) - .map_err(to_rpc_error) - .and_then(|result| result.map(Into::into).map_err(map_dispatch_error)) - } - - fn cf_pool_orders( - &self, - base_asset: Asset, - quote_asset: Asset, - lp: Option, - filled_orders: Option, - at: Option, - ) -> RpcResult> { - self.client - .runtime_api() - .cf_pool_orders( - self.unwrap_or_best(at), - base_asset, - quote_asset, - lp, - filled_orders.unwrap_or(false), - ) - .map_err(to_rpc_error) - .and_then(|result| result.map_err(map_dispatch_error)) - } - - fn cf_pool_range_order_liquidity_value( - &self, - base_asset: Asset, - quote_asset: Asset, - tick_range: Range, - liquidity: Liquidity, - at: Option, - ) -> RpcResult> { - self.client - .runtime_api() - .cf_pool_range_order_liquidity_value( - self.unwrap_or_best(at), - base_asset, - quote_asset, - tick_range, - liquidity, + }) + .collect() + }), + )? + .map(|simulated_swap_info| RpcSwapOutputV2 { + intermediary: simulated_swap_info.intermediary.map(Into::into), + output: simulated_swap_info.output.into(), + network_fee: RpcFee { + asset: cf_primitives::STABLE_ASSET, + amount: simulated_swap_info.network_fee.into(), + }, + ingress_fee: RpcFee { + asset: from_asset, + amount: simulated_swap_info.ingress_fee.into(), + }, + egress_fee: RpcFee { + asset: to_asset, + amount: simulated_swap_info.egress_fee.into(), + }, + })?, ) - .map_err(to_rpc_error) - .and_then(|result| result.map_err(map_dispatch_error)) + }) } fn cf_ingress_egress_environment( &self, at: Option, ) -> RpcResult { - let runtime_api = &self.client.runtime_api(); - let hash = self.unwrap_or_best(at); - - let mut witness_safety_margins = HashMap::new(); - let mut channel_opening_fees = HashMap::new(); - - for chain in ForeignChain::iter() { - witness_safety_margins.insert( - chain, - runtime_api.cf_witness_safety_margin(hash, chain).map_err(to_rpc_error)?, - ); - channel_opening_fees.insert( - chain, - runtime_api.cf_channel_opening_fee(hash, chain).map_err(to_rpc_error)?.into(), - ); - } + self.with_runtime_api(at, |api, hash| { + let mut witness_safety_margins = HashMap::new(); + let mut channel_opening_fees = HashMap::new(); - Ok(IngressEgressEnvironment { - minimum_deposit_amounts: any::AssetMap::try_from_fn(|asset| { - runtime_api - .cf_min_deposit_amount(hash, asset) - .map_err(to_rpc_error) - .map(Into::into) - })?, - ingress_fees: any::AssetMap::try_from_fn(|asset| { - runtime_api - .cf_ingress_fee(hash, asset) - .map_err(to_rpc_error) - .map(|value| value.map(Into::into)) - })?, - egress_fees: any::AssetMap::try_from_fn(|asset| { - runtime_api - .cf_egress_fee(hash, asset) - .map_err(to_rpc_error) - .map(|value| value.map(Into::into)) - })?, - witness_safety_margins, - egress_dust_limits: any::AssetMap::try_from_fn(|asset| { - runtime_api - .cf_egress_dust_limit(hash, asset) - .map_err(to_rpc_error) - .map(Into::into) - })?, - channel_opening_fees, + for chain in ForeignChain::iter() { + witness_safety_margins.insert(chain, api.cf_witness_safety_margin(hash, chain)?); + channel_opening_fees.insert(chain, api.cf_channel_opening_fee(hash, chain)?.into()); + } + + Ok::<_, CfApiError>(IngressEgressEnvironment { + minimum_deposit_amounts: any::AssetMap::try_from_fn(|asset| { + api.cf_min_deposit_amount(hash, asset).map(Into::into) + })?, + ingress_fees: any::AssetMap::try_from_fn(|asset| { + api.cf_ingress_fee(hash, asset).map(|value| value.map(Into::into)) + })?, + egress_fees: any::AssetMap::try_from_fn(|asset| { + api.cf_egress_fee(hash, asset).map(|value| value.map(Into::into)) + })?, + witness_safety_margins, + egress_dust_limits: any::AssetMap::try_from_fn(|asset| { + api.cf_egress_dust_limit(hash, asset).map(Into::into) + })?, + channel_opening_fees, + }) }) } @@ -1611,28 +1447,20 @@ where &self, at: Option, ) -> RpcResult { - let runtime_api = &self.client.runtime_api(); - let hash = self.unwrap_or_best(at); - let swap_limits = runtime_api.cf_swap_limits(hash).map_err(to_rpc_error)?; - Ok(SwappingEnvironment { - maximum_swap_amounts: any::AssetMap::try_from_fn(|asset| { - runtime_api - .cf_max_swap_amount(hash, asset) - .map_err(to_rpc_error) - .map(|option| option.map(Into::into)) - })?, - network_fee_hundredth_pips: NetworkFee::get(), - swap_retry_delay_blocks: runtime_api - .cf_swap_retry_delay_blocks(hash) - .map_err(to_rpc_error)?, - max_swap_retry_duration_blocks: swap_limits.max_swap_retry_duration_blocks, - max_swap_request_duration_blocks: swap_limits.max_swap_request_duration_blocks, - minimum_chunk_size: any::AssetMap::try_from_fn(|asset| { - runtime_api - .cf_minimum_chunk_size(hash, asset) - .map_err(to_rpc_error) - .map(Into::into) - })?, + self.with_runtime_api(at, |api, hash| { + let swap_limits = api.cf_swap_limits(hash)?; + Ok::<_, CfApiError>(SwappingEnvironment { + maximum_swap_amounts: any::AssetMap::try_from_fn(|asset| { + api.cf_max_swap_amount(hash, asset).map(|option| option.map(Into::into)) + })?, + network_fee_hundredth_pips: NetworkFee::get(), + swap_retry_delay_blocks: api.cf_swap_retry_delay_blocks(hash)?, + max_swap_retry_duration_blocks: swap_limits.max_swap_retry_duration_blocks, + max_swap_request_duration_blocks: swap_limits.max_swap_request_duration_blocks, + minimum_chunk_size: any::AssetMap::try_from_fn(|asset| { + api.cf_minimum_chunk_size(hash, asset).map(Into::into) + })?, + }) }) } @@ -1640,12 +1468,11 @@ where &self, at: Option, ) -> RpcResult { - let runtime_api = &self.client.runtime_api(); - let hash = self.unwrap_or_best(at); - - Ok(FundingEnvironment { - redemption_tax: runtime_api.cf_redemption_tax(hash).map_err(to_rpc_error)?.into(), - minimum_funding_amount: runtime_api.cf_min_funding(hash).map_err(to_rpc_error)?.into(), + self.with_runtime_api(at, |api, hash| { + Ok::<_, CfApiError>(FundingEnvironment { + redemption_tax: api.cf_redemption_tax(hash)?.into(), + minimum_funding_amount: api.cf_min_funding(hash)?.into(), + }) }) } @@ -1653,22 +1480,19 @@ where &self, at: Option, ) -> RpcResult { - Ok(PoolsEnvironment { - fees: { - let mut map = AssetMap::default(); - self.client - .runtime_api() - .cf_pools(self.unwrap_or_best(at)) - .map_err(to_rpc_error)? - .iter() - .for_each(|asset_pair| { + self.with_runtime_api(at, |api, hash| { + Ok::<_, CfApiError>(PoolsEnvironment { + fees: { + let mut map = AssetMap::default(); + for asset_pair in api.cf_pools(hash)? { map[asset_pair.base] = self .cf_pool_info(asset_pair.base, asset_pair.quote, at) .ok() .map(Into::into); - }); - map - }, + } + map + }, + }) }) } @@ -1681,21 +1505,6 @@ where }) } - fn cf_current_compatibility_version(&self) -> RpcResult { - #[allow(deprecated)] - self.client - .runtime_api() - .cf_current_compatibility_version(self.unwrap_or_best(None)) - .map_err(to_rpc_error) - } - - fn cf_max_swap_amount(&self, asset: Asset) -> RpcResult> { - self.client - .runtime_api() - .cf_max_swap_amount(self.unwrap_or_best(None), asset) - .map_err(to_rpc_error) - } - fn cf_subscribe_pool_price( &self, pending_sink: PendingSubscriptionSink, @@ -1706,7 +1515,9 @@ where true, /* only_on_changes */ false, /* end_on_error */ pending_sink, - move |client, hash| client.runtime_api().cf_pool_price(hash, from_asset, to_asset), + move |client, hash| { + Ok((*client.runtime_api()).cf_pool_price(hash, from_asset, to_asset)?) + }, ) } @@ -1721,12 +1532,15 @@ where true, /* end_on_error */ pending_sink, move |client, hash| { - client - .runtime_api() - .cf_pool_price_v2(hash, base_asset, quote_asset) - .map_err(to_rpc_error) - .and_then(|result| result.map_err(map_dispatch_error)) - .map(|price| PoolPriceV2 { base_asset, quote_asset, price }) + Ok(PoolPriceV2 { + base_asset, + quote_asset, + price: (*client.runtime_api()).cf_pool_price_v2( + hash, + base_asset, + quote_asset, + )??, + }) }, ) } @@ -1751,9 +1565,8 @@ where true, /* end_on_error */ pending_sink, move |client, hash| { - Ok::<_, ApiError>(SwapResponse { - swaps: client - .runtime_api() + Ok(SwapResponse { + swaps: (*client.runtime_api()) .cf_scheduled_swaps(hash, base_asset, quote_asset)? .into_iter() .map(|(swap, execute_at)| ScheduledSwap::new(swap, execute_at)) @@ -1770,20 +1583,19 @@ where at: Option, ) -> RpcResult> { // Check that the requested pool exists: - self.client - .runtime_api() - .cf_pool_info(self.client.info().best_hash, base_asset, quote_asset) - .map_err(to_rpc_error) - .and_then(|result| result.map_err(map_dispatch_error))?; - - Ok(self - .client - .runtime_api() - .cf_scheduled_swaps(self.unwrap_or_best(at), base_asset, quote_asset) - .map_err(to_rpc_error)? - .into_iter() - .map(|(swap, execute_at)| ScheduledSwap::new(swap, execute_at)) - .collect()) + let _ = (*self.client.runtime_api()).cf_pool_info( + self.client.info().best_hash, + base_asset, + quote_asset, + )?; + + self.with_runtime_api(at, |api, hash| api.cf_scheduled_swaps(hash, base_asset, quote_asset)) + .map(|swaps| { + swaps + .into_iter() + .map(|(swap, execute_at)| ScheduledSwap::new(swap, execute_at)) + .collect() + }) } fn cf_subscribe_prewitness_swaps( @@ -1798,14 +1610,12 @@ where true, /* end_on_error */ pending_sink, move |client, hash| { - Ok::<_, ErrorObjectOwned>(RpcPrewitnessedSwap { + Ok::<_, CfApiError>(RpcPrewitnessedSwap { base_asset, quote_asset, side, - amounts: client - .runtime_api() - .cf_prewitness_swaps(hash, base_asset, quote_asset, side) - .map_err(to_rpc_error)? + amounts: (*client.runtime_api()) + .cf_prewitness_swaps(hash, base_asset, quote_asset, side)? .into_iter() .map(|s| s.into()) .collect(), @@ -1828,8 +1638,7 @@ where amounts: self .client .runtime_api() - .cf_prewitness_swaps(self.unwrap_or_best(at), base_asset, quote_asset, side) - .map_err(to_rpc_error)? + .cf_prewitness_swaps(self.unwrap_or_best(at), base_asset, quote_asset, side)? .into_iter() .map(|s| s.into()) .collect(), @@ -1847,8 +1656,7 @@ where let fills = prev_pools .map(|prev_pools| { - let pools_events = - client.runtime_api().cf_lp_events(hash).map_err(to_rpc_error)?; + let pools_events = client.runtime_api().cf_lp_events(hash)?; RpcResult::Ok(order_fills::order_fills_from_block_updates( prev_pools, @@ -1868,58 +1676,20 @@ where Ok(Asset::all().collect()) } - fn cf_failed_call_ethereum( - &self, - broadcast_id: BroadcastId, - ) -> RpcResult::Transaction>> { - self.client - .runtime_api() - .cf_failed_call_ethereum(self.unwrap_or_best(None), broadcast_id) - .map_err(to_rpc_error) - } - - fn cf_failed_call_arbitrum( - &self, - broadcast_id: BroadcastId, - ) -> RpcResult::Transaction>> { - self.client - .runtime_api() - .cf_failed_call_arbitrum(self.unwrap_or_best(None), broadcast_id) - .map_err(to_rpc_error) - } - - fn cf_witness_count( - &self, - hash: state_chain_runtime::Hash, - epoch_index: Option, - at: Option, - ) -> RpcResult> { - self.client - .runtime_api() - .cf_witness_count( - self.unwrap_or_best(at), - pallet_cf_witnesser::CallHash(hash.into()), - epoch_index, - ) - .map_err(to_rpc_error) - } - fn cf_boost_pool_details( &self, asset: Option, at: Option, ) -> RpcResult { execute_for_all_or_one_asset(asset, |asset| { - self.client - .runtime_api() - .cf_boost_pool_details(self.unwrap_or_best(at), asset) - .map(|details_for_each_pool| { + self.with_runtime_api(at, |api, hash| { + api.cf_boost_pool_details(hash, asset).map(|details_for_each_pool| { details_for_each_pool .into_iter() .map(|(tier, details)| BoostPoolDetailsRpc::new(asset, tier, details)) .collect() }) - .map_err(to_rpc_error) + }) }) } @@ -1929,34 +1699,19 @@ where at: Option, ) -> RpcResult { execute_for_all_or_one_asset(asset, |asset| { - self.client - .runtime_api() - .cf_boost_pool_details(self.unwrap_or_best(at), asset) - .map(|details_for_each_pool| { + self.with_runtime_api(at, |api, hash| { + api.cf_boost_pool_details(hash, asset).map(|details_for_each_pool| { details_for_each_pool .into_iter() .map(|(fee_tier, details)| BoostPoolFeesRpc::new(asset, fee_tier, details)) .collect() }) - .map_err(to_rpc_error) + }) }) } - fn cf_safe_mode_statuses( - &self, - at: Option, - ) -> RpcResult { - self.client - .runtime_api() - .cf_safe_mode_statuses(self.unwrap_or_best(at)) - .map_err(to_rpc_error) - } - fn cf_available_pools(&self, at: Option) -> RpcResult>> { - self.client - .runtime_api() - .cf_pools(self.unwrap_or_best(at)) - .map_err(to_rpc_error) + self.with_runtime_api(at, |api, hash| api.cf_pools(hash)) } fn cf_solana_electoral_data( @@ -1964,13 +1719,7 @@ where validator: state_chain_runtime::AccountId, at: Option, ) -> RpcResult> { - let runtime_api = self.client.runtime_api(); - ElectoralRuntimeApi::<_, SolanaInstance>::cf_electoral_data( - &*runtime_api, - self.unwrap_or_best(at), - validator, - ) - .map_err(to_rpc_error) + self.with_runtime_api(at, |api, hash| api.cf_electoral_data(hash, validator)) } fn cf_solana_filter_votes( @@ -1979,39 +1728,7 @@ where proposed_votes: Vec, at: Option, ) -> RpcResult> { - let runtime_api = self.client.runtime_api(); - ElectoralRuntimeApi::<_, SolanaInstance>::cf_filter_votes( - &*runtime_api, - self.unwrap_or_best(at), - validator, - proposed_votes, - ) - .map_err(to_rpc_error) - } - - fn cf_validate_dca_params( - &self, - number_of_chunks: u32, - chunk_interval: u32, - at: Option, - ) -> RpcResult<()> { - self.client - .runtime_api() - .cf_validate_dca_params(self.unwrap_or_best(at), number_of_chunks, chunk_interval) - .map_err(to_rpc_error) - .and_then(|result| result.map_err(map_dispatch_error)) - } - - fn cf_validate_refund_params( - &self, - retry_duration: u32, - at: Option, - ) -> RpcResult<()> { - self.client - .runtime_api() - .cf_validate_refund_params(self.unwrap_or_best(at), retry_duration) - .map_err(to_rpc_error) - .and_then(|result| result.map_err(map_dispatch_error)) + self.with_runtime_api(at, |api, hash| api.cf_filter_votes(hash, validator, proposed_votes)) } } @@ -2028,8 +1745,7 @@ where { fn new_subscription< T: Serialize + Send + Clone + Eq + 'static, - E: std::error::Error + Send + Sync + 'static, - F: Fn(&C, state_chain_runtime::Hash) -> Result + Send + Clone + 'static, + F: Fn(&C, state_chain_runtime::Hash) -> Result + Send + Clone + 'static, >( &self, only_on_changes: bool, @@ -2051,10 +1767,12 @@ where /// an error. fn new_subscription_with_state< T: Serialize + Send + Clone + Eq + 'static, - E: std::error::Error + Send + Sync + 'static, // State to carry forward between calls to the closure. S: 'static + Clone + Send, - F: Fn(&C, state_chain_runtime::Hash, Option<&S>) -> Result<(T, S), E> + Send + Clone + 'static, + F: Fn(&C, state_chain_runtime::Hash, Option<&S>) -> Result<(T, S), CfApiError> + + Send + + Clone + + 'static, >( &self, only_on_changes: bool, @@ -2072,9 +1790,7 @@ where self.executor.spawn( "cf-rpc-update-subscription", Some("rpc"), - pending_sink - .reject(sc_rpc_api::state::error::Error::Client(Box::new(to_rpc_error(e)))) - .boxed(), + pending_sink.reject(e).boxed(), ); return; }, @@ -2108,7 +1824,7 @@ where data: new_item, })) }, - Err(error) if end_on_error => Some(Err(to_rpc_error(error))), + Err(error) if end_on_error => Some(Err(ErrorObjectOwned::from(error))), _ => None, }) } diff --git a/state-chain/custom-rpc/src/monitoring.rs b/state-chain/custom-rpc/src/monitoring.rs index 47be9d02cab..8973af24f83 100644 --- a/state-chain/custom-rpc/src/monitoring.rs +++ b/state-chain/custom-rpc/src/monitoring.rs @@ -1,8 +1,7 @@ -use crate::{ - to_rpc_error, BlockT, CustomRpc, RpcAccountInfoV2, RpcFeeImbalance, RpcMonitoringData, -}; +use super::pass_through; +use crate::{BlockT, CustomRpc, RpcAccountInfoV2, RpcFeeImbalance, RpcMonitoringData, RpcResult}; use cf_chains::{dot::PolkadotAccountId, sol::SolAddress}; -use jsonrpsee::{core::RpcResult, proc_macros::rpc}; +use jsonrpsee::proc_macros::rpc; use sc_client_api::{BlockchainEvents, HeaderBackend}; use sp_core::{bounded_vec::BoundedVec, ConstU32}; use state_chain_runtime::{ @@ -85,20 +84,6 @@ pub trait MonitoringApi { ) -> RpcResult>; } -macro_rules! pass_through { - ($( $name:ident -> $result_type:ty ),+) => { - - $( - fn $name(&self, at: Option) -> RpcResult<$result_type> { - self.client - .runtime_api() - .$name(self.unwrap_or_best(at)) - .map_err(to_rpc_error) - } - )+ - }; -} - impl MonitoringApiServer for CustomRpc where B: BlockT, @@ -111,55 +96,46 @@ where C::Api: MonitoringRuntimeApi, { pass_through! { - cf_authorities -> AuthoritiesInfo, - cf_external_chains_block_height -> ExternalChainsBlockHeight, - cf_btc_utxos -> BtcUtxos, - cf_dot_aggkey -> PolkadotAccountId, - cf_suspended_validators -> Vec<(Offence, u32)>, - cf_epoch_state -> EpochState, - cf_redemptions -> RedemptionsInfo, - cf_pending_broadcasts_count -> PendingBroadcasts, - cf_pending_tss_ceremonies_count -> PendingTssCeremonies, - cf_pending_swaps_count -> u32, - cf_open_deposit_channels_count -> OpenDepositChannels, - cf_build_version -> LastRuntimeUpgradeInfo, - cf_rotation_broadcast_ids -> ActivateKeysBroadcastIds, - cf_sol_nonces -> SolanaNonces, - cf_sol_aggkey -> SolAddress, - cf_sol_onchain_key -> SolAddress + cf_authorities() -> AuthoritiesInfo, + cf_external_chains_block_height() -> ExternalChainsBlockHeight, + cf_btc_utxos() -> BtcUtxos, + cf_dot_aggkey() -> PolkadotAccountId, + cf_suspended_validators() -> Vec<(Offence, u32)>, + cf_epoch_state() -> EpochState, + cf_redemptions() -> RedemptionsInfo, + cf_pending_broadcasts_count() -> PendingBroadcasts, + cf_pending_tss_ceremonies_count() -> PendingTssCeremonies, + cf_pending_swaps_count() -> u32, + cf_open_deposit_channels_count() -> OpenDepositChannels, + cf_build_version() -> LastRuntimeUpgradeInfo, + cf_rotation_broadcast_ids() -> ActivateKeysBroadcastIds, + cf_sol_nonces() -> SolanaNonces, + cf_sol_aggkey() -> SolAddress, + cf_sol_onchain_key() -> SolAddress } fn cf_fee_imbalance( &self, at: Option, ) -> RpcResult { - self.client - .runtime_api() - .cf_fee_imbalance(self.unwrap_or_best(at)) + self.with_runtime_api::<_, _>(at, |api, hash| api.cf_fee_imbalance(hash)) .map(|imbalance| imbalance.map(|i| (*i).into())) - .map_err(to_rpc_error) } fn cf_monitoring_data( &self, at: Option, ) -> RpcResult { - self.client - .runtime_api() - .cf_monitoring_data(self.unwrap_or_best(at)) + self.with_runtime_api(at, |api, hash| api.cf_monitoring_data(hash)) .map(Into::into) - .map_err(to_rpc_error) } fn cf_accounts_info( &self, accounts: BoundedVec>, at: Option, ) -> RpcResult> { - let accounts_info = self - .client - .runtime_api() - .cf_accounts_info(self.unwrap_or_best(at), accounts) - .map_err(to_rpc_error)?; + let accounts_info = + self.with_runtime_api(at, |api, hash| api.cf_accounts_info(hash, accounts))?; Ok(accounts_info .into_iter() .map(|account_info| RpcAccountInfoV2 { diff --git a/state-chain/runtime/src/runtime_apis.rs b/state-chain/runtime/src/runtime_apis.rs index 5a29f5b22e2..536ebc0bfdb 100644 --- a/state-chain/runtime/src/runtime_apis.rs +++ b/state-chain/runtime/src/runtime_apis.rs @@ -12,7 +12,7 @@ use cf_primitives::{ }; use cf_traits::SwapLimits; use codec::{Decode, Encode}; -use core::ops::Range; +use core::{ops::Range, str}; use frame_support::sp_runtime::AccountId32; use pallet_cf_governance::GovCallHash; pub use pallet_cf_ingress_egress::OwedAmount; @@ -161,6 +161,21 @@ impl From for DispatchErrorWithMessage { } } } +#[cfg(feature = "std")] +impl core::fmt::Display for DispatchErrorWithMessage { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> { + match self { + DispatchErrorWithMessage::Module(message) => write!( + f, + "{}", + str::from_utf8(message).unwrap_or("") + ), + DispatchErrorWithMessage::Other(error) => write!(f, "{:?}", error), + } + } +} +#[cfg(feature = "std")] +impl std::error::Error for DispatchErrorWithMessage {} #[derive(Serialize, Deserialize, Encode, Decode, Eq, PartialEq, TypeInfo, Debug, Clone)] pub struct FailingWitnessValidators {