From cbfad94b447a56606a9807679d68bc60fa836cf4 Mon Sep 17 00:00:00 2001 From: nmammeri Date: Wed, 4 Dec 2024 15:54:16 +0100 Subject: [PATCH] fix: properly handle finalized events + code cleanup --- state-chain/custom-rpc/src/lib.rs | 225 ++++++++++++++++-------------- 1 file changed, 122 insertions(+), 103 deletions(-) diff --git a/state-chain/custom-rpc/src/lib.rs b/state-chain/custom-rpc/src/lib.rs index 85d9bd9de0..9e32e1628a 100644 --- a/state-chain/custom-rpc/src/lib.rs +++ b/state-chain/custom-rpc/src/lib.rs @@ -28,7 +28,7 @@ use jsonrpsee::{ error::{ErrorObject, ErrorObjectOwned}, ErrorCode, }, - PendingSubscriptionSink, RpcModule, Subscription, + PendingSubscriptionSink, RpcModule, }; use order_fills::OrderFills; use pallet_cf_governance::GovCallHash; @@ -70,7 +70,7 @@ use state_chain_runtime::{ Block, Hash, NetworkFee, SolanaInstance, }; use std::{ - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, HashMap, VecDeque}, marker::PhantomData, sync::Arc, }; @@ -1977,103 +1977,134 @@ where self.chain_head_api().subscribe_unbounded("chainHead_v1_follow", [false]).await else { pending_sink - .reject(CfApiError::from(internal_error("chainHead_v1_follow subscription failed"))) + .reject(internal_error("chainHead_v1_follow subscription failed")) .await; return; }; - // depending on only_finalized param, construct the blocks stream from the subscription - let seed_state = (only_finalized, subscription); - let blocks_stream = Box::pin(stream::unfold(seed_state, |mut state| async move { - while let Ok(event) = get_next_chain_head_event(&mut state.1).await { - if state.0 { - // only interested in initialized and finalized events - match event { - FollowEvent::Initialized(mut ev_initialized) => { - // Initialized event guarantees that finalized_block_hashes is of - // at least one element, ordered by increasing block number - let hash = ev_initialized.finalized_block_hashes.pop().unwrap(); - return Some((hash, state)) - }, - FollowEvent::Finalized(mut ev_finalized) => { - let hash = ev_finalized.finalized_block_hashes.pop().unwrap(); - return Some((hash, state)) - }, - _ => continue, - } + // construct either best or finalized blocks stream from the chain head subscription + let blocks_stream = Box::pin(stream::unfold( + (VecDeque::::new(), subscription, only_finalized), + |(mut st_items_to_stream, mut st_subscription, st_only_finalized)| async move { + // before checking for events, stream any remaining items from the previous run + // This can happen since finalization can produce an array of finalized_block_hashes + // unlike best block production which always produces only 1 best_block_hash + if let Some(hash) = st_items_to_stream.pop_front() { + Some((hash, (st_items_to_stream, st_subscription, st_only_finalized))) } else { - // only interested in bestBlockChanged events - match event { - FollowEvent::BestBlockChanged(ev_best_block) => - return Some((ev_best_block.best_block_hash, state)), - _ => continue, + while let Some(result) = st_subscription.next().await { + match result { + Ok((event, _subs_id)) => { + if st_only_finalized { + // when only_finalized is set, handle both initialized and + // finalized events + match event { + FollowEvent::Initialized(mut ev_initialized) => { + // Initialized event guarantees that + // finalized_block_hashes is of at least one + // element, the last element is the current + // final block. + return Some(( + ev_initialized.finalized_block_hashes.pop() + .expect("received FollowEvent::Initialized event with less than 1 hashes"), + ( + VecDeque::with_capacity(0), + st_subscription, + st_only_finalized, + ) + )) + }, + FollowEvent::Finalized(ev_finalized) => { + // In case of Finalized event, send the first + // element and send the remaining as part of + // the unfold state + let mut block_hashes = + VecDeque::from(ev_finalized.finalized_block_hashes); + return Some(( + block_hashes.pop_front() + .expect("received FollowEvent::Finalized event with less than 1 hashes"), + ( + block_hashes, + st_subscription, + st_only_finalized, + ), + )) + }, + _ => continue, + } + } else { + // when !only_finalized, handle only bestBlockChanged events + match event { + // Ignore st_init_block to not send it twice + FollowEvent::BestBlockChanged(ev_best_block) => + return Some(( + ev_best_block.best_block_hash, + ( + VecDeque::with_capacity(0), + st_subscription, + st_only_finalized, + ), + )), + _ => continue, + } + } + }, + Err(e) => { + log::warn!("ChainHead subscription error {:?}", e); + return None // Close the stream on subscription error + }, + } } + None // Close the steam, if subscription is closed } - } - None - })); - - // create the initial item and state - let info = self.client.info(); - let block_hash = if only_finalized { info.finalized_hash } else { info.best_hash }; - let block_number = if only_finalized { info.finalized_number } else { info.best_number }; - - let (initial_item, initial_state) = match f(&self.client, block_hash, None) { - Ok(initial) => initial, - Err(e) => { - log::warn!(target: "cf-rpc", "Error in subscription initialization: {:?}", e); - pending_sink.reject(e).await; - return; }, - }; - - let stream = futures::stream::iter(std::iter::once(Ok(BlockUpdate { - block_hash, - block_number, - data: initial_item.clone(), - }))) - .chain(blocks_stream.filter_map({ - let client = self.client.clone(); - - let mut previous_item = initial_item; - let mut previous_state = initial_state; - - move |hash| { - futures::future::ready(match f(&client, hash, Some(&previous_state)) { - Ok((new_item, new_state)) if !only_on_changes || new_item != previous_item => { - previous_item = new_item.clone(); - previous_state = new_state; - - if let Ok(Some(header)) = client.header(hash) { - Some(Ok(BlockUpdate { - block_hash: hash, - block_number: *header.number(), - data: new_item, - })) - } else if end_on_error { - Some(Err(internal_error(format!( - "Cannot fetch block header for block {:?}", - hash - )))) - } else { - None - } - }, - Err(error) => { - log::warn!("Subscription Error: {error}."); - if end_on_error { - log::warn!("Closing Subscription."); - Some(Err(ErrorObjectOwned::from(error))) - } else { - None - } - }, - _ => None, - }) - } - })) - .take_while(|item| futures::future::ready(item.is_ok())) - .map(Result::unwrap); + )); + + let stream = blocks_stream + .filter_map({ + let client = self.client.clone(); + + let mut previous_item = None; + let mut previous_state = None; + + move |hash| { + futures::future::ready(match f(&client, hash, previous_state.as_ref()) { + Ok((new_item, new_state)) + if !only_on_changes || Some(&new_item) != previous_item.as_ref() => + { + previous_item = Some(new_item.clone()); + previous_state = Some(new_state); + + if let Ok(Some(header)) = client.header(hash) { + Some(Ok(BlockUpdate { + block_hash: hash, + block_number: *header.number(), + data: new_item, + })) + } else if end_on_error { + Some(Err(internal_error(format!( + "Could not fetch block header for block {:?}", + hash + )))) + } else { + None + } + }, + Err(error) => { + log::warn!("Subscription Error: {error}."); + if end_on_error { + log::warn!("Closing Subscription."); + Some(Err(ErrorObjectOwned::from(error))) + } else { + None + } + }, + _ => None, + }) + } + }) + .take_while(|item| futures::future::ready(item.is_ok())) + .map(Result::unwrap); self.executor.spawn( "cf-rpc-update-subscription", @@ -2101,18 +2132,6 @@ where } } -async fn get_next_chain_head_event( - sub: &mut Subscription, -) -> Result, CfApiError> { - match sub.next().await { - Some(result) => match result { - Ok((event, _subs_id)) => Ok(event), - Err(e) => Err(CfApiError::from(internal_error(format!("ChainHead event error {e}")))), - }, - None => Err(CfApiError::from(internal_error("ChainHead deserialization error"))), - } -} - #[cfg(test)] mod test { use std::collections::BTreeSet;