From 85a2689a2998647b106f53ab22cd7d80be97e6e0 Mon Sep 17 00:00:00 2001 From: Daniel Date: Fri, 1 Nov 2024 15:04:44 +0100 Subject: [PATCH] fix: subscription return types --- api/bin/chainflip-lp-api/src/main.rs | 42 +++++++++++-------- state-chain/custom-rpc/src/lib.rs | 62 ++++++++++++++++++---------- 2 files changed, 65 insertions(+), 39 deletions(-) diff --git a/api/bin/chainflip-lp-api/src/main.rs b/api/bin/chainflip-lp-api/src/main.rs index b1990c528a..16283a7c6c 100644 --- a/api/bin/chainflip-lp-api/src/main.rs +++ b/api/bin/chainflip-lp-api/src/main.rs @@ -28,7 +28,7 @@ use custom_rpc::{ }; use futures::{try_join, FutureExt, StreamExt}; use jsonrpsee::{ - core::{async_trait, ClientError}, + core::{async_trait, ClientError, SubscriptionResult}, proc_macros::rpc, server::ServerBuilder, types::{ErrorCode, ErrorObject, ErrorObjectOwned}, @@ -199,7 +199,7 @@ pub trait Rpc { ) -> RpcResult; #[subscription(name = "subscribe_order_fills", item = BlockUpdate)] - fn subscribe_order_fills(&self); + async fn subscribe_order_fills(&self) -> SubscriptionResult; #[method(name = "order_fills")] async fn order_fills(&self, at: Option) -> RpcResult>; @@ -486,27 +486,33 @@ impl RpcServer for RpcServerImpl { .await?) } - fn subscribe_order_fills(&self, pending_sink: PendingSubscriptionSink) { + async fn subscribe_order_fills( + &self, + pending_sink: PendingSubscriptionSink, + ) -> SubscriptionResult { let state_chain_client = self.api.state_chain_client.clone(); + let sink = pending_sink.accept().await.map_err(|e| e.to_string())?; tokio::spawn(async move { - if let Ok(sink) = pending_sink.accept().await { - let mut finalized_block_stream = state_chain_client.finalized_block_stream().await; - while let Some(block) = finalized_block_stream.next().await { - match order_fills(state_chain_client.clone(), block).await { - Ok(order_fills) => { - if sink - .send(sc_rpc::utils::to_sub_message(&sink, &order_fills)) - .await - .is_err() - { - break; - } - }, - Err(_) => break, - } + // Note we construct the subscription here rather than relying on the custom-rpc + // subscription. This is because we want to use finalized blocks. + // TODO: allow custom rpc subscriptions to use finalized blocks. + let mut finalized_block_stream = state_chain_client.finalized_block_stream().await; + while let Some(block) = finalized_block_stream.next().await { + match order_fills(state_chain_client.clone(), block).await { + Ok(order_fills) => { + if sink + .send(sc_rpc::utils::to_sub_message(&sink, &order_fills)) + .await + .is_err() + { + break; + } + }, + Err(_) => break, } } }); + Ok(()) } async fn order_fills(&self, at: Option) -> RpcResult> { diff --git a/state-chain/custom-rpc/src/lib.rs b/state-chain/custom-rpc/src/lib.rs index c0f6fe8305..a84e888a1f 100644 --- a/state-chain/custom-rpc/src/lib.rs +++ b/state-chain/custom-rpc/src/lib.rs @@ -19,6 +19,7 @@ use cf_primitives::{ use cf_utilities::rpc::NumberOrHex; use core::ops::Range; use jsonrpsee::{ + core::async_trait, proc_macros::rpc, types::{ error::{ErrorObject, ErrorObjectOwned}, @@ -834,21 +835,26 @@ pub trait CustomApi { #[method(name = "max_swap_amount")] fn cf_max_swap_amount(&self, asset: Asset) -> RpcResult>; - #[subscription(name = "subscribe_pool_price", item = PoolPriceV1)] - fn cf_subscribe_pool_price(&self, from_asset: Asset, to_asset: Asset); + #[subscription(name = "subscribe_pool_price", item = BlockUpdate)] + async fn cf_subscribe_pool_price(&self, from_asset: Asset, to_asset: Asset); #[subscription(name = "subscribe_pool_price_v2", item = BlockUpdate)] - fn cf_subscribe_pool_price_v2(&self, base_asset: Asset, quote_asset: Asset); + async fn cf_subscribe_pool_price_v2(&self, base_asset: Asset, quote_asset: Asset); #[subscription(name = "subscribe_prewitness_swaps", item = BlockUpdate)] - fn cf_subscribe_prewitness_swaps(&self, base_asset: Asset, quote_asset: Asset, side: Side); + async fn cf_subscribe_prewitness_swaps( + &self, + base_asset: Asset, + quote_asset: Asset, + side: Side, + ); // Subscribe to a stream that on every block produces a list of all scheduled/pending // swaps in the base_asset/quote_asset pool, including any "implicit" half-swaps (as a // part of a swap involving two pools) #[subscription(name = "subscribe_scheduled_swaps", item = BlockUpdate)] - fn cf_subscribe_scheduled_swaps(&self, base_asset: Asset, quote_asset: Asset); + async fn cf_subscribe_scheduled_swaps(&self, base_asset: Asset, quote_asset: Asset); #[subscription(name = "subscribe_lp_order_fills", item = BlockUpdate)] - fn cf_subscribe_lp_order_fills(&self); + async fn cf_subscribe_lp_order_fills(&self); #[method(name = "scheduled_swaps")] fn cf_scheduled_swaps( @@ -1139,6 +1145,7 @@ where } } +#[async_trait] impl CustomApiServer for CustomRpc where B: BlockT, @@ -1505,7 +1512,7 @@ where }) } - fn cf_subscribe_pool_price( + async fn cf_subscribe_pool_price( &self, pending_sink: PendingSubscriptionSink, from_asset: Asset, @@ -1519,9 +1526,10 @@ where Ok((*client.runtime_api()).cf_pool_price(hash, from_asset, to_asset)?) }, ) + .await } - fn cf_subscribe_pool_price_v2( + async fn cf_subscribe_pool_price_v2( &self, pending_sink: PendingSubscriptionSink, base_asset: Asset, @@ -1543,9 +1551,10 @@ where }) }, ) + .await } - fn cf_subscribe_scheduled_swaps( + async fn cf_subscribe_scheduled_swaps( &self, pending_sink: PendingSubscriptionSink, base_asset: Asset, @@ -1573,7 +1582,8 @@ where .collect(), }) }, - ); + ) + .await; } fn cf_scheduled_swaps( @@ -1598,7 +1608,7 @@ where }) } - fn cf_subscribe_prewitness_swaps( + async fn cf_subscribe_prewitness_swaps( &self, pending_sink: PendingSubscriptionSink, base_asset: Asset, @@ -1622,6 +1632,7 @@ where }) }, ) + .await } fn cf_prewitness_swaps( @@ -1645,7 +1656,7 @@ where }) } - fn cf_subscribe_lp_order_fills(&self, sink: PendingSubscriptionSink) { + async fn cf_subscribe_lp_order_fills(&self, sink: PendingSubscriptionSink) { self.new_subscription_with_state( false, /* only_on_changes */ true, /* end_on_error */ @@ -1670,6 +1681,7 @@ where RpcResult::Ok((fills, pools)) }, ) + .await } fn cf_supported_assets(&self) -> RpcResult> { @@ -1743,7 +1755,7 @@ where + BlockchainEvents, C::Api: CustomRuntimeApi, { - fn new_subscription< + async fn new_subscription< T: Serialize + Send + Clone + Eq + 'static, F: Fn(&C, state_chain_runtime::Hash) -> Result + Send + Clone + 'static, >( @@ -1759,13 +1771,14 @@ where sink, move |client, hash, _state| f(client, hash).map(|res| (res, ())), ) + .await } /// The subscription will return the first value immediately and then either return new values /// only when it changes, or every new block. Note in both cases this can skip blocks. Also this /// subscription can either filter out, or end the stream if the provided async closure returns /// an error. - fn new_subscription_with_state< + async fn new_subscription_with_state< T: Serialize + Send + Clone + Eq + 'static, // State to carry forward between calls to the closure. S: 'static + Clone + Send, @@ -1787,11 +1800,8 @@ where let (initial_item, initial_state) = match f(&self.client, info.best_hash, None) { Ok(initial) => initial, Err(e) => { - self.executor.spawn( - "cf-rpc-update-subscription", - Some("rpc"), - pending_sink.reject(e).boxed(), - ); + log::warn!(target: "cf-rpc", "Error in subscription initialization: {:?}", e); + pending_sink.reject(e).await; return; }, }; @@ -1824,12 +1834,22 @@ where data: new_item, })) }, - Err(error) if end_on_error => Some(Err(ErrorObjectOwned::from(error))), + Err(error) => { + log::warn!("Subscription Error: {error}."); + if end_on_error { + log::warn!("Closing Subscription."); + Some(Err(ErrorObjectOwned::from(error))) + } else { + None + } + }, _ => None, }) } }), - ); + ) + .take_while(|item| futures::future::ready(item.is_ok())) + .map(Result::unwrap); self.executor.spawn( "cf-rpc-update-subscription",