Skip to content

Commit

Permalink
feat: subcribe_price and depth rpc (#3978)
Browse files Browse the repository at this point in the history
* price subscription

* remove dead_code allow

* depth_between rpc

squash

squash

squash

* subscribe_price -> subscribe_pool_price

* inclusive and exclusive ranges

* comment

* depth rpc improvement
  • Loading branch information
AlastairHolmes authored Sep 22, 2023
1 parent e94f1f2 commit 342b182
Show file tree
Hide file tree
Showing 11 changed files with 305 additions and 15 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions state-chain/amm/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,8 @@ impl<T> core::ops::IndexMut<Side> for SideMap<T> {
}
}
}
#[cfg(test)]
impl<T: std::ops::Add<R>, R> std::ops::Add<SideMap<R>> for SideMap<T> {
type Output = SideMap<<T as std::ops::Add<R>>::Output>;
impl<T: core::ops::Add<R>, R> core::ops::Add<SideMap<R>> for SideMap<T> {
type Output = SideMap<<T as core::ops::Add<R>>::Output>;
fn add(self, rhs: SideMap<R>) -> Self::Output {
SideMap { zero: self.zero + rhs.zero, one: self.one + rhs.one }
}
Expand Down
32 changes: 32 additions & 0 deletions state-chain/amm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,4 +308,36 @@ impl<LiquidityProvider: Clone + Ord> PoolState<LiquidityProvider> {
pub fn range_order_liquidity(&self) -> Vec<(Tick, Liquidity)> {
self.range_orders.liquidity()
}

pub fn limit_order_depth(
&mut self,
range: core::ops::Range<Tick>,
) -> Result<SideMap<(Option<Price>, Amount)>, limit_orders::DepthError> {
Ok(SideMap {
zero: (
self.limit_orders.current_sqrt_price::<OneToZero>(),
self.limit_orders.depth::<OneToZero>(range.clone())?,
),
one: (
self.limit_orders.current_sqrt_price::<ZeroToOne>(),
self.limit_orders.depth::<ZeroToOne>(range)?,
),
})
}

pub fn range_order_depth(
&self,
range: core::ops::Range<Tick>,
) -> Result<SideMap<(Option<Price>, Amount)>, range_orders::DepthError> {
self.range_orders.depth(range.start, range.end).map(|assets| SideMap {
zero: (
self.range_orders.current_sqrt_price::<OneToZero>().map(sqrt_price_to_price),
assets[Side::Zero],
),
one: (
self.range_orders.current_sqrt_price::<ZeroToOne>().map(sqrt_price_to_price),
assets[Side::One],
),
})
}
}
30 changes: 29 additions & 1 deletion state-chain/amm/src/limit_orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,14 @@ pub enum SetFeesError {
InvalidFeeAmount,
}

#[derive(Debug)]
pub enum DepthError {
/// Invalid Price
InvalidTick,
/// Start tick must be less than or equal to the end tick
InvalidTickRange,
}

#[derive(Debug)]
pub enum MintError {
/// One of the start/end ticks of the range reached its maximum gross liquidity
Expand Down Expand Up @@ -729,11 +737,31 @@ impl<LiquidityProvider: Clone + Ord> PoolState<LiquidityProvider> {
/// Returns all the assets available for swaps in a given direction
///
/// This function never panics.
#[allow(dead_code)]
pub(super) fn liquidity<SD: SwapDirection>(&self) -> Vec<(Tick, Amount)> {
self.fixed_pools[!SD::INPUT_SIDE]
.iter()
.map(|(sqrt_price, fixed_pool)| (tick_at_sqrt_price(*sqrt_price), fixed_pool.available))
.collect()
}

/// Returns all the assets available for swaps between two prices (inclusive..exclusive)
///
/// This function never panics.
pub(super) fn depth<SD: SwapDirection>(
&self,
range: core::ops::Range<Tick>,
) -> Result<Amount, DepthError> {
let start =
Self::validate_tick::<Infallible>(range.start).map_err(|_| DepthError::InvalidTick)?;
let end =
Self::validate_tick::<Infallible>(range.end).map_err(|_| DepthError::InvalidTick)?;
if start <= end {
Ok(self.fixed_pools[!SD::INPUT_SIDE]
.range(start..end)
.map(|(_, fixed_pool)| fixed_pool.available)
.fold(Default::default(), |acc, x| acc + x))
} else {
Err(DepthError::InvalidTickRange)
}
}
}
58 changes: 55 additions & 3 deletions state-chain/amm/src/range_orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ use scale_info::TypeInfo;
use sp_core::{U256, U512};

use crate::common::{
is_sqrt_price_valid, mul_div_ceil, mul_div_floor, sqrt_price_at_tick, tick_at_sqrt_price,
Amount, OneToZero, Side, SideMap, SqrtPriceQ64F96, Tick, ZeroToOne, MAX_TICK, MIN_TICK,
ONE_IN_HUNDREDTH_PIPS, SQRT_PRICE_FRACTIONAL_BITS,
is_sqrt_price_valid, is_tick_valid, mul_div_ceil, mul_div_floor, sqrt_price_at_tick,
tick_at_sqrt_price, Amount, OneToZero, Side, SideMap, SqrtPriceQ64F96, Tick, ZeroToOne,
MAX_TICK, MIN_TICK, ONE_IN_HUNDREDTH_PIPS, SQRT_PRICE_FRACTIONAL_BITS,
};

pub type Liquidity = u128;
Expand Down Expand Up @@ -345,6 +345,14 @@ pub enum RequiredAssetRatioError {
InvalidTickRange,
}

#[derive(Debug)]
pub enum DepthError {
/// Invalid Price
InvalidTick,
/// Start tick must be less than or equal to the end tick
InvalidTickRange,
}

#[derive(Debug)]
pub enum LiquidityToAmountsError {
/// Invalid Tick range
Expand Down Expand Up @@ -1013,6 +1021,50 @@ impl<LiquidityProvider: Clone + Ord> PoolState<LiquidityProvider> {
})
.collect()
}

pub(super) fn depth(
&self,
lower_tick: Tick,
upper_tick: Tick,
) -> Result<SideMap<Amount>, DepthError> {
if !is_tick_valid(lower_tick) || !is_tick_valid(upper_tick) {
return Err(DepthError::InvalidTick)
}

if lower_tick <= upper_tick {
let liquidity_at_lower_tick: Liquidity =
self.liquidity_map.range(..lower_tick).fold(0, |liquidity, (_, tick_delta)| {
liquidity.checked_add_signed(tick_delta.liquidity_delta).unwrap()
});

let (_liquidity, _tick, assets) = self
.liquidity_map
.range(lower_tick..upper_tick)
.map(|(tick, tick_delta)| (tick, tick_delta.liquidity_delta))
.chain(core::iter::once((&upper_tick, 0 /* value doesn't matter */)))
.fold(
(liquidity_at_lower_tick, lower_tick, SideMap::<Amount>::default()),
|(liquidity, previous_tick, assets), (current_tick, liquidity_delta)| {
(
// Addition is guaranteed to never overflow, see test `max_liquidity`
liquidity.checked_add_signed(liquidity_delta).unwrap(),
*current_tick,
assets +
self.inner_liquidity_to_amounts::<false>(
liquidity,
previous_tick,
*current_tick,
)
.0,
)
},
);

Ok(assets)
} else {
Err(DepthError::InvalidTickRange)
}
}
}

fn zero_amount_delta_floor(
Expand Down
3 changes: 3 additions & 0 deletions state-chain/custom-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ edition = '2021'
state-chain-runtime = { path = '../runtime' }

anyhow = "1.0"
futures = "0.3.14"
jsonrpsee = { version = "0.16.2", features = ["full"] }
serde = { version = '1.0', features = ['derive'] }
cf-chains = { path = '../chains' }
Expand All @@ -21,6 +22,8 @@ pallet-cf-pools = { path = "../pallets/cf-pools" }
hex = '0.4.3'

sp-api = { git = "https://github.com/chainflip-io/substrate.git", tag = "chainflip-monthly-2023-08+2" }
sp-core = { git = "https://github.com/chainflip-io/substrate.git", tag = "chainflip-monthly-2023-08+2" }
sp-rpc = { git = "https://github.com/chainflip-io/substrate.git", tag = "chainflip-monthly-2023-08+2" }
sc-rpc-api = { git = "https://github.com/chainflip-io/substrate.git", tag = "chainflip-monthly-2023-08+2" }
sp-runtime = { git = "https://github.com/chainflip-io/substrate.git", tag = "chainflip-monthly-2023-08+2" }
sc-client-api = { git = "https://github.com/chainflip-io/substrate.git", tag = "chainflip-monthly-2023-08+2" }
122 changes: 117 additions & 5 deletions state-chain/custom-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@ use cf_amm::{
use cf_chains::{btc::BitcoinNetwork, dot::PolkadotHash, eth::Address as EthereumAddress};
use cf_primitives::{Asset, AssetAmount, SemVer, SwapOutput};
use core::ops::Range;
use jsonrpsee::{core::RpcResult, proc_macros::rpc, types::error::CallError};
use jsonrpsee::{
core::RpcResult,
proc_macros::rpc,
types::error::{CallError, SubscriptionEmptyError},
SubscriptionSink,
};
use pallet_cf_governance::GovCallHash;
use pallet_cf_pools::{AssetsMap, PoolInfo, PoolLiquidity, PoolOrders};
use sc_client_api::HeaderBackend;
use pallet_cf_pools::{AssetsMap, Depth, PoolInfo, PoolLiquidity, PoolOrders};
use sc_client_api::{BlockchainEvents, HeaderBackend};
use serde::{Deserialize, Serialize};
use sp_api::BlockT;
use sp_rpc::number::NumberOrHex;
Expand Down Expand Up @@ -215,6 +220,14 @@ pub trait CustomApi {
pair_asset: Asset,
at: Option<state_chain_runtime::Hash>,
) -> RpcResult<Option<PoolInfo>>;
#[method(name = "pool_depth")]
fn cf_pool_depth(
&self,
base_asset: Asset,
pair_asset: Asset,
tick_range: Range<cf_amm::common::Tick>,
at: Option<state_chain_runtime::Hash>,
) -> RpcResult<Option<Result<AssetsMap<Depth>, DispatchError>>>;
#[method(name = "pool_liquidity")]
fn cf_pool_liquidity(
&self,
Expand Down Expand Up @@ -245,18 +258,26 @@ pub trait CustomApi {
fn cf_current_compatibility_version(&self) -> RpcResult<SemVer>;
#[method(name = "min_swap_amount")]
fn cf_min_swap_amount(&self, asset: Asset) -> RpcResult<AssetAmount>;
#[subscription(name = "subscribe_pool_price", item = Price)]
fn cf_subscribe_pool_price(&self, from: Asset, to: Asset);
}

/// An RPC extension for the state chain node.
pub struct CustomRpc<C, B> {
pub client: Arc<C>,
pub _phantom: PhantomData<B>,
pub executor: Arc<dyn sp_core::traits::SpawnNamed>,
}

impl<C, B> CustomRpc<C, B>
where
B: BlockT<Hash = state_chain_runtime::Hash>,
C: sp_api::ProvideRuntimeApi<B> + Send + Sync + 'static + HeaderBackend<B>,
C: sp_api::ProvideRuntimeApi<B>
+ Send
+ Sync
+ 'static
+ HeaderBackend<B>
+ BlockchainEvents<B>,
C::Api: CustomRuntimeApi<B>,
{
fn unwrap_or_best(&self, from_rpc: Option<<B as BlockT>::Hash>) -> B::Hash {
Expand All @@ -271,7 +292,12 @@ fn to_rpc_error<E: std::error::Error + Send + Sync + 'static>(e: E) -> jsonrpsee
impl<C, B> CustomApiServer for CustomRpc<C, B>
where
B: BlockT<Hash = state_chain_runtime::Hash>,
C: sp_api::ProvideRuntimeApi<B> + Send + Sync + 'static + HeaderBackend<B>,
C: sp_api::ProvideRuntimeApi<B>
+ Send
+ Sync
+ 'static
+ HeaderBackend<B>
+ BlockchainEvents<B>,
C::Api: CustomRuntimeApi<B>,
{
fn cf_is_auction_phase(&self, at: Option<<B as BlockT>::Hash>) -> RpcResult<bool> {
Expand Down Expand Up @@ -554,6 +580,19 @@ where
.map_err(to_rpc_error)
}

fn cf_pool_depth(
&self,
base_asset: Asset,
pair_asset: Asset,
tick_range: Range<Tick>,
at: Option<state_chain_runtime::Hash>,
) -> RpcResult<Option<Result<AssetsMap<Depth>, DispatchError>>> {
self.client
.runtime_api()
.cf_pool_depth(self.unwrap_or_best(at), base_asset, pair_asset, tick_range)
.map_err(to_rpc_error)
}

fn cf_pool_liquidity(
&self,
base_asset: Asset,
Expand Down Expand Up @@ -638,4 +677,77 @@ where
.cf_min_swap_amount(self.unwrap_or_best(None), asset)
.map_err(to_rpc_error)
}

fn cf_subscribe_pool_price(
&self,
sink: SubscriptionSink,
from: Asset,
to: Asset,
) -> Result<(), SubscriptionEmptyError> {
self.new_subscription(sink, move |api, hash| api.cf_pool_price(hash, from, to))
}
}

impl<C, B> CustomRpc<C, B>
where
B: BlockT<Hash = state_chain_runtime::Hash>,
C: sp_api::ProvideRuntimeApi<B>
+ Send
+ Sync
+ 'static
+ HeaderBackend<B>
+ BlockchainEvents<B>,
C::Api: CustomRuntimeApi<B>,
{
fn new_subscription<
T: Serialize + Send + Clone + Eq + 'static,
E: std::error::Error + Send + Sync + 'static,
F: Fn(&C::Api, state_chain_runtime::Hash) -> Result<T, E> + Send + Clone + 'static,
>(
&self,
mut sink: SubscriptionSink,
f: F,
) -> Result<(), SubscriptionEmptyError> {
use futures::{future::FutureExt, stream::StreamExt};

let client = self.client.clone();

let initial = match f(&self.client.runtime_api(), self.client.info().best_hash) {
Ok(initial) => initial,
Err(e) => {
let _ = sink.reject(jsonrpsee::core::Error::from(
sc_rpc_api::state::error::Error::Client(Box::new(e)),
));
return Ok(())
},
};

let mut previous = initial.clone();

let stream = self
.client
.import_notification_stream()
.filter(|n| futures::future::ready(n.is_new_best))
.filter_map(move |n| {
let new = f(&client.runtime_api(), n.hash);

match new {
Ok(new) if new != previous => {
previous = new.clone();
futures::future::ready(Some(new))
},
_ => futures::future::ready(None),
}
});

let stream = futures::stream::once(futures::future::ready(initial)).chain(stream);

let fut = async move {
sink.pipe_from_stream(stream).await;
};

self.executor.spawn("cf-rpc-subscription", Some("rpc"), fut.boxed());

Ok(())
}
}
2 changes: 2 additions & 0 deletions state-chain/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
let rpc_builder = {
let client = client.clone();
let pool = transaction_pool.clone();
let executor = Arc::new(task_manager.spawn_handle());

Box::new(move |deny_unsafe, subscription_executor| {
let build = || {
Expand Down Expand Up @@ -276,6 +277,7 @@ pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
module.merge(CustomApiServer::into_rpc(CustomRpc {
client: client.clone(),
_phantom: PhantomData,
executor: executor.clone(),
}))?;

Ok(module)
Expand Down
Loading

0 comments on commit 342b182

Please sign in to comment.