diff --git a/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs b/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs index 403ad4c779..84c6aee343 100644 --- a/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs +++ b/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs @@ -34,25 +34,30 @@ struct EnvironmentParameters { async fn get_env_parameters(state_chain_client: &StateChainClient<()>) -> EnvironmentParameters { use state_chain_runtime::Runtime; - let latest_hash = state_chain_client.latest_finalized_hash(); let eth_chain_id = state_chain_client - .storage_value::>(latest_hash) + .storage_value::>( + state_chain_client.latest_finalized_block().hash, + ) .await .expect("State Chain client connection failed"); let eth_vault_address = state_chain_client - .storage_value::>(latest_hash) + .storage_value::>( + state_chain_client.latest_finalized_block().hash, + ) .await .expect("Failed to get Vault contract address from SC"); let eth_address_checker_address = state_chain_client - .storage_value::>(latest_hash) + .storage_value::>( + state_chain_client.latest_finalized_block().hash, + ) .await .expect("State Chain client connection failed"); let supported_erc20_tokens: HashMap<_, _> = state_chain_client .storage_map::, _>( - state_chain_client.latest_finalized_hash(), + state_chain_client.latest_finalized_block().hash, ) .await .expect("Failed to fetch Ethereum supported assets"); @@ -70,14 +75,14 @@ async fn get_env_parameters(state_chain_client: &StateChainClient<()>) -> Enviro let dot_genesis_hash = state_chain_client .storage_value::>( - state_chain_client.latest_finalized_hash(), + state_chain_client.latest_finalized_block().hash, ) .await .expect(STATE_CHAIN_CONNECTION); let btc_network = state_chain_client .storage_value::>( - state_chain_client.latest_finalized_hash(), + state_chain_client.latest_finalized_block().hash, ) .await .expect(STATE_CHAIN_CONNECTION) diff --git a/api/lib/src/queries.rs b/api/lib/src/queries.rs index 29306692c9..b873c482e1 100644 --- a/api/lib/src/queries.rs +++ b/api/lib/src/queries.rs @@ -61,7 +61,7 @@ impl QueryApi { pallet_cf_ingress_egress::Config, { let block_hash = - block_hash.unwrap_or_else(|| self.state_chain_client.latest_finalized_hash()); + block_hash.unwrap_or_else(|| self.state_chain_client.latest_finalized_block().hash); let (channel_details, network_environment) = tokio::try_join!( self.state_chain_client @@ -99,7 +99,7 @@ impl QueryApi { block_hash: Option, ) -> Result> { let block_hash = - block_hash.unwrap_or_else(|| self.state_chain_client.latest_finalized_hash()); + block_hash.unwrap_or_else(|| self.state_chain_client.latest_finalized_block().hash); futures::future::join_all(Asset::all().iter().map(|asset| async { Ok(( @@ -125,7 +125,7 @@ impl QueryApi { account_id: Option, ) -> Result, anyhow::Error> { let block_hash = - block_hash.unwrap_or_else(|| self.state_chain_client.latest_finalized_hash()); + block_hash.unwrap_or_else(|| self.state_chain_client.latest_finalized_block().hash); let account_id = account_id.unwrap_or_else(|| self.state_chain_client.account_id()); Ok(self @@ -143,7 +143,7 @@ impl QueryApi { account_id: Option, ) -> Result, anyhow::Error> { let block_hash = - block_hash.unwrap_or_else(|| self.state_chain_client.latest_finalized_hash()); + block_hash.unwrap_or_else(|| self.state_chain_client.latest_finalized_block().hash); let account_id = account_id.unwrap_or_else(|| self.state_chain_client.account_id()); Ok(self @@ -161,7 +161,7 @@ impl QueryApi { account_id: Option, ) -> Result> { let block_hash = - block_hash.unwrap_or_else(|| self.state_chain_client.latest_finalized_hash()); + block_hash.unwrap_or_else(|| self.state_chain_client.latest_finalized_block().hash); let account_id = account_id.unwrap_or_else(|| self.state_chain_client.account_id()); Ok(self @@ -179,7 +179,7 @@ impl QueryApi { account_id: Option, ) -> Result { let block_hash = - block_hash.unwrap_or_else(|| self.state_chain_client.latest_finalized_hash()); + block_hash.unwrap_or_else(|| self.state_chain_client.latest_finalized_block().hash); let account_id = account_id.unwrap_or_else(|| self.state_chain_client.account_id()); let mut result = diff --git a/engine/src/main.rs b/engine/src/main.rs index 7b928c3f97..9bccfaa9e9 100644 --- a/engine/src/main.rs +++ b/engine/src/main.rs @@ -107,7 +107,7 @@ async fn run_main(settings: Settings) -> anyhow::Result<()> { ) = p2p::start( state_chain_client.clone(), settings.node_p2p.clone(), - state_chain_stream.cache().block_hash, + state_chain_stream.cache().hash, ) .await .context("Failed to start p2p")?; @@ -124,7 +124,7 @@ async fn run_main(settings: Settings) -> anyhow::Result<()> { .storage_value::>(state_chain_stream.cache().block_hash) + >>(state_chain_stream.cache().hash) .await .context("Failed to get Ethereum CeremonyIdCounter from SC")?, ); @@ -141,7 +141,7 @@ async fn run_main(settings: Settings) -> anyhow::Result<()> { .storage_value::>(state_chain_stream.cache().block_hash) + >>(state_chain_stream.cache().hash) .await .context("Failed to get Polkadot CeremonyIdCounter from SC")?, ); @@ -158,7 +158,7 @@ async fn run_main(settings: Settings) -> anyhow::Result<()> { .storage_value::>(state_chain_stream.cache().block_hash) + >>(state_chain_stream.cache().hash) .await .context("Failed to get Bitcoin CeremonyIdCounter from SC")?, ); @@ -170,7 +170,7 @@ async fn run_main(settings: Settings) -> anyhow::Result<()> { let expected_eth_chain_id = web3::types::U256::from( state_chain_client .storage_value::>( - state_chain_client.latest_finalized_hash(), + state_chain_client.latest_finalized_block().hash, ) .await .expect(STATE_CHAIN_CONNECTION), @@ -187,7 +187,7 @@ async fn run_main(settings: Settings) -> anyhow::Result<()> { state_chain_client .storage_value::>(state_chain_client.latest_finalized_hash()) + >>(state_chain_client.latest_finalized_block().hash) .await .expect(STATE_CHAIN_CONNECTION), ); @@ -197,7 +197,7 @@ async fn run_main(settings: Settings) -> anyhow::Result<()> { let expected_dot_genesis_hash = PolkadotHash::from( state_chain_client .storage_value::>( - state_chain_client.latest_finalized_hash(), + state_chain_client.latest_finalized_block().hash, ) .await .expect(STATE_CHAIN_CONNECTION), diff --git a/engine/src/state_chain_observer/client/chain_api.rs b/engine/src/state_chain_observer/client/chain_api.rs index 0747fff48b..ac1f3af6d3 100644 --- a/engine/src/state_chain_observer/client/chain_api.rs +++ b/engine/src/state_chain_observer/client/chain_api.rs @@ -1,4 +1,4 @@ pub trait ChainApi { - fn latest_finalized_hash(&self) -> state_chain_runtime::Hash; - fn latest_unfinalized_hash(&self) -> state_chain_runtime::Hash; + fn latest_finalized_block(&self) -> super::BlockInfo; + fn latest_unfinalized_block(&self) -> super::BlockInfo; } diff --git a/engine/src/state_chain_observer/client/extrinsic_api/signed.rs b/engine/src/state_chain_observer/client/extrinsic_api/signed.rs index f5a5d5a51d..fee65c4751 100644 --- a/engine/src/state_chain_observer/client/extrinsic_api/signed.rs +++ b/engine/src/state_chain_observer/client/extrinsic_api/signed.rs @@ -163,8 +163,8 @@ impl SignedExtrinsicClient { scope, signer, account_nonce, - state_chain_stream.cache().block_hash, - state_chain_stream.cache().block_number, + state_chain_stream.cache().hash, + state_chain_stream.cache().number, base_rpc_client.runtime_version().await?, genesis_hash, SIGNED_EXTRINSIC_LIFETIME, @@ -182,11 +182,11 @@ impl SignedExtrinsicClient { let submission_details = submission_watcher.watch_for_submission_in_block() => { submission_watcher.on_submission_in_block(&mut requests, submission_details).await?; }, - if let Some((block_hash, block_header)) = state_chain_stream.next() => { - trace!("Received state chain block: {number} ({block_hash:x?})", number = block_header.number); + if let Some(block) = state_chain_stream.next() => { + trace!("Received state chain block: {} ({:x?})", block.number, block.hash); submission_watcher.on_block_finalized( &mut requests, - block_hash, + block.hash, ).await?; } else break Ok(()), } diff --git a/engine/src/state_chain_observer/client/mod.rs b/engine/src/state_chain_observer/client/mod.rs index f517d8b9b0..a0251c4930 100644 --- a/engine/src/state_chain_observer/client/mod.rs +++ b/engine/src/state_chain_observer/client/mod.rs @@ -40,31 +40,27 @@ const SUBSTRATE_BEHAVIOUR: &str = "Unexpected state chain node behaviour"; const SYNC_POLL_INTERVAL: Duration = Duration::from_secs(4); -#[derive(Clone)] -pub struct StreamCache { - pub block_number: state_chain_runtime::BlockNumber, - pub block_hash: state_chain_runtime::Hash, +#[derive(Copy, Clone)] +pub struct BlockInfo { + pub parent_hash: state_chain_runtime::Hash, + pub hash: state_chain_runtime::Hash, + pub number: state_chain_runtime::BlockNumber, +} +impl From for BlockInfo { + fn from(value: state_chain_runtime::Header) -> Self { + Self { parent_hash: value.parent_hash, hash: value.hash(), number: value.number } + } } pub trait StateChainStreamApi: - CachedStream< - Cache = StreamCache, - Item = (state_chain_runtime::Hash, state_chain_runtime::Header), - > + Send - + Sync - + Unpin - + 'static + CachedStream + Send + Sync + Unpin + 'static { } -impl StateChainStreamApi for utilities::InnerCachedStream +impl StateChainStreamApi for utilities::InnerCachedStream where - S: Stream - + Send - + Sync - + Unpin - + 'static, - F: FnMut(&S::Item) -> StreamCache + Send + Sync + Unpin + 'static, + S: Stream + Send + Sync + Unpin + 'static, + F: FnMut(&S::Item) -> BlockInfo + Send + Sync + Unpin + 'static, { } @@ -94,8 +90,8 @@ pub struct StateChainClient< unsigned_extrinsic_client: unsigned::UnsignedExtrinsicClient, _block_producer_handles: (ScopedJoinHandle<()>, ScopedJoinHandle<()>), pub base_rpc_client: Arc, - latest_finalized_block_hash_watcher: tokio::sync::watch::Receiver, - latest_unfinalized_block_hash_watcher: tokio::sync::watch::Receiver, + latest_finalized_block_watcher: tokio::sync::watch::Receiver, + latest_unfinalized_block_watcher: tokio::sync::watch::Receiver, } impl StateChainClient { @@ -200,31 +196,27 @@ async fn create_finalized_block_subscription< base_rpc_client: Arc, signed_extrinsic_client_builder: &mut SignedExtrinsicClientBuilder, required_version_and_wait: Option<(SemVer, bool)>, -) -> Result<(watch::Receiver, impl StateChainStreamApi + Clone, ScopedJoinHandle<()>)> { - let mut finalized_block_header_stream = { +) -> Result<(watch::Receiver, impl StateChainStreamApi + Clone, ScopedJoinHandle<()>)> { + let mut finalized_block_stream = { // https://substrate.stackexchange.com/questions/3667/api-rpc-chain-subscribefinalizedheads-missing-blocks // https://arxiv.org/abs/2007.01560 - let sparse_finalized_block_header_stream = base_rpc_client + let sparse_finalized_block_stream = base_rpc_client .subscribe_finalized_block_headers() .await? .map_err(Into::into) + .map_ok(|header| -> BlockInfo { header.into() }) .chain(futures::stream::once(std::future::ready(Err(anyhow::anyhow!( "sparse_finalized_block_header_stream unexpectedly ended" ))))); - let mut finalized_block_header_stream = Box::pin( - inject_intervening_headers( - sparse_finalized_block_header_stream, - base_rpc_client.clone(), - ) - .await?, + let mut finalized_block_stream = Box::pin( + inject_intervening_headers(sparse_finalized_block_stream, base_rpc_client.clone()) + .await?, ); - let latest_finalized_header: state_chain_runtime::Header = - finalized_block_header_stream.next().await.unwrap()?; + let latest_finalized_block: BlockInfo = finalized_block_stream.next().await.unwrap()?; - finalized_block_header_stream - .make_try_cached(latest_finalized_header, |header| header.clone()) + finalized_block_stream.make_try_cached(latest_finalized_block, |header| *header) }; // Often `finalized_header` returns a significantly newer latest block than the stream @@ -232,12 +224,12 @@ async fn create_finalized_block_subscription< { let finalised_header_hash = base_rpc_client.latest_finalized_block_hash().await?; let finalised_header = base_rpc_client.block_header(finalised_header_hash).await?; - if finalized_block_header_stream.cache().number < finalised_header.number { + if finalized_block_stream.cache().number < finalised_header.number { let blocks_to_skip = - finalized_block_header_stream.cache().number + 1..=finalised_header.number; + finalized_block_stream.cache().number + 1..=finalised_header.number; for block_number in blocks_to_skip { assert_eq!( - finalized_block_header_stream.next().await.unwrap()?.number, + finalized_block_stream.next().await.unwrap()?.number, block_number, "{SUBSTRATE_BEHAVIOUR}" ); @@ -246,49 +238,45 @@ async fn create_finalized_block_subscription< } signed_extrinsic_client_builder - .pre_compatibility(base_rpc_client.clone(), &mut finalized_block_header_stream) + .pre_compatibility(base_rpc_client.clone(), &mut finalized_block_stream) .await?; if let Some((required_version, wait_for_required_version)) = required_version_and_wait { - let latest_block_header = finalized_block_header_stream.cache().clone(); + let latest_block = *finalized_block_stream.cache(); if wait_for_required_version { let incompatible_blocks = - futures::stream::once(futures::future::ready(Ok::<_, anyhow::Error>( - latest_block_header, - ))) - .chain(finalized_block_header_stream.by_ref()) - .and_then(|block_header| { - let base_rpc_client = &base_rpc_client; - let block_hash = block_header.hash(); - async move { - Ok::<_, anyhow::Error>(( - block_hash, - base_rpc_client - .storage_value::>(block_hash) - .await?, - )) - } - }) - .try_take_while(|(_block_header, current_release_version)| { - futures::future::ready({ - Ok::<_, anyhow::Error>( - !required_version.is_compatible_with(*current_release_version), - ) + futures::stream::once(futures::future::ready(Ok::<_, anyhow::Error>(latest_block))) + .chain(finalized_block_stream.by_ref()) + .and_then(|block| { + let base_rpc_client = &base_rpc_client; + async move { + Ok::<_, anyhow::Error>(( + block.hash, + base_rpc_client + .storage_value::>(block.hash) + .await?, + )) + } }) - }) - .boxed(); + .try_take_while(|(_block_hash, current_release_version)| { + futures::future::ready({ + Ok::<_, anyhow::Error>( + !required_version.is_compatible_with(*current_release_version), + ) + }) + }) + .boxed(); incompatible_blocks.try_for_each(move |(block_hash, current_release_version)| futures::future::ready({ info!("This version '{}' is incompatible with the current release '{}' at block: {}. WAITING for a compatible release version.", required_version, current_release_version, block_hash); Ok::<_, anyhow::Error>(()) })).await?; } else { - let latest_block_hash = latest_block_header.hash(); let current_release_version = base_rpc_client .storage_value::>( - latest_block_hash, + latest_block.hash, ) .await?; if !required_version.is_compatible_with(current_release_version) { @@ -296,58 +284,50 @@ async fn create_finalized_block_subscription< "This version '{}' is incompatible with the current release '{}' at block: {}.", required_version, current_release_version, - latest_block_hash, + latest_block.hash, ); } } } const BLOCK_CAPACITY: usize = 10; - let (block_sender, block_receiver) = - spmc::channel::<(state_chain_runtime::Hash, state_chain_runtime::Header)>(BLOCK_CAPACITY); + let (block_sender, block_receiver) = spmc::channel::(BLOCK_CAPACITY); - let latest_block_header = finalized_block_header_stream.cache(); - let latest_block_hash = latest_block_header.hash(); + let latest_block = *finalized_block_stream.cache(); - let (latest_block_hash_sender, latest_block_hash_watcher) = - tokio::sync::watch::channel::(latest_block_hash); + let (latest_block_sender, latest_block_watcher) = + tokio::sync::watch::channel::(latest_block); Ok(( - latest_block_hash_watcher, - FinalizedCachedStream::new(block_receiver.make_cached( - StreamCache { - block_hash: latest_block_hash, - block_number: latest_block_header.number, - }, - |(block_hash, block_header): &( - state_chain_runtime::Hash, - state_chain_runtime::Header, - )| StreamCache { block_hash: *block_hash, block_number: block_header.number }, - )), - scope.spawn_with_handle({ - let base_rpc_client = base_rpc_client.clone(); - let mut finalized_block_header_stream = finalized_block_header_stream.into_inner(); - async move { - loop { - let block_header = - finalized_block_header_stream.next().await.unwrap()?; - let block_hash = block_header.hash(); - if let Some((required_version, _)) = required_version_and_wait { - let current_release_version = base_rpc_client.storage_value::>(block_hash).await?; - if !required_version.is_compatible_with(current_release_version) { - break Err(anyhow!("This version '{}' is no longer compatible with the release version '{}' at block: {}", required_version, current_release_version, block_hash)) + latest_block_watcher, + FinalizedCachedStream::new(block_receiver.make_cached( + latest_block, + |block: &BlockInfo| *block, + )), + scope.spawn_with_handle({ + let base_rpc_client = base_rpc_client.clone(); + let mut finalized_block_stream = finalized_block_stream.into_inner(); + async move { + loop { + let block = + finalized_block_stream.next().await.unwrap()?; + if let Some((required_version, _)) = required_version_and_wait { + let current_release_version = base_rpc_client.storage_value::>(block.hash).await?; + if !required_version.is_compatible_with(current_release_version) { + break Err(anyhow!("This version '{}' is no longer compatible with the release version '{}' at block: {}", required_version, current_release_version, block.hash)) + } } - } - if !block_sender.send((block_hash, block_header)).await { - break Ok(()) - } - if latest_block_hash_sender.send(block_hash).is_err() { - break Ok(()) + if !block_sender.send(block).await { + break Ok(()) + } + if latest_block_sender.send(block).is_err() { + break Ok(()) + } } } - } - }))) + }) + )) } async fn create_unfinalized_block_subscription< @@ -355,46 +335,39 @@ async fn create_unfinalized_block_subscription< >( scope: &Scope<'_, anyhow::Error>, base_rpc_client: Arc, -) -> Result<(watch::Receiver, impl StateChainStreamApi + Clone, ScopedJoinHandle<()>)> -{ - let mut block_header_stream = base_rpc_client +) -> Result<( + watch::Receiver, + impl StateChainStreamApi + Clone, + ScopedJoinHandle<()>, +)> { + let mut sparse_block_stream = base_rpc_client .subscribe_unfinalized_block_headers() .await? .map_err(Into::into) + .map_ok(|header| -> BlockInfo { header.into() }) .chain(futures::stream::once(std::future::ready(Err(anyhow::anyhow!( - "sparse_block_header_stream unexpectedly ended" + "sparse_block_stream unexpectedly ended" ))))); - let first_block_header = block_header_stream.next().await.unwrap()?; - - let latest_block_hash = first_block_header.hash(); - let latest_block_number = first_block_header.number; + let first_block = sparse_block_stream.next().await.unwrap()?; const BLOCK_CAPACITY: usize = 10; - let (block_sender, block_receiver) = - spmc::channel::<(state_chain_runtime::Hash, state_chain_runtime::Header)>(BLOCK_CAPACITY); + let (block_sender, block_receiver) = spmc::channel::(BLOCK_CAPACITY); - let (latest_block_hash_sender, latest_block_hash_watcher) = - tokio::sync::watch::channel::(latest_block_hash); + let (latest_block_sender, latest_block_watcher) = + tokio::sync::watch::channel::(first_block); Ok(( - latest_block_hash_watcher, - block_receiver.make_cached( - StreamCache { block_hash: latest_block_hash, block_number: latest_block_number }, - |(block_hash, block_header): &( - state_chain_runtime::Hash, - state_chain_runtime::Header, - )| StreamCache { block_hash: *block_hash, block_number: block_header.number }, - ), + latest_block_watcher, + block_receiver.make_cached(first_block, |block: &BlockInfo| *block), scope.spawn_with_handle(async move { loop { - let block_header = block_header_stream.next().await.unwrap()?; - let block_hash = block_header.hash(); + let block = sparse_block_stream.next().await.unwrap()?; - if !block_sender.send((block_hash, block_header)).await { + if !block_sender.send(block).await { break Ok(()) } - if latest_block_hash_sender.send(block_hash).is_err() { + if latest_block_sender.send(block).is_err() { break Ok(()) } } @@ -405,66 +378,60 @@ async fn create_unfinalized_block_subscription< async fn inject_intervening_headers< BaseRpcClient: base_rpc_api::BaseRpcApi + Send + Sync + 'static, >( - sparse_block_header_stream: impl Stream> + Send + 'static, + sparse_block_stream: impl Stream> + Send + 'static, base_rpc_client: Arc, -) -> Result>> { - let mut sparse_block_header_stream = Box::pin(sparse_block_header_stream); +) -> Result>> { + let mut sparse_block_stream = Box::pin(sparse_block_stream); - let latest_finalized_header: state_chain_runtime::Header = sparse_block_header_stream - .next() - .await - .ok_or(anyhow!("initial header missing"))??; + let latest_finalized_block: BlockInfo = + sparse_block_stream.next().await.ok_or(anyhow!("initial header missing"))??; let stream_rest = utilities::assert_stream_send( - sparse_block_header_stream + sparse_block_stream .and_then({ - let mut latest_finalized_header = latest_finalized_header.clone(); - move |next_finalized_header| { + // 1 + let mut latest_finalized_block = latest_finalized_block; + move |next_finalized_block| { assert!( - latest_finalized_header.number < next_finalized_header.number, + latest_finalized_block.number < next_finalized_block.number, "{SUBSTRATE_BEHAVIOUR}", ); - let prev_finalized_header = std::mem::replace( - &mut latest_finalized_header, - next_finalized_header.clone(), - ); + let prev_finalized_block = + std::mem::replace(&mut latest_finalized_block, next_finalized_block); let base_rpc_client = base_rpc_client.clone(); async move { let base_rpc_client = &base_rpc_client; - let intervening_headers: Vec<_> = futures::stream::iter( - prev_finalized_header.number + 1..next_finalized_header.number, + let intervening_blocks: Vec<_> = futures::stream::iter( + prev_finalized_block.number + 1..next_finalized_block.number, ) .then(|block_number| async move { let block_hash = base_rpc_client .block_hash(block_number) .await? .expect(SUBSTRATE_BEHAVIOUR); - let block_header = base_rpc_client.block_header(block_hash).await?; - assert_eq!(block_header.hash(), block_hash, "{SUBSTRATE_BEHAVIOUR}"); - assert_eq!(block_header.number, block_number, "{SUBSTRATE_BEHAVIOUR}",); - Result::<_, anyhow::Error>::Ok((block_hash, block_header)) + let block: BlockInfo = + base_rpc_client.block_header(block_hash).await?.into(); + assert_eq!(block.hash, block_hash, "{SUBSTRATE_BEHAVIOUR}"); + assert_eq!(block.number, block_number, "{SUBSTRATE_BEHAVIOUR}",); + Result::<_, anyhow::Error>::Ok(block) }) .try_collect() .await?; - for (block_hash, next_block_header) in Iterator::zip( - std::iter::once(&prev_finalized_header.hash()) - .chain(intervening_headers.iter().map(|(hash, _header)| hash)), - intervening_headers - .iter() - .map(|(_hash, header)| header) - .chain(std::iter::once(&next_finalized_header)), + // 3 + for (previous_block, next_block) in Iterator::zip( + std::iter::once(&prev_finalized_block).chain(intervening_blocks.iter()), + intervening_blocks.iter().chain(std::iter::once(&next_finalized_block)), ) { - assert_eq!(*block_hash, next_block_header.parent_hash); + assert_eq!(previous_block.hash, next_block.parent_hash); } Result::<_, anyhow::Error>::Ok(futures::stream::iter( - intervening_headers + intervening_blocks .into_iter() - .map(|(_hash, header)| header) - .chain(std::iter::once(next_finalized_header)) + .chain(std::iter::once(next_finalized_block)) .map(Result::<_, anyhow::Error>::Ok), )) } @@ -473,7 +440,7 @@ async fn inject_intervening_headers< .try_flatten(), ); - Ok(futures::stream::once(async { Ok(latest_finalized_header) }).chain(stream_rest)) + Ok(futures::stream::once(async move { Ok(latest_finalized_block) }).chain(stream_rest)) } impl @@ -506,7 +473,7 @@ impl, - > + Send - + Unpin, + FinalizedBlockStream: TryCachedStream> + Send + Unpin, >( &mut self, base_rpc_client: Arc, @@ -596,11 +559,7 @@ impl SignedExtrinsicClientBuilderTrait for () { async fn pre_compatibility< BaseRpcClient: base_rpc_api::BaseRpcApi + Send + Sync + 'static, - FinalizedBlockStream: TryCachedStream< - Cache = state_chain_runtime::Header, - Item = Result, - > + Send - + Unpin, + FinalizedBlockStream: TryCachedStream> + Send + Unpin, >( &mut self, _base_rpc_client: Arc, @@ -639,11 +598,7 @@ impl SignedExtrinsicClientBuilderTrait for SignedExtrinsicClientBuilder { async fn pre_compatibility< BaseRpcClient: base_rpc_api::BaseRpcApi + Send + Sync + 'static, - FinalizedBlockStream: TryCachedStream< - Cache = state_chain_runtime::Header, - Item = Result, - > + Send - + Unpin, + FinalizedBlockStream: TryCachedStream> + Send + Unpin, >( &mut self, base_rpc_client: Arc, @@ -674,7 +629,7 @@ impl SignedExtrinsicClientBuilderTrait for SignedExtrinsicClientBuilder { let account_nonce = { loop { - let block_hash = finalized_block_stream.cache().hash(); + let block_hash = finalized_block_stream.cache().hash; match base_rpc_client .storage_map_entry::>( @@ -702,7 +657,7 @@ impl SignedExtrinsicClientBuilderTrait for SignedExtrinsicClientBuilder { finalized_block_stream.next().await.expect(OR_CANCEL)?; } - let block_hash = finalized_block_stream.cache().hash(); + let block_hash = finalized_block_stream.cache().hash; base_rpc_client .storage_map_entry::>( @@ -912,12 +867,12 @@ impl< SignedExtrinsicClient: Send + Sync + 'static, > ChainApi for StateChainClient { - fn latest_finalized_hash(&self) -> state_chain_runtime::Hash { - *self.latest_finalized_block_hash_watcher.borrow() + fn latest_finalized_block(&self) -> BlockInfo { + *self.latest_finalized_block_watcher.borrow() } - fn latest_unfinalized_hash(&self) -> state_chain_runtime::Hash { - *self.latest_unfinalized_block_hash_watcher.borrow() + fn latest_unfinalized_block(&self) -> BlockInfo { + *self.latest_unfinalized_block_watcher.borrow() } } @@ -937,7 +892,7 @@ pub mod mocks { use super::{ extrinsic_api::{self, unsigned}, - storage_api, + storage_api, BlockInfo, }; mock! { @@ -987,8 +942,8 @@ pub mod mocks { } #[async_trait] impl ChainApi for StateChainClient { - fn latest_finalized_hash(&self) -> state_chain_runtime::Hash; - fn latest_unfinalized_hash(&self) -> state_chain_runtime::Hash; + fn latest_finalized_block(&self) -> BlockInfo; + fn latest_unfinalized_block(&self) -> BlockInfo; } #[async_trait] impl StorageApi for StateChainClient { @@ -1047,10 +1002,9 @@ mod tests { use std::collections::BTreeMap; + use sp_runtime::Digest; use state_chain_runtime::Header; - use crate::state_chain_observer::test_helpers::test_header; - use super::{base_rpc_api::MockBaseRpcApi, *}; struct TestChain { @@ -1063,9 +1017,16 @@ mod tests { let mut headers = Vec::
::with_capacity(total_blocks); for index in 0..total_blocks { - let parent_hash = index.checked_sub(1).map(|parent_i| headers[parent_i].hash()); - let header = test_header(index as u32, parent_hash); - headers.push(header); + headers.push(Header { + number: index as u32, + parent_hash: index + .checked_sub(1) + .map(|parent_i| headers[parent_i].hash()) + .unwrap_or_default(), + state_root: H256::default(), + extrinsics_root: H256::default(), + digest: Digest { logs: Vec::new() }, + }); } let hashes = headers.iter().map(|header| header.hash()).collect(); @@ -1102,7 +1063,7 @@ mod tests { ) -> Result> { let sparse_stream = tokio_stream::iter(block_numbers).map(move |num| { let hash = &chain.hashes[num as usize]; - Ok(chain.headers[hash].clone()) + Ok(chain.headers[hash].clone().into()) }); let stream = inject_intervening_headers(sparse_stream, rpc).await?; diff --git a/engine/src/state_chain_observer/sc_observer/mod.rs b/engine/src/state_chain_observer/sc_observer/mod.rs index 2535cfefc0..179719f7df 100644 --- a/engine/src/state_chain_observer/sc_observer/mod.rs +++ b/engine/src/state_chain_observer/sc_observer/mod.rs @@ -289,10 +289,10 @@ where let mut sc_block_stream = Box::pin(sc_block_stream); loop { match sc_block_stream.next().await { - Some((current_block_hash, current_block_header)) => { - debug!("Processing SC block {} with block hash: {current_block_hash:#x}", current_block_header.number); + Some(current_block) => { + debug!("Processing SC block {} with block hash: {:#x}", current_block.number, current_block.hash); - match state_chain_client.storage_value::>(current_block_hash).await { + match state_chain_client.storage_value::>(current_block.hash).await { Ok(events) => { for event_record in events { match_event! {event_record.event, { @@ -590,24 +590,24 @@ where } }}}} Err(error) => { - error!("Failed to decode events at block {}. {error}", current_block_header.number); + error!("Failed to decode events at block {}. {error}", current_block.number); } } // All nodes must send a heartbeat regardless of their validator status (at least for now). // We send it every `blocks_per_heartbeat` from the block they started up at. - if ((current_block_header.number - last_heartbeat_submitted_at) >= blocks_per_heartbeat + if ((current_block.number - last_heartbeat_submitted_at) >= blocks_per_heartbeat // Submitting earlier than one minute in may falsely indicate liveness. ) && has_submitted_init_heartbeat.load(Ordering::Relaxed) { - info!("Sending heartbeat at block: {}", current_block_header.number); + info!("Sending heartbeat at block: {}", current_block.number); state_chain_client .finalize_signed_extrinsic( pallet_cf_reputation::Call::heartbeat {}, ) .await; - last_heartbeat_submitted_at = current_block_header.number; + last_heartbeat_submitted_at = current_block.number; } } None => { diff --git a/engine/src/state_chain_observer/sc_observer/tests.rs b/engine/src/state_chain_observer/sc_observer/tests.rs index ad98976ec1..81dc4b8c46 100644 --- a/engine/src/state_chain_observer/sc_observer/tests.rs +++ b/engine/src/state_chain_observer/sc_observer/tests.rs @@ -5,7 +5,7 @@ use crate::{ dot::retry_rpc::mocks::MockDotHttpRpcClient, eth::retry_rpc::mocks::MockEthRetryRpcClient, state_chain_observer::{ - client::{extrinsic_api, finalized_stream::FinalizedCachedStream, StreamCache}, + client::{extrinsic_api, finalized_stream::FinalizedCachedStream}, test_helpers::test_header, }, }; @@ -15,7 +15,7 @@ use cf_chains::{ }; use cf_primitives::{AccountRole, GENESIS_EPOCH}; use frame_system::Phase; -use futures::{FutureExt, StreamExt}; +use futures::FutureExt; use mockall::predicate::eq; use multisig::{eth::EvmCryptoScheme, ChainSigning, SignatureToThresholdSignature}; use pallet_cf_broadcast::BroadcastAttemptId; @@ -73,7 +73,6 @@ async fn start_sc_observer< // historical epochs we were a part of #[tokio::test] async fn only_encodes_and_signs_when_specified() { - let initial_block_hash = H256::default(); let account_id = AccountId::new([0; 32]); let mut state_chain_client = MockStateChainClient::new(); @@ -83,20 +82,13 @@ async fn only_encodes_and_signs_when_specified() { || account_id }); - let block_header = test_header(21, None); - let sc_block_stream = tokio_stream::iter([block_header.clone()]) - .map(|block_header| (block_header.hash(), block_header)) - .make_cached( - StreamCache { block_hash: initial_block_hash, block_number: 20 }, - |(block_hash, block_header): &( - state_chain_runtime::Hash, - state_chain_runtime::Header, - )| StreamCache { block_hash: *block_hash, block_number: block_header.number }, - ); + let block = test_header(21, None); + let sc_block_stream = + tokio_stream::iter([block]).make_cached(test_header(20, None), |block| *block); state_chain_client .expect_storage_value::>() - .with(eq(block_header.hash())) + .with(eq(block.hash)) .once() .return_once(move |_| { Ok(vec![ diff --git a/engine/src/state_chain_observer/test_helpers.rs b/engine/src/state_chain_observer/test_helpers.rs index cefb8651e8..4678a57642 100644 --- a/engine/src/state_chain_observer/test_helpers.rs +++ b/engine/src/state_chain_observer/test_helpers.rs @@ -1,13 +1,11 @@ use sp_core::H256; -use sp_runtime::Digest; -use state_chain_runtime::Header; -pub fn test_header(number: u32, parent_hash: Option) -> Header { - Header { +use super::client::BlockInfo; + +pub fn test_header(number: u32, parent_hash: Option) -> BlockInfo { + BlockInfo { number, parent_hash: parent_hash.unwrap_or_default(), - state_root: H256::default(), - extrinsics_root: H256::default(), - digest: Digest { logs: Vec::new() }, + hash: H256::from_low_u64_le(number.into()), } } diff --git a/engine/src/witness/common/chunked_chain_source/chunked_by_vault/deposit_addresses.rs b/engine/src/witness/common/chunked_chain_source/chunked_by_vault/deposit_addresses.rs index 80e5be7770..ac8a7b2935 100644 --- a/engine/src/witness/common/chunked_chain_source/chunked_by_vault/deposit_addresses.rs +++ b/engine/src/witness/common/chunked_chain_source/chunked_by_vault/deposit_addresses.rs @@ -108,7 +108,7 @@ where let (sender, receiver) = watch::channel( Self::get_chain_state_and_addresses( &*state_chain_client, - state_chain_stream.cache().block_hash, + state_chain_stream.cache().hash, ) .await, ); @@ -116,9 +116,9 @@ where scope.spawn(async move { utilities::loop_select! { let _ = sender.closed() => { break Ok(()) }, - if let Some((_block_hash, _block_header)) = state_chain_stream.next() => { + if let Some(_block_header) = state_chain_stream.next() => { // Note it is still possible for engines to inconsistently select addresses to witness for a block due to how the SC expiries ingress addresses - let _result = sender.send(Self::get_chain_state_and_addresses(&*state_chain_client, state_chain_stream.cache().block_hash).await); + let _result = sender.send(Self::get_chain_state_and_addresses(&*state_chain_client, state_chain_stream.cache().hash).await); } else break Ok(()), } }); diff --git a/engine/src/witness/common/chunked_chain_source/chunked_by_vault/egress_items.rs b/engine/src/witness/common/chunked_chain_source/chunked_by_vault/egress_items.rs index 1727f0cdb0..0fad3f39bf 100644 --- a/engine/src/witness/common/chunked_chain_source/chunked_by_vault/egress_items.rs +++ b/engine/src/witness/common/chunked_chain_source/chunked_by_vault/egress_items.rs @@ -66,18 +66,15 @@ where state_chain_client: Arc, ) -> Self { let (sender, receiver) = tokio::sync::watch::channel( - Self::get_transaction_out_ids( - &*state_chain_client, - state_chain_stream.cache().block_hash, - ) - .await, + Self::get_transaction_out_ids(&*state_chain_client, state_chain_stream.cache().hash) + .await, ); scope.spawn(async move { utilities::loop_select! { let _ = sender.closed() => { break Ok(()) }, - if let Some((_block_hash, _block_header)) = state_chain_stream.next() => { - let _result = sender.send(Self::get_transaction_out_ids(&*state_chain_client, _block_hash).await); + if let Some(block) = state_chain_stream.next() => { + let _result = sender.send(Self::get_transaction_out_ids(&*state_chain_client, block.hash).await); } else break Ok(()), } }); diff --git a/engine/src/witness/common/epoch_source.rs b/engine/src/witness/common/epoch_source.rs index c56c77bdc5..3833850d64 100644 --- a/engine/src/witness/common/epoch_source.rs +++ b/engine/src/witness/common/epoch_source.rs @@ -84,7 +84,7 @@ impl EpochSource<(), ()> { ) -> EpochSourceBuilder<'a, 'env, StateChainClient, (), ()> { let (epoch_update_sender, epoch_update_receiver) = spmc::channel(CHANNEL_BUFFER); - let initial_block_hash = state_chain_stream.cache().block_hash; + let initial_block_hash = state_chain_stream.cache().hash; let mut current_epoch = state_chain_client .storage_value::>( @@ -115,28 +115,28 @@ impl EpochSource<(), ()> { async move { utilities::loop_select! { let _ = epoch_update_sender.closed() => { break Ok(()) }, - if let Some((block_hash, _block_header)) = state_chain_stream.next() => { + if let Some(block) = state_chain_stream.next() => { let old_current_epoch = std::mem::replace(&mut current_epoch, state_chain_client .storage_value::>(block_hash) + >>(block.hash) .await .expect(STATE_CHAIN_CONNECTION)); if old_current_epoch != current_epoch { - epoch_update_sender.send((old_current_epoch, block_hash, EpochUpdate::Historic(()))).await; - epoch_update_sender.send((current_epoch, block_hash, EpochUpdate::NewCurrent(()))).await; + epoch_update_sender.send((old_current_epoch, block.hash, EpochUpdate::Historic(()))).await; + epoch_update_sender.send((current_epoch, block.hash, EpochUpdate::NewCurrent(()))).await; historic_epochs.insert(old_current_epoch); } let old_historic_epochs = std::mem::replace(&mut historic_epochs, BTreeSet::from_iter( state_chain_client.storage_map::, Vec<_>>(block_hash).await.expect(STATE_CHAIN_CONNECTION).into_iter().map(|(_, index)| index) + >, Vec<_>>(block.hash).await.expect(STATE_CHAIN_CONNECTION).into_iter().map(|(_, index)| index) )); assert!(!historic_epochs.contains(¤t_epoch)); assert!(old_historic_epochs.is_superset(&historic_epochs)); for expired_epoch in old_historic_epochs.difference(&historic_epochs) { - epoch_update_sender.send((*expired_epoch, block_hash, EpochUpdate::Expired)).await; + epoch_update_sender.send((*expired_epoch, block.hash, EpochUpdate::Expired)).await; } } else break Ok(()), } diff --git a/engine/src/witness/eth.rs b/engine/src/witness/eth.rs index 26070817ad..d7ff5a4857 100644 --- a/engine/src/witness/eth.rs +++ b/engine/src/witness/eth.rs @@ -69,28 +69,28 @@ where { let state_chain_gateway_address = state_chain_client .storage_value::>( - state_chain_client.latest_finalized_hash(), + state_chain_client.latest_finalized_block().hash, ) .await .context("Failed to get StateChainGateway address from SC")?; let key_manager_address = state_chain_client .storage_value::>( - state_chain_client.latest_finalized_hash(), + state_chain_client.latest_finalized_block().hash, ) .await .context("Failed to get KeyManager address from SC")?; let vault_address = state_chain_client .storage_value::>( - state_chain_client.latest_finalized_hash(), + state_chain_client.latest_finalized_block().hash, ) .await .context("Failed to get Vault contract address from SC")?; let address_checker_address = state_chain_client .storage_value::>( - state_chain_client.latest_finalized_hash(), + state_chain_client.latest_finalized_block().hash, ) .await .expect(STATE_CHAIN_CONNECTION); @@ -98,7 +98,7 @@ where let supported_erc20_tokens: HashMap = state_chain_client .storage_map::, _>( - state_chain_client.latest_finalized_hash(), + state_chain_client.latest_finalized_block().hash, ) .await .context("Failed to fetch Ethereum supported assets")?;