From bb89c41efa11bd5d7cf15a03bfb8ca3e8aff0d92 Mon Sep 17 00:00:00 2001 From: Daniel Date: Fri, 1 Nov 2024 11:21:12 +0100 Subject: [PATCH] WIP --- api/bin/chainflip-lp-api/src/main.rs | 52 +++++++++++++++------------- state-chain/custom-rpc/src/lib.rs | 11 ++++-- 2 files changed, 36 insertions(+), 27 deletions(-) diff --git a/api/bin/chainflip-lp-api/src/main.rs b/api/bin/chainflip-lp-api/src/main.rs index b1990c528a8..71d63be454f 100644 --- a/api/bin/chainflip-lp-api/src/main.rs +++ b/api/bin/chainflip-lp-api/src/main.rs @@ -26,9 +26,9 @@ use custom_rpc::{ order_fills::{order_fills_from_block_updates, OrderFills}, CustomApiClient, }; -use futures::{try_join, FutureExt, StreamExt}; +use futures::{try_join, FutureExt, StreamExt, TryStreamExt}; use jsonrpsee::{ - core::{async_trait, ClientError}, + core::{async_trait, ClientError, SubscriptionResult}, proc_macros::rpc, server::ServerBuilder, types::{ErrorCode, ErrorObject, ErrorObjectOwned}, @@ -199,7 +199,7 @@ pub trait Rpc { ) -> RpcResult; #[subscription(name = "subscribe_order_fills", item = BlockUpdate)] - fn subscribe_order_fills(&self); + async fn subscribe_order_fills(&self) -> SubscriptionResult; #[method(name = "order_fills")] async fn order_fills(&self, at: Option) -> RpcResult>; @@ -486,27 +486,31 @@ impl RpcServer for RpcServerImpl { .await?) } - fn subscribe_order_fills(&self, pending_sink: PendingSubscriptionSink) { - let state_chain_client = self.api.state_chain_client.clone(); - tokio::spawn(async move { - if let Ok(sink) = pending_sink.accept().await { - let mut finalized_block_stream = state_chain_client.finalized_block_stream().await; - while let Some(block) = finalized_block_stream.next().await { - match order_fills(state_chain_client.clone(), block).await { - Ok(order_fills) => { - if sink - .send(sc_rpc::utils::to_sub_message(&sink, &order_fills)) - .await - .is_err() - { - break; - } - }, - Err(_) => break, - } - } - } - }); + async fn subscribe_order_fills( + &self, + pending_sink: PendingSubscriptionSink, + ) -> SubscriptionResult { + let sub = self + .api + .state_chain_client + .base_rpc_client + .raw_rpc_client + .cf_subscribe_lp_order_fills() + .await? + .take_while(|item| { + futures::future::ready( + item.inspect_err(|err| { + tracing::error!("Subsription error: {err}. Closing subscription.") + }) + .is_ok(), + ) + }) + .map(Result::unwrap); + + // TODO: + // - Check if we still need the order_fills definitions referenced in this file. + + tokio::spawn(async move { sc_rpc::utils::pipe_from_stream(pending_sink, sub).await }); } async fn order_fills(&self, at: Option) -> RpcResult> { diff --git a/state-chain/custom-rpc/src/lib.rs b/state-chain/custom-rpc/src/lib.rs index c0f6fe8305b..5dd87c6882f 100644 --- a/state-chain/custom-rpc/src/lib.rs +++ b/state-chain/custom-rpc/src/lib.rs @@ -834,7 +834,7 @@ pub trait CustomApi { #[method(name = "max_swap_amount")] fn cf_max_swap_amount(&self, asset: Asset) -> RpcResult>; - #[subscription(name = "subscribe_pool_price", item = PoolPriceV1)] + #[subscription(name = "subscribe_pool_price", item = BlockUpdate)] fn cf_subscribe_pool_price(&self, from_asset: Asset, to_asset: Asset); #[subscription(name = "subscribe_pool_price_v2", item = BlockUpdate)] fn cf_subscribe_pool_price_v2(&self, base_asset: Asset, quote_asset: Asset); @@ -1824,12 +1824,17 @@ where data: new_item, })) }, - Err(error) if end_on_error => Some(Err(ErrorObjectOwned::from(error))), + Err(error) if end_on_error => { + log::warn!("Subscription Error: {error}. Closing subscription."); + Some(Err(ErrorObjectOwned::from(error))) + }, _ => None, }) } }), - ); + ) + .take_while(|item| futures::future::ready(item.is_ok())) + .map(Result::unwrap); self.executor.spawn( "cf-rpc-update-subscription",