Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: rpc subscription return types #5374

Merged
merged 1 commit into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 24 additions & 18 deletions api/bin/chainflip-lp-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use custom_rpc::{
};
use futures::{try_join, FutureExt, StreamExt};
use jsonrpsee::{
core::{async_trait, ClientError},
core::{async_trait, ClientError, SubscriptionResult},
proc_macros::rpc,
server::ServerBuilder,
types::{ErrorCode, ErrorObject, ErrorObjectOwned},
Expand Down Expand Up @@ -199,7 +199,7 @@ pub trait Rpc {
) -> RpcResult<Hash>;

#[subscription(name = "subscribe_order_fills", item = BlockUpdate<OrderFills>)]
fn subscribe_order_fills(&self);
async fn subscribe_order_fills(&self) -> SubscriptionResult;

#[method(name = "order_fills")]
async fn order_fills(&self, at: Option<Hash>) -> RpcResult<BlockUpdate<OrderFills>>;
Expand Down Expand Up @@ -486,27 +486,33 @@ impl RpcServer for RpcServerImpl {
.await?)
}

fn subscribe_order_fills(&self, pending_sink: PendingSubscriptionSink) {
async fn subscribe_order_fills(
&self,
pending_sink: PendingSubscriptionSink,
) -> SubscriptionResult {
let state_chain_client = self.api.state_chain_client.clone();
let sink = pending_sink.accept().await.map_err(|e| e.to_string())?;
tokio::spawn(async move {
if let Ok(sink) = pending_sink.accept().await {
let mut finalized_block_stream = state_chain_client.finalized_block_stream().await;
while let Some(block) = finalized_block_stream.next().await {
match order_fills(state_chain_client.clone(), block).await {
Ok(order_fills) => {
if sink
.send(sc_rpc::utils::to_sub_message(&sink, &order_fills))
.await
.is_err()
{
break;
}
},
Err(_) => break,
}
// Note we construct the subscription here rather than relying on the custom-rpc
// subscription. This is because we want to use finalized blocks.
// TODO: allow custom rpc subscriptions to use finalized blocks.
let mut finalized_block_stream = state_chain_client.finalized_block_stream().await;
while let Some(block) = finalized_block_stream.next().await {
match order_fills(state_chain_client.clone(), block).await {
Ok(order_fills) => {
if sink
.send(sc_rpc::utils::to_sub_message(&sink, &order_fills))
.await
.is_err()
{
break;
}
},
Err(_) => break,
}
}
});
Ok(())
}

async fn order_fills(&self, at: Option<Hash>) -> RpcResult<BlockUpdate<OrderFills>> {
Expand Down
62 changes: 41 additions & 21 deletions state-chain/custom-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use cf_primitives::{
use cf_utilities::rpc::NumberOrHex;
use core::ops::Range;
use jsonrpsee::{
core::async_trait,
proc_macros::rpc,
types::{
error::{ErrorObject, ErrorObjectOwned},
Expand Down Expand Up @@ -834,21 +835,26 @@ pub trait CustomApi {

#[method(name = "max_swap_amount")]
fn cf_max_swap_amount(&self, asset: Asset) -> RpcResult<Option<AssetAmount>>;
#[subscription(name = "subscribe_pool_price", item = PoolPriceV1)]
fn cf_subscribe_pool_price(&self, from_asset: Asset, to_asset: Asset);
#[subscription(name = "subscribe_pool_price", item = BlockUpdate<PoolPriceV1>)]
async fn cf_subscribe_pool_price(&self, from_asset: Asset, to_asset: Asset);
#[subscription(name = "subscribe_pool_price_v2", item = BlockUpdate<PoolPriceV2>)]
fn cf_subscribe_pool_price_v2(&self, base_asset: Asset, quote_asset: Asset);
async fn cf_subscribe_pool_price_v2(&self, base_asset: Asset, quote_asset: Asset);
#[subscription(name = "subscribe_prewitness_swaps", item = BlockUpdate<RpcPrewitnessedSwap>)]
fn cf_subscribe_prewitness_swaps(&self, base_asset: Asset, quote_asset: Asset, side: Side);
async fn cf_subscribe_prewitness_swaps(
&self,
base_asset: Asset,
quote_asset: Asset,
side: Side,
);

// Subscribe to a stream that on every block produces a list of all scheduled/pending
// swaps in the base_asset/quote_asset pool, including any "implicit" half-swaps (as a
// part of a swap involving two pools)
#[subscription(name = "subscribe_scheduled_swaps", item = BlockUpdate<SwapResponse>)]
fn cf_subscribe_scheduled_swaps(&self, base_asset: Asset, quote_asset: Asset);
async fn cf_subscribe_scheduled_swaps(&self, base_asset: Asset, quote_asset: Asset);

#[subscription(name = "subscribe_lp_order_fills", item = BlockUpdate<OrderFills>)]
fn cf_subscribe_lp_order_fills(&self);
async fn cf_subscribe_lp_order_fills(&self);

#[method(name = "scheduled_swaps")]
fn cf_scheduled_swaps(
Expand Down Expand Up @@ -1139,6 +1145,7 @@ where
}
}

#[async_trait]
impl<C, B> CustomApiServer for CustomRpc<C, B>
where
B: BlockT<Hash = state_chain_runtime::Hash, Header = state_chain_runtime::Header>,
Expand Down Expand Up @@ -1505,7 +1512,7 @@ where
})
}

fn cf_subscribe_pool_price(
async fn cf_subscribe_pool_price(
&self,
pending_sink: PendingSubscriptionSink,
from_asset: Asset,
Expand All @@ -1519,9 +1526,10 @@ where
Ok((*client.runtime_api()).cf_pool_price(hash, from_asset, to_asset)?)
},
)
.await
}

fn cf_subscribe_pool_price_v2(
async fn cf_subscribe_pool_price_v2(
&self,
pending_sink: PendingSubscriptionSink,
base_asset: Asset,
Expand All @@ -1543,9 +1551,10 @@ where
})
},
)
.await
}

fn cf_subscribe_scheduled_swaps(
async fn cf_subscribe_scheduled_swaps(
&self,
pending_sink: PendingSubscriptionSink,
base_asset: Asset,
Expand Down Expand Up @@ -1573,7 +1582,8 @@ where
.collect(),
})
},
);
)
.await;
}

fn cf_scheduled_swaps(
Expand All @@ -1598,7 +1608,7 @@ where
})
}

fn cf_subscribe_prewitness_swaps(
async fn cf_subscribe_prewitness_swaps(
&self,
pending_sink: PendingSubscriptionSink,
base_asset: Asset,
Expand All @@ -1622,6 +1632,7 @@ where
})
},
)
.await
}

fn cf_prewitness_swaps(
Expand All @@ -1645,7 +1656,7 @@ where
})
}

fn cf_subscribe_lp_order_fills(&self, sink: PendingSubscriptionSink) {
async fn cf_subscribe_lp_order_fills(&self, sink: PendingSubscriptionSink) {
self.new_subscription_with_state(
false, /* only_on_changes */
true, /* end_on_error */
Expand All @@ -1670,6 +1681,7 @@ where
RpcResult::Ok((fills, pools))
},
)
.await
}

fn cf_supported_assets(&self) -> RpcResult<Vec<Asset>> {
Expand Down Expand Up @@ -1743,7 +1755,7 @@ where
+ BlockchainEvents<B>,
C::Api: CustomRuntimeApi<B>,
{
fn new_subscription<
async fn new_subscription<
T: Serialize + Send + Clone + Eq + 'static,
F: Fn(&C, state_chain_runtime::Hash) -> Result<T, CfApiError> + Send + Clone + 'static,
>(
Expand All @@ -1759,13 +1771,14 @@ where
sink,
move |client, hash, _state| f(client, hash).map(|res| (res, ())),
)
.await
}

/// The subscription will return the first value immediately and then either return new values
/// only when it changes, or every new block. Note in both cases this can skip blocks. Also this
/// subscription can either filter out, or end the stream if the provided async closure returns
/// an error.
fn new_subscription_with_state<
async fn new_subscription_with_state<
T: Serialize + Send + Clone + Eq + 'static,
// State to carry forward between calls to the closure.
S: 'static + Clone + Send,
Expand All @@ -1787,11 +1800,8 @@ where
let (initial_item, initial_state) = match f(&self.client, info.best_hash, None) {
Ok(initial) => initial,
Err(e) => {
self.executor.spawn(
"cf-rpc-update-subscription",
Some("rpc"),
pending_sink.reject(e).boxed(),
);
log::warn!(target: "cf-rpc", "Error in subscription initialization: {:?}", e);
pending_sink.reject(e).await;
return;
},
};
Expand Down Expand Up @@ -1824,12 +1834,22 @@ where
data: new_item,
}))
},
Err(error) if end_on_error => Some(Err(ErrorObjectOwned::from(error))),
Err(error) => {
log::warn!("Subscription Error: {error}.");
if end_on_error {
log::warn!("Closing Subscription.");
Some(Err(ErrorObjectOwned::from(error)))
} else {
None
}
},
_ => None,
})
}
}),
);
)
.take_while(|item| futures::future::ready(item.is_ok()))
.map(Result::unwrap);

self.executor.spawn(
"cf-rpc-update-subscription",
Expand Down
Loading