diff --git a/state-chain/custom-rpc/src/lib.rs b/state-chain/custom-rpc/src/lib.rs index f527008ae9..599c399c1a 100644 --- a/state-chain/custom-rpc/src/lib.rs +++ b/state-chain/custom-rpc/src/lib.rs @@ -70,7 +70,7 @@ use state_chain_runtime::{ Block, Hash, NetworkFee, SolanaInstance, }; use std::{ - collections::{BTreeMap, HashMap, VecDeque}, + collections::{BTreeMap, HashMap}, marker::PhantomData, sync::Arc, }; @@ -1984,76 +1984,43 @@ where }; // construct either best or finalized blocks stream from the chain head subscription - let blocks_stream = Box::pin(stream::unfold( - (VecDeque::::new(), subscription), - move |(mut st_items_to_stream, mut st_subscription)| async move { - // before checking for events, stream any remaining items from the previous run - // This can happen since finalization can produce an array of finalized_block_hashes - // unlike best block production which always produces only 1 best_block_hash - if let Some(hash) = st_items_to_stream.pop_front() { - Some((hash, (st_items_to_stream, st_subscription))) - } else { - while let Some(result) = st_subscription.next().await { - match result { - Ok((event, _subs_id)) => { - if only_finalized { - // when only_finalized is set, handle both initialized and - // finalized events - match event { - FollowEvent::Initialized(mut ev_initialized) => { - // Initialized event guarantees that - // finalized_block_hashes is of at least one - // element, the last element is the current - // final block. - return Some(( - ev_initialized.finalized_block_hashes.pop() - .expect("received FollowEvent::Initialized event with less than 1 hashes"), - ( - VecDeque::with_capacity(0), - st_subscription, - ) - )) - }, - FollowEvent::Finalized(ev_finalized) => { - // In case of Finalized event, send the first - // element and send the remaining as part of - // the unfold state - let mut block_hashes = - VecDeque::from(ev_finalized.finalized_block_hashes); - return Some(( - block_hashes.pop_front() - .expect("received FollowEvent::Finalized event with less than 1 hashes"), - ( - block_hashes, - st_subscription, - ), - )) - }, - _ => continue, - } - } else { - // when !only_finalized, handle only bestBlockChanged events - match event { - // Ignore st_init_block to not send it twice - FollowEvent::BestBlockChanged(ev_best_block) => - return Some(( - ev_best_block.best_block_hash, - (VecDeque::with_capacity(0), st_subscription), - )), - _ => continue, - } - } - }, - Err(e) => { - log::warn!("ChainHead subscription error {:?}", e); - return None // Close the stream on subscription error - }, - } - } - None // Close the steam, if subscription is closed - } - }, - )); + let blocks_stream = stream::unfold(subscription, move |mut sub| async move { + match sub.next().await { + Some(Ok((event, _subs_id))) => Some((event, sub)), + Some(Err(e)) => { + log::warn!("ChainHead subscription error {:?}", e); + None + }, + _ => None, + } + }) + .filter_map(move |event| async move { + match (only_finalized, event) { + ( + true, + FollowEvent::Initialized(sc_rpc_spec_v2::chain_head::Initialized { + finalized_block_hashes, + .. + }), + ) => Some(finalized_block_hashes), + ( + true, + FollowEvent::Finalized(sc_rpc_spec_v2::chain_head::Finalized { + finalized_block_hashes, + .. + }), + ) => Some(finalized_block_hashes), + ( + false, + FollowEvent::BestBlockChanged(sc_rpc_spec_v2::chain_head::BestBlockChanged { + best_block_hash, + }), + ) => Some(vec![best_block_hash]), + _ => None, + } + }) + .map(stream::iter) + .flatten(); let stream = blocks_stream .filter_map({ @@ -2099,7 +2066,8 @@ where } }) .take_while(|item| futures::future::ready(item.is_ok())) - .map(Result::unwrap); + .map(Result::unwrap) + .boxed(); self.executor.spawn( "cf-rpc-update-subscription",