diff --git a/Cargo.lock b/Cargo.lock index c1ac28bbd8..bc04ab4595 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2571,6 +2571,7 @@ dependencies = [ "sc-client-api", "sc-rpc", "sc-rpc-api", + "sc-rpc-spec-v2", "serde", "serde_json", "sp-api", @@ -7611,7 +7612,7 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56" dependencies = [ - "proc-macro-crate 1.1.3", + "proc-macro-crate 3.2.0", "proc-macro2", "quote", "syn 2.0.89", @@ -8462,9 +8463,9 @@ dependencies = [ [[package]] name = "parity-scale-codec" -version = "3.6.12" +version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "306800abfa29c7f16596b5970a588435e3d5b3149683d00c12b699cc19f895ee" +checksum = "8be4817d39f3272f69c59fe05d0535ae6456c2dc2fa1ba02910296c7e0a5c590" dependencies = [ "arrayvec 0.7.6", "bitvec", @@ -8472,19 +8473,20 @@ dependencies = [ "bytes", "impl-trait-for-tuples", "parity-scale-codec-derive", + "rustversion", "serde", ] [[package]] name = "parity-scale-codec-derive" -version = "3.6.12" +version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d830939c76d294956402033aee57a6da7b438f2294eb94864c37b0569053a42c" +checksum = "8781a75c6205af67215f382092b6e0a4ff3734798523e69073d4bcd294ec767b" dependencies = [ "proc-macro-crate 3.2.0", "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.89", ] [[package]] diff --git a/state-chain/custom-rpc/Cargo.toml b/state-chain/custom-rpc/Cargo.toml index 66d10ba3a1..e766dacf0e 100644 --- a/state-chain/custom-rpc/Cargo.toml +++ b/state-chain/custom-rpc/Cargo.toml @@ -36,6 +36,7 @@ sp-core = { workspace = true, default-features = true } sp-rpc = { workspace = true, default-features = true } sc-rpc = { workspace = true, default-features = true } sc-rpc-api = { workspace = true, default-features = true } +sc-rpc-spec-v2 = { workspace = true, default-features = true } sp-runtime = { workspace = true, default-features = true } sp-state-machine = { workspace = true, default-features = true } sc-client-api = { workspace = true, default-features = true } diff --git a/state-chain/custom-rpc/src/lib.rs b/state-chain/custom-rpc/src/lib.rs index d8cda371f8..816a37f9ed 100644 --- a/state-chain/custom-rpc/src/lib.rs +++ b/state-chain/custom-rpc/src/lib.rs @@ -20,6 +20,7 @@ use cf_primitives::{ }; use cf_utilities::rpc::NumberOrHex; use core::ops::Range; +use futures::{stream, stream::StreamExt, FutureExt}; use jsonrpsee::{ core::async_trait, proc_macros::rpc, @@ -27,7 +28,7 @@ use jsonrpsee::{ error::{ErrorObject, ErrorObjectOwned}, ErrorCode, }, - PendingSubscriptionSink, + PendingSubscriptionSink, RpcModule, }; use order_fills::OrderFills; use pallet_cf_governance::GovCallHash; @@ -36,7 +37,13 @@ use pallet_cf_pools::{ UnidirectionalPoolDepth, }; use pallet_cf_swapping::SwapLegInfo; -use sc_client_api::{BlockchainEvents, HeaderBackend}; +use sc_client_api::{ + blockchain::HeaderMetadata, Backend, BlockBackend, BlockchainEvents, ExecutorProvider, + HeaderBackend, StorageProvider, +}; +use sc_rpc_spec_v2::chain_head::{ + api::ChainHeadApiServer, ChainHead, ChainHeadConfig, FollowEvent, +}; use serde::{Deserialize, Serialize}; use sp_api::{ApiError, ApiExt, CallApiAt}; use sp_core::U256; @@ -1027,13 +1034,14 @@ pub trait CustomApi { } /// An RPC extension for the state chain node. -pub struct CustomRpc { +pub struct CustomRpc { pub client: Arc, + pub backend: Arc, pub executor: Arc, pub _phantom: PhantomData, } -impl CustomRpc +impl CustomRpc where B: BlockT, C: Send + Sync + 'static + HeaderBackend, @@ -1043,7 +1051,7 @@ where } } -impl CustomRpc +impl CustomRpc where B: BlockT, C: Send + Sync + 'static + HeaderBackend + sp_api::ProvideRuntimeApi, @@ -1220,16 +1228,22 @@ where } #[async_trait] -impl CustomApiServer for CustomRpc +impl CustomApiServer for CustomRpc where B: BlockT, + B::Header: Unpin, + BE: Backend + Send + Sync + 'static, C: sp_api::ProvideRuntimeApi + Send + Sync + 'static + + BlockBackend + + ExecutorProvider + HeaderBackend + + HeaderMetadata + BlockchainEvents - + CallApiAt, + + CallApiAt + + StorageProvider, C::Api: CustomRuntimeApi + ElectoralRuntimeApi, { pass_through! { @@ -1618,8 +1632,9 @@ where to_asset: Asset, ) { self.new_subscription( - true, /* only_on_changes */ - false, /* end_on_error */ + Default::default(), /* notification_behaviour */ + true, /* only_on_changes */ + false, /* end_on_error */ pending_sink, move |client, hash| { Ok((*client.runtime_api()).cf_pool_price(hash, from_asset, to_asset)?) @@ -1635,8 +1650,9 @@ where quote_asset: Asset, ) { self.new_subscription( - false, /* only_on_changes */ - true, /* end_on_error */ + Default::default(), /* notification_behaviour */ + false, /* only_on_changes */ + true, /* end_on_error */ pending_sink, move |client, hash| { Ok(PoolPriceV2 { @@ -1665,12 +1681,14 @@ where base_asset, quote_asset, ) else { + pending_sink.reject(call_error("requested pool does not exist")).await; return; }; self.new_subscription( - false, /* only_on_changes */ - true, /* end_on_error */ + Default::default(), /* notification_behaviour */ + false, /* only_on_changes */ + true, /* end_on_error */ pending_sink, move |client, hash| { Ok(SwapResponse { @@ -1715,8 +1733,9 @@ where side: Side, ) { self.new_subscription( - false, /* only_on_changes */ - true, /* end_on_error */ + Default::default(), /* notification_behaviour */ + false, /* only_on_changes */ + true, /* end_on_error */ pending_sink, move |client, hash| { Ok::<_, CfApiError>(RpcPrewitnessedSwap { @@ -1757,8 +1776,9 @@ where async fn cf_subscribe_lp_order_fills(&self, sink: PendingSubscriptionSink) { self.new_subscription_with_state( - false, /* only_on_changes */ - true, /* end_on_error */ + Default::default(), /* notification_behaviour */ + false, /* only_on_changes */ + true, /* end_on_error */ sink, |client, hash, prev_pools| { let pools = StorageQueryApi::new(client) @@ -1884,28 +1904,60 @@ where } } -impl CustomRpc +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum NotificationBehaviour { + /// Subscription will return finalized blocks. + Finalized, + /// Subscription will return best blocks. In the case of a re-org it might drop events. + #[default] + Best, + /// Subscription will return all new blocks. In the case of a re-org it might duplicate events. + /// + /// The caller is responsible for de-duplicating events. + New, +} + +impl CustomRpc where B: BlockT, + B::Header: Unpin, + BE: Send + Sync + 'static + Backend, C: sp_api::ProvideRuntimeApi + Send + Sync + 'static + + BlockBackend + + ExecutorProvider + HeaderBackend - + BlockchainEvents, + + HeaderMetadata + + BlockchainEvents + + CallApiAt + + StorageProvider, C::Api: CustomRuntimeApi, { + fn chain_head_api(&self) -> RpcModule> { + ChainHead::new( + self.client.clone(), + self.backend.clone(), + self.executor.clone(), + ChainHeadConfig::default(), + ) + .into_rpc() + } + async fn new_subscription< T: Serialize + Send + Clone + Eq + 'static, F: Fn(&C, state_chain_runtime::Hash) -> Result + Send + Clone + 'static, >( &self, + notification_behaviour: NotificationBehaviour, only_on_changes: bool, end_on_error: bool, sink: PendingSubscriptionSink, f: F, ) { self.new_subscription_with_state( + notification_behaviour, only_on_changes, end_on_error, sink, @@ -1915,7 +1967,8 @@ where } /// The subscription will return the first value immediately and then either return new values - /// only when it changes, or every new block. Note in both cases this can skip blocks. Also this + /// only when it changes, or every new block. + /// Note depending on the notification_behaviour blocks can be skipped. Also this /// subscription can either filter out, or end the stream if the provided async closure returns /// an error. async fn new_subscription_with_state< @@ -1928,68 +1981,121 @@ where + 'static, >( &self, + notification_behaviour: NotificationBehaviour, only_on_changes: bool, end_on_error: bool, pending_sink: PendingSubscriptionSink, f: F, ) { - use futures::{stream::StreamExt, FutureExt}; + // subscribe to the chain head + let Ok(subscription) = + self.chain_head_api().subscribe_unbounded("chainHead_v1_follow", [false]).await + else { + pending_sink + .reject(internal_error("chainHead_v1_follow subscription failed")) + .await; + return; + }; - let info = self.client.info(); + // construct either best, new or finalized blocks stream from the chain head subscription + let blocks_stream = stream::unfold(subscription, move |mut sub| async move { + match sub.next::>().await { + Some(Ok((event, _subs_id))) => Some((event, sub)), + Some(Err(e)) => { + log::warn!("ChainHead subscription error {:?}", e); + None + }, + _ => None, + } + }) + .filter_map(move |event| async move { + // When NotificationBehaviour is: + // * NotificationBehaviour::Finalized: listen to initialized and finalized events + // * NotificationBehaviour::Best: listen to just bestBlockChanged events + // * NotificationBehaviour::New: listen to just newBlock events + // See: https://paritytech.github.io/json-rpc-interface-spec/api/chainHead_v1_follow.html + match (notification_behaviour, event) { + ( + // Always start from the most recent finalized block hash + NotificationBehaviour::Finalized, + FollowEvent::Initialized(sc_rpc_spec_v2::chain_head::Initialized { + mut finalized_block_hashes, + .. + }), + ) => Some(vec![finalized_block_hashes + .pop() + .expect("Guaranteed to have at least one element.")]), + ( + NotificationBehaviour::Finalized, + FollowEvent::Finalized(sc_rpc_spec_v2::chain_head::Finalized { + finalized_block_hashes, + .. + }), + ) => Some(finalized_block_hashes), + ( + NotificationBehaviour::Best, + FollowEvent::BestBlockChanged(sc_rpc_spec_v2::chain_head::BestBlockChanged { + best_block_hash, + }), + ) => Some(vec![best_block_hash]), + ( + NotificationBehaviour::New, + FollowEvent::NewBlock(sc_rpc_spec_v2::chain_head::NewBlock { + block_hash, .. + }), + ) => Some(vec![block_hash]), + _ => None, + } + }) + .map(stream::iter) + .flatten(); - let (initial_item, initial_state) = match f(&self.client, info.best_hash, None) { - Ok(initial) => initial, - Err(e) => { - log::warn!(target: "cf-rpc", "Error in subscription initialization: {:?}", e); - pending_sink.reject(e).await; - return; - }, - }; + let stream = blocks_stream + .filter_map({ + let client = self.client.clone(); + + let mut previous_item = None; + let mut previous_state = None; + + move |hash| { + futures::future::ready(match f(&client, hash, previous_state.as_ref()) { + Ok((new_item, new_state)) + if !only_on_changes || Some(&new_item) != previous_item.as_ref() => + { + previous_item = Some(new_item.clone()); + previous_state = Some(new_state); - let stream = futures::stream::iter(std::iter::once(Ok(BlockUpdate { - block_hash: info.best_hash, - block_number: info.best_number, - data: initial_item.clone(), - }))) - .chain( - self.client - .import_notification_stream() - .filter(|n| futures::future::ready(n.is_new_best)) - .filter_map({ - let client = self.client.clone(); - - let mut previous_item = initial_item; - let mut previous_state = initial_state; - - move |n| { - futures::future::ready(match f(&client, n.hash, Some(&previous_state)) { - Ok((new_item, new_state)) - if !only_on_changes || new_item != previous_item => - { - previous_item = new_item.clone(); - previous_state = new_state; + if let Ok(Some(header)) = client.header(hash) { Some(Ok(BlockUpdate { - block_hash: n.hash, - block_number: *n.header.number(), + block_hash: hash, + block_number: *header.number(), data: new_item, })) - }, - Err(error) => { - log::warn!("Subscription Error: {error}."); - if end_on_error { - log::warn!("Closing Subscription."); - Some(Err(ErrorObjectOwned::from(error))) - } else { - None - } - }, - _ => None, - }) - } - }), - ) - .take_while(|item| futures::future::ready(item.is_ok())) - .map(Result::unwrap); + } else if end_on_error { + Some(Err(internal_error(format!( + "Could not fetch block header for block {:?}", + hash + )))) + } else { + None + } + }, + Err(error) => { + log::warn!("Subscription Error: {error}."); + if end_on_error { + log::warn!("Closing Subscription."); + Some(Err(ErrorObjectOwned::from(error))) + } else { + None + } + }, + _ => None, + }) + } + }) + .take_while(|item| futures::future::ready(item.is_ok())) + .map(Result::unwrap) + .boxed(); self.executor.spawn( "cf-rpc-update-subscription", diff --git a/state-chain/custom-rpc/src/monitoring.rs b/state-chain/custom-rpc/src/monitoring.rs index 91b11bb5c7..fc6e2503be 100644 --- a/state-chain/custom-rpc/src/monitoring.rs +++ b/state-chain/custom-rpc/src/monitoring.rs @@ -87,9 +87,10 @@ pub trait MonitoringApi { ) -> RpcResult>; } -impl MonitoringApiServer for CustomRpc +impl MonitoringApiServer for CustomRpc where B: BlockT, + BE: Send + Sync + 'static, C: sp_api::ProvideRuntimeApi + Send + Sync diff --git a/state-chain/node/src/service.rs b/state-chain/node/src/service.rs index f8d26d27f5..20c0bad178 100644 --- a/state-chain/node/src/service.rs +++ b/state-chain/node/src/service.rs @@ -243,6 +243,7 @@ pub fn new_full< let rpc_builder = { let client = client.clone(); + let backend = backend.clone(); let pool = transaction_pool.clone(); let executor = Arc::new(task_manager.spawn_handle()); let chain_spec = config.chain_spec.cloned_box(); @@ -288,6 +289,7 @@ pub fn new_full< // Implement custom RPC extensions module.merge(CustomApiServer::into_rpc(CustomRpc { client: client.clone(), + backend: backend.clone(), _phantom: PhantomData, executor: executor.clone(), }))?; @@ -295,6 +297,7 @@ pub fn new_full< // Implement custom RPC extensions module.merge(MonitoringApiServer::into_rpc(CustomRpc { client: client.clone(), + backend: backend.clone(), _phantom: PhantomData, executor: executor.clone(), }))?;