Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
dandanlen committed Nov 1, 2024
1 parent 99bc666 commit bb89c41
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 27 deletions.
52 changes: 28 additions & 24 deletions api/bin/chainflip-lp-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use custom_rpc::{
order_fills::{order_fills_from_block_updates, OrderFills},
CustomApiClient,
};
use futures::{try_join, FutureExt, StreamExt};
use futures::{try_join, FutureExt, StreamExt, TryStreamExt};
use jsonrpsee::{
core::{async_trait, ClientError},
core::{async_trait, ClientError, SubscriptionResult},
proc_macros::rpc,
server::ServerBuilder,
types::{ErrorCode, ErrorObject, ErrorObjectOwned},
Expand Down Expand Up @@ -199,7 +199,7 @@ pub trait Rpc {
) -> RpcResult<Hash>;

#[subscription(name = "subscribe_order_fills", item = BlockUpdate<OrderFills>)]
fn subscribe_order_fills(&self);
async fn subscribe_order_fills(&self) -> SubscriptionResult;

#[method(name = "order_fills")]
async fn order_fills(&self, at: Option<Hash>) -> RpcResult<BlockUpdate<OrderFills>>;
Expand Down Expand Up @@ -486,27 +486,31 @@ impl RpcServer for RpcServerImpl {
.await?)
}

fn subscribe_order_fills(&self, pending_sink: PendingSubscriptionSink) {
let state_chain_client = self.api.state_chain_client.clone();
tokio::spawn(async move {
if let Ok(sink) = pending_sink.accept().await {
let mut finalized_block_stream = state_chain_client.finalized_block_stream().await;
while let Some(block) = finalized_block_stream.next().await {
match order_fills(state_chain_client.clone(), block).await {
Ok(order_fills) => {
if sink
.send(sc_rpc::utils::to_sub_message(&sink, &order_fills))
.await
.is_err()
{
break;
}
},
Err(_) => break,
}
}
}
});
async fn subscribe_order_fills(
&self,
pending_sink: PendingSubscriptionSink,
) -> SubscriptionResult {
let sub = self
.api
.state_chain_client
.base_rpc_client
.raw_rpc_client
.cf_subscribe_lp_order_fills()
.await?
.take_while(|item| {
futures::future::ready(
item.inspect_err(|err| {
tracing::error!("Subsription error: {err}. Closing subscription.")
})
.is_ok(),
)
})
.map(Result::unwrap);

// TODO:
// - Check if we still need the order_fills definitions referenced in this file.

tokio::spawn(async move { sc_rpc::utils::pipe_from_stream(pending_sink, sub).await });
}

async fn order_fills(&self, at: Option<Hash>) -> RpcResult<BlockUpdate<OrderFills>> {
Expand Down
11 changes: 8 additions & 3 deletions state-chain/custom-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ pub trait CustomApi {

#[method(name = "max_swap_amount")]
fn cf_max_swap_amount(&self, asset: Asset) -> RpcResult<Option<AssetAmount>>;
#[subscription(name = "subscribe_pool_price", item = PoolPriceV1)]
#[subscription(name = "subscribe_pool_price", item = BlockUpdate<PoolPriceV1>)]
fn cf_subscribe_pool_price(&self, from_asset: Asset, to_asset: Asset);
#[subscription(name = "subscribe_pool_price_v2", item = BlockUpdate<PoolPriceV2>)]
fn cf_subscribe_pool_price_v2(&self, base_asset: Asset, quote_asset: Asset);
Expand Down Expand Up @@ -1824,12 +1824,17 @@ where
data: new_item,
}))
},
Err(error) if end_on_error => Some(Err(ErrorObjectOwned::from(error))),
Err(error) if end_on_error => {
log::warn!("Subscription Error: {error}. Closing subscription.");
Some(Err(ErrorObjectOwned::from(error)))
},
_ => None,
})
}
}),
);
)
.take_while(|item| futures::future::ready(item.is_ok()))
.map(Result::unwrap);

self.executor.spawn(
"cf-rpc-update-subscription",
Expand Down

0 comments on commit bb89c41

Please sign in to comment.