diff --git a/engine/src/state_chain_observer/client/mod.rs b/engine/src/state_chain_observer/client/mod.rs index 13a8702e3d..2890b13fc1 100644 --- a/engine/src/state_chain_observer/client/mod.rs +++ b/engine/src/state_chain_observer/client/mod.rs @@ -16,7 +16,7 @@ use std::{sync::Arc, time::Duration}; use tracing::info; use utilities::{ - make_periodic_tick, read_clean_and_decode_hex_str_file, + make_periodic_tick, read_clean_and_decode_hex_str_file, spmc, task_scope::{Scope, ScopedJoinHandle}, CachedStream, MakeCachedStream, }; @@ -283,7 +283,7 @@ impl(BLOCK_CAPACITY); @@ -309,7 +309,7 @@ impl { // This branch failing causes `sender` to be dropped, this causes the proxy/duplicate streams to also end. - let _result = sender.send(item).await; + sender.send(item).await; }, let _ = sender.closed() => { break }, ) diff --git a/engine/src/witness/common/epoch_source.rs b/engine/src/witness/common/epoch_source.rs index 1123ed89a7..c56c77bdc5 100644 --- a/engine/src/witness/common/epoch_source.rs +++ b/engine/src/witness/common/epoch_source.rs @@ -12,7 +12,7 @@ use futures::StreamExt; use futures_core::{Future, Stream}; use futures_util::stream; use state_chain_runtime::PalletInstanceAlias; -use utilities::task_scope::Scope; +use utilities::{spmc, task_scope::Scope}; use super::{ActiveAndFuture, ExternalChain, RuntimeHasChain}; @@ -37,11 +37,8 @@ enum EpochUpdate { #[derive(Clone)] pub struct EpochSource { epochs: BTreeMap)>, - epoch_update_receiver: async_broadcast::Receiver<( - EpochIndex, - state_chain_runtime::Hash, - EpochUpdate, - )>, + epoch_update_receiver: + spmc::Receiver<(EpochIndex, state_chain_runtime::Hash, EpochUpdate)>, } impl<'a, 'env, StateChainClient, Info, HistoricInfo> @@ -58,11 +55,8 @@ pub struct EpochSourceBuilder<'a, 'env, StateChainClient, Info, HistoricInfo> { state_chain_client: Arc, initial_block_hash: state_chain_runtime::Hash, epochs: BTreeMap)>, - epoch_update_receiver: async_broadcast::Receiver<( - EpochIndex, - state_chain_runtime::Hash, - EpochUpdate, - )>, + epoch_update_receiver: + spmc::Receiver<(EpochIndex, state_chain_runtime::Hash, EpochUpdate)>, } impl<'a, 'env, StateChainClient, Info: Clone, HistoricInfo: Clone> Clone for EpochSourceBuilder<'a, 'env, StateChainClient, Info, HistoricInfo> @@ -88,8 +82,7 @@ impl EpochSource<(), ()> { mut state_chain_stream: StateChainStream, state_chain_client: Arc, ) -> EpochSourceBuilder<'a, 'env, StateChainClient, (), ()> { - let (epoch_update_sender, epoch_update_receiver) = - async_broadcast::broadcast(CHANNEL_BUFFER); + let (epoch_update_sender, epoch_update_receiver) = spmc::channel(CHANNEL_BUFFER); let initial_block_hash = state_chain_stream.cache().block_hash; @@ -121,7 +114,7 @@ impl EpochSource<(), ()> { let state_chain_client = state_chain_client.clone(); async move { utilities::loop_select! { - if epoch_update_sender.is_closed() => break Ok(()), + let _ = epoch_update_sender.closed() => { break Ok(()) }, if let Some((block_hash, _block_header)) = state_chain_stream.next() => { let old_current_epoch = std::mem::replace(&mut current_epoch, state_chain_client .storage_value:: { .await .expect(STATE_CHAIN_CONNECTION)); if old_current_epoch != current_epoch { - let _result = epoch_update_sender.broadcast((old_current_epoch, block_hash, EpochUpdate::Historic(()))).await; - let _result = epoch_update_sender.broadcast((current_epoch, block_hash, EpochUpdate::NewCurrent(()))).await; + epoch_update_sender.send((old_current_epoch, block_hash, EpochUpdate::Historic(()))).await; + epoch_update_sender.send((current_epoch, block_hash, EpochUpdate::NewCurrent(()))).await; historic_epochs.insert(old_current_epoch); } @@ -143,7 +136,7 @@ impl EpochSource<(), ()> { assert!(!historic_epochs.contains(¤t_epoch)); assert!(old_historic_epochs.is_superset(&historic_epochs)); for expired_epoch in old_historic_epochs.difference(&historic_epochs) { - let _result = epoch_update_sender.broadcast((*expired_epoch, block_hash, EpochUpdate::Expired)).await; + epoch_update_sender.send((*expired_epoch, block_hash, EpochUpdate::Expired)).await; } } else break Ok(()), } @@ -310,8 +303,7 @@ impl< epoch_update_receiver: mut unmapped_epoch_update_receiver, } = self; - let (epoch_update_sender, epoch_update_receiver) = - async_broadcast::broadcast(CHANNEL_BUFFER); + let (epoch_update_sender, epoch_update_receiver) = spmc::channel(CHANNEL_BUFFER); let epochs: BTreeMap<_, _> = futures::stream::iter(unmapped_epochs) .filter_map(|(epoch, (info, option_historic_info))| { @@ -354,18 +346,18 @@ impl< let mut epochs = epochs.keys().cloned().collect::>(); async move { utilities::loop_select! { - if epoch_update_sender.is_closed() => break Ok(()), + let _ = epoch_update_sender.closed() => { break Ok(()) }, if let Some((epoch, block_hash, update)) = unmapped_epoch_update_receiver.next() => { match update { EpochUpdate::NewCurrent(info) => { if let Some(mapped_info) = filter_map(state_chain_client.clone(), epoch, block_hash, info).await { epochs.insert(epoch); - let _result = epoch_update_sender.broadcast((epoch, block_hash, EpochUpdate::NewCurrent(mapped_info))).await; + epoch_update_sender.send((epoch, block_hash, EpochUpdate::NewCurrent(mapped_info))).await; } }, EpochUpdate::Historic(historic_info) => { if epochs.contains(&epoch) { - let _result = epoch_update_sender.broadcast(( + epoch_update_sender.send(( epoch, block_hash, EpochUpdate::Historic(map_historic_info(state_chain_client.clone(), epoch, block_hash, historic_info).await), diff --git a/utilities/src/with_std/spmc.rs b/utilities/src/with_std/spmc.rs index 9af6e37f2a..7ce98bf3c5 100644 --- a/utilities/src/with_std/spmc.rs +++ b/utilities/src/with_std/spmc.rs @@ -1,4 +1,5 @@ use futures::stream::{Stream, StreamExt}; +use tracing::warn; pub fn channel(capacity: usize) -> (Sender, Receiver) { let (sender, receiver) = async_broadcast::broadcast(capacity); @@ -11,11 +12,28 @@ pub struct Sender(async_broadcast::Sender, tokio::sync::watch::Sender<()>) impl Sender { /// Sends an item to all receivers - pub async fn send(&self, t: T) -> Result<(), async_broadcast::SendError> { - self.0 - .broadcast(t) - .await - .map(|option| assert!(option.is_none(), "async_broadcast overflow is off")) + #[allow(clippy::manual_async_fn)] + #[track_caller] + pub fn send(&self, msg: T) -> impl futures::Future + '_ { + async move { + match self.0.try_broadcast(msg) { + Ok(None) => true, + Ok(Some(_)) => unreachable!("async_broadcast feature unused"), + Err(error) => match error { + async_broadcast::TrySendError::Full(msg) => { + warn!("Waiting for space in channel which is currently full with a capacity of {} items at {}", self.0.capacity(), core::panic::Location::caller()); + match self.0.broadcast(msg).await { + Ok(None) => true, + Ok(Some(_)) => unreachable!("async_broadcast feature unused"), + Err(_) => false, + } + }, + async_broadcast::TrySendError::Closed(_msg) => false, + async_broadcast::TrySendError::Inactive(_msg) => + unreachable!("async_broadcast feature unused"), + }, + } + } } } impl Sender { @@ -66,10 +84,10 @@ mod test { async fn channel_allows_reconnection() { let (mut sender, receiver) = channel(2); drop(receiver); - assert!(sender.send(1).await.is_err()); + assert!(!sender.send(1).await); let mut receiver = sender.receiver(); - sender.send(1).await.unwrap(); - sender.send(1).await.unwrap(); + assert!(sender.send(1).await); + assert!(sender.send(1).await); drop(sender); assert_eq!(receiver.next().await, Some(1)); assert_eq!(receiver.next().await, Some(1)); @@ -82,7 +100,7 @@ mod test { let mut receiver_2 = sender.receiver(); let mut receiver_3 = receiver_1.clone(); - sender.send(1).await.unwrap(); + assert!(sender.send(1).await); assert_eq!(receiver_1.next().await, Some(1)); assert_eq!(receiver_2.next().await, Some(1)); @@ -103,7 +121,7 @@ mod test { assert!(sender.closed().now_or_never().is_none()); - sender.send(1).await.unwrap(); + assert!(sender.send(1).await); join( async move {