Skip to content

Commit

Permalink
fix: rpc subscription return types (#5374)
Browse files Browse the repository at this point in the history
fix: subscription return types
  • Loading branch information
dandanlen authored Nov 1, 2024
1 parent ade8c30 commit 31e8fa8
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 39 deletions.
42 changes: 24 additions & 18 deletions api/bin/chainflip-lp-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use custom_rpc::{
};
use futures::{try_join, FutureExt, StreamExt};
use jsonrpsee::{
core::{async_trait, ClientError},
core::{async_trait, ClientError, SubscriptionResult},
proc_macros::rpc,
server::ServerBuilder,
types::{ErrorCode, ErrorObject, ErrorObjectOwned},
Expand Down Expand Up @@ -199,7 +199,7 @@ pub trait Rpc {
) -> RpcResult<Hash>;

#[subscription(name = "subscribe_order_fills", item = BlockUpdate<OrderFills>)]
fn subscribe_order_fills(&self);
async fn subscribe_order_fills(&self) -> SubscriptionResult;

#[method(name = "order_fills")]
async fn order_fills(&self, at: Option<Hash>) -> RpcResult<BlockUpdate<OrderFills>>;
Expand Down Expand Up @@ -486,27 +486,33 @@ impl RpcServer for RpcServerImpl {
.await?)
}

fn subscribe_order_fills(&self, pending_sink: PendingSubscriptionSink) {
async fn subscribe_order_fills(
&self,
pending_sink: PendingSubscriptionSink,
) -> SubscriptionResult {
let state_chain_client = self.api.state_chain_client.clone();
let sink = pending_sink.accept().await.map_err(|e| e.to_string())?;
tokio::spawn(async move {
if let Ok(sink) = pending_sink.accept().await {
let mut finalized_block_stream = state_chain_client.finalized_block_stream().await;
while let Some(block) = finalized_block_stream.next().await {
match order_fills(state_chain_client.clone(), block).await {
Ok(order_fills) => {
if sink
.send(sc_rpc::utils::to_sub_message(&sink, &order_fills))
.await
.is_err()
{
break;
}
},
Err(_) => break,
}
// Note we construct the subscription here rather than relying on the custom-rpc
// subscription. This is because we want to use finalized blocks.
// TODO: allow custom rpc subscriptions to use finalized blocks.
let mut finalized_block_stream = state_chain_client.finalized_block_stream().await;
while let Some(block) = finalized_block_stream.next().await {
match order_fills(state_chain_client.clone(), block).await {
Ok(order_fills) => {
if sink
.send(sc_rpc::utils::to_sub_message(&sink, &order_fills))
.await
.is_err()
{
break;
}
},
Err(_) => break,
}
}
});
Ok(())
}

async fn order_fills(&self, at: Option<Hash>) -> RpcResult<BlockUpdate<OrderFills>> {
Expand Down
62 changes: 41 additions & 21 deletions state-chain/custom-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use cf_primitives::{
use cf_utilities::rpc::NumberOrHex;
use core::ops::Range;
use jsonrpsee::{
core::async_trait,
proc_macros::rpc,
types::{
error::{ErrorObject, ErrorObjectOwned},
Expand Down Expand Up @@ -834,21 +835,26 @@ pub trait CustomApi {

#[method(name = "max_swap_amount")]
fn cf_max_swap_amount(&self, asset: Asset) -> RpcResult<Option<AssetAmount>>;
#[subscription(name = "subscribe_pool_price", item = PoolPriceV1)]
fn cf_subscribe_pool_price(&self, from_asset: Asset, to_asset: Asset);
#[subscription(name = "subscribe_pool_price", item = BlockUpdate<PoolPriceV1>)]
async fn cf_subscribe_pool_price(&self, from_asset: Asset, to_asset: Asset);
#[subscription(name = "subscribe_pool_price_v2", item = BlockUpdate<PoolPriceV2>)]
fn cf_subscribe_pool_price_v2(&self, base_asset: Asset, quote_asset: Asset);
async fn cf_subscribe_pool_price_v2(&self, base_asset: Asset, quote_asset: Asset);
#[subscription(name = "subscribe_prewitness_swaps", item = BlockUpdate<RpcPrewitnessedSwap>)]
fn cf_subscribe_prewitness_swaps(&self, base_asset: Asset, quote_asset: Asset, side: Side);
async fn cf_subscribe_prewitness_swaps(
&self,
base_asset: Asset,
quote_asset: Asset,
side: Side,
);

// Subscribe to a stream that on every block produces a list of all scheduled/pending
// swaps in the base_asset/quote_asset pool, including any "implicit" half-swaps (as a
// part of a swap involving two pools)
#[subscription(name = "subscribe_scheduled_swaps", item = BlockUpdate<SwapResponse>)]
fn cf_subscribe_scheduled_swaps(&self, base_asset: Asset, quote_asset: Asset);
async fn cf_subscribe_scheduled_swaps(&self, base_asset: Asset, quote_asset: Asset);

#[subscription(name = "subscribe_lp_order_fills", item = BlockUpdate<OrderFills>)]
fn cf_subscribe_lp_order_fills(&self);
async fn cf_subscribe_lp_order_fills(&self);

#[method(name = "scheduled_swaps")]
fn cf_scheduled_swaps(
Expand Down Expand Up @@ -1139,6 +1145,7 @@ where
}
}

#[async_trait]
impl<C, B> CustomApiServer for CustomRpc<C, B>
where
B: BlockT<Hash = state_chain_runtime::Hash, Header = state_chain_runtime::Header>,
Expand Down Expand Up @@ -1505,7 +1512,7 @@ where
})
}

fn cf_subscribe_pool_price(
async fn cf_subscribe_pool_price(
&self,
pending_sink: PendingSubscriptionSink,
from_asset: Asset,
Expand All @@ -1519,9 +1526,10 @@ where
Ok((*client.runtime_api()).cf_pool_price(hash, from_asset, to_asset)?)
},
)
.await
}

fn cf_subscribe_pool_price_v2(
async fn cf_subscribe_pool_price_v2(
&self,
pending_sink: PendingSubscriptionSink,
base_asset: Asset,
Expand All @@ -1543,9 +1551,10 @@ where
})
},
)
.await
}

fn cf_subscribe_scheduled_swaps(
async fn cf_subscribe_scheduled_swaps(
&self,
pending_sink: PendingSubscriptionSink,
base_asset: Asset,
Expand Down Expand Up @@ -1573,7 +1582,8 @@ where
.collect(),
})
},
);
)
.await;
}

fn cf_scheduled_swaps(
Expand All @@ -1598,7 +1608,7 @@ where
})
}

fn cf_subscribe_prewitness_swaps(
async fn cf_subscribe_prewitness_swaps(
&self,
pending_sink: PendingSubscriptionSink,
base_asset: Asset,
Expand All @@ -1622,6 +1632,7 @@ where
})
},
)
.await
}

fn cf_prewitness_swaps(
Expand All @@ -1645,7 +1656,7 @@ where
})
}

fn cf_subscribe_lp_order_fills(&self, sink: PendingSubscriptionSink) {
async fn cf_subscribe_lp_order_fills(&self, sink: PendingSubscriptionSink) {
self.new_subscription_with_state(
false, /* only_on_changes */
true, /* end_on_error */
Expand All @@ -1670,6 +1681,7 @@ where
RpcResult::Ok((fills, pools))
},
)
.await
}

fn cf_supported_assets(&self) -> RpcResult<Vec<Asset>> {
Expand Down Expand Up @@ -1743,7 +1755,7 @@ where
+ BlockchainEvents<B>,
C::Api: CustomRuntimeApi<B>,
{
fn new_subscription<
async fn new_subscription<
T: Serialize + Send + Clone + Eq + 'static,
F: Fn(&C, state_chain_runtime::Hash) -> Result<T, CfApiError> + Send + Clone + 'static,
>(
Expand All @@ -1759,13 +1771,14 @@ where
sink,
move |client, hash, _state| f(client, hash).map(|res| (res, ())),
)
.await
}

/// The subscription will return the first value immediately and then either return new values
/// only when it changes, or every new block. Note in both cases this can skip blocks. Also this
/// subscription can either filter out, or end the stream if the provided async closure returns
/// an error.
fn new_subscription_with_state<
async fn new_subscription_with_state<
T: Serialize + Send + Clone + Eq + 'static,
// State to carry forward between calls to the closure.
S: 'static + Clone + Send,
Expand All @@ -1787,11 +1800,8 @@ where
let (initial_item, initial_state) = match f(&self.client, info.best_hash, None) {
Ok(initial) => initial,
Err(e) => {
self.executor.spawn(
"cf-rpc-update-subscription",
Some("rpc"),
pending_sink.reject(e).boxed(),
);
log::warn!(target: "cf-rpc", "Error in subscription initialization: {:?}", e);
pending_sink.reject(e).await;
return;
},
};
Expand Down Expand Up @@ -1824,12 +1834,22 @@ where
data: new_item,
}))
},
Err(error) if end_on_error => Some(Err(ErrorObjectOwned::from(error))),
Err(error) => {
log::warn!("Subscription Error: {error}.");
if end_on_error {
log::warn!("Closing Subscription.");
Some(Err(ErrorObjectOwned::from(error)))
} else {
None
}
},
_ => None,
})
}
}),
);
)
.take_while(|item| futures::future::ready(item.is_ok()))
.map(Result::unwrap);

self.executor.spawn(
"cf-rpc-update-subscription",
Expand Down

0 comments on commit 31e8fa8

Please sign in to comment.