From f8793588c0ad17c0a29e3b3ed379fb65d5933fa9 Mon Sep 17 00:00:00 2001 From: David Salami <31099392+Wizdave97@users.noreply.github.com> Date: Mon, 23 Sep 2024 21:03:46 +0100 Subject: [PATCH] Fisherman updates and fixes (#313) Co-authored-by: Seun Lanlege --- Cargo.lock | 2 + modules/ismp/pallets/asset-gateway/src/lib.rs | 3 - modules/ismp/pallets/demo/src/lib.rs | 36 +++++- .../src/tests/pallet_asset_gateway.rs | 2 - .../src/tests/xcm_integration_test.rs | 4 +- modules/utils/subxt/src/lib.rs | 17 ++- parachain/runtimes/gargantua/Cargo.toml | 2 + parachain/runtimes/gargantua/src/ismp.rs | 12 +- parachain/runtimes/gargantua/src/lib.rs | 3 +- parachain/runtimes/nexus/src/ismp.rs | 4 + tesseract/evm/src/byzantine.rs | 95 +++++++++++++- tesseract/evm/src/provider.rs | 2 +- tesseract/fisherman/src/lib.rs | 42 +++--- tesseract/primitives/src/lib.rs | 23 +++- tesseract/primitives/src/mocks.rs | 8 ++ tesseract/relayer/src/cli.rs | 9 +- tesseract/substrate/Cargo.toml | 1 + tesseract/substrate/src/byzantine.rs | 122 ++++++++++++++++-- tesseract/substrate/src/calls.rs | 4 +- tesseract/substrate/src/provider.rs | 17 ++- tesseract/substrate/src/testing.rs | 10 +- 21 files changed, 352 insertions(+), 66 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b1c698efd..c555394c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6102,6 +6102,7 @@ dependencies = [ "pallet-call-decompressor", "pallet-collator-selection", "pallet-collective", + "pallet-fishermen", "pallet-ismp", "pallet-ismp-demo", "pallet-ismp-host-executive", @@ -23801,6 +23802,7 @@ dependencies = [ "serde", "serde-hex-utils", "sp-core 34.0.0", + "sp-runtime 39.0.0", "substrate-state-machine", "subxt", "subxt-utils", diff --git a/modules/ismp/pallets/asset-gateway/src/lib.rs b/modules/ismp/pallets/asset-gateway/src/lib.rs index 656b42ea5..8f4ea9c83 100644 --- a/modules/ismp/pallets/asset-gateway/src/lib.rs +++ b/modules/ismp/pallets/asset-gateway/src/lib.rs @@ -234,7 +234,6 @@ where convert_to_erc20(amount).to_big_endian(&mut bytes); alloy_primitives::U256::from_be_bytes(bytes) }, - max_fee: Default::default(), asset_id, redeem: false, from: from.into(), @@ -279,8 +278,6 @@ alloy_sol_macro::sol! { struct Body { // Amount of the asset to be sent uint256 amount; - // Maximum amount to pay for liquidity fees - uint256 max_fee; // The asset identifier bytes32 asset_id; // Flag to redeem the erc20 asset on the destination diff --git a/modules/ismp/pallets/demo/src/lib.rs b/modules/ismp/pallets/demo/src/lib.rs index 72d744130..8f610e4eb 100644 --- a/modules/ismp/pallets/demo/src/lib.rs +++ b/modules/ismp/pallets/demo/src/lib.rs @@ -51,8 +51,9 @@ pub mod pallet { }; use frame_system::pallet_prelude::*; use ismp::{ + consensus::{StateCommitment, StateMachineHeight}, dispatcher::{DispatchGet, DispatchPost, DispatchRequest, FeeMetadata, IsmpDispatcher}, - host::StateMachine, + host::{IsmpHost, StateMachine}, }; #[pallet::pallet] @@ -70,7 +71,8 @@ pub mod pallet { /// Native currency implementation type NativeCurrency: Mutate; /// Ismp message disptacher - type IsmpDispatcher: IsmpDispatcher::Balance> + type IsmpHost: IsmpHost + + IsmpDispatcher::Balance> + Default; } @@ -162,7 +164,7 @@ pub mod pallet { }; // dispatch the request - let dispatcher = T::IsmpDispatcher::default(); + let dispatcher = T::IsmpHost::default(); dispatcher .dispatch_request( DispatchRequest::Post(post), @@ -202,7 +204,7 @@ pub mod pallet { context: Default::default(), }; - let dispatcher = T::IsmpDispatcher::default(); + let dispatcher = T::IsmpHost::default(); dispatcher .dispatch_request( DispatchRequest::Get(get), @@ -224,7 +226,7 @@ pub mod pallet { timeout: params.timeout, body: b"Hello from polkadot".to_vec(), }; - let dispatcher = T::IsmpDispatcher::default(); + let dispatcher = T::IsmpHost::default(); for _ in 0..params.count { // dispatch the request dispatcher @@ -236,6 +238,30 @@ pub mod pallet { } Ok(()) } + + /// Insert an unverified state commitment into the host, this is for testing purposes only. + #[pallet::weight(Weight::from_parts(1_000_000, 0))] + #[pallet::call_index(3)] + pub fn set_state_commitment( + origin: OriginFor, + height: StateMachineHeight, + commitment: StateCommitment, + ) -> DispatchResult { + use ismp::events::{Event, StateMachineUpdated}; + ensure_root(origin)?; + let host = T::IsmpHost::default(); + + // shouldn't return an error + host.store_state_machine_commitment(height, commitment).unwrap(); + host.store_state_machine_update_time(height, host.timestamp()).unwrap(); + + // deposit the event + pallet_ismp::Pallet::::deposit_pallet_event(Event::StateMachineUpdated( + StateMachineUpdated { state_machine_id: height.id, latest_height: height.height }, + )); + + Ok(()) + } } /// Transfer payload diff --git a/modules/ismp/pallets/testsuite/src/tests/pallet_asset_gateway.rs b/modules/ismp/pallets/testsuite/src/tests/pallet_asset_gateway.rs index 96906cdc6..53f66bf89 100644 --- a/modules/ismp/pallets/testsuite/src/tests/pallet_asset_gateway.rs +++ b/modules/ismp/pallets/testsuite/src/tests/pallet_asset_gateway.rs @@ -149,7 +149,6 @@ fn should_process_on_accept_module_callback_correctly() { }, asset_id: pallet_asset_gateway::Pallet::::dot_asset_id().0.into(), redeem: false, - max_fee: Default::default(), from: alloy_primitives::B256::from_slice(ALICE.as_slice()), to: alloy_primitives::B256::from_slice(ALICE.as_slice()), }; @@ -260,7 +259,6 @@ fn should_process_on_timeout_module_callback_correctly() { }, asset_id: pallet_asset_gateway::Pallet::::dot_asset_id().0.into(), redeem: false, - max_fee: Default::default(), from: alloy_primitives::FixedBytes::<32>::from_slice(ALICE.as_slice()), to: alloy_primitives::FixedBytes::<32>::from_slice(&[0u8; 32]), }; diff --git a/modules/ismp/pallets/testsuite/src/tests/xcm_integration_test.rs b/modules/ismp/pallets/testsuite/src/tests/xcm_integration_test.rs index 501ee2c5b..edd698c48 100644 --- a/modules/ismp/pallets/testsuite/src/tests/xcm_integration_test.rs +++ b/modules/ismp/pallets/testsuite/src/tests/xcm_integration_test.rs @@ -86,7 +86,7 @@ async fn should_dispatch_ismp_request_when_xcm_is_received() -> anyhow::Result<( ) .encode_call_data(&client.metadata())?; let tx = Extrinsic::new("Sudo", "sudo", encoded_call); - send_extrinsic(&client, signer, tx).await?; + send_extrinsic(&client, signer, tx, None).await?; } let ext = Extrinsic::new( @@ -102,7 +102,7 @@ async fn should_dispatch_ismp_request_when_xcm_is_received() -> anyhow::Result<( .ok_or_else(|| anyhow!("Failed to fetch latest header"))? .number(); - send_extrinsic(&client, signer, ext).await?; + send_extrinsic(&client, signer, ext, None).await?; let mut sub = para_client.rpc().subscribe_finalized_block_headers().await?; diff --git a/modules/utils/subxt/src/lib.rs b/modules/utils/subxt/src/lib.rs index fa66fc565..f37be639c 100644 --- a/modules/utils/subxt/src/lib.rs +++ b/modules/utils/subxt/src/lib.rs @@ -365,13 +365,14 @@ pub mod signer { client: &OnlineClient, signer: InMemorySigner, payload: Tx, + tip: Option, ) -> Result where >::OtherParams: Default + Send + Sync + From>, T::Signature: From + Send + Sync, { - let other_params = BaseExtrinsicParamsBuilder::new(); + let other_params = BaseExtrinsicParamsBuilder::new().tip(tip.unwrap_or_default()); let ext = client.tx().create_signed(&payload, &signer, other_params.into()).await?; let progress = ext.submit_and_watch().await.context("Failed to submit signed extrinsic")?; let ext_hash = progress.extrinsic_hash(); @@ -442,3 +443,17 @@ pub fn host_params_storage_key(state_machine: StateMachine) -> Vec { [pallet_prefix, storage_prefix, key_1, state_machine.encode()].concat() } + +pub fn fisherman_storage_key(address: Vec) -> Vec { + let address = { + let mut dest = [0u8; 32]; + dest.copy_from_slice(&address); + dest + }; + let pallet_prefix = twox_128(b"Fishermen").to_vec(); + + let storage_prefix = twox_128(b"Fishermen").to_vec(); + let key_1 = twox_64(&address.encode()).to_vec(); + + [pallet_prefix, storage_prefix, key_1, address.encode()].concat() +} diff --git a/parachain/runtimes/gargantua/Cargo.toml b/parachain/runtimes/gargantua/Cargo.toml index eff0caec2..fa30f2144 100644 --- a/parachain/runtimes/gargantua/Cargo.toml +++ b/parachain/runtimes/gargantua/Cargo.toml @@ -82,6 +82,7 @@ parachains-common = { workspace = true } # local modules ismp = { workspace = true } pallet-ismp = { workspace = true, features = ["unsigned"] } +pallet-fishermen = { workspace = true } pallet-ismp-demo = { workspace = true } pallet-ismp-runtime-api = { workspace = true } ismp-sync-committee = { workspace = true } @@ -135,6 +136,7 @@ std = [ "pallet-transaction-payment/std", "pallet-xcm/std", "pallet-treasury/std", + "pallet-fishermen/std", "pallet-asset-rate/std", "pallet-collective/std", "pallet-ismp/std", diff --git a/parachain/runtimes/gargantua/src/ismp.rs b/parachain/runtimes/gargantua/src/ismp.rs index 585117706..f3986b950 100644 --- a/parachain/runtimes/gargantua/src/ismp.rs +++ b/parachain/runtimes/gargantua/src/ismp.rs @@ -112,7 +112,7 @@ impl pallet_ismp_demo::Config for Runtime { type RuntimeEvent = RuntimeEvent; type Balance = Balance; type NativeCurrency = Balances; - type IsmpDispatcher = Ismp; + type IsmpHost = Ismp; } impl pallet_ismp_relayer::Config for Runtime { @@ -134,6 +134,11 @@ impl ismp_parachain::Config for Runtime { type IsmpHost = Ismp; } +impl pallet_fishermen::Config for Runtime { + type RuntimeEvent = RuntimeEvent; + type IsmpHost = Ismp; +} + // todo: set corrrect parameters parameter_types! { pub const AssetPalletId: PalletId = PalletId(*b"asset-tx"); @@ -193,6 +198,11 @@ impl pallet_assets::Config for Runtime { impl IsmpModule for ProxyModule { fn on_accept(&self, request: PostRequest) -> Result<(), Error> { if request.dest != HostStateMachine::get() { + let token_gateway = Gateway::token_gateway_address(&request.dest); + if request.source.is_substrate() && request.from == token_gateway.0.to_vec() { + Err(Error::Custom("Illegal request!".into()))? + } + Ismp::dispatch_request( Request::Post(request), FeeMetadata:: { payer: [0u8; 32].into(), fee: Default::default() }, diff --git a/parachain/runtimes/gargantua/src/lib.rs b/parachain/runtimes/gargantua/src/lib.rs index 91b1e5326..49e3bad05 100644 --- a/parachain/runtimes/gargantua/src/lib.rs +++ b/parachain/runtimes/gargantua/src/lib.rs @@ -232,7 +232,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { spec_name: create_runtime_str!("gargantua"), impl_name: create_runtime_str!("gargantua"), authoring_version: 1, - spec_version: 900, + spec_version: 1130, impl_version: 0, apis: RUNTIME_API_VERSIONS, transaction_version: 1, @@ -707,6 +707,7 @@ construct_runtime!( Assets: pallet_assets = 58, TokenGovernor: pallet_token_governor = 59, StateCoprocessor: pallet_state_coprocessor = 60, + Fishermen: pallet_fishermen = 61, // Governance TechnicalCollective: pallet_collective = 80, diff --git a/parachain/runtimes/nexus/src/ismp.rs b/parachain/runtimes/nexus/src/ismp.rs index f822d07b3..118a1b67d 100644 --- a/parachain/runtimes/nexus/src/ismp.rs +++ b/parachain/runtimes/nexus/src/ismp.rs @@ -175,6 +175,10 @@ impl pallet_assets::Config for Runtime { impl IsmpModule for ProxyModule { fn on_accept(&self, request: PostRequest) -> Result<(), Error> { if request.dest != HostStateMachine::get() { + let token_gateway = Gateway::token_gateway_address(&request.dest); + if request.source.is_substrate() && request.from == token_gateway.0.to_vec() { + Err(Error::Custom("Illegal request!".into()))? + } Ismp::dispatch_request( Request::Post(request), FeeMetadata:: { payer: [0u8; 32].into(), fee: Default::default() }, diff --git a/tesseract/evm/src/byzantine.rs b/tesseract/evm/src/byzantine.rs index 201ce04c6..fc4f00ed0 100644 --- a/tesseract/evm/src/byzantine.rs +++ b/tesseract/evm/src/byzantine.rs @@ -1,11 +1,14 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; +use anyhow::{anyhow, Error}; use ethers::providers::Middleware; +use futures::FutureExt; use ismp::{ consensus::{StateMachineHeight, StateMachineId}, - events::StateMachineUpdated, + events::{Event, StateMachineUpdated}, + host::StateMachine, }; -use tesseract_primitives::{ByzantineHandler, IsmpProvider}; +use tesseract_primitives::{BoxStream, ByzantineHandler, IsmpProvider}; use crate::EvmClient; @@ -13,6 +16,7 @@ use crate::EvmClient; impl ByzantineHandler for EvmClient { async fn check_for_byzantine_attack( &self, + _coprocessor: StateMachine, counterparty: Arc, event: StateMachineUpdated, ) -> Result<(), anyhow::Error> { @@ -46,4 +50,89 @@ impl ByzantineHandler for EvmClient { Ok(()) } + + async fn state_machine_updates( + &self, + _counterparty_state_id: StateMachineId, + ) -> Result>, Error> { + use futures::StreamExt; + let (tx, recv) = tokio::sync::broadcast::channel(512); + + let initial_height = self.client.get_block_number().await?.low_u64(); + let client = self.clone(); + let poll_interval = 5; + tokio::spawn(async move { + let mut latest_height = initial_height; + let state_machine = client.state_machine; + loop { + tokio::time::sleep(Duration::from_secs(poll_interval)).await; + // wait for an update with a greater height + let block_number = match client.client.get_block_number().await { + Ok(number) => number.low_u64(), + Err(err) => { + if let Err(err) = tx + .send(Err(anyhow!( + "Error fetching latest block height on {state_machine:?} {err:?}" + ).into())) + { + log::error!(target: "tesseract", "Failed to send message over channel on {state_machine:?} \n {err:?}"); + return + } + continue; + }, + }; + + if block_number <= latest_height { + continue; + } + + let event = StateMachineUpdated { + state_machine_id: client.state_machine_id(), + latest_height: block_number, + }; + + let events = match client.query_ismp_events(latest_height, event).await { + Ok(events) => events, + Err(err) => { + if let Err(err) = tx + .send(Err(anyhow!( + "Error encountered while querying ismp events {err:?}" + ).into())) + { + log::error!(target: "tesseract", "Failed to send message over channel on {state_machine:?} \n {err:?}"); + return + } + latest_height = block_number; + continue; + }, + }; + + let events = events + .into_iter() + .filter_map(|ev| match ev { + Event::StateMachineUpdated(update) => Some(update), + _ => None, + }).collect::>(); + + if !events.is_empty() { + if let Err(err) = tx + .send(Ok(events)) + { + log::error!(target: "tesseract", "Failed to send message over channel on {state_machine:?} \n {err:?}"); + return + } + } + latest_height = block_number; + } + }.boxed()); + + let stream = tokio_stream::wrappers::BroadcastStream::new(recv).filter_map(|res| async { + match res { + Ok(res) => Some(res), + Err(err) => Some(Err(anyhow!("{err:?}").into())), + } + }); + + Ok(Box::pin(stream)) + } } diff --git a/tesseract/evm/src/provider.rs b/tesseract/evm/src/provider.rs index ab4fdd13f..23e8fd07d 100644 --- a/tesseract/evm/src/provider.rs +++ b/tesseract/evm/src/provider.rs @@ -676,7 +676,7 @@ impl IsmpProvider for EvmClient { let provider = Arc::new(client.clone()); // Yield if the challenge period elapses and the state commitment is not vetoed tokio::select! { - _res = wait_for_challenge_period(provider, state_machine_update_time, challenge_period) => { + _res = wait_for_challenge_period(provider, state_machine_update_time, challenge_period, counterparty_state_id.state_id) => { match _res { Ok(_) => { if let Err(err) = tx.send(Ok(event.clone())) { diff --git a/tesseract/fisherman/src/lib.rs b/tesseract/fisherman/src/lib.rs index 21d53e855..bebbc9df6 100644 --- a/tesseract/fisherman/src/lib.rs +++ b/tesseract/fisherman/src/lib.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use anyhow::anyhow; use futures::StreamExt; +use ismp::host::StateMachine; use sc_service::TaskManager; use tesseract_primitives::IsmpProvider; @@ -26,30 +27,18 @@ pub async fn fish( chain_a: Arc, chain_b: Arc, task_manager: &TaskManager, + coprocessor: StateMachine, ) -> Result<(), anyhow::Error> { { let chain_a = chain_a.clone(); let chain_b = chain_b.clone(); + let coprocessor = coprocessor.clone(); let name = format!("fisherman-{}-{}", chain_a.name(), chain_b.name()); task_manager.spawn_essential_handle().spawn_blocking( Box::leak(Box::new(name.clone())), "fisherman", async move { - let res = handle_notification(chain_a, chain_b).await; - tracing::error!(target: "tesseract", "{name} has terminated with result {res:?}") - }, - ) - } - - { - let chain_a = chain_a.clone(); - let chain_b = chain_b.clone(); - let name = format!("fisherman-{}-{}", chain_b.name(), chain_a.name()); - task_manager.spawn_essential_handle().spawn_blocking( - Box::leak(Box::new(name.clone())), - "fisherman", - async move { - let res = handle_notification(chain_b, chain_a).await; + let res = handle_notification(chain_a, chain_b, coprocessor).await; tracing::error!(target: "tesseract", "{name} has terminated with result {res:?}") }, ) @@ -61,21 +50,28 @@ pub async fn fish( async fn handle_notification( chain_a: Arc, chain_b: Arc, + coprocessor: StateMachine, ) -> Result<(), anyhow::Error> { let mut state_machine_update_stream = chain_a - .state_machine_update_notification(chain_b.state_machine_id()) + .state_machine_updates(chain_b.state_machine_id()) .await .map_err(|err| anyhow!("StateMachineUpdated stream subscription failed: {err:?}"))?; while let Some(item) = state_machine_update_stream.next().await { match item { - Ok(state_machine_update) => { - let res = - chain_b.check_for_byzantine_attack(chain_a.clone(), state_machine_update).await; - if let Err(err) = res { - log::error!("Failed to check for byzantine behavior: {err:?}") - } - }, + Ok(state_machine_updates) => + for state_machine_update in state_machine_updates { + let res = chain_b + .check_for_byzantine_attack( + coprocessor, + chain_a.clone(), + state_machine_update, + ) + .await; + if let Err(err) = res { + log::error!("Failed to check for byzantine behavior: {err:?}") + } + }, Err(e) => { log::error!(target: "tesseract","Fisherman task {}-{} encountered an error: {e:?}", chain_a.name(), chain_b.name()) }, diff --git a/tesseract/primitives/src/lib.rs b/tesseract/primitives/src/lib.rs index 3f78e606b..ab9e3178f 100644 --- a/tesseract/primitives/src/lib.rs +++ b/tesseract/primitives/src/lib.rs @@ -363,9 +363,17 @@ pub trait ByzantineHandler { /// Check the state machine update event for byzantine behaviour and challenge it. async fn check_for_byzantine_attack( &self, + coprocessor: StateMachine, counterparty: Arc, challenge_event: StateMachineUpdated, ) -> Result<(), anyhow::Error>; + + /// Return a stream that watches for updates to [`counterparty_state_id`], yields when new + /// [`Vec`] event is observed for [`counterparty_state_id`] + async fn state_machine_updates( + &self, + counterparty_state_id: StateMachineId, + ) -> Result>, anyhow::Error>; } /// Provides an interface for the chain to the relayer core for submitting Ismp messages as well as @@ -455,9 +463,14 @@ pub async fn wait_for_challenge_period( client: Arc, last_consensus_update: Duration, challenge_period: Duration, + counterparty_state_id: StateMachine, ) -> anyhow::Result<()> { if challenge_period != Duration::ZERO { - log::info!("Waiting for challenge period {challenge_period:?}"); + log::info!( + "Waiting for challenge period {challenge_period:?} for {} on {}", + counterparty_state_id, + client.name() + ); } tokio::time::sleep(challenge_period).await; @@ -509,6 +522,12 @@ pub async fn observe_challenge_period( let challenge_period = hyperbridge.query_challenge_period(chain.state_machine_id()).await?; let height = StateMachineHeight { id: chain.state_machine_id(), height }; let last_consensus_update = hyperbridge.query_state_machine_update_time(height).await?; - wait_for_challenge_period(hyperbridge, last_consensus_update, challenge_period).await?; + wait_for_challenge_period( + hyperbridge, + last_consensus_update, + challenge_period, + chain.state_machine_id().state_id, + ) + .await?; Ok(()) } diff --git a/tesseract/primitives/src/mocks.rs b/tesseract/primitives/src/mocks.rs index eb09a59f7..b60e0b8bf 100644 --- a/tesseract/primitives/src/mocks.rs +++ b/tesseract/primitives/src/mocks.rs @@ -57,11 +57,19 @@ impl HyperbridgeClaim for MockHost { impl ByzantineHandler for MockHost { async fn check_for_byzantine_attack( &self, + _coprocessor: StateMachine, _counterparty: Arc, _challenge_event: StateMachineUpdated, ) -> Result<(), Error> { Err(anyhow!("No byzantine faults")) } + + async fn state_machine_updates( + &self, + _counterparty_state_id: StateMachineId, + ) -> Result>, anyhow::Error> { + Err(anyhow!("No byzantine faults")) + } } #[async_trait::async_trait] diff --git a/tesseract/relayer/src/cli.rs b/tesseract/relayer/src/cli.rs index b80f7c213..8da40454b 100644 --- a/tesseract/relayer/src/cli.rs +++ b/tesseract/relayer/src/cli.rs @@ -110,8 +110,13 @@ impl Cli { .await?; if relayer.fisherman.unwrap_or_default() { - tesseract_fisherman::fish(Arc::new(new_hyperbridge), client.clone(), &task_manager) - .await? + tesseract_fisherman::fish( + Arc::new(new_hyperbridge), + client.clone(), + &task_manager, + coprocessor, + ) + .await? } metadata.push(( diff --git a/tesseract/substrate/Cargo.toml b/tesseract/substrate/Cargo.toml index 857db2a4a..7c486364c 100644 --- a/tesseract/substrate/Cargo.toml +++ b/tesseract/substrate/Cargo.toml @@ -33,6 +33,7 @@ substrate-state-machine = { workspace = true, default-features = true } pallet-hyperbridge = { workspace = true, default-features = true } serde-hex-utils = { workspace = true, default-features = false } pallet-state-coprocessor = { workspace = true, default-features = true } +sp-runtime = { workspace = true, default-features = true } [features] testing = [] diff --git a/tesseract/substrate/src/byzantine.rs b/tesseract/substrate/src/byzantine.rs index ce8d0ae9f..3d5d40e06 100644 --- a/tesseract/substrate/src/byzantine.rs +++ b/tesseract/substrate/src/byzantine.rs @@ -1,20 +1,24 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; -use anyhow::anyhow; +use anyhow::{anyhow, Error}; use codec::{Decode, Encode}; +use futures::FutureExt; use ismp::{ consensus::{StateMachineHeight, StateMachineId}, - events::StateMachineUpdated, + events::{Event, StateMachineUpdated}, + host::StateMachine, }; use sp_core::H256; +use substrate_state_machine::fetch_overlay_root_and_timestamp; use subxt::{ config::{ extrinsic_params::BaseExtrinsicParamsBuilder, polkadot::PlainTip, - substrate::SubstrateHeader, ExtrinsicParams, + substrate::SubstrateHeader, ExtrinsicParams, Header, }, ext::sp_runtime::{AccountId32, MultiSignature}, }; -use tesseract_primitives::{ByzantineHandler, IsmpProvider}; + +use tesseract_primitives::{BoxStream, ByzantineHandler, IsmpProvider}; use crate::SubstrateClient; @@ -31,6 +35,7 @@ where { async fn check_for_byzantine_attack( &self, + coprocessor: StateMachine, counterparty: Arc, event: StateMachineUpdated, ) -> Result<(), anyhow::Error> { @@ -46,12 +51,15 @@ where self.client.rpc().block_hash(Some(event.latest_height.into())).await? else { // If block header is not found veto the state commitment + log::info!( - "Vetoing state commitment for {} on {}", + "Vetoing state commitment for {} on {}: block header not found for {}", self.state_machine_id().state_id, - counterparty.state_machine_id().state_id + counterparty.state_machine_id().state_id, + event.latest_height ); counterparty.veto_state_commitment(height).await?; + return Ok(()) }; let header = self @@ -63,12 +71,21 @@ where let header = SubstrateHeader::::decode(&mut &*header.encode())?; + let digest = sp_runtime::generic::Digest::decode(&mut &*header.digest.encode())?; + let digest_result = fetch_overlay_root_and_timestamp(&digest, Default::default()) + .map_err(|_| anyhow!("Failed to extract disgest logs in byzantine handler"))?; + + let state_root = if self.state_machine_id().state_id == coprocessor { + digest_result.ismp_digest.child_trie_root + } else { + header.state_root.into() + }; let finalized_state_commitment = counterparty.query_state_machine_commitment(height).await?; - if finalized_state_commitment.state_root != header.state_root.into() { + if finalized_state_commitment.state_root != state_root.into() { log::info!( - "Vetoing state commitment for {} on {}", + "Vetoing state commitment for {} on {}, state commitment mismatch", self.state_machine_id().state_id, counterparty.state_machine_id().state_id ); @@ -77,4 +94,91 @@ where Ok(()) } + + async fn state_machine_updates( + &self, + counterparty_state_id: StateMachineId, + ) -> Result>, Error> { + use futures::StreamExt; + let client = self.clone(); + let (tx, recv) = tokio::sync::broadcast::channel(512); + let latest_height = client.query_finalized_height().await?; + + tokio::task::spawn(async move { + let mut latest_height = latest_height; + let state_machine = client.state_machine; + loop { + tokio::time::sleep(Duration::from_secs(3)).await; + let header = match client.client.rpc().header(None).await { + Ok(Some(header)) => header, + _ => { + if let Err(err) = tx + .send(Err(anyhow!( + "Error encountered while fetching finalized head" + ).into())) + { + log::error!(target: "tesseract", "Failed to send message over channel on {state_machine:?} \n {err:?}"); + return + } + continue; + }, + }; + + if header.number().into() <= latest_height { + continue; + } + + let event = StateMachineUpdated { + state_machine_id: client.state_machine_id(), + latest_height: header.number().into(), + }; + + let events = match client.query_ismp_events(latest_height, event).await { + Ok(e) => e, + Err(err) => { + if let Err(err) = tx + .send(Err(anyhow!( + "Error encountered while querying ismp events {err:?}" + ).into())) + { + log::error!(target: "tesseract", "Failed to send message over channel on {state_machine:?} \n {err:?}"); + return + } + latest_height = header.number().into(); + continue; + }, + }; + + let events = events + .into_iter() + .filter_map(|event| match event { + Event::StateMachineUpdated(e) + if e.state_machine_id == counterparty_state_id => + Some(e), + _ => None, + }) + .collect::>(); + + if !events.is_empty() { + if let Err(err) = tx + .send(Ok(events)) + { + log::error!(target: "tesseract", "Failed to send message over channel on {state_machine:?} \n {err:?}"); + return + } + } + + latest_height = header.number().into(); + } + }.boxed()); + + let stream = tokio_stream::wrappers::BroadcastStream::new(recv).filter_map(|res| async { + match res { + Ok(res) => Some(res), + Err(err) => Some(Err(anyhow!("{err:?}").into())), + } + }); + + Ok(Box::pin(stream)) + } } diff --git a/tesseract/substrate/src/calls.rs b/tesseract/substrate/src/calls.rs index efaf6a427..e145d263f 100644 --- a/tesseract/substrate/src/calls.rs +++ b/tesseract/substrate/src/calls.rs @@ -76,7 +76,7 @@ where let call = Extrinsic::new("Ismp", "create_consensus_client", call) .encode_call_data(&self.client.metadata())?; let tx = Extrinsic::new("Sudo", "sudo", call); - send_extrinsic(&self.client, signer, tx).await?; + send_extrinsic(&self.client, signer, tx, None).await?; Ok(()) } @@ -89,7 +89,7 @@ where .encode_call_data(&self.client.metadata())?; let tx = Extrinsic::new("Sudo", "sudo", encoded_call); let signer = InMemorySigner::new(self.signer()); - send_extrinsic(&self.client, signer, tx).await?; + send_extrinsic(&self.client, signer, tx, None).await?; Ok(()) } diff --git a/tesseract/substrate/src/provider.rs b/tesseract/substrate/src/provider.rs index 2b3a482fd..b39dc085a 100644 --- a/tesseract/substrate/src/provider.rs +++ b/tesseract/substrate/src/provider.rs @@ -55,7 +55,10 @@ use subxt::{ tx::TxPayload, }; -use subxt_utils::{host_params_storage_key, send_extrinsic, state_machine_update_time_storage_key}; +use subxt_utils::{ + fisherman_storage_key, host_params_storage_key, send_extrinsic, + state_machine_update_time_storage_key, +}; use tesseract_primitives::{ wait_for_challenge_period, BoxStream, EstimateGasReturnParams, IsmpProvider, Query, StateMachineUpdated, StateProofQueryType, TxReceipt, @@ -601,7 +604,7 @@ where let provider = Arc::new(client.clone()); tokio::select! { - _res = wait_for_challenge_period(provider, state_machine_update_time, challenge_period) => { + _res = wait_for_challenge_period(provider, state_machine_update_time, challenge_period, counterparty_state_id.state_id) => { match _res { Ok(_) => { if let Err(err) = tx.send(Ok(event.clone())) { @@ -617,7 +620,7 @@ where _res = state_commitment_vetoed_stream.next() => { match _res { Some(Ok(_)) => { - log::error!(target: "tesseract", "State Commitment for {event:?} was vetoed on {state_machine}"); + log::info!(target: "tesseract", "State Commitment for {event:?} was vetoed on {state_machine}"); } _ => { log::error!(target: "tesseract", "Error in state machine vetoed stream {state_machine:?} - {:?}", counterparty_state_id.state_id); @@ -762,6 +765,12 @@ where } async fn veto_state_commitment(&self, height: StateMachineHeight) -> Result<(), Error> { + let key = fisherman_storage_key(self.address()); + let raw_params = self.client.storage().at_latest().await?.fetch_raw(&key).await?; + if raw_params.is_none() { + return Ok(()) + } + let signer = InMemorySigner { account_id: MultiSigner::Sr25519(self.signer.public()).into_account().into(), signer: self.signer.clone(), @@ -769,7 +778,7 @@ where let call = height.encode(); let call = Extrinsic::new("Fishermen", "veto_state_commitment", call); - send_extrinsic(&self.client, signer, call).await?; + send_extrinsic(&self.client, signer, call, Some(PlainTip::new(100))).await?; Ok(()) } diff --git a/tesseract/substrate/src/testing.rs b/tesseract/substrate/src/testing.rs index 99d435590..b8e46a887 100644 --- a/tesseract/substrate/src/testing.rs +++ b/tesseract/substrate/src/testing.rs @@ -57,7 +57,7 @@ where let tx = Extrinsic::new("IsmpDemo", "transfer", call); let signer = InMemorySigner::new(self.signer()); - let tx_block_hash = send_extrinsic(&self.client, signer, tx).await?; + let tx_block_hash = send_extrinsic(&self.client, signer, tx, None).await?; Ok(tx_block_hash) } @@ -65,7 +65,7 @@ where let call = params.encode(); let tx = Extrinsic::new("IsmpDemo", "dispatch_to_evm", call); let signer = InMemorySigner::new(self.signer()); - send_extrinsic(&self.client, signer, tx).await?; + send_extrinsic(&self.client, signer, tx, None).await?; Ok(()) } @@ -74,7 +74,7 @@ where let call = get_req.encode(); let tx = Extrinsic::new("IsmpDemo", "get_request", call); let signer = InMemorySigner::new(self.signer()); - let tx_block_hash = send_extrinsic(&self.client, signer, tx).await?; + let tx_block_hash = send_extrinsic(&self.client, signer, tx, None).await?; Ok(tx_block_hash) } @@ -128,7 +128,7 @@ where .encode_call_data(&self.client.metadata())?; let tx = Extrinsic::new("Sudo", "sudo", encoded_call); let signer = InMemorySigner::new(self.signer()); - send_extrinsic(&self.client, signer, tx).await?; + send_extrinsic(&self.client, signer, tx, None).await?; Ok(()) } @@ -139,7 +139,7 @@ where .encode_call_data(&self.client.metadata())?; let tx = Extrinsic::new("Sudo", "sudo", encoded_call); let signer = InMemorySigner::new(self.signer()); - send_extrinsic(&self.client, signer, tx).await?; + send_extrinsic(&self.client, signer, tx, None).await?; Ok(()) }