diff --git a/Cargo.lock b/Cargo.lock index 035054cc9f..582789ff1b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1524,6 +1524,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "cf-chains", "cf-primitives", "chainflip-engine", "futures", diff --git a/api/bin/chainflip-ingress-egress-tracker/Cargo.toml b/api/bin/chainflip-ingress-egress-tracker/Cargo.toml index 20b4ce28f3..593a8b00c7 100644 --- a/api/bin/chainflip-ingress-egress-tracker/Cargo.toml +++ b/api/bin/chainflip-ingress-egress-tracker/Cargo.toml @@ -29,3 +29,4 @@ utilities = { path = "../../../utilities" } cf-primitives = { path = "../../../state-chain/primitives" } pallet-cf-environment = { path = "../../../state-chain/pallets/cf-environment" } state-chain-runtime = { path = "../../../state-chain/runtime" } +cf-chains = { path = "../../../state-chain/chains" } diff --git a/api/bin/chainflip-ingress-egress-tracker/src/main.rs b/api/bin/chainflip-ingress-egress-tracker/src/main.rs index b59c3fa605..67ccef6243 100644 --- a/api/bin/chainflip-ingress-egress-tracker/src/main.rs +++ b/api/bin/chainflip-ingress-egress-tracker/src/main.rs @@ -7,10 +7,12 @@ use utilities::task_scope; mod witnessing; +#[derive(Clone)] pub struct DepositTrackerSettings { eth_node: WsHttpEndpoints, // The key shouldn't be necessary, but the current witnesser wants this eth_key_path: PathBuf, + dot_node: WsHttpEndpoints, state_chain_ws_endpoint: String, } @@ -86,20 +88,27 @@ async fn main() -> anyhow::Result<()> { eth_key_temp_file .write_all(b"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef") .unwrap(); - let eth_key_path = eth_key_temp_file.path(); - - let eth_ws_endpoint = env::var("ETH_WS_ENDPOINT").unwrap_or("ws://localhost:8546".to_string()); - let eth_http_endpoint = - env::var("ETH_HTTP_ENDPOINT").unwrap_or("http://localhost:8545".to_string()); - let sc_ws_endpoint = env::var("SC_WS_ENDPOINT").unwrap_or("ws://localhost:9944".to_string()); let settings = DepositTrackerSettings { eth_node: WsHttpEndpoints { - ws_endpoint: eth_ws_endpoint.into(), - http_endpoint: eth_http_endpoint.into(), + ws_endpoint: env::var("ETH_WS_ENDPOINT") + .unwrap_or("ws://localhost:8546".to_string()) + .into(), + http_endpoint: env::var("ETH_HTTP_ENDPOINT") + .unwrap_or("http://localhost:8545".to_string()) + .into(), + }, + eth_key_path: eth_key_temp_file.path().into(), + dot_node: WsHttpEndpoints { + ws_endpoint: env::var("DOT_WS_ENDPOINT") + .unwrap_or("ws://localhost:9945".to_string()) + .into(), + http_endpoint: env::var("DOT_HTTP_ENDPOINT") + .unwrap_or("http://localhost:9945".to_string()) + .into(), }, - eth_key_path: eth_key_path.into(), - state_chain_ws_endpoint: sc_ws_endpoint, + state_chain_ws_endpoint: env::var("SC_WS_ENDPOINT") + .unwrap_or("ws://localhost:9944".to_string()), }; task_scope::task_scope(|scope| async move { start(scope, settings).await }.boxed()).await diff --git a/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs b/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs index b40df7b2cf..c8baa66068 100644 --- a/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs +++ b/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs @@ -1,21 +1,24 @@ pub mod btc; +mod dot; mod eth; use std::collections::HashMap; +use cf_chains::dot::PolkadotHash; use cf_primitives::chains::assets::eth::Asset; use chainflip_engine::{ state_chain_observer::{ self, client::{chain_api::ChainApi, storage_api::StorageApi, StateChainClient}, }, - witness::common::epoch_source::EpochSource, + witness::common::{epoch_source::EpochSource, STATE_CHAIN_CONNECTION}, }; use sp_core::H160; use utilities::task_scope; use crate::DepositTrackerSettings; +#[derive(Clone)] struct EnvironmentParameters { eth_chain_id: u64, eth_vault_address: H160, @@ -23,6 +26,7 @@ struct EnvironmentParameters { flip_contract_address: H160, usdc_contract_address: H160, supported_erc20_tokens: HashMap, + dot_genesis_hash: PolkadotHash, } async fn get_env_parameters(state_chain_client: &StateChainClient<()>) -> EnvironmentParameters { @@ -62,6 +66,15 @@ async fn get_env_parameters(state_chain_client: &StateChainClient<()>) -> Enviro .map(|(asset, address)| (address, asset.into())) .collect(); + let dot_genesis_hash = PolkadotHash::from( + state_chain_client + .storage_value::>( + state_chain_client.latest_finalized_hash(), + ) + .await + .expect(STATE_CHAIN_CONNECTION), + ); + EnvironmentParameters { eth_chain_id, eth_vault_address, @@ -69,6 +82,7 @@ async fn get_env_parameters(state_chain_client: &StateChainClient<()>) -> Enviro usdc_contract_address, eth_address_checker_address, supported_erc20_tokens, + dot_genesis_hash, } } @@ -108,10 +122,21 @@ pub(super) async fn start( scope, state_chain_client.clone(), state_chain_stream.clone(), - settings, - env_params, + settings.clone(), + env_params.clone(), epoch_source.clone(), + witness_call.clone(), + ) + .await?; + + dot::start( + scope, witness_call, + settings, + env_params, + state_chain_client, + state_chain_stream, + epoch_source, ) .await?; diff --git a/api/bin/chainflip-ingress-egress-tracker/src/witnessing/dot.rs b/api/bin/chainflip-ingress-egress-tracker/src/witnessing/dot.rs new file mode 100644 index 0000000000..5f35a2af8d --- /dev/null +++ b/api/bin/chainflip-ingress-egress-tracker/src/witnessing/dot.rs @@ -0,0 +1,95 @@ +use std::sync::Arc; + +use cf_primitives::EpochIndex; +use chainflip_engine::{ + dot::retry_rpc::DotRetryRpcClient, + settings::NodeContainer, + state_chain_observer::client::{ + storage_api::StorageApi, StateChainClient, StateChainStreamApi, + }, + witness::{ + common::{ + chain_source::extension::ChainSourceExt, epoch_source::EpochSourceBuilder, + STATE_CHAIN_CONNECTION, + }, + dot::{filter_map_events, process_egress, proxy_added_witnessing, DotUnfinalisedSource}, + }, +}; +use futures::Future; +use utilities::task_scope::Scope; + +use crate::DepositTrackerSettings; + +use super::EnvironmentParameters; + +pub(super) async fn start( + scope: &Scope<'_, anyhow::Error>, + witness_call: ProcessCall, + settings: DepositTrackerSettings, + env_params: EnvironmentParameters, + state_chain_client: Arc>, + state_chain_stream: impl StateChainStreamApi + Clone, + epoch_source: EpochSourceBuilder<'_, '_, StateChainClient<()>, (), ()>, +) -> anyhow::Result<()> +where + ProcessCall: Fn(state_chain_runtime::RuntimeCall, EpochIndex) -> ProcessingFut + + Send + + Sync + + Clone + + 'static, + ProcessingFut: Future + Send + 'static, +{ + let dot_client = { + DotRetryRpcClient::new( + scope, + NodeContainer { primary: settings.dot_node, backup: None }, + env_params.dot_genesis_hash, + )? + }; + + let epoch_source = epoch_source + .filter_map( + |state_chain_client, _epoch_index, hash, _info| async move { + state_chain_client + .storage_value::>( + hash, + ) + .await + .expect(STATE_CHAIN_CONNECTION) + }, + |_state_chain_client, _epoch, _block_hash, historic_info| async move { historic_info }, + ) + .await; + + let vaults = epoch_source.vaults().await; + + DotUnfinalisedSource::new(dot_client.clone()) + .shared(scope) + .then(|header| async move { header.data.iter().filter_map(filter_map_events).collect() }) + .strictly_monotonic() + .shared(scope) + .chunk_by_vault(vaults.clone()) + .deposit_addresses(scope, state_chain_stream.clone(), state_chain_client.clone()) + .await + // Deposit witnessing + .dot_deposits(witness_call.clone()) + // Proxy added witnessing + .then({ + let witness_call = witness_call.clone(); + move |epoch, header| proxy_added_witnessing(epoch, header, witness_call.clone()) + }) + // Broadcast success + .egress_items(scope, state_chain_stream.clone(), state_chain_client.clone()) + .await + .then({ + let process_call = witness_call.clone(); + let dot_client = dot_client.clone(); + move |epoch, header| { + process_egress(epoch, header, process_call.clone(), dot_client.clone()) + } + }) + .logging("DOT Witnessing") + .spawn(scope); + + Ok(()) +} diff --git a/engine/src/witness.rs b/engine/src/witness.rs index 600c6d268f..cfb99c4c7f 100644 --- a/engine/src/witness.rs +++ b/engine/src/witness.rs @@ -1,5 +1,5 @@ mod btc; pub mod common; -mod dot; +pub mod dot; pub mod eth; pub mod start;