Skip to content

Commit

Permalink
feat: always return a single leading block; allow new block subscript…
Browse files Browse the repository at this point in the history
…ions;
  • Loading branch information
dandanlen authored and nmammeri committed Dec 9, 2024
1 parent cfa06e4 commit 8144ba8
Showing 1 changed file with 48 additions and 25 deletions.
73 changes: 48 additions & 25 deletions state-chain/custom-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1632,9 +1632,9 @@ where
to_asset: Asset,
) {
self.new_subscription(
false, /* only_finalized */
true, /* only_on_changes */
false, /* end_on_error */
Default::default(), /* notification_behaviour */
true, /* only_on_changes */
false, /* end_on_error */
pending_sink,
move |client, hash| {
Ok((*client.runtime_api()).cf_pool_price(hash, from_asset, to_asset)?)
Expand All @@ -1650,9 +1650,9 @@ where
quote_asset: Asset,
) {
self.new_subscription(
false, /* only_finalized */
false, /* only_on_changes */
true, /* end_on_error */
Default::default(), /* notification_behaviour */
false, /* only_on_changes */
true, /* end_on_error */
pending_sink,
move |client, hash| {
Ok(PoolPriceV2 {
Expand Down Expand Up @@ -1686,9 +1686,9 @@ where
};

self.new_subscription(
false, /* only_finalized */
false, /* only_on_changes */
true, /* end_on_error */
Default::default(), /* notification_behaviour */
false, /* only_on_changes */
true, /* end_on_error */
pending_sink,
move |client, hash| {
Ok(SwapResponse {
Expand Down Expand Up @@ -1733,9 +1733,9 @@ where
side: Side,
) {
self.new_subscription(
false, /* only_finalized */
false, /* only_on_changes */
true, /* end_on_error */
Default::default(), /* notification_behaviour */
false, /* only_on_changes */
true, /* end_on_error */
pending_sink,
move |client, hash| {
Ok::<_, CfApiError>(RpcPrewitnessedSwap {
Expand Down Expand Up @@ -1776,9 +1776,9 @@ where

async fn cf_subscribe_lp_order_fills(&self, sink: PendingSubscriptionSink) {
self.new_subscription_with_state(
false, /* only_finalized */
false, /* only_on_changes */
true, /* end_on_error */
Default::default(), /* notification_behaviour */
false, /* only_on_changes */
true, /* end_on_error */
sink,
|client, hash, prev_pools| {
let pools = StorageQueryApi::new(client)
Expand Down Expand Up @@ -1904,6 +1904,19 @@ where
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum NotificationBehaviour {
/// Subscription will return finalized blocks.
Finalized,
/// Subscription will return best blocks. In the case of a re-org it might drop events.
#[default]
Best,
/// Subscription will return all new blocks. In the case of a re-org it might duplicate events.
///
/// The caller is responsible for de-duplicating events.
New,
}

impl<C, B, BE> CustomRpc<C, B, BE>
where
B: BlockT<Hash = state_chain_runtime::Hash, Header = state_chain_runtime::Header>,
Expand Down Expand Up @@ -1937,14 +1950,14 @@ where
F: Fn(&C, state_chain_runtime::Hash) -> Result<T, CfApiError> + Send + Clone + 'static,
>(
&self,
only_finalized: bool,
notification_behaviour: NotificationBehaviour,
only_on_changes: bool,
end_on_error: bool,
sink: PendingSubscriptionSink,
f: F,
) {
self.new_subscription_with_state(
only_finalized,
notification_behaviour,
only_on_changes,
end_on_error,
sink,
Expand All @@ -1967,7 +1980,7 @@ where
+ 'static,
>(
&self,
only_finalized: bool,
notification_behaviour: NotificationBehaviour,
only_on_changes: bool,
end_on_error: bool,
pending_sink: PendingSubscriptionSink,
Expand All @@ -1985,7 +1998,7 @@ where

// construct either best or finalized blocks stream from the chain head subscription
let blocks_stream = stream::unfold(subscription, move |mut sub| async move {
match sub.next().await {
match sub.next::<FollowEvent<Hash>>().await {
Some(Ok((event, _subs_id))) => Some((event, sub)),
Some(Err(e)) => {
log::warn!("ChainHead subscription error {:?}", e);
Expand All @@ -1995,27 +2008,37 @@ where
}
})
.filter_map(move |event| async move {
match (only_finalized, event) {
match (notification_behaviour, event) {
(
true,
// Always return the most recent finalized block hash as the first event.
// See: https://paritytech.github.io/json-rpc-interface-spec/api/chainHead_v1_follow.html
_,
FollowEvent::Initialized(sc_rpc_spec_v2::chain_head::Initialized {
finalized_block_hashes,
mut finalized_block_hashes,
..
}),
) => Some(finalized_block_hashes),
) => Some(vec![finalized_block_hashes
.pop()
.expect("Guaranteed to have at least one element.")]),
(
true,
NotificationBehaviour::Finalized,
FollowEvent::Finalized(sc_rpc_spec_v2::chain_head::Finalized {
finalized_block_hashes,
..
}),
) => Some(finalized_block_hashes),
(
false,
NotificationBehaviour::Best,
FollowEvent::BestBlockChanged(sc_rpc_spec_v2::chain_head::BestBlockChanged {
best_block_hash,
}),
) => Some(vec![best_block_hash]),
(
NotificationBehaviour::New,
FollowEvent::NewBlock(sc_rpc_spec_v2::chain_head::NewBlock {
block_hash, ..
}),
) => Some(vec![block_hash]),
_ => None,
}
})
Expand Down

0 comments on commit 8144ba8

Please sign in to comment.