Skip to content

Commit

Permalink
feat: get new block streams from scc after creation (#4217)
Browse files Browse the repository at this point in the history
* feat: get new block streams from scc

* fix bad rebase
  • Loading branch information
AlastairHolmes authored Nov 7, 2023
1 parent 4321cc2 commit 22c713b
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 51 deletions.
8 changes: 8 additions & 0 deletions engine/src/state_chain_observer/client/chain_api.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
use async_trait::async_trait;

use super::StateChainStreamApi;

#[async_trait]
pub trait ChainApi {
fn latest_finalized_block(&self) -> super::BlockInfo;
fn latest_unfinalized_block(&self) -> super::BlockInfo;

async fn finalized_block_stream(&self) -> Box<dyn StateChainStreamApi>;
async fn unfinalized_block_stream(&self) -> Box<dyn StateChainStreamApi<false>>;
}
145 changes: 94 additions & 51 deletions engine/src/state_chain_observer/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@ use cf_primitives::{AccountRole, SemVer};
use futures::{StreamExt, TryStreamExt};

use futures_core::Stream;
use futures_util::FutureExt;
use sp_core::{Pair, H256};
use state_chain_runtime::AccountId;
use std::{sync::Arc, time::Duration};
use tokio::sync::watch;
use tracing::{info, warn};

use utilities::{
make_periodic_tick, read_clean_and_decode_hex_str_file, spmc,
task_scope::{Scope, ScopedJoinHandle, OR_CANCEL},
loop_select, make_periodic_tick, read_clean_and_decode_hex_str_file, spmc,
task_scope::{Scope, OR_CANCEL},
CachedStream, MakeCachedStream, MakeTryCachedStream, TryCachedStream,
};

Expand Down Expand Up @@ -88,8 +89,12 @@ pub struct StateChainClient<
genesis_hash: state_chain_runtime::Hash,
signed_extrinsic_client: SignedExtrinsicClient,
unsigned_extrinsic_client: unsigned::UnsignedExtrinsicClient,
_block_producer_handles: (ScopedJoinHandle<()>, ScopedJoinHandle<()>),
pub base_rpc_client: Arc<BaseRpcClient>,
finalized_block_stream_request_sender:
tokio::sync::mpsc::Sender<tokio::sync::oneshot::Sender<Box<dyn StateChainStreamApi>>>,
unfinalized_block_stream_request_sender: tokio::sync::mpsc::Sender<
tokio::sync::oneshot::Sender<Box<dyn StateChainStreamApi<false>>>,
>,
latest_finalized_block_watcher: tokio::sync::watch::Receiver<BlockInfo>,
latest_unfinalized_block_watcher: tokio::sync::watch::Receiver<BlockInfo>,
}
Expand Down Expand Up @@ -196,7 +201,11 @@ async fn create_finalized_block_subscription<
base_rpc_client: Arc<BaseRpcClient>,
signed_extrinsic_client_builder: &mut SignedExtrinsicClientBuilder,
required_version_and_wait: Option<(SemVer, bool)>,
) -> Result<(watch::Receiver<BlockInfo>, impl StateChainStreamApi + Clone, ScopedJoinHandle<()>)> {
) -> Result<(
watch::Receiver<BlockInfo>,
impl StateChainStreamApi + Clone,
tokio::sync::mpsc::Sender<tokio::sync::oneshot::Sender<Box<dyn StateChainStreamApi>>>,
)> {
let mut finalized_block_stream = {
// https://substrate.stackexchange.com/questions/3667/api-rpc-chain-subscribefinalizedheads-missing-blocks
// https://arxiv.org/abs/2007.01560
Expand Down Expand Up @@ -291,42 +300,49 @@ async fn create_finalized_block_subscription<
}

const BLOCK_CAPACITY: usize = 10;
let (block_sender, block_receiver) = spmc::channel::<BlockInfo>(BLOCK_CAPACITY);
let (mut block_sender, block_receiver) = spmc::channel::<BlockInfo>(BLOCK_CAPACITY);

let latest_block = *finalized_block_stream.cache();

let (latest_block_sender, latest_block_watcher) =
tokio::sync::watch::channel::<BlockInfo>(latest_block);

Ok((
latest_block_watcher,
FinalizedCachedStream::new(block_receiver.make_cached(
latest_block,
|block: &BlockInfo| *block,
)),
scope.spawn_with_handle({
let base_rpc_client = base_rpc_client.clone();
let mut finalized_block_stream = finalized_block_stream.into_inner();
async move {
loop {
let block =
finalized_block_stream.next().await.unwrap()?;
let (block_stream_request_sender, block_stream_request_receiver) =
tokio::sync::mpsc::channel::<tokio::sync::oneshot::Sender<Box<dyn StateChainStreamApi>>>(1);

scope.spawn({
let base_rpc_client = base_rpc_client.clone();
let mut finalized_block_stream = finalized_block_stream.into_inner();
let mut block_stream_request_receiver: tokio_stream::wrappers::ReceiverStream<_> =
tokio_stream::wrappers::ReceiverStream::new(block_stream_request_receiver);
let mut latest_block = latest_block;
async move {
loop_select!(
let result_block = finalized_block_stream.next().map(|option| option.unwrap()) => {
let block = result_block?;
latest_block = block;
if let Some((required_version, _)) = required_version_and_wait {
let current_release_version = base_rpc_client.storage_value::<pallet_cf_environment::CurrentReleaseVersion<state_chain_runtime::Runtime>>(block.hash).await?;
if !required_version.is_compatible_with(current_release_version) {
break Err(anyhow!("This version '{}' is no longer compatible with the release version '{}' at block: {}", required_version, current_release_version, block.hash))
}
}

if !block_sender.send(block).await {
break Ok(())
}
if latest_block_sender.send(block).is_err() {
break Ok(())
}
}
}
})
block_sender.send(block).await;
let _result = latest_block_sender.send(block);
},
if let Some(block_stream_request) = block_stream_request_receiver.next() => {
let _result = block_stream_request.send(Box::new(FinalizedCachedStream::new(block_sender.receiver().make_cached(latest_block, |block: &BlockInfo| *block))));
} else break Ok(()),
)
}
});

Ok((
latest_block_watcher,
FinalizedCachedStream::new(
block_receiver.make_cached(latest_block, |block: &BlockInfo| *block),
),
block_stream_request_sender,
))
}

Expand All @@ -338,7 +354,7 @@ async fn create_unfinalized_block_subscription<
) -> Result<(
watch::Receiver<BlockInfo>,
impl StateChainStreamApi<false> + Clone,
ScopedJoinHandle<()>,
tokio::sync::mpsc::Sender<tokio::sync::oneshot::Sender<Box<dyn StateChainStreamApi<false>>>>,
)> {
let mut sparse_block_stream = base_rpc_client
.subscribe_unfinalized_block_headers()
Expand All @@ -352,26 +368,39 @@ async fn create_unfinalized_block_subscription<
let first_block = sparse_block_stream.next().await.unwrap()?;

const BLOCK_CAPACITY: usize = 10;
let (block_sender, block_receiver) = spmc::channel::<BlockInfo>(BLOCK_CAPACITY);
let (mut block_sender, block_receiver) = spmc::channel::<BlockInfo>(BLOCK_CAPACITY);

let (latest_block_sender, latest_block_watcher) =
tokio::sync::watch::channel::<BlockInfo>(first_block);

let (block_stream_request_sender, block_stream_request_receiver) = tokio::sync::mpsc::channel::<
tokio::sync::oneshot::Sender<Box<dyn StateChainStreamApi<false>>>,
>(1);

scope.spawn({
let mut block_stream_request_receiver: tokio_stream::wrappers::ReceiverStream<_> =
tokio_stream::wrappers::ReceiverStream::new(block_stream_request_receiver);
let mut latest_block = first_block;
async move {
loop_select!(
let result_block = sparse_block_stream.next().map(|option| option.unwrap()) => {
let block = result_block?;
latest_block = block;

block_sender.send(block).await;
let _result = latest_block_sender.send(block);
},
if let Some(block_stream_request) = block_stream_request_receiver.next() => {
let _result = block_stream_request.send(Box::new(block_sender.receiver().make_cached(latest_block, |block: &BlockInfo| *block)));
} else break Ok(()),
)
}
});

Ok((
latest_block_watcher,
block_receiver.make_cached(first_block, |block: &BlockInfo| *block),
scope.spawn_with_handle(async move {
loop {
let block = sparse_block_stream.next().await.unwrap()?;

if !block_sender.send(block).await {
break Ok(())
}
if latest_block_sender.send(block).is_err() {
break Ok(())
}
}
}),
block_stream_request_sender,
))
}

Expand All @@ -389,7 +418,6 @@ async fn inject_intervening_headers<
let stream_rest = utilities::assert_stream_send(
sparse_block_stream
.and_then({
// 1
let mut latest_finalized_block = latest_finalized_block;
move |next_finalized_block| {
assert!(
Expand Down Expand Up @@ -420,7 +448,6 @@ async fn inject_intervening_headers<
.try_collect()
.await?;

// 3
for (previous_block, next_block) in Iterator::zip(
std::iter::once(&prev_finalized_block).chain(intervening_blocks.iter()),
intervening_blocks.iter().chain(std::iter::once(&next_finalized_block)),
Expand Down Expand Up @@ -475,7 +502,7 @@ impl<BaseRpcClient: base_rpc_api::BaseRpcApi + Send + Sync + 'static, SignedExtr
let (
latest_finalized_block_watcher,
mut finalized_state_chain_stream,
finalized_block_producer_handle,
finalized_block_stream_request_sender,
) = create_finalized_block_subscription(
scope,
base_rpc_client.clone(),
Expand All @@ -487,7 +514,7 @@ impl<BaseRpcClient: base_rpc_api::BaseRpcApi + Send + Sync + 'static, SignedExtr
let (
latest_unfinalized_block_watcher,
unfinalized_state_chain_stream,
unfinalized_block_producer_handle,
unfinalized_block_stream_request_sender,
) = create_unfinalized_block_subscription(scope, base_rpc_client.clone()).await?;

let state_chain_client = Arc::new(StateChainClient {
Expand All @@ -504,10 +531,8 @@ impl<BaseRpcClient: base_rpc_api::BaseRpcApi + Send + Sync + 'static, SignedExtr
scope,
base_rpc_client.clone(),
),
_block_producer_handles: (
finalized_block_producer_handle,
unfinalized_block_producer_handle,
),
finalized_block_stream_request_sender,
unfinalized_block_stream_request_sender,
base_rpc_client,
latest_finalized_block_watcher,
latest_unfinalized_block_watcher,
Expand Down Expand Up @@ -874,6 +899,21 @@ impl<
fn latest_unfinalized_block(&self) -> BlockInfo {
*self.latest_unfinalized_block_watcher.borrow()
}

async fn finalized_block_stream(&self) -> Box<dyn StateChainStreamApi> {
let (sender, receiver) = tokio::sync::oneshot::channel();
self.finalized_block_stream_request_sender.send(sender).await.expect(OR_CANCEL);
receiver.await.expect(OR_CANCEL)
}

async fn unfinalized_block_stream(&self) -> Box<dyn StateChainStreamApi<false>> {
let (sender, receiver) = tokio::sync::oneshot::channel();
self.unfinalized_block_stream_request_sender
.send(sender)
.await
.expect(OR_CANCEL);
receiver.await.expect(OR_CANCEL)
}
}

#[cfg(test)]
Expand All @@ -892,7 +932,7 @@ pub mod mocks {

use super::{
extrinsic_api::{self, unsigned},
storage_api, BlockInfo,
storage_api, BlockInfo, StateChainStreamApi,
};

mock! {
Expand Down Expand Up @@ -944,6 +984,9 @@ pub mod mocks {
impl ChainApi for StateChainClient {
fn latest_finalized_block(&self) -> BlockInfo;
fn latest_unfinalized_block(&self) -> BlockInfo;

async fn finalized_block_stream(&self) -> Box<dyn StateChainStreamApi>;
async fn unfinalized_block_stream(&self) -> Box<dyn StateChainStreamApi<false>>;
}
#[async_trait]
impl StorageApi for StateChainClient {
Expand Down

0 comments on commit 22c713b

Please sign in to comment.