diff --git a/Cargo.lock b/Cargo.lock index 13d0a2ebe8..68ea096442 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2319,13 +2319,16 @@ dependencies = [ "cf-amm", "cf-chains", "cf-primitives", + "futures", "hex", "jsonrpsee 0.16.2", "pallet-cf-governance", "pallet-cf-pools", "sc-client-api", + "sc-rpc-api", "serde", "sp-api", + "sp-core", "sp-rpc", "sp-runtime", "state-chain-runtime", diff --git a/state-chain/amm/src/common.rs b/state-chain/amm/src/common.rs index 7dc5c8f706..39bb990d37 100644 --- a/state-chain/amm/src/common.rs +++ b/state-chain/amm/src/common.rs @@ -114,9 +114,8 @@ impl core::ops::IndexMut for SideMap { } } } -#[cfg(test)] -impl, R> std::ops::Add> for SideMap { - type Output = SideMap<>::Output>; +impl, R> core::ops::Add> for SideMap { + type Output = SideMap<>::Output>; fn add(self, rhs: SideMap) -> Self::Output { SideMap { zero: self.zero + rhs.zero, one: self.one + rhs.one } } diff --git a/state-chain/amm/src/lib.rs b/state-chain/amm/src/lib.rs index 4d9e746f57..020c878985 100644 --- a/state-chain/amm/src/lib.rs +++ b/state-chain/amm/src/lib.rs @@ -308,4 +308,36 @@ impl PoolState { pub fn range_order_liquidity(&self) -> Vec<(Tick, Liquidity)> { self.range_orders.liquidity() } + + pub fn limit_order_depth( + &mut self, + range: core::ops::Range, + ) -> Result, Amount)>, limit_orders::DepthError> { + Ok(SideMap { + zero: ( + self.limit_orders.current_sqrt_price::(), + self.limit_orders.depth::(range.clone())?, + ), + one: ( + self.limit_orders.current_sqrt_price::(), + self.limit_orders.depth::(range)?, + ), + }) + } + + pub fn range_order_depth( + &self, + range: core::ops::Range, + ) -> Result, Amount)>, range_orders::DepthError> { + self.range_orders.depth(range.start, range.end).map(|assets| SideMap { + zero: ( + self.range_orders.current_sqrt_price::().map(sqrt_price_to_price), + assets[Side::Zero], + ), + one: ( + self.range_orders.current_sqrt_price::().map(sqrt_price_to_price), + assets[Side::One], + ), + }) + } } diff --git a/state-chain/amm/src/limit_orders.rs b/state-chain/amm/src/limit_orders.rs index 6a943d725a..f69edff751 100644 --- a/state-chain/amm/src/limit_orders.rs +++ b/state-chain/amm/src/limit_orders.rs @@ -205,6 +205,14 @@ pub enum SetFeesError { InvalidFeeAmount, } +#[derive(Debug)] +pub enum DepthError { + /// Invalid Price + InvalidTick, + /// Start tick must be less than or equal to the end tick + InvalidTickRange, +} + #[derive(Debug)] pub enum MintError { /// One of the start/end ticks of the range reached its maximum gross liquidity @@ -729,11 +737,31 @@ impl PoolState { /// Returns all the assets available for swaps in a given direction /// /// This function never panics. - #[allow(dead_code)] pub(super) fn liquidity(&self) -> Vec<(Tick, Amount)> { self.fixed_pools[!SD::INPUT_SIDE] .iter() .map(|(sqrt_price, fixed_pool)| (tick_at_sqrt_price(*sqrt_price), fixed_pool.available)) .collect() } + + /// Returns all the assets available for swaps between two prices (inclusive..exclusive) + /// + /// This function never panics. + pub(super) fn depth( + &self, + range: core::ops::Range, + ) -> Result { + let start = + Self::validate_tick::(range.start).map_err(|_| DepthError::InvalidTick)?; + let end = + Self::validate_tick::(range.end).map_err(|_| DepthError::InvalidTick)?; + if start <= end { + Ok(self.fixed_pools[!SD::INPUT_SIDE] + .range(start..end) + .map(|(_, fixed_pool)| fixed_pool.available) + .fold(Default::default(), |acc, x| acc + x)) + } else { + Err(DepthError::InvalidTickRange) + } + } } diff --git a/state-chain/amm/src/range_orders.rs b/state-chain/amm/src/range_orders.rs index fd3e730699..4febb3bb57 100644 --- a/state-chain/amm/src/range_orders.rs +++ b/state-chain/amm/src/range_orders.rs @@ -28,9 +28,9 @@ use scale_info::TypeInfo; use sp_core::{U256, U512}; use crate::common::{ - is_sqrt_price_valid, mul_div_ceil, mul_div_floor, sqrt_price_at_tick, tick_at_sqrt_price, - Amount, OneToZero, Side, SideMap, SqrtPriceQ64F96, Tick, ZeroToOne, MAX_TICK, MIN_TICK, - ONE_IN_HUNDREDTH_PIPS, SQRT_PRICE_FRACTIONAL_BITS, + is_sqrt_price_valid, is_tick_valid, mul_div_ceil, mul_div_floor, sqrt_price_at_tick, + tick_at_sqrt_price, Amount, OneToZero, Side, SideMap, SqrtPriceQ64F96, Tick, ZeroToOne, + MAX_TICK, MIN_TICK, ONE_IN_HUNDREDTH_PIPS, SQRT_PRICE_FRACTIONAL_BITS, }; pub type Liquidity = u128; @@ -345,6 +345,14 @@ pub enum RequiredAssetRatioError { InvalidTickRange, } +#[derive(Debug)] +pub enum DepthError { + /// Invalid Price + InvalidTick, + /// Start tick must be less than or equal to the end tick + InvalidTickRange, +} + #[derive(Debug)] pub enum LiquidityToAmountsError { /// Invalid Tick range @@ -1013,6 +1021,50 @@ impl PoolState { }) .collect() } + + pub(super) fn depth( + &self, + lower_tick: Tick, + upper_tick: Tick, + ) -> Result, DepthError> { + if !is_tick_valid(lower_tick) || !is_tick_valid(upper_tick) { + return Err(DepthError::InvalidTick) + } + + if lower_tick <= upper_tick { + let liquidity_at_lower_tick: Liquidity = + self.liquidity_map.range(..lower_tick).fold(0, |liquidity, (_, tick_delta)| { + liquidity.checked_add_signed(tick_delta.liquidity_delta).unwrap() + }); + + let (_liquidity, _tick, assets) = self + .liquidity_map + .range(lower_tick..upper_tick) + .map(|(tick, tick_delta)| (tick, tick_delta.liquidity_delta)) + .chain(core::iter::once((&upper_tick, 0 /* value doesn't matter */))) + .fold( + (liquidity_at_lower_tick, lower_tick, SideMap::::default()), + |(liquidity, previous_tick, assets), (current_tick, liquidity_delta)| { + ( + // Addition is guaranteed to never overflow, see test `max_liquidity` + liquidity.checked_add_signed(liquidity_delta).unwrap(), + *current_tick, + assets + + self.inner_liquidity_to_amounts::( + liquidity, + previous_tick, + *current_tick, + ) + .0, + ) + }, + ); + + Ok(assets) + } else { + Err(DepthError::InvalidTickRange) + } + } } fn zero_amount_delta_floor( diff --git a/state-chain/custom-rpc/Cargo.toml b/state-chain/custom-rpc/Cargo.toml index 9d0711f160..07d82b42eb 100644 --- a/state-chain/custom-rpc/Cargo.toml +++ b/state-chain/custom-rpc/Cargo.toml @@ -10,6 +10,7 @@ edition = '2021' state-chain-runtime = { path = '../runtime' } anyhow = "1.0" +futures = "0.3.14" jsonrpsee = { version = "0.16.2", features = ["full"] } serde = { version = '1.0', features = ['derive'] } cf-chains = { path = '../chains' } @@ -21,6 +22,8 @@ pallet-cf-pools = { path = "../pallets/cf-pools" } hex = '0.4.3' sp-api = { git = "https://github.com/chainflip-io/substrate.git", tag = "chainflip-monthly-2023-08+2" } +sp-core = { git = "https://github.com/chainflip-io/substrate.git", tag = "chainflip-monthly-2023-08+2" } sp-rpc = { git = "https://github.com/chainflip-io/substrate.git", tag = "chainflip-monthly-2023-08+2" } +sc-rpc-api = { git = "https://github.com/chainflip-io/substrate.git", tag = "chainflip-monthly-2023-08+2" } sp-runtime = { git = "https://github.com/chainflip-io/substrate.git", tag = "chainflip-monthly-2023-08+2" } sc-client-api = { git = "https://github.com/chainflip-io/substrate.git", tag = "chainflip-monthly-2023-08+2" } diff --git a/state-chain/custom-rpc/src/lib.rs b/state-chain/custom-rpc/src/lib.rs index a60b59e97a..17b5ffc6c4 100644 --- a/state-chain/custom-rpc/src/lib.rs +++ b/state-chain/custom-rpc/src/lib.rs @@ -5,10 +5,15 @@ use cf_amm::{ use cf_chains::{btc::BitcoinNetwork, dot::PolkadotHash, eth::Address as EthereumAddress}; use cf_primitives::{Asset, AssetAmount, SemVer, SwapOutput}; use core::ops::Range; -use jsonrpsee::{core::RpcResult, proc_macros::rpc, types::error::CallError}; +use jsonrpsee::{ + core::RpcResult, + proc_macros::rpc, + types::error::{CallError, SubscriptionEmptyError}, + SubscriptionSink, +}; use pallet_cf_governance::GovCallHash; -use pallet_cf_pools::{AssetsMap, PoolInfo, PoolLiquidity, PoolOrders}; -use sc_client_api::HeaderBackend; +use pallet_cf_pools::{AssetsMap, Depth, PoolInfo, PoolLiquidity, PoolOrders}; +use sc_client_api::{BlockchainEvents, HeaderBackend}; use serde::{Deserialize, Serialize}; use sp_api::BlockT; use sp_rpc::number::NumberOrHex; @@ -215,6 +220,14 @@ pub trait CustomApi { pair_asset: Asset, at: Option, ) -> RpcResult>; + #[method(name = "pool_depth")] + fn cf_pool_depth( + &self, + base_asset: Asset, + pair_asset: Asset, + tick_range: Range, + at: Option, + ) -> RpcResult, DispatchError>>>; #[method(name = "pool_liquidity")] fn cf_pool_liquidity( &self, @@ -245,18 +258,26 @@ pub trait CustomApi { fn cf_current_compatibility_version(&self) -> RpcResult; #[method(name = "min_swap_amount")] fn cf_min_swap_amount(&self, asset: Asset) -> RpcResult; + #[subscription(name = "subscribe_pool_price", item = Price)] + fn cf_subscribe_pool_price(&self, from: Asset, to: Asset); } /// An RPC extension for the state chain node. pub struct CustomRpc { pub client: Arc, pub _phantom: PhantomData, + pub executor: Arc, } impl CustomRpc where B: BlockT, - C: sp_api::ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, + C: sp_api::ProvideRuntimeApi + + Send + + Sync + + 'static + + HeaderBackend + + BlockchainEvents, C::Api: CustomRuntimeApi, { fn unwrap_or_best(&self, from_rpc: Option<::Hash>) -> B::Hash { @@ -271,7 +292,12 @@ fn to_rpc_error(e: E) -> jsonrpsee impl CustomApiServer for CustomRpc where B: BlockT, - C: sp_api::ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, + C: sp_api::ProvideRuntimeApi + + Send + + Sync + + 'static + + HeaderBackend + + BlockchainEvents, C::Api: CustomRuntimeApi, { fn cf_is_auction_phase(&self, at: Option<::Hash>) -> RpcResult { @@ -554,6 +580,19 @@ where .map_err(to_rpc_error) } + fn cf_pool_depth( + &self, + base_asset: Asset, + pair_asset: Asset, + tick_range: Range, + at: Option, + ) -> RpcResult, DispatchError>>> { + self.client + .runtime_api() + .cf_pool_depth(self.unwrap_or_best(at), base_asset, pair_asset, tick_range) + .map_err(to_rpc_error) + } + fn cf_pool_liquidity( &self, base_asset: Asset, @@ -638,4 +677,77 @@ where .cf_min_swap_amount(self.unwrap_or_best(None), asset) .map_err(to_rpc_error) } + + fn cf_subscribe_pool_price( + &self, + sink: SubscriptionSink, + from: Asset, + to: Asset, + ) -> Result<(), SubscriptionEmptyError> { + self.new_subscription(sink, move |api, hash| api.cf_pool_price(hash, from, to)) + } +} + +impl CustomRpc +where + B: BlockT, + C: sp_api::ProvideRuntimeApi + + Send + + Sync + + 'static + + HeaderBackend + + BlockchainEvents, + C::Api: CustomRuntimeApi, +{ + fn new_subscription< + T: Serialize + Send + Clone + Eq + 'static, + E: std::error::Error + Send + Sync + 'static, + F: Fn(&C::Api, state_chain_runtime::Hash) -> Result + Send + Clone + 'static, + >( + &self, + mut sink: SubscriptionSink, + f: F, + ) -> Result<(), SubscriptionEmptyError> { + use futures::{future::FutureExt, stream::StreamExt}; + + let client = self.client.clone(); + + let initial = match f(&self.client.runtime_api(), self.client.info().best_hash) { + Ok(initial) => initial, + Err(e) => { + let _ = sink.reject(jsonrpsee::core::Error::from( + sc_rpc_api::state::error::Error::Client(Box::new(e)), + )); + return Ok(()) + }, + }; + + let mut previous = initial.clone(); + + let stream = self + .client + .import_notification_stream() + .filter(|n| futures::future::ready(n.is_new_best)) + .filter_map(move |n| { + let new = f(&client.runtime_api(), n.hash); + + match new { + Ok(new) if new != previous => { + previous = new.clone(); + futures::future::ready(Some(new)) + }, + _ => futures::future::ready(None), + } + }); + + let stream = futures::stream::once(futures::future::ready(initial)).chain(stream); + + let fut = async move { + sink.pipe_from_stream(stream).await; + }; + + self.executor.spawn("cf-rpc-subscription", Some("rpc"), fut.boxed()); + + Ok(()) + } } diff --git a/state-chain/node/src/service.rs b/state-chain/node/src/service.rs index 195953fc28..14c2b501f7 100644 --- a/state-chain/node/src/service.rs +++ b/state-chain/node/src/service.rs @@ -242,6 +242,7 @@ pub fn new_full(config: Configuration) -> Result { let rpc_builder = { let client = client.clone(); let pool = transaction_pool.clone(); + let executor = Arc::new(task_manager.spawn_handle()); Box::new(move |deny_unsafe, subscription_executor| { let build = || { @@ -276,6 +277,7 @@ pub fn new_full(config: Configuration) -> Result { module.merge(CustomApiServer::into_rpc(CustomRpc { client: client.clone(), _phantom: PhantomData, + executor: executor.clone(), }))?; Ok(module) diff --git a/state-chain/pallets/cf-pools/src/lib.rs b/state-chain/pallets/cf-pools/src/lib.rs index 20cdc7a48a..554f2ecc57 100644 --- a/state-chain/pallets/cf-pools/src/lib.rs +++ b/state-chain/pallets/cf-pools/src/lib.rs @@ -5,7 +5,7 @@ use cf_amm::{ common::{Amount, Order, Price, Side, SideMap, Tick}, limit_orders, range_orders, range_orders::Liquidity, - NewError, PoolState, + PoolState, }; use cf_primitives::{chains::assets::any, Asset, AssetAmount, SwapOutput, STABLE_ASSET}; use cf_traits::{impl_pallet_safe_mode, Chainflip, LpBalanceApi, SwappingApi}; @@ -193,6 +193,7 @@ pub mod pallet { common::Tick, limit_orders, range_orders::{self, Liquidity}, + NewError, }; use cf_traits::{AccountRoleRegistry, LpBalanceApi}; use frame_system::pallet_prelude::BlockNumberFor; @@ -956,6 +957,18 @@ pub struct PoolLiquidity { pub range_orders: Vec<(Tick, Liquidity)>, } +#[derive(Clone, Debug, Encode, Decode, TypeInfo, PartialEq, Eq, Deserialize, Serialize)] +pub struct SingleDepth { + pub price: Option, + pub depth: Amount, +} + +#[derive(Clone, Debug, Encode, Decode, TypeInfo, PartialEq, Eq, Deserialize, Serialize)] +pub struct Depth { + pub limit_orders: SingleDepth, + pub range_orders: SingleDepth, +} + impl Pallet { #[allow(clippy::too_many_arguments)] fn inner_update_limit_order( @@ -1243,6 +1256,43 @@ impl Pallet { ) } + pub fn pool_depth( + base_asset: any::Asset, + pair_asset: any::Asset, + tick_range: Range, + ) -> Option, DispatchError>> { + let asset_pair = AssetPair::::new(base_asset, pair_asset).ok()?; + let mut pool = Pools::::get(asset_pair.canonical_asset_pair)?; + + let limit_orders = pool.pool_state.limit_order_depth(tick_range.clone()).map_err(|error| { + match error { + limit_orders::DepthError::InvalidTickRange => Error::::InvalidTickRange, + limit_orders::DepthError::InvalidTick => Error::::InvalidTick, + } + .into() + }); + + let range_orders = pool.pool_state.range_order_depth(tick_range).map_err(|error| { + match error { + range_orders::DepthError::InvalidTickRange => Error::::InvalidTickRange, + range_orders::DepthError::InvalidTick => Error::::InvalidTick, + } + .into() + }); + + Some(limit_orders.and_then(|limit_orders| { + range_orders.map(|range_orders| { + asset_pair.side_map_to_assets_map(SideMap::<()>::default().map(|side, ()| { + let to_single_depth = |(price, depth)| SingleDepth { price, depth }; + Depth { + limit_orders: to_single_depth(limit_orders[side]), + range_orders: to_single_depth(range_orders[side]), + } + })) + }) + })) + } + pub fn pool_liquidity_providers( base_asset: any::Asset, pair_asset: any::Asset, diff --git a/state-chain/runtime/src/lib.rs b/state-chain/runtime/src/lib.rs index 5c3be0658e..83ab32f5a6 100644 --- a/state-chain/runtime/src/lib.rs +++ b/state-chain/runtime/src/lib.rs @@ -29,7 +29,7 @@ use cf_chains::{ use core::ops::Range; pub use frame_system::Call as SystemCall; use pallet_cf_governance::GovCallHash; -use pallet_cf_pools::{AssetsMap, PoolLiquidity}; +use pallet_cf_pools::{AssetsMap, Depth, PoolLiquidity}; use pallet_cf_reputation::ExclusionList; use pallet_transaction_payment::{ConstFeeMultiplier, Multiplier}; use sp_runtime::DispatchError; @@ -1036,6 +1036,10 @@ impl_runtime_apis! { LiquidityPools::pool_info(base_asset, pair_asset) } + fn cf_pool_depth(base_asset: Asset, pair_asset: Asset, tick_range: Range) -> Option, DispatchError>> { + LiquidityPools::pool_depth(base_asset, pair_asset, tick_range) + } + fn cf_pool_liquidity(base_asset: Asset, pair_asset: Asset) -> Option { LiquidityPools::pool_liquidity(base_asset, pair_asset) } diff --git a/state-chain/runtime/src/runtime_apis.rs b/state-chain/runtime/src/runtime_apis.rs index 98a2f32d66..da1779f94a 100644 --- a/state-chain/runtime/src/runtime_apis.rs +++ b/state-chain/runtime/src/runtime_apis.rs @@ -9,7 +9,7 @@ use codec::{Decode, Encode}; use core::ops::Range; use frame_support::sp_runtime::AccountId32; use pallet_cf_governance::GovCallHash; -use pallet_cf_pools::{AssetsMap, PoolInfo, PoolLiquidity, PoolOrders}; +use pallet_cf_pools::{AssetsMap, Depth, PoolInfo, PoolLiquidity, PoolOrders}; use scale_info::TypeInfo; use serde::{Deserialize, Serialize}; use sp_api::decl_runtime_apis; @@ -114,6 +114,11 @@ decl_runtime_apis!( fn cf_pool_simulate_swap(from: Asset, to: Asset, amount: AssetAmount) -> Option; fn cf_pool_info(base_asset: Asset, pair_asset: Asset) -> Option; + fn cf_pool_depth( + base_asset: Asset, + pair_asset: Asset, + tick_range: Range, + ) -> Option, DispatchError>>; fn cf_pool_liquidity(base_asset: Asset, pair_asset: Asset) -> Option; fn cf_required_asset_ratio_for_range_order( base_asset: Asset,