From de9ead3d458882a51c8824f95f82b01c892de22d Mon Sep 17 00:00:00 2001 From: Alastair Holmes <42404303+AlastairHolmes@users.noreply.github.com> Date: Fri, 13 Oct 2023 11:40:01 +0200 Subject: [PATCH 1/8] chore: delete unneeded function (#4116) chore: delete unneeded functions --- .../client/base_rpc_api.rs | 22 +------------------ 1 file changed, 1 insertion(+), 21 deletions(-) diff --git a/engine/src/state_chain_observer/client/base_rpc_api.rs b/engine/src/state_chain_observer/client/base_rpc_api.rs index 8589f88df5..87afc4f830 100644 --- a/engine/src/state_chain_observer/client/base_rpc_api.rs +++ b/engine/src/state_chain_observer/client/base_rpc_api.rs @@ -1,7 +1,5 @@ use async_trait::async_trait; -use cf_amm::{common::Tick, range_orders::Liquidity}; -use cf_primitives::Asset; use jsonrpsee::core::{ client::{ClientT, Subscription, SubscriptionClientT}, RpcResult, @@ -11,7 +9,7 @@ use sp_core::{ storage::{StorageData, StorageKey}, Bytes, }; -use sp_runtime::{traits::BlakeTwo256, AccountId32}; +use sp_runtime::traits::BlakeTwo256; use sp_version::RuntimeVersion; use state_chain_runtime::SignedBlock; @@ -130,13 +128,6 @@ pub trait BaseRpcApi { async fn runtime_version(&self) -> RpcResult; - async fn pool_minted_positions( - &self, - lp: AccountId32, - asset: Asset, - at: state_chain_runtime::Hash, - ) -> RpcResult>; - async fn dry_run( &self, extrinsic: Bytes, @@ -240,17 +231,6 @@ impl BaseRpcApi for BaseRpcClient RpcResult> { - // TODO: Add function that gets minted range and limit orders #3082 - //self.raw_rpc_client.cf_pool_minted_positions(lp, asset, Some(at)).await - Err(jsonrpsee::core::Error::Custom("Not implemented".to_string())) - } - async fn dry_run( &self, extrinsic: Bytes, From 99c3e0a2fd054900bb30c33632b9a28433e0cd31 Mon Sep 17 00:00:00 2001 From: kylezs Date: Fri, 13 Oct 2023 23:46:31 +1100 Subject: [PATCH 2/8] chore: storage migration delete NextCompatibilityVersion (#4115) --- state-chain/pallets/cf-environment/src/lib.rs | 2 +- .../pallets/cf-environment/src/migrations.rs | 6 ++++- .../cf-environment/src/migrations/v5.rs | 23 +++++++++++++++++++ 3 files changed, 29 insertions(+), 2 deletions(-) create mode 100644 state-chain/pallets/cf-environment/src/migrations/v5.rs diff --git a/state-chain/pallets/cf-environment/src/lib.rs b/state-chain/pallets/cf-environment/src/lib.rs index 5116f9ce19..0c1935b789 100644 --- a/state-chain/pallets/cf-environment/src/lib.rs +++ b/state-chain/pallets/cf-environment/src/lib.rs @@ -31,7 +31,7 @@ pub mod weights; pub use weights::WeightInfo; mod migrations; -pub const PALLET_VERSION: StorageVersion = StorageVersion::new(4); +pub const PALLET_VERSION: StorageVersion = StorageVersion::new(5); type SignatureNonce = u64; diff --git a/state-chain/pallets/cf-environment/src/migrations.rs b/state-chain/pallets/cf-environment/src/migrations.rs index 0758482acf..49027375d6 100644 --- a/state-chain/pallets/cf-environment/src/migrations.rs +++ b/state-chain/pallets/cf-environment/src/migrations.rs @@ -1,6 +1,10 @@ pub mod v3; pub mod v4; +pub mod v5; use cf_runtime_upgrade_utilities::VersionedMigration; -pub type PalletMigration = (VersionedMigration, v4::Migration, 3, 4>,); +pub type PalletMigration = ( + VersionedMigration, v4::Migration, 3, 4>, + VersionedMigration, v5::Migration, 4, 5>, +); diff --git a/state-chain/pallets/cf-environment/src/migrations/v5.rs b/state-chain/pallets/cf-environment/src/migrations/v5.rs new file mode 100644 index 0000000000..1af139a5d6 --- /dev/null +++ b/state-chain/pallets/cf-environment/src/migrations/v5.rs @@ -0,0 +1,23 @@ +use crate::*; + +use frame_support::traits::OnRuntimeUpgrade; +use sp_std::marker::PhantomData; + +pub struct Migration(PhantomData); + +mod old { + use super::*; + use frame_support::pallet_prelude::ValueQuery; + + #[frame_support::storage_alias] + pub type NextCompatibilityVersion = + StorageValue, Option, ValueQuery>; +} + +impl OnRuntimeUpgrade for Migration { + fn on_runtime_upgrade() -> frame_support::weights::Weight { + old::NextCompatibilityVersion::::take(); + + Weight::zero() + } +} From 4e73511e25b9c3de4087860d475830f1a8ba3f2b Mon Sep 17 00:00:00 2001 From: Andrew Dibble Date: Fri, 13 Oct 2023 20:01:39 +0200 Subject: [PATCH 3/8] feat(custom-rpc): add flip balance to account info (#4119) --- state-chain/custom-rpc/src/lib.rs | 90 +++++++++++++++---------- state-chain/runtime/src/lib.rs | 29 ++++---- state-chain/runtime/src/runtime_apis.rs | 3 +- 3 files changed, 72 insertions(+), 50 deletions(-) diff --git a/state-chain/custom-rpc/src/lib.rs b/state-chain/custom-rpc/src/lib.rs index c1f6747ad7..91dc192e9b 100644 --- a/state-chain/custom-rpc/src/lib.rs +++ b/state-chain/custom-rpc/src/lib.rs @@ -35,14 +35,19 @@ use std::{ #[derive(Serialize, Deserialize)] #[serde(tag = "role", rename_all = "snake_case")] pub enum RpcAccountInfo { - None, - Broker, + None { + flip_balance: NumberOrHex, + }, + Broker { + flip_balance: NumberOrHex, + }, LiquidityProvider { balances: HashMap>, refund_addresses: HashMap>, + flip_balance: NumberOrHex, }, Validator { - balance: NumberOrHex, + flip_balance: NumberOrHex, bond: NumberOrHex, last_heartbeat: u32, online_credits: u32, @@ -59,15 +64,15 @@ pub enum RpcAccountInfo { } impl RpcAccountInfo { - fn none() -> Self { - Self::None + fn none(balance: u128) -> Self { + Self::None { flip_balance: balance.into() } } - fn broker() -> Self { - Self::Broker + fn broker(balance: u128) -> Self { + Self::Broker { flip_balance: balance.into() } } - fn lp(info: LiquidityProviderInfo) -> Self { + fn lp(info: LiquidityProviderInfo, balance: u128) -> Self { let mut balances = HashMap::new(); for (asset, balance) in info.balances { @@ -78,6 +83,7 @@ impl RpcAccountInfo { } Self::LiquidityProvider { + flip_balance: balance.into(), balances, refund_addresses: info .refund_addresses @@ -96,7 +102,7 @@ impl RpcAccountInfo { fn validator(info: RuntimeApiAccountInfoV2) -> Self { Self::Validator { - balance: info.balance.into(), + flip_balance: info.balance.into(), bond: info.bond.into(), last_heartbeat: info.last_heartbeat, online_credits: info.online_credits, @@ -529,26 +535,28 @@ where ) -> RpcResult { let api = self.client.runtime_api(); + let hash = self.unwrap_or_best(at); + + let balance = api.cf_account_flip_balance(hash, &account_id).map_err(to_rpc_error)?; + Ok( match api - .cf_account_role(self.unwrap_or_best(at), account_id.clone()) + .cf_account_role(hash, account_id.clone()) .map_err(to_rpc_error)? .unwrap_or(AccountRole::None) { - AccountRole::None => RpcAccountInfo::none(), - AccountRole::Broker => RpcAccountInfo::broker(), + AccountRole::None => RpcAccountInfo::none(balance), + AccountRole::Broker => RpcAccountInfo::broker(balance), AccountRole::LiquidityProvider => { let info = api - .cf_liquidity_provider_info(self.unwrap_or_best(at), account_id) + .cf_liquidity_provider_info(hash, account_id) .map_err(to_rpc_error)? .expect("role already validated"); - RpcAccountInfo::lp(info) + RpcAccountInfo::lp(info, balance) }, AccountRole::Validator => { - let info = api - .cf_account_info_v2(self.unwrap_or_best(at), account_id) - .map_err(to_rpc_error)?; + let info = api.cf_account_info_v2(hash, &account_id).map_err(to_rpc_error)?; RpcAccountInfo::validator(info) }, @@ -564,7 +572,7 @@ where let account_info = self .client .runtime_api() - .cf_account_info_v2(self.unwrap_or_best(at), account_id) + .cf_account_info_v2(self.unwrap_or_best(at), &account_id) .map_err(to_rpc_error)?; Ok(RpcAccountInfoV2 { @@ -939,33 +947,41 @@ mod test { #[test] fn test_account_info_serialization() { assert_eq!( - serde_json::to_value(RpcAccountInfo::none()).unwrap(), - json!({ "role": "none" }) + serde_json::to_value(RpcAccountInfo::none(0)).unwrap(), + json!({ "role": "none", "flip_balance": "0x0" }) ); assert_eq!( - serde_json::to_value(RpcAccountInfo::broker()).unwrap(), - json!({ "role":"broker" }) + serde_json::to_value(RpcAccountInfo::broker(0)).unwrap(), + json!({ "role":"broker", "flip_balance": "0x0" }) ); - let lp = RpcAccountInfo::lp(LiquidityProviderInfo { - refund_addresses: vec![ - ( - ForeignChain::Ethereum, - Some(cf_chains::ForeignChainAddress::Eth(H160::from([1; 20]))), - ), - ( - ForeignChain::Polkadot, - Some(cf_chains::ForeignChainAddress::Dot(Default::default())), - ), - (ForeignChain::Bitcoin, None), - ], - balances: vec![(Asset::Eth, u128::MAX), (Asset::Btc, 0), (Asset::Flip, u128::MAX / 2)], - }); + let lp = RpcAccountInfo::lp( + LiquidityProviderInfo { + refund_addresses: vec![ + ( + ForeignChain::Ethereum, + Some(cf_chains::ForeignChainAddress::Eth(H160::from([1; 20]))), + ), + ( + ForeignChain::Polkadot, + Some(cf_chains::ForeignChainAddress::Dot(Default::default())), + ), + (ForeignChain::Bitcoin, None), + ], + balances: vec![ + (Asset::Eth, u128::MAX), + (Asset::Btc, 0), + (Asset::Flip, u128::MAX / 2), + ], + }, + 0, + ); assert_eq!( serde_json::to_value(lp).unwrap(), json!({ "role": "liquidity_provider", + "flip_balance": "0x0", "balances": { "Ethereum": { "Flip": "0x7fffffffffffffffffffffffffffffff", @@ -999,7 +1015,7 @@ mod test { assert_eq!( serde_json::to_value(validator).unwrap(), json!({ - "balance": "0xde0b6b3a7640000", + "flip_balance": "0xde0b6b3a7640000", "bond": "0xde0b6b3a7640000", "bound_redeem_address": "0x0101010101010101010101010101010101010101", "is_bidding": false, diff --git a/state-chain/runtime/src/lib.rs b/state-chain/runtime/src/lib.rs index 0107bb062b..fd16b909c4 100644 --- a/state-chain/runtime/src/lib.rs +++ b/state-chain/runtime/src/lib.rs @@ -913,27 +913,32 @@ impl_runtime_apis! { }) .collect() } - fn cf_account_info_v2(account_id: AccountId) -> RuntimeApiAccountInfoV2 { - let is_current_backup = pallet_cf_validator::Backups::::get().contains_key(&account_id); - let key_holder_epochs = pallet_cf_validator::HistoricalActiveEpochs::::get(&account_id); - let is_qualified = <::KeygenQualification as QualifyNode<_>>::is_qualified(&account_id); - let is_current_authority = pallet_cf_validator::CurrentAuthorities::::get().contains(&account_id); - let is_bidding = pallet_cf_funding::ActiveBidder::::get(&account_id); - let bound_redeem_address = pallet_cf_funding::BoundRedeemAddress::::get(&account_id); - let reputation_info = pallet_cf_reputation::Reputations::::get(&account_id); - let account_info = pallet_cf_flip::Account::::get(&account_id); - let restricted_balances = pallet_cf_funding::RestrictedBalances::::get(&account_id); + + fn cf_account_flip_balance(account_id: &AccountId) -> u128 { + pallet_cf_flip::Account::::get(account_id).total() + } + + fn cf_account_info_v2(account_id: &AccountId) -> RuntimeApiAccountInfoV2 { + let is_current_backup = pallet_cf_validator::Backups::::get().contains_key(account_id); + let key_holder_epochs = pallet_cf_validator::HistoricalActiveEpochs::::get(account_id); + let is_qualified = <::KeygenQualification as QualifyNode<_>>::is_qualified(account_id); + let is_current_authority = pallet_cf_validator::CurrentAuthorities::::get().contains(account_id); + let is_bidding = pallet_cf_funding::ActiveBidder::::get(account_id); + let bound_redeem_address = pallet_cf_funding::BoundRedeemAddress::::get(account_id); + let reputation_info = pallet_cf_reputation::Reputations::::get(account_id); + let account_info = pallet_cf_flip::Account::::get(account_id); + let restricted_balances = pallet_cf_funding::RestrictedBalances::::get(account_id); RuntimeApiAccountInfoV2 { balance: account_info.total(), bond: account_info.bond(), - last_heartbeat: pallet_cf_reputation::LastHeartbeat::::get(&account_id).unwrap_or(0), + last_heartbeat: pallet_cf_reputation::LastHeartbeat::::get(account_id).unwrap_or(0), online_credits: reputation_info.online_credits, reputation_points: reputation_info.reputation_points, keyholder_epochs: key_holder_epochs, is_current_authority, is_current_backup, is_qualified: is_bidding && is_qualified, - is_online: Reputation::is_qualified(&account_id), + is_online: Reputation::is_qualified(account_id), is_bidding, bound_redeem_address, restricted_balances, diff --git a/state-chain/runtime/src/runtime_apis.rs b/state-chain/runtime/src/runtime_apis.rs index 9da9b759bf..a02a0e7838 100644 --- a/state-chain/runtime/src/runtime_apis.rs +++ b/state-chain/runtime/src/runtime_apis.rs @@ -103,7 +103,8 @@ decl_runtime_apis!( /// Returns the flip supply in the form [total_issuance, offchain_funds] fn cf_flip_supply() -> (u128, u128); fn cf_accounts() -> Vec<(AccountId32, VanityName)>; - fn cf_account_info_v2(account_id: AccountId32) -> RuntimeApiAccountInfoV2; + fn cf_account_flip_balance(account_id: &AccountId32) -> u128; + fn cf_account_info_v2(account_id: &AccountId32) -> RuntimeApiAccountInfoV2; fn cf_penalties() -> Vec<(Offence, RuntimeApiPenalty)>; fn cf_suspensions() -> Vec<(Offence, Vec<(u32, AccountId32)>)>; fn cf_generate_gov_key_call_hash(call: Vec) -> GovCallHash; From 5c70808fc7d011feff8ee4f3968cab5fc33a755d Mon Sep 17 00:00:00 2001 From: Maxim Shishmarev Date: Mon, 16 Oct 2023 13:18:31 +1100 Subject: [PATCH 4/8] Feat: ensure correct process termination in ingress/egress tracker (#4101) * refactor: move most of btc into separate module (no new changes) * chore: tests submodule (allows collapsing all tests when not needed) * refactor: move the rest of btc into separate module (with a few changes) * chore: remove stale comment * feat: ensure correct process termination (using task_scope) * chore: fix clippy * chore: address review --- .../src/main.rs | 585 ++---------------- .../src/witnessing.rs | 96 ++- .../src/witnessing/btc.rs | 533 ++++++++++++++++ 3 files changed, 623 insertions(+), 591 deletions(-) create mode 100644 api/bin/chainflip-ingress-egress-tracker/src/witnessing/btc.rs diff --git a/api/bin/chainflip-ingress-egress-tracker/src/main.rs b/api/bin/chainflip-ingress-egress-tracker/src/main.rs index b8b3a38a7e..b59c3fa605 100644 --- a/api/bin/chainflip-ingress-egress-tracker/src/main.rs +++ b/api/bin/chainflip-ingress-egress-tracker/src/main.rs @@ -1,291 +1,12 @@ -use anyhow::anyhow; -use async_trait::async_trait; use chainflip_engine::settings::WsHttpEndpoints; -use futures::future; +use futures::FutureExt; use jsonrpsee::{core::Error, server::ServerBuilder, RpcModule}; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::{ - collections::{HashMap, HashSet}, - env, - io::Write, - net::SocketAddr, - path::PathBuf, - sync::{Arc, Mutex}, - time::Duration, -}; -use tokio::{task, time}; +use std::{env, io::Write, net::SocketAddr, path::PathBuf}; use tracing::log; - -type TxHash = String; -type BlockHash = String; -type Address = String; +use utilities::task_scope; mod witnessing; -#[derive(Deserialize)] -struct BestBlockResult { - result: BlockHash, -} - -#[derive(Deserialize)] -struct MemPoolResult { - result: Vec, -} - -#[derive(Deserialize, Clone)] -struct ScriptPubKey { - address: Option
, -} - -#[derive(Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -struct Vout { - value: f64, - script_pub_key: ScriptPubKey, -} - -#[derive(Deserialize, Clone)] -struct RawTx { - txid: TxHash, - vout: Vec, -} - -#[derive(Deserialize)] -struct RawTxResult { - result: Option, -} - -#[derive(Deserialize, Clone)] -struct Block { - previousblockhash: BlockHash, - tx: Vec, -} - -#[derive(Deserialize)] -struct BlockResult { - result: Block, -} - -#[derive(Clone, Serialize)] -struct QueryResult { - confirmations: u32, - destination: Address, - value: f64, - tx_hash: TxHash, -} - -#[derive(Default, Clone)] -enum CacheStatus { - #[default] - Init, - Ready, - Down, -} - -#[derive(Default, Clone)] -struct Cache { - status: CacheStatus, - best_block_hash: BlockHash, - transactions: HashMap, - known_tx_hashes: HashSet, -} - -const SAFETY_MARGIN: u32 = 10; -const REFRESH_INTERVAL: u64 = 10; - -#[async_trait] -trait BtcNode { - async fn getrawmempool(&self) -> anyhow::Result>; - async fn getrawtransactions( - &self, - tx_hashes: Vec, - ) -> anyhow::Result>>; - async fn getbestblockhash(&self) -> anyhow::Result; - async fn getblock(&self, block_hash: BlockHash) -> anyhow::Result; -} - -struct BtcRpc; - -impl BtcRpc { - async fn call( - &self, - method: &str, - params: Vec<&str>, - ) -> anyhow::Result> { - log::info!("Calling {} with batch size of {}", method, params.len()); - let url = env::var("BTC_ENDPOINT").unwrap_or("http://127.0.0.1:8332".to_string()); - let body = params - .iter() - .map(|param| { - format!(r#"{{"jsonrpc":"1.0","id":0,"method":"{}","params":[{}]}}"#, method, param) - }) - .collect::>() - .join(","); - - let username = env::var("BTC_USERNAME").unwrap_or("flip".to_string()); - let password = env::var("BTC_PASSWORD").unwrap_or("flip".to_string()); - reqwest::Client::new() - .post(url) - .basic_auth(username, Some(password)) - .header("Content-Type", "text/plain") - .body(format!("[{}]", body)) - .send() - .await? - .json::>() - .await - .map_err(|err| anyhow!(err)) - .and_then(|result| { - if result.len() == params.len() { - Ok(result) - } else { - Err(anyhow!("Batched request returned an incorrect number of results")) - } - }) - } -} - -#[async_trait] -impl BtcNode for BtcRpc { - async fn getrawmempool(&self) -> anyhow::Result> { - self.call::("getrawmempool", vec![""]) - .await - .map(|x| x[0].result.clone()) - } - async fn getrawtransactions( - &self, - tx_hashes: Vec, - ) -> anyhow::Result>> { - let params = tx_hashes - .iter() - .map(|tx_hash| format!("\"{}\", true", tx_hash)) - .collect::>(); - Ok(self - .call::( - "getrawtransaction", - params.iter().map(|x| x.as_str()).collect::>(), - ) - .await? - .into_iter() - .map(|x| x.result) - .collect::>>()) - } - async fn getbestblockhash(&self) -> anyhow::Result { - self.call::("getbestblockhash", vec![""]) - .await - .map(|x| x[0].result.clone()) - } - async fn getblock(&self, block_hash: String) -> anyhow::Result { - self.call::("getblock", vec![&format!("\"{}\", 2", block_hash)]) - .await - .map(|x| x[0].result.clone()) - } -} - -async fn get_updated_cache(btc: T, previous_cache: Cache) -> anyhow::Result { - let all_mempool_transactions: Vec = btc.getrawmempool().await?; - let mut new_transactions: HashMap = Default::default(); - let mut new_known_tx_hashes: HashSet = Default::default(); - let previous_mempool: HashMap = previous_cache - .clone() - .transactions - .into_iter() - .filter_map(|(_, query_result)| { - if query_result.confirmations == 0 { - Some((query_result.tx_hash.clone(), query_result)) - } else { - None - } - }) - .collect(); - let unknown_mempool_transactions: Vec = all_mempool_transactions - .into_iter() - .filter(|tx_hash| { - if let Some(known_transaction) = previous_mempool.get(tx_hash) { - new_known_tx_hashes.insert(tx_hash.clone()); - new_transactions - .insert(known_transaction.destination.clone(), known_transaction.clone()); - } else if previous_cache.known_tx_hashes.contains(tx_hash) { - new_known_tx_hashes.insert(tx_hash.clone()); - } else { - return true - } - false - }) - .collect(); - let transactions: Vec = btc - .getrawtransactions(unknown_mempool_transactions) - .await? - .iter() - .filter_map(|x| x.clone()) - .collect(); - for tx in transactions { - for vout in tx.vout { - new_known_tx_hashes.insert(tx.txid.clone()); - if let Some(destination) = vout.script_pub_key.address { - new_transactions.insert( - destination.clone(), - QueryResult { - destination, - confirmations: 0, - value: vout.value, - tx_hash: tx.txid.clone(), - }, - ); - } - } - } - let block_hash = btc.getbestblockhash().await?; - if previous_cache.best_block_hash == block_hash { - for entry in previous_cache.transactions { - if entry.1.confirmations > 0 { - new_transactions.insert(entry.0, entry.1); - } - } - } else { - log::info!("New block found: {}", block_hash); - let mut block_hash_to_query = block_hash.clone(); - for confirmations in 1..SAFETY_MARGIN { - let block = btc.getblock(block_hash_to_query).await?; - for tx in block.tx { - for vout in tx.vout { - if let Some(destination) = vout.script_pub_key.address { - new_transactions.insert( - destination.clone(), - QueryResult { - destination, - confirmations, - value: vout.value, - tx_hash: tx.txid.clone(), - }, - ); - } - } - } - block_hash_to_query = block.previousblockhash; - } - } - Ok(Cache { - status: CacheStatus::Ready, - best_block_hash: block_hash, - transactions: new_transactions, - known_tx_hashes: new_known_tx_hashes, - }) -} - -fn lookup_transactions( - cache: Cache, - addresses: Vec, -) -> Result>, Error> { - match cache.status { - CacheStatus::Ready => Ok(addresses - .iter() - .map(|address| cache.transactions.get(address).map(Clone::clone)) - .collect::>>()), - CacheStatus::Init => Err(anyhow!("Address cache is not initialised.").into()), - CacheStatus::Down => Err(anyhow!("Address cache is down - check btc connection.").into()), - } -} - pub struct DepositTrackerSettings { eth_node: WsHttpEndpoints, // The key shouldn't be necessary, but the current witnesser wants this @@ -293,82 +14,36 @@ pub struct DepositTrackerSettings { state_chain_ws_endpoint: String, } -#[tokio::main] -async fn main() -> anyhow::Result<()> { +async fn start( + scope: &task_scope::Scope<'_, anyhow::Error>, + settings: DepositTrackerSettings, +) -> anyhow::Result<()> { tracing_subscriber::FmtSubscriber::builder() .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .try_init() .expect("setting default subscriber failed"); - let cache: Arc> = Default::default(); - let updater = task::spawn({ - let cache = cache.clone(); - async move { - let mut interval = time::interval(Duration::from_secs(REFRESH_INTERVAL)); - interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip); - loop { - interval.tick().await; - let cache_copy = cache.lock().unwrap().clone(); - match get_updated_cache(BtcRpc, cache_copy).await { - Ok(updated_cache) => { - let mut cache = cache.lock().unwrap(); - *cache = updated_cache; - }, - Err(err) => { - log::error!("Error when querying Bitcoin chain: {}", err); - let mut cache = cache.lock().unwrap(); - cache.status = CacheStatus::Down; - }, - } - } - } - }); - let server = ServerBuilder::default() - // It seems that if the client doesn't unsubscribe correctly, a "connection" - // won't be released, and we will eventually reach the limit, so we increase - // as a way to mitigate this issue. - // TODO: ensure that connections are always released - .build("0.0.0.0:13337".parse::()?) - .await?; let mut module = RpcModule::new(()); + + let btc_tracker = witnessing::btc::start(scope).await; + module.register_async_method("status", move |arguments, _context| { - let cache = cache.clone(); + let btc_tracker = btc_tracker.clone(); async move { - arguments - .parse::>() - .map_err(Error::Call) - .and_then(|addresses| lookup_transactions(cache.lock().unwrap().clone(), addresses)) + arguments.parse::>().map_err(Error::Call).and_then(|addresses| { + btc_tracker + .lookup_transactions(&addresses) + .map_err(|err| jsonrpsee::core::Error::Custom(err.to_string())) + }) } })?; - // Broadcast channel will drop old messages when the buffer if full to + // Broadcast channel will drop old messages when the buffer is full to // avoid "memory leaks" due to slow receivers. const EVENT_BUFFER_SIZE: usize = 1024; let (witness_sender, _) = tokio::sync::broadcast::channel::(EVENT_BUFFER_SIZE); - // Temporary hack: we don't actually use eth key, but the current witnesser is - // expecting a path with a valid key, so we create a temporary dummy key file here: - let mut eth_key_temp_file = tempfile::NamedTempFile::new()?; - eth_key_temp_file - .write_all(b"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef") - .unwrap(); - let eth_key_path = eth_key_temp_file.path(); - - let eth_ws_endpoint = env::var("ETH_WS_ENDPOINT").unwrap_or("ws://localhost:8546".to_string()); - let eth_http_endpoint = - env::var("ETH_HTTP_ENDPOINT").unwrap_or("http://localhost:8545".to_string()); - let sc_ws_endpoint = env::var("SC_WS_ENDPOINT").unwrap_or("ws://localhost:9944".to_string()); - - let settings = DepositTrackerSettings { - eth_node: WsHttpEndpoints { - ws_endpoint: eth_ws_endpoint.into(), - http_endpoint: eth_http_endpoint.into(), - }, - eth_key_path: eth_key_path.into(), - state_chain_ws_endpoint: sc_ws_endpoint, - }; - - witnessing::start(settings, witness_sender.clone()); + witnessing::start(scope, settings, witness_sender.clone()).await?; module.register_subscription( "subscribe_witnessing", @@ -390,204 +65,42 @@ async fn main() -> anyhow::Result<()> { }, )?; - let addr = server.local_addr()?; - log::info!("Listening on http://{}", addr); - let serverhandle = Box::pin(server.start(module)?.stopped()); - let _ = future::select(serverhandle, updater).await; - Ok(()) -} - -#[cfg(test)] -#[derive(Clone)] -struct MockBtcRpc { - mempool: Vec, - latest_block_hash: String, - blocks: HashMap, -} - -#[cfg(test)] -#[async_trait] -impl BtcNode for MockBtcRpc { - async fn getrawmempool(&self) -> anyhow::Result> { - Ok(self.mempool.iter().map(|x| x.txid.clone()).collect()) - } - async fn getrawtransactions( - &self, - tx_hashes: Vec, - ) -> anyhow::Result>> { - let mut result: Vec> = Default::default(); - for hash in tx_hashes { - for tx in self.mempool.clone() { - if tx.txid == hash { - result.push(Some(tx)) - } else { - result.push(None) - } - } - } - Ok(result) - } - async fn getbestblockhash(&self) -> anyhow::Result { - Ok(self.latest_block_hash.clone()) - } - async fn getblock(&self, block_hash: String) -> anyhow::Result { - self.blocks.get(&block_hash).cloned().ok_or(anyhow!("Block missing")) - } -} + scope.spawn(async { + let server = ServerBuilder::default().build("0.0.0.0:13337".parse::()?).await?; + let addr = server.local_addr()?; + log::info!("Listening on http://{}", addr); + server.start(module)?.stopped().await; + // If the server stops for some reason, we return + // error to terminate other tasks and the process. + Err(anyhow::anyhow!("RPC server stopped")) + }); -#[tokio::test] -async fn multiple_outputs_in_one_tx() { - let mempool = vec![RawTx { - txid: "tx1".into(), - vout: vec![ - Vout { value: 0.8, script_pub_key: ScriptPubKey { address: Some("address1".into()) } }, - Vout { value: 1.2, script_pub_key: ScriptPubKey { address: Some("address2".into()) } }, - ], - }]; - let latest_block_hash = "15".to_string(); - let mut blocks: HashMap = Default::default(); - for i in 1..16 { - blocks.insert(i.to_string(), Block { previousblockhash: (i - 1).to_string(), tx: vec![] }); - } - let btc = MockBtcRpc { mempool, latest_block_hash, blocks }; - let cache: Cache = Default::default(); - let cache = get_updated_cache(btc, cache).await.unwrap(); - let result = lookup_transactions(cache, vec!["address1".into(), "address2".into()]).unwrap(); - assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); - assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); + Ok(()) } -#[tokio::test] -async fn mempool_updates() { - let mempool = vec![ - RawTx { - txid: "tx1".into(), - vout: vec![Vout { - value: 0.8, - script_pub_key: ScriptPubKey { address: Some("address1".into()) }, - }], - }, - RawTx { - txid: "tx2".into(), - vout: vec![Vout { - value: 0.8, - script_pub_key: ScriptPubKey { address: Some("address2".into()) }, - }], - }, - ]; - let latest_block_hash = "15".to_string(); - let mut blocks: HashMap = Default::default(); - for i in 1..16 { - blocks.insert(i.to_string(), Block { previousblockhash: (i - 1).to_string(), tx: vec![] }); - } - let mut btc = MockBtcRpc { mempool: mempool.clone(), latest_block_hash, blocks }; - let cache: Cache = Default::default(); - let cache = get_updated_cache(btc.clone(), cache).await.unwrap(); - let result = lookup_transactions( - cache.clone(), - vec!["address1".into(), "address2".into(), "address3".into()], - ) - .unwrap(); - assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); - assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); - assert!(result[2].is_none()); - - btc.mempool.append(&mut vec![RawTx { - txid: "tx3".into(), - vout: vec![Vout { - value: 0.8, - script_pub_key: ScriptPubKey { address: Some("address3".into()) }, - }], - }]); - let cache = get_updated_cache(btc.clone(), cache.clone()).await.unwrap(); - let result = lookup_transactions( - cache.clone(), - vec!["address1".into(), "address2".into(), "address3".into()], - ) - .unwrap(); - assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); - assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); - assert_eq!(result[2].as_ref().unwrap().destination, "address3".to_string()); +#[tokio::main] +async fn main() -> anyhow::Result<()> { + // Temporary hack: we don't actually use eth key, but the current witnesser is + // expecting a path with a valid key, so we create a temporary dummy key file here: + let mut eth_key_temp_file = tempfile::NamedTempFile::new()?; + eth_key_temp_file + .write_all(b"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef") + .unwrap(); + let eth_key_path = eth_key_temp_file.path(); - btc.mempool.remove(0); - let cache = get_updated_cache(btc.clone(), cache.clone()).await.unwrap(); - let result = lookup_transactions( - cache.clone(), - vec!["address1".into(), "address2".into(), "address3".into()], - ) - .unwrap(); - assert!(result[0].is_none()); - assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); - assert_eq!(result[2].as_ref().unwrap().destination, "address3".to_string()); -} + let eth_ws_endpoint = env::var("ETH_WS_ENDPOINT").unwrap_or("ws://localhost:8546".to_string()); + let eth_http_endpoint = + env::var("ETH_HTTP_ENDPOINT").unwrap_or("http://localhost:8545".to_string()); + let sc_ws_endpoint = env::var("SC_WS_ENDPOINT").unwrap_or("ws://localhost:9944".to_string()); -#[tokio::test] -async fn blocks() { - let mempool = vec![]; - let latest_block_hash = "15".to_string(); - let mut blocks: HashMap = Default::default(); - for i in 1..19 { - blocks.insert(i.to_string(), Block { previousblockhash: (i - 1).to_string(), tx: vec![] }); - } - blocks.insert( - "15".to_string(), - Block { - previousblockhash: "14".to_string(), - tx: vec![RawTx { - txid: "tx1".into(), - vout: vec![Vout { - value: 12.5, - script_pub_key: ScriptPubKey { address: Some("address1".into()) }, - }], - }], + let settings = DepositTrackerSettings { + eth_node: WsHttpEndpoints { + ws_endpoint: eth_ws_endpoint.into(), + http_endpoint: eth_http_endpoint.into(), }, - ); - let mut btc = MockBtcRpc { mempool: mempool.clone(), latest_block_hash, blocks }; - let cache: Cache = Default::default(); - let cache = get_updated_cache(btc.clone(), cache).await.unwrap(); - let result = lookup_transactions(cache.clone(), vec!["address1".into()]).unwrap(); - assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); - assert_eq!(result[0].as_ref().unwrap().confirmations, 1); - - btc.latest_block_hash = "16".to_string(); - let cache = get_updated_cache(btc.clone(), cache.clone()).await.unwrap(); - let result = lookup_transactions(cache.clone(), vec!["address1".into()]).unwrap(); - assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); - assert_eq!(result[0].as_ref().unwrap().confirmations, 2); -} + eth_key_path: eth_key_path.into(), + state_chain_ws_endpoint: sc_ws_endpoint, + }; -#[tokio::test] -async fn report_oldest_tx_only() { - let mempool = vec![RawTx { - txid: "tx2".into(), - vout: vec![Vout { - value: 0.8, - script_pub_key: ScriptPubKey { address: Some("address1".into()) }, - }], - }]; - let latest_block_hash = "15".to_string(); - let mut blocks: HashMap = Default::default(); - for i in 1..16 { - blocks.insert(i.to_string(), Block { previousblockhash: (i - 1).to_string(), tx: vec![] }); - } - blocks.insert( - "13".to_string(), - Block { - previousblockhash: "12".to_string(), - tx: vec![RawTx { - txid: "tx1".into(), - vout: vec![Vout { - value: 12.5, - script_pub_key: ScriptPubKey { address: Some("address1".into()) }, - }], - }], - }, - ); - let btc = MockBtcRpc { mempool: mempool.clone(), latest_block_hash, blocks }; - let cache: Cache = Default::default(); - let cache = get_updated_cache(btc.clone(), cache).await.unwrap(); - let result = lookup_transactions(cache.clone(), vec!["address1".into()]).unwrap(); - assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); - assert_eq!(result[0].as_ref().unwrap().confirmations, 3); - assert_eq!(result[0].as_ref().unwrap().value, 12.5); + task_scope::task_scope(|scope| async move { start(scope, settings).await }.boxed()).await } diff --git a/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs b/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs index d57f77c98b..b40df7b2cf 100644 --- a/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs +++ b/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs @@ -1,3 +1,4 @@ +pub mod btc; mod eth; use std::collections::HashMap; @@ -10,7 +11,6 @@ use chainflip_engine::{ }, witness::common::epoch_source::EpochSource, }; -use futures::FutureExt; use sp_core::H160; use utilities::task_scope; @@ -72,62 +72,48 @@ async fn get_env_parameters(state_chain_client: &StateChainClient<()>) -> Enviro } } -pub(super) fn start( +pub(super) async fn start( + scope: &task_scope::Scope<'_, anyhow::Error>, settings: DepositTrackerSettings, witness_sender: tokio::sync::broadcast::Sender, -) { - tokio::spawn(async { - // TODO: ensure that if this panics, the whole process exists (probably by moving - // task scope to main) - task_scope::task_scope(|scope| { +) -> anyhow::Result<()> { + let (state_chain_stream, state_chain_client) = { + state_chain_observer::client::StateChainClient::connect_without_account( + scope, + &settings.state_chain_ws_endpoint, + ) + .await? + }; + + let env_params = get_env_parameters(&state_chain_client).await; + + let epoch_source = + EpochSource::builder(scope, state_chain_stream.clone(), state_chain_client.clone()).await; + + let witness_call = { + let witness_sender = witness_sender.clone(); + move |call: state_chain_runtime::RuntimeCall, _epoch_index| { + let witness_sender = witness_sender.clone(); async move { - let (state_chain_stream, state_chain_client) = { - state_chain_observer::client::StateChainClient::connect_without_account( - scope, - &settings.state_chain_ws_endpoint, - ) - .await? - }; - - let env_params = get_env_parameters(&state_chain_client).await; - - let epoch_source = EpochSource::builder( - scope, - state_chain_stream.clone(), - state_chain_client.clone(), - ) - .await; - - let witness_call = { - let witness_sender = witness_sender.clone(); - move |call: state_chain_runtime::RuntimeCall, _epoch_index| { - let witness_sender = witness_sender.clone(); - async move { - // Send may fail if there aren't any subscribers, - // but it is safe to ignore the error. - if let Ok(n) = witness_sender.send(call) { - tracing::info!("Broadcasting witnesser call to {} subscribers", n); - } - } - } - }; - - eth::start( - scope, - state_chain_client.clone(), - state_chain_stream.clone(), - settings, - env_params, - epoch_source.clone(), - witness_call, - ) - .await?; - - Ok(()) + // Send may fail if there aren't any subscribers, + // but it is safe to ignore the error. + if let Ok(n) = witness_sender.send(call.clone()) { + tracing::info!("Broadcasting witnesser call {:?} to {} subscribers", call, n); + } } - .boxed() - }) - .await - .unwrap() - }); + } + }; + + eth::start( + scope, + state_chain_client.clone(), + state_chain_stream.clone(), + settings, + env_params, + epoch_source.clone(), + witness_call, + ) + .await?; + + Ok(()) } diff --git a/api/bin/chainflip-ingress-egress-tracker/src/witnessing/btc.rs b/api/bin/chainflip-ingress-egress-tracker/src/witnessing/btc.rs new file mode 100644 index 0000000000..67b63c1fea --- /dev/null +++ b/api/bin/chainflip-ingress-egress-tracker/src/witnessing/btc.rs @@ -0,0 +1,533 @@ +use std::{ + collections::{HashMap, HashSet}, + env, + sync::{Arc, Mutex}, + time::Duration, +}; + +use anyhow::anyhow; +use async_trait::async_trait; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use tracing::{error, info}; +use utilities::task_scope; + +type TxHash = String; +type BlockHash = String; +type Address = String; + +#[derive(Deserialize)] +struct BestBlockResult { + result: BlockHash, +} + +#[derive(Deserialize)] +struct MemPoolResult { + result: Vec, +} + +#[derive(Deserialize, Clone)] +struct ScriptPubKey { + address: Option
, +} + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +struct Vout { + value: f64, + script_pub_key: ScriptPubKey, +} + +#[derive(Deserialize, Clone)] +struct RawTx { + txid: TxHash, + vout: Vec, +} + +#[derive(Deserialize)] +struct RawTxResult { + result: Option, +} + +#[derive(Deserialize, Clone)] +struct Block { + previousblockhash: BlockHash, + tx: Vec, +} + +#[derive(Deserialize)] +struct BlockResult { + result: Block, +} + +#[derive(Clone, Serialize)] +pub struct QueryResult { + confirmations: u32, + destination: Address, + value: f64, + tx_hash: TxHash, +} + +#[derive(Default, Clone)] +enum CacheStatus { + #[default] + Init, + Ready, + Down, +} + +#[derive(Default, Clone)] +struct Cache { + status: CacheStatus, + best_block_hash: BlockHash, + transactions: HashMap, + known_tx_hashes: HashSet, +} + +const SAFETY_MARGIN: u32 = 10; +const REFRESH_INTERVAL: u64 = 10; + +#[async_trait] +trait BtcNode { + async fn getrawmempool(&self) -> anyhow::Result>; + async fn getrawtransactions( + &self, + tx_hashes: Vec, + ) -> anyhow::Result>>; + async fn getbestblockhash(&self) -> anyhow::Result; + async fn getblock(&self, block_hash: BlockHash) -> anyhow::Result; +} + +struct BtcRpc; + +impl BtcRpc { + async fn call( + &self, + method: &str, + params: Vec<&str>, + ) -> anyhow::Result> { + info!("Calling {} with batch size of {}", method, params.len()); + let url = env::var("BTC_ENDPOINT").unwrap_or("http://127.0.0.1:8332".to_string()); + let body = params + .iter() + .map(|param| { + format!(r#"{{"jsonrpc":"1.0","id":0,"method":"{}","params":[{}]}}"#, method, param) + }) + .collect::>() + .join(","); + + let username = env::var("BTC_USERNAME").unwrap_or("flip".to_string()); + let password = env::var("BTC_PASSWORD").unwrap_or("flip".to_string()); + reqwest::Client::new() + .post(url) + .basic_auth(username, Some(password)) + .header("Content-Type", "text/plain") + .body(format!("[{}]", body)) + .send() + .await? + .json::>() + .await + .map_err(|err| anyhow!(err)) + .and_then(|result| { + if result.len() == params.len() { + Ok(result) + } else { + Err(anyhow!("Batched request returned an incorrect number of results")) + } + }) + } +} + +#[async_trait] +impl BtcNode for BtcRpc { + async fn getrawmempool(&self) -> anyhow::Result> { + self.call::("getrawmempool", vec![""]) + .await + .map(|x| x[0].result.clone()) + } + async fn getrawtransactions( + &self, + tx_hashes: Vec, + ) -> anyhow::Result>> { + let params = tx_hashes + .iter() + .map(|tx_hash| format!("\"{}\", true", tx_hash)) + .collect::>(); + Ok(self + .call::( + "getrawtransaction", + params.iter().map(|x| x.as_str()).collect::>(), + ) + .await? + .into_iter() + .map(|x| x.result) + .collect::>>()) + } + async fn getbestblockhash(&self) -> anyhow::Result { + self.call::("getbestblockhash", vec![""]) + .await + .map(|x| x[0].result.clone()) + } + async fn getblock(&self, block_hash: String) -> anyhow::Result { + self.call::("getblock", vec![&format!("\"{}\", 2", block_hash)]) + .await + .map(|x| x[0].result.clone()) + } +} + +async fn get_updated_cache(btc: T, previous_cache: Cache) -> anyhow::Result { + let all_mempool_transactions: Vec = btc.getrawmempool().await?; + let mut new_transactions: HashMap = Default::default(); + let mut new_known_tx_hashes: HashSet = Default::default(); + let previous_mempool: HashMap = previous_cache + .clone() + .transactions + .into_iter() + .filter_map(|(_, query_result)| { + if query_result.confirmations == 0 { + Some((query_result.tx_hash.clone(), query_result)) + } else { + None + } + }) + .collect(); + let unknown_mempool_transactions: Vec = all_mempool_transactions + .into_iter() + .filter(|tx_hash| { + if let Some(known_transaction) = previous_mempool.get(tx_hash) { + new_known_tx_hashes.insert(tx_hash.clone()); + new_transactions + .insert(known_transaction.destination.clone(), known_transaction.clone()); + } else if previous_cache.known_tx_hashes.contains(tx_hash) { + new_known_tx_hashes.insert(tx_hash.clone()); + } else { + return true + } + false + }) + .collect(); + let transactions: Vec = btc + .getrawtransactions(unknown_mempool_transactions) + .await? + .iter() + .filter_map(|x| x.clone()) + .collect(); + for tx in transactions { + for vout in tx.vout { + new_known_tx_hashes.insert(tx.txid.clone()); + if let Some(destination) = vout.script_pub_key.address { + new_transactions.insert( + destination.clone(), + QueryResult { + destination, + confirmations: 0, + value: vout.value, + tx_hash: tx.txid.clone(), + }, + ); + } + } + } + let block_hash = btc.getbestblockhash().await?; + if previous_cache.best_block_hash == block_hash { + for entry in previous_cache.transactions { + if entry.1.confirmations > 0 { + new_transactions.insert(entry.0, entry.1); + } + } + } else { + info!("New block found: {}", block_hash); + let mut block_hash_to_query = block_hash.clone(); + for confirmations in 1..SAFETY_MARGIN { + let block = btc.getblock(block_hash_to_query).await?; + for tx in block.tx { + for vout in tx.vout { + if let Some(destination) = vout.script_pub_key.address { + new_transactions.insert( + destination.clone(), + QueryResult { + destination, + confirmations, + value: vout.value, + tx_hash: tx.txid.clone(), + }, + ); + } + } + } + block_hash_to_query = block.previousblockhash; + } + } + Ok(Cache { + status: CacheStatus::Ready, + best_block_hash: block_hash, + transactions: new_transactions, + known_tx_hashes: new_known_tx_hashes, + }) +} + +fn lookup_transactions( + cache: &Cache, + addresses: &[String], +) -> anyhow::Result>> { + match cache.status { + CacheStatus::Ready => Ok(addresses + .iter() + .map(|address| cache.transactions.get(address).map(Clone::clone)) + .collect::>>()), + CacheStatus::Init => Err(anyhow!("Address cache is not initialised.")), + CacheStatus::Down => Err(anyhow!("Address cache is down - check btc connection.")), + } +} + +#[derive(Clone)] +pub struct BtcTracker { + cache: Arc>, +} + +impl BtcTracker { + pub fn lookup_transactions( + &self, + addresses: &[String], + ) -> anyhow::Result>> { + lookup_transactions(&self.cache.lock().unwrap(), addresses) + } +} + +pub async fn start(scope: &task_scope::Scope<'_, anyhow::Error>) -> BtcTracker { + let cache: Arc> = Default::default(); + scope.spawn({ + let cache = cache.clone(); + async move { + let mut interval = tokio::time::interval(Duration::from_secs(REFRESH_INTERVAL)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + interval.tick().await; + let cache_copy = cache.lock().unwrap().clone(); + match get_updated_cache(BtcRpc, cache_copy).await { + Ok(updated_cache) => { + let mut cache = cache.lock().unwrap(); + *cache = updated_cache; + }, + Err(err) => { + error!("Error when querying Bitcoin chain: {}", err); + let mut cache = cache.lock().unwrap(); + cache.status = CacheStatus::Down; + }, + } + } + } + }); + + BtcTracker { cache } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[derive(Clone)] + struct MockBtcRpc { + mempool: Vec, + latest_block_hash: String, + blocks: HashMap, + } + + #[async_trait] + impl BtcNode for MockBtcRpc { + async fn getrawmempool(&self) -> anyhow::Result> { + Ok(self.mempool.iter().map(|x| x.txid.clone()).collect()) + } + async fn getrawtransactions( + &self, + tx_hashes: Vec, + ) -> anyhow::Result>> { + let mut result: Vec> = Default::default(); + for hash in tx_hashes { + for tx in self.mempool.clone() { + if tx.txid == hash { + result.push(Some(tx)) + } else { + result.push(None) + } + } + } + Ok(result) + } + async fn getbestblockhash(&self) -> anyhow::Result { + Ok(self.latest_block_hash.clone()) + } + async fn getblock(&self, block_hash: String) -> anyhow::Result { + self.blocks.get(&block_hash).cloned().ok_or(anyhow!("Block missing")) + } + } + + #[tokio::test] + async fn multiple_outputs_in_one_tx() { + let mempool = vec![RawTx { + txid: "tx1".into(), + vout: vec![ + Vout { + value: 0.8, + script_pub_key: ScriptPubKey { address: Some("address1".into()) }, + }, + Vout { + value: 1.2, + script_pub_key: ScriptPubKey { address: Some("address2".into()) }, + }, + ], + }]; + let latest_block_hash = "15".to_string(); + let mut blocks: HashMap = Default::default(); + for i in 1..16 { + blocks.insert( + i.to_string(), + Block { previousblockhash: (i - 1).to_string(), tx: vec![] }, + ); + } + let btc = MockBtcRpc { mempool, latest_block_hash, blocks }; + let cache: Cache = Default::default(); + let cache = get_updated_cache(btc, cache).await.unwrap(); + let result = lookup_transactions(&cache, &["address1".into(), "address2".into()]).unwrap(); + assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); + assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); + } + + #[tokio::test] + async fn mempool_updates() { + let mempool = vec![ + RawTx { + txid: "tx1".into(), + vout: vec![Vout { + value: 0.8, + script_pub_key: ScriptPubKey { address: Some("address1".into()) }, + }], + }, + RawTx { + txid: "tx2".into(), + vout: vec![Vout { + value: 0.8, + script_pub_key: ScriptPubKey { address: Some("address2".into()) }, + }], + }, + ]; + let latest_block_hash = "15".to_string(); + let mut blocks: HashMap = Default::default(); + for i in 1..16 { + blocks.insert( + i.to_string(), + Block { previousblockhash: (i - 1).to_string(), tx: vec![] }, + ); + } + let mut btc = MockBtcRpc { mempool: mempool.clone(), latest_block_hash, blocks }; + let cache: Cache = Default::default(); + let cache = get_updated_cache(btc.clone(), cache).await.unwrap(); + let result = + lookup_transactions(&cache, &["address1".into(), "address2".into(), "address3".into()]) + .unwrap(); + assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); + assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); + assert!(result[2].is_none()); + + btc.mempool.append(&mut vec![RawTx { + txid: "tx3".into(), + vout: vec![Vout { + value: 0.8, + script_pub_key: ScriptPubKey { address: Some("address3".into()) }, + }], + }]); + let cache = get_updated_cache(btc.clone(), cache.clone()).await.unwrap(); + let result = + lookup_transactions(&cache, &["address1".into(), "address2".into(), "address3".into()]) + .unwrap(); + assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); + assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); + assert_eq!(result[2].as_ref().unwrap().destination, "address3".to_string()); + + btc.mempool.remove(0); + let cache = get_updated_cache(btc.clone(), cache.clone()).await.unwrap(); + let result = + lookup_transactions(&cache, &["address1".into(), "address2".into(), "address3".into()]) + .unwrap(); + assert!(result[0].is_none()); + assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); + assert_eq!(result[2].as_ref().unwrap().destination, "address3".to_string()); + } + + #[tokio::test] + async fn blocks() { + let mempool = vec![]; + let latest_block_hash = "15".to_string(); + let mut blocks: HashMap = Default::default(); + for i in 1..19 { + blocks.insert( + i.to_string(), + Block { previousblockhash: (i - 1).to_string(), tx: vec![] }, + ); + } + blocks.insert( + "15".to_string(), + Block { + previousblockhash: "14".to_string(), + tx: vec![RawTx { + txid: "tx1".into(), + vout: vec![Vout { + value: 12.5, + script_pub_key: ScriptPubKey { address: Some("address1".into()) }, + }], + }], + }, + ); + let mut btc = MockBtcRpc { mempool: mempool.clone(), latest_block_hash, blocks }; + let cache: Cache = Default::default(); + let cache = get_updated_cache(btc.clone(), cache).await.unwrap(); + let result = lookup_transactions(&cache, &["address1".into()]).unwrap(); + assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); + assert_eq!(result[0].as_ref().unwrap().confirmations, 1); + + btc.latest_block_hash = "16".to_string(); + let cache = get_updated_cache(btc.clone(), cache.clone()).await.unwrap(); + let result = lookup_transactions(&cache, &["address1".into()]).unwrap(); + assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); + assert_eq!(result[0].as_ref().unwrap().confirmations, 2); + } + + #[tokio::test] + async fn report_oldest_tx_only() { + let mempool = vec![RawTx { + txid: "tx2".into(), + vout: vec![Vout { + value: 0.8, + script_pub_key: ScriptPubKey { address: Some("address1".into()) }, + }], + }]; + let latest_block_hash = "15".to_string(); + let mut blocks: HashMap = Default::default(); + for i in 1..16 { + blocks.insert( + i.to_string(), + Block { previousblockhash: (i - 1).to_string(), tx: vec![] }, + ); + } + blocks.insert( + "13".to_string(), + Block { + previousblockhash: "12".to_string(), + tx: vec![RawTx { + txid: "tx1".into(), + vout: vec![Vout { + value: 12.5, + script_pub_key: ScriptPubKey { address: Some("address1".into()) }, + }], + }], + }, + ); + let btc = MockBtcRpc { mempool: mempool.clone(), latest_block_hash, blocks }; + let cache: Cache = Default::default(); + let cache = get_updated_cache(btc.clone(), cache).await.unwrap(); + let result = lookup_transactions(&cache, &["address1".into()]).unwrap(); + assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); + assert_eq!(result[0].as_ref().unwrap().confirmations, 3); + assert_eq!(result[0].as_ref().unwrap().value, 12.5); + } +} From b1fe871f4e8d052dbc70a5f7d550b5885ce6ec84 Mon Sep 17 00:00:00 2001 From: kylezs Date: Tue, 17 Oct 2023 00:11:00 +1100 Subject: [PATCH 5/8] feat: bouncer command for submitting runtime upgrades (#4122) * feat: bouncer command for submitting runtime upgrades * chore: linting --- bouncer/commands/submit_runtime_upgrade.ts | 31 ++++++++++++++++++++ bouncer/shared/submit_runtime_upgrade.ts | 33 ++++++++++++++++++++++ 2 files changed, 64 insertions(+) create mode 100755 bouncer/commands/submit_runtime_upgrade.ts create mode 100755 bouncer/shared/submit_runtime_upgrade.ts diff --git a/bouncer/commands/submit_runtime_upgrade.ts b/bouncer/commands/submit_runtime_upgrade.ts new file mode 100755 index 0000000000..e2a4f012b8 --- /dev/null +++ b/bouncer/commands/submit_runtime_upgrade.ts @@ -0,0 +1,31 @@ +#!/usr/bin/env -S pnpm tsx +// INSTRUCTIONS +// +// This command takes 1 mandatory argument, and 2 optional arguments. +// Arguments: +// 1. Path to the runtime wasm file +// 2. Optional: A JSON string representing the semver restriction for the upgrade. If not provided, the upgrade will not be restricted by semver. +// 3. Optional: A number representing the percentage of nodes that must be upgraded before the upgrade will be allowed to proceed. If not provided, the upgrade will not be restricted by the number of nodes that have upgraded. +// +// For example: ./commands/submit_runtime_upgrade.ts /path/to/state_chain_runtime.compact.compressed.wasm '{"major": 1, "minor": 2, "patch": 3}' 50 + +import { submitRuntimeUpgrade } from '../shared/submit_runtime_upgrade'; +import { runWithTimeout } from '../shared/utils'; + +async function main() { + const wasmPath = process.argv[2]; + + const arg3 = process.argv[3].trim(); + const semverRestriction = arg3 ? JSON.parse(arg3) : undefined; + + const arg4 = process.argv[4].trim(); + const percentNodesUpgraded = arg4 ? Number(arg4) : undefined; + + await submitRuntimeUpgrade(wasmPath, semverRestriction, percentNodesUpgraded); + process.exit(0); +} + +runWithTimeout(main(), 20000).catch((error) => { + console.error(error); + process.exit(-1); +}); diff --git a/bouncer/shared/submit_runtime_upgrade.ts b/bouncer/shared/submit_runtime_upgrade.ts new file mode 100755 index 0000000000..c23a928efa --- /dev/null +++ b/bouncer/shared/submit_runtime_upgrade.ts @@ -0,0 +1,33 @@ +import { compactAddLength } from '@polkadot/util'; +import { promises as fs } from 'fs'; +import { submitGovernanceExtrinsic } from './cf_governance'; +import { getChainflipApi } from '../shared/utils'; + +async function readRuntimeWasmFromFile(filePath: string): Promise { + return compactAddLength(new Uint8Array(await fs.readFile(filePath))); +} + +// By default we don't want to restrict that any of the nodes need to be upgraded. +export async function submitRuntimeUpgrade( + wasmPath: string, + semverRestriction?: Record, + percentNodesUpgraded = 0, +) { + const runtimeWasm = await readRuntimeWasmFromFile(wasmPath); + + console.log('Submitting runtime upgrade.'); + const chainflip = await getChainflipApi(); + + let versionPercentRestriction; + if (semverRestriction && percentNodesUpgraded) { + versionPercentRestriction = [semverRestriction, percentNodesUpgraded]; + } else { + versionPercentRestriction = undefined; + } + + await submitGovernanceExtrinsic( + chainflip.tx.governance.chainflipRuntimeUpgrade(versionPercentRestriction, runtimeWasm), + ); + + console.log('Runtime upgrade completed.'); +} From b90af3588b83e0530721d548406b40163458607d Mon Sep 17 00:00:00 2001 From: Martin Rieke <121793148+martin-chainflip@users.noreply.github.com> Date: Mon, 16 Oct 2023 16:47:28 +0200 Subject: [PATCH 6/8] add support for hex encoded amounts on limit order and range order methods in LP API (#4120) * add support for hex encoded amounts on limit order methods in LP API * handle hex for range orders also --------- Co-authored-by: Alastair Holmes --- api/bin/chainflip-lp-api/src/main.rs | 120 +++++++++++++++++---------- 1 file changed, 76 insertions(+), 44 deletions(-) diff --git a/api/bin/chainflip-lp-api/src/main.rs b/api/bin/chainflip-lp-api/src/main.rs index 2bb7df0ca4..bd3e859dd8 100644 --- a/api/bin/chainflip-lp-api/src/main.rs +++ b/api/bin/chainflip-lp-api/src/main.rs @@ -1,4 +1,3 @@ -use cf_primitives::AssetAmount; use cf_utilities::{ task_scope::{task_scope, Scope}, try_parse_number_or_hex, AnyhowRpcError, @@ -17,7 +16,7 @@ use clap::Parser; use futures::FutureExt; use jsonrpsee::{core::async_trait, proc_macros::rpc, server::ServerBuilder}; use pallet_cf_pools::{IncreaseOrDecrease, OrderId, RangeOrderSize}; -use rpc_types::OpenSwapChannels; +use rpc_types::{OpenSwapChannels, OrderIdJson, RangeOrderSizeJson}; use sp_rpc::number::NumberOrHex; use std::{collections::BTreeMap, ops::Range, path::PathBuf}; use tracing::log; @@ -25,35 +24,48 @@ use tracing::log; /// Contains RPC interface types that differ from internal types. pub mod rpc_types { use super::*; - use chainflip_api::{lp, primitives::AssetAmount, queries::SwapChannelInfo}; + use anyhow::anyhow; + use chainflip_api::queries::SwapChannelInfo; + use pallet_cf_pools::AssetsMap; use serde::{Deserialize, Serialize}; use sp_rpc::number::NumberOrHex; - #[derive(Serialize, Deserialize)] - pub struct AssetAmounts { - /// The amount of the unstable asset. - /// - /// This is side `zero` in the AMM. - unstable: NumberOrHex, - /// The amount of the stable asset (USDC). - /// - /// This is side `one` in the AMM. - stable: NumberOrHex, - } - - impl TryFrom for lp::SideMap { - type Error = >::Error; + #[derive(Copy, Clone, Debug, Serialize, Deserialize)] + pub struct OrderIdJson(NumberOrHex); + impl TryFrom for OrderId { + type Error = anyhow::Error; - fn try_from(value: AssetAmounts) -> Result { - Ok(lp::SideMap::from_array([value.unstable.try_into()?, value.stable.try_into()?])) + fn try_from(value: OrderIdJson) -> Result { + value.0.try_into().map_err(|_| anyhow!("Failed to convert order id to u64")) } } - #[derive(Serialize, Deserialize)] - pub struct RangeOrder { - pub lower_tick: i32, - pub upper_tick: i32, - pub liquidity: u128, + #[derive(Copy, Clone, Debug, Serialize, Deserialize)] + pub enum RangeOrderSizeJson { + AssetAmounts { maximum: AssetsMap, minimum: AssetsMap }, + Liquidity { liquidity: NumberOrHex }, + } + impl TryFrom for RangeOrderSize { + type Error = anyhow::Error; + + fn try_from(value: RangeOrderSizeJson) -> Result { + Ok(match value { + RangeOrderSizeJson::AssetAmounts { maximum, minimum } => + RangeOrderSize::AssetAmounts { + maximum: maximum + .try_map(TryInto::try_into) + .map_err(|_| anyhow!("Failed to convert maximums to u128"))?, + minimum: minimum + .try_map(TryInto::try_into) + .map_err(|_| anyhow!("Failed to convert minimums to u128"))?, + }, + RangeOrderSizeJson::Liquidity { liquidity } => RangeOrderSize::Liquidity { + liquidity: liquidity + .try_into() + .map_err(|_| anyhow!("Failed to convert liquidity to u128"))?, + }, + }) + } } #[derive(Serialize, Deserialize, Clone)] @@ -95,10 +107,10 @@ pub trait Rpc { &self, base_asset: Asset, pair_asset: Asset, - id: OrderId, + id: OrderIdJson, tick_range: Option>, increase_or_decrease: IncreaseOrDecrease, - size: RangeOrderSize, + size: RangeOrderSizeJson, ) -> Result, AnyhowRpcError>; #[method(name = "set_range_order")] @@ -106,9 +118,9 @@ pub trait Rpc { &self, base_asset: Asset, pair_asset: Asset, - id: OrderId, + id: OrderIdJson, tick_range: Option>, - size: RangeOrderSize, + size: RangeOrderSizeJson, ) -> Result, AnyhowRpcError>; #[method(name = "update_limit_order")] @@ -116,10 +128,10 @@ pub trait Rpc { &self, sell_asset: Asset, buy_asset: Asset, - id: OrderId, + id: OrderIdJson, tick: Option, increase_or_decrease: IncreaseOrDecrease, - amount: AssetAmount, + amount: NumberOrHex, ) -> Result, AnyhowRpcError>; #[method(name = "set_limit_order")] @@ -127,9 +139,9 @@ pub trait Rpc { &self, sell_asset: Asset, buy_asset: Asset, - id: OrderId, + id: OrderIdJson, tick: Option, - amount: AssetAmount, + amount: NumberOrHex, ) -> Result, AnyhowRpcError>; #[method(name = "asset_balances")] @@ -205,15 +217,22 @@ impl RpcServer for RpcServerImpl { &self, base_asset: Asset, pair_asset: Asset, - id: OrderId, + id: OrderIdJson, tick_range: Option>, increase_or_decrease: IncreaseOrDecrease, - size: RangeOrderSize, + size: RangeOrderSizeJson, ) -> Result, AnyhowRpcError> { Ok(self .api .lp_api() - .update_range_order(base_asset, pair_asset, id, tick_range, increase_or_decrease, size) + .update_range_order( + base_asset, + pair_asset, + id.try_into()?, + tick_range, + increase_or_decrease, + size.try_into()?, + ) .await?) } @@ -221,14 +240,14 @@ impl RpcServer for RpcServerImpl { &self, base_asset: Asset, pair_asset: Asset, - id: OrderId, + id: OrderIdJson, tick_range: Option>, - size: RangeOrderSize, + size: RangeOrderSizeJson, ) -> Result, AnyhowRpcError> { Ok(self .api .lp_api() - .set_range_order(base_asset, pair_asset, id, tick_range, size) + .set_range_order(base_asset, pair_asset, id.try_into()?, tick_range, size.try_into()?) .await?) } @@ -236,15 +255,22 @@ impl RpcServer for RpcServerImpl { &self, sell_asset: Asset, buy_asset: Asset, - id: OrderId, + id: OrderIdJson, tick: Option, increase_or_decrease: IncreaseOrDecrease, - amount: AssetAmount, + amount: NumberOrHex, ) -> Result, AnyhowRpcError> { Ok(self .api .lp_api() - .update_limit_order(sell_asset, buy_asset, id, tick, increase_or_decrease, amount) + .update_limit_order( + sell_asset, + buy_asset, + id.try_into()?, + tick, + increase_or_decrease, + try_parse_number_or_hex(amount)?, + ) .await?) } @@ -252,14 +278,20 @@ impl RpcServer for RpcServerImpl { &self, sell_asset: Asset, buy_asset: Asset, - id: OrderId, + id: OrderIdJson, tick: Option, - sell_amount: AssetAmount, + sell_amount: NumberOrHex, ) -> Result, AnyhowRpcError> { Ok(self .api .lp_api() - .set_limit_order(sell_asset, buy_asset, id, tick, sell_amount) + .set_limit_order( + sell_asset, + buy_asset, + id.try_into()?, + tick, + try_parse_number_or_hex(sell_amount)?, + ) .await?) } From 03c4f3c2f4530b3d36d020b62d801f850c479f2e Mon Sep 17 00:00:00 2001 From: Maxim Shishmarev Date: Tue, 17 Oct 2023 12:38:07 +1100 Subject: [PATCH 7/8] Feat: ingress-egress tracking for DOT (#4121) * refactor: extract dot witnessing procedures into reusable functions * feat: add dot ingress-egress tracking * chore: silence clippy type complexity * chore: address minor review comments --- Cargo.lock | 1 + .../Cargo.toml | 1 + .../src/main.rs | 29 ++-- .../src/witnessing.rs | 31 +++- .../src/witnessing/dot.rs | 93 +++++++++++ engine/src/witness.rs | 2 +- engine/src/witness/dot.rs | 156 +++++++++++------- engine/src/witness/dot/dot_deposits.rs | 2 + 8 files changed, 239 insertions(+), 76 deletions(-) create mode 100644 api/bin/chainflip-ingress-egress-tracker/src/witnessing/dot.rs diff --git a/Cargo.lock b/Cargo.lock index 035054cc9f..582789ff1b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1524,6 +1524,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "cf-chains", "cf-primitives", "chainflip-engine", "futures", diff --git a/api/bin/chainflip-ingress-egress-tracker/Cargo.toml b/api/bin/chainflip-ingress-egress-tracker/Cargo.toml index 20b4ce28f3..593a8b00c7 100644 --- a/api/bin/chainflip-ingress-egress-tracker/Cargo.toml +++ b/api/bin/chainflip-ingress-egress-tracker/Cargo.toml @@ -29,3 +29,4 @@ utilities = { path = "../../../utilities" } cf-primitives = { path = "../../../state-chain/primitives" } pallet-cf-environment = { path = "../../../state-chain/pallets/cf-environment" } state-chain-runtime = { path = "../../../state-chain/runtime" } +cf-chains = { path = "../../../state-chain/chains" } diff --git a/api/bin/chainflip-ingress-egress-tracker/src/main.rs b/api/bin/chainflip-ingress-egress-tracker/src/main.rs index b59c3fa605..67ccef6243 100644 --- a/api/bin/chainflip-ingress-egress-tracker/src/main.rs +++ b/api/bin/chainflip-ingress-egress-tracker/src/main.rs @@ -7,10 +7,12 @@ use utilities::task_scope; mod witnessing; +#[derive(Clone)] pub struct DepositTrackerSettings { eth_node: WsHttpEndpoints, // The key shouldn't be necessary, but the current witnesser wants this eth_key_path: PathBuf, + dot_node: WsHttpEndpoints, state_chain_ws_endpoint: String, } @@ -86,20 +88,27 @@ async fn main() -> anyhow::Result<()> { eth_key_temp_file .write_all(b"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef") .unwrap(); - let eth_key_path = eth_key_temp_file.path(); - - let eth_ws_endpoint = env::var("ETH_WS_ENDPOINT").unwrap_or("ws://localhost:8546".to_string()); - let eth_http_endpoint = - env::var("ETH_HTTP_ENDPOINT").unwrap_or("http://localhost:8545".to_string()); - let sc_ws_endpoint = env::var("SC_WS_ENDPOINT").unwrap_or("ws://localhost:9944".to_string()); let settings = DepositTrackerSettings { eth_node: WsHttpEndpoints { - ws_endpoint: eth_ws_endpoint.into(), - http_endpoint: eth_http_endpoint.into(), + ws_endpoint: env::var("ETH_WS_ENDPOINT") + .unwrap_or("ws://localhost:8546".to_string()) + .into(), + http_endpoint: env::var("ETH_HTTP_ENDPOINT") + .unwrap_or("http://localhost:8545".to_string()) + .into(), + }, + eth_key_path: eth_key_temp_file.path().into(), + dot_node: WsHttpEndpoints { + ws_endpoint: env::var("DOT_WS_ENDPOINT") + .unwrap_or("ws://localhost:9945".to_string()) + .into(), + http_endpoint: env::var("DOT_HTTP_ENDPOINT") + .unwrap_or("http://localhost:9945".to_string()) + .into(), }, - eth_key_path: eth_key_path.into(), - state_chain_ws_endpoint: sc_ws_endpoint, + state_chain_ws_endpoint: env::var("SC_WS_ENDPOINT") + .unwrap_or("ws://localhost:9944".to_string()), }; task_scope::task_scope(|scope| async move { start(scope, settings).await }.boxed()).await diff --git a/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs b/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs index b40df7b2cf..c8baa66068 100644 --- a/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs +++ b/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs @@ -1,21 +1,24 @@ pub mod btc; +mod dot; mod eth; use std::collections::HashMap; +use cf_chains::dot::PolkadotHash; use cf_primitives::chains::assets::eth::Asset; use chainflip_engine::{ state_chain_observer::{ self, client::{chain_api::ChainApi, storage_api::StorageApi, StateChainClient}, }, - witness::common::epoch_source::EpochSource, + witness::common::{epoch_source::EpochSource, STATE_CHAIN_CONNECTION}, }; use sp_core::H160; use utilities::task_scope; use crate::DepositTrackerSettings; +#[derive(Clone)] struct EnvironmentParameters { eth_chain_id: u64, eth_vault_address: H160, @@ -23,6 +26,7 @@ struct EnvironmentParameters { flip_contract_address: H160, usdc_contract_address: H160, supported_erc20_tokens: HashMap, + dot_genesis_hash: PolkadotHash, } async fn get_env_parameters(state_chain_client: &StateChainClient<()>) -> EnvironmentParameters { @@ -62,6 +66,15 @@ async fn get_env_parameters(state_chain_client: &StateChainClient<()>) -> Enviro .map(|(asset, address)| (address, asset.into())) .collect(); + let dot_genesis_hash = PolkadotHash::from( + state_chain_client + .storage_value::>( + state_chain_client.latest_finalized_hash(), + ) + .await + .expect(STATE_CHAIN_CONNECTION), + ); + EnvironmentParameters { eth_chain_id, eth_vault_address, @@ -69,6 +82,7 @@ async fn get_env_parameters(state_chain_client: &StateChainClient<()>) -> Enviro usdc_contract_address, eth_address_checker_address, supported_erc20_tokens, + dot_genesis_hash, } } @@ -108,10 +122,21 @@ pub(super) async fn start( scope, state_chain_client.clone(), state_chain_stream.clone(), - settings, - env_params, + settings.clone(), + env_params.clone(), epoch_source.clone(), + witness_call.clone(), + ) + .await?; + + dot::start( + scope, witness_call, + settings, + env_params, + state_chain_client, + state_chain_stream, + epoch_source, ) .await?; diff --git a/api/bin/chainflip-ingress-egress-tracker/src/witnessing/dot.rs b/api/bin/chainflip-ingress-egress-tracker/src/witnessing/dot.rs new file mode 100644 index 0000000000..5ea1047b8a --- /dev/null +++ b/api/bin/chainflip-ingress-egress-tracker/src/witnessing/dot.rs @@ -0,0 +1,93 @@ +use std::sync::Arc; + +use cf_primitives::EpochIndex; +use chainflip_engine::{ + dot::retry_rpc::DotRetryRpcClient, + settings::NodeContainer, + state_chain_observer::client::{ + storage_api::StorageApi, StateChainClient, StateChainStreamApi, + }, + witness::{ + common::{ + chain_source::extension::ChainSourceExt, epoch_source::EpochSourceBuilder, + STATE_CHAIN_CONNECTION, + }, + dot::{filter_map_events, process_egress, proxy_added_witnessing, DotUnfinalisedSource}, + }, +}; +use futures::Future; +use utilities::task_scope::Scope; + +use crate::DepositTrackerSettings; + +use super::EnvironmentParameters; + +pub(super) async fn start( + scope: &Scope<'_, anyhow::Error>, + witness_call: ProcessCall, + settings: DepositTrackerSettings, + env_params: EnvironmentParameters, + state_chain_client: Arc>, + state_chain_stream: impl StateChainStreamApi + Clone, + epoch_source: EpochSourceBuilder<'_, '_, StateChainClient<()>, (), ()>, +) -> anyhow::Result<()> +where + ProcessCall: Fn(state_chain_runtime::RuntimeCall, EpochIndex) -> ProcessingFut + + Send + + Sync + + Clone + + 'static, + ProcessingFut: Future + Send + 'static, +{ + let dot_client = DotRetryRpcClient::new( + scope, + NodeContainer { primary: settings.dot_node, backup: None }, + env_params.dot_genesis_hash, + )?; + + let epoch_source = epoch_source + .filter_map( + |state_chain_client, _epoch_index, hash, _info| async move { + state_chain_client + .storage_value::>( + hash, + ) + .await + .expect(STATE_CHAIN_CONNECTION) + }, + |_state_chain_client, _epoch, _block_hash, historic_info| async move { historic_info }, + ) + .await; + + let vaults = epoch_source.vaults().await; + + DotUnfinalisedSource::new(dot_client.clone()) + .shared(scope) + .then(|header| async move { header.data.iter().filter_map(filter_map_events).collect() }) + .strictly_monotonic() + .shared(scope) + .chunk_by_vault(vaults.clone()) + .deposit_addresses(scope, state_chain_stream.clone(), state_chain_client.clone()) + .await + // Deposit witnessing + .dot_deposits(witness_call.clone()) + // Proxy added witnessing + .then({ + let witness_call = witness_call.clone(); + move |epoch, header| proxy_added_witnessing(epoch, header, witness_call.clone()) + }) + // Broadcast success + .egress_items(scope, state_chain_stream.clone(), state_chain_client.clone()) + .await + .then({ + let witness_call = witness_call.clone(); + let dot_client = dot_client.clone(); + move |epoch, header| { + process_egress(epoch, header, witness_call.clone(), dot_client.clone()) + } + }) + .logging("witnessing") + .spawn(scope); + + Ok(()) +} diff --git a/engine/src/witness.rs b/engine/src/witness.rs index 600c6d268f..cfb99c4c7f 100644 --- a/engine/src/witness.rs +++ b/engine/src/witness.rs @@ -1,5 +1,5 @@ mod btc; pub mod common; -mod dot; +pub mod dot; pub mod eth; pub mod start; diff --git a/engine/src/witness/dot.rs b/engine/src/witness/dot.rs index 04e24320c8..08d093c700 100644 --- a/engine/src/witness/dot.rs +++ b/engine/src/witness/dot.rs @@ -3,7 +3,8 @@ mod dot_deposits; mod dot_source; use cf_chains::dot::{ - PolkadotAccountId, PolkadotBalance, PolkadotExtrinsicIndex, PolkadotUncheckedExtrinsic, + PolkadotAccountId, PolkadotBalance, PolkadotExtrinsicIndex, PolkadotHash, PolkadotSignature, + PolkadotUncheckedExtrinsic, }; use cf_primitives::{EpochIndex, PolkadotBlockNumber, TxId}; use futures_core::Future; @@ -29,9 +30,13 @@ use crate::{ witness::common::chain_source::extension::ChainSourceExt, }; use anyhow::Result; -use dot_source::{DotFinalisedSource, DotUnfinalisedSource}; +pub use dot_source::{DotFinalisedSource, DotUnfinalisedSource}; -use super::common::{epoch_source::EpochSourceBuilder, STATE_CHAIN_CONNECTION}; +use super::common::{ + chain_source::Header, + epoch_source::{EpochSourceBuilder, Vault}, + STATE_CHAIN_CONNECTION, +}; // To generate the metadata file, use the subxt-cli tool (`cargo install subxt-cli`): // subxt metadata --format=json --pallets Proxy,Balances,TransactionPayment --url @@ -51,7 +56,7 @@ use polkadot::{ transaction_payment::events::TransactionFeePaid, }; -fn filter_map_events( +pub fn filter_map_events( res_event_details: Result, subxt::Error>, ) -> Option<(Phase, EventWrapper)> { match res_event_details { @@ -81,6 +86,87 @@ fn filter_map_events( } } +pub async fn proxy_added_witnessing( + epoch: Vault, + header: Header, BTreeSet)>, + process_call: ProcessCall, +) -> (Vec<(Phase, EventWrapper)>, BTreeSet) +where + ProcessCall: Fn(state_chain_runtime::RuntimeCall, EpochIndex) -> ProcessingFut + + Send + + Sync + + Clone + + 'static, + ProcessingFut: Future + Send + 'static, +{ + let (events, mut broadcast_indices) = header.data; + + let (vault_key_rotated_calls, mut proxy_added_broadcasts) = + proxy_addeds(header.index, &events, &epoch.info.1); + broadcast_indices.append(&mut proxy_added_broadcasts); + + for call in vault_key_rotated_calls { + process_call(call, epoch.index).await; + } + + (events, broadcast_indices) +} + +#[allow(clippy::type_complexity)] +pub async fn process_egress( + epoch: Vault, + header: Header< + PolkadotBlockNumber, + PolkadotHash, + ((Vec<(Phase, EventWrapper)>, BTreeSet), Vec), + >, + process_call: ProcessCall, + dot_client: DotRetryRpcClient, +) where + ProcessCall: Fn(state_chain_runtime::RuntimeCall, EpochIndex) -> ProcessingFut + + Send + + Sync + + Clone + + 'static, + ProcessingFut: Future + Send + 'static, +{ + let ((events, broadcast_indices), monitored_egress_ids) = header.data; + + let extrinsics = dot_client.extrinsics(header.hash).await; + + for (extrinsic_index, tx_fee) in transaction_fee_paids(&broadcast_indices, &events) { + let xt = extrinsics.get(extrinsic_index as usize).expect( + "We know this exists since we got + this index from the event, from the block we are querying.", + ); + let mut xt_bytes = xt.0.as_slice(); + + let unchecked = PolkadotUncheckedExtrinsic::decode(&mut xt_bytes); + if let Ok(unchecked) = unchecked { + if let Some(signature) = unchecked.signature() { + if monitored_egress_ids.contains(&signature) { + tracing::info!("Witnessing transaction_succeeded. signature: {signature:?}"); + process_call( + pallet_cf_broadcast::Call::<_, PolkadotInstance>::transaction_succeeded { + tx_out_id: signature, + signer_id: epoch.info.1, + tx_fee, + } + .into(), + epoch.index, + ) + .await; + } + } + } else { + // We expect this to occur when attempting to decode + // a transaction that was not sent by us. + // We can safely ignore it, but we log it in case. + tracing::debug!("Failed to decode UncheckedExtrinsic {unchecked:?}"); + } + } +} + pub async fn start< StateChainClient, StateChainStream, @@ -168,22 +254,8 @@ where // Proxy added witnessing .then({ let process_call = process_call.clone(); - move |epoch, header| { - let process_call = process_call.clone(); - async move { - let (events, mut broadcast_indices) = header.data; - - let (vault_key_rotated_calls, mut proxy_added_broadcasts) = proxy_addeds(header.index, &events, &epoch.info.1); - broadcast_indices.append(&mut proxy_added_broadcasts); - - for call in vault_key_rotated_calls { - process_call(call, epoch.index).await; - } - - (events, broadcast_indices) - } - }} - ) + move |epoch, header| proxy_added_witnessing(epoch, header, process_call.clone()) + }) // Broadcast success .egress_items(scope, state_chain_stream.clone(), state_chain_client.clone()) .await @@ -191,49 +263,9 @@ where let process_call = process_call.clone(); let dot_client = dot_client.clone(); move |epoch, header| { - let process_call = process_call.clone(); - let dot_client = dot_client.clone(); - async move { - let ((events, broadcast_indices), monitored_egress_ids) = header.data; - - let extrinsics = dot_client - .extrinsics(header.hash) - .await; - - for (extrinsic_index, tx_fee) in transaction_fee_paids(&broadcast_indices, &events) { - let xt = extrinsics.get(extrinsic_index as usize).expect("We know this exists since we got this index from the event, from the block we are querying."); - let mut xt_bytes = xt.0.as_slice(); - - let unchecked = PolkadotUncheckedExtrinsic::decode(&mut xt_bytes); - if let Ok(unchecked) = unchecked { - if let Some(signature) = unchecked.signature() { - if monitored_egress_ids.contains(&signature) { - tracing::info!("Witnessing transaction_succeeded. signature: {signature:?}"); - process_call( - pallet_cf_broadcast::Call::< - _, - PolkadotInstance, - >::transaction_succeeded { - tx_out_id: signature, - signer_id: epoch.info.1, - tx_fee, - } - .into(), - epoch.index, - ).await; - } - } - } else { - // We expect this to occur when attempting to decode - // a transaction that was not sent by us. - // We can safely ignore it, but we log it in case. - tracing::debug!("Failed to decode UncheckedExtrinsic {unchecked:?}"); - } - } - } - } + process_egress(epoch, header, process_call.clone(), dot_client.clone()) } - ) + }) .continuous("Polkadot".to_string(), db) .logging("witnessing") .spawn(scope); diff --git a/engine/src/witness/dot/dot_deposits.rs b/engine/src/witness/dot/dot_deposits.rs index 3c6cd7f212..6cd7a45350 100644 --- a/engine/src/witness/dot/dot_deposits.rs +++ b/engine/src/witness/dot/dot_deposits.rs @@ -33,6 +33,7 @@ impl ChunkedByVaultBuilder { Data = (Vec<(Phase, EventWrapper)>, BTreeSet), Chain = Polkadot, ExtraInfo = PolkadotAccountId, + ExtraHistoricInfo = (), >, > where @@ -42,6 +43,7 @@ impl ChunkedByVaultBuilder { Data = (Vec<(Phase, EventWrapper)>, Addresses), Chain = Polkadot, ExtraInfo = PolkadotAccountId, + ExtraHistoricInfo = (), >, ProcessCall: Fn(state_chain_runtime::RuntimeCall, EpochIndex) -> ProcessingFut + Send From d243595c773dc4dc707589ae89d4c069b1416a8c Mon Sep 17 00:00:00 2001 From: Albert Llimos <53186777+albert-llimos@users.noreply.github.com> Date: Tue, 17 Oct 2023 09:48:10 +0200 Subject: [PATCH 8/8] chore: get gas parameters from statechain event (#4125) * chore: get gas parameters from statechain event * chore: improvements --- bouncer/shared/gaslimit_ccm.ts | 56 ++++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/bouncer/shared/gaslimit_ccm.ts b/bouncer/shared/gaslimit_ccm.ts index 88c7abf9ec..7c3e7964f4 100644 --- a/bouncer/shared/gaslimit_ccm.ts +++ b/bouncer/shared/gaslimit_ccm.ts @@ -20,7 +20,6 @@ import { signAndSendTxEthSilent } from './send_eth'; // on the lenght of the message. const MIN_BASE_GAS_OVERHEAD = 100000; const BASE_GAS_OVERHEAD_BUFFER = 20000; -const ETHEREUM_BASE_FEE_MULTIPLIER = 2; const CFE_GAS_LIMIT_CAP = 10000000; // Arbitrary gas consumption values for testing. The total default gas used is then ~360-380k depending on the parameters. let DEFAULT_GAS_CONSUMPTION = 260000; @@ -131,6 +130,20 @@ async function testGasLimitSwap( }, () => swapScheduledObserved, ); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const broadcastIdToTxPayload: { [key: string]: any } = {}; + const broadcastRequesthandle = observeEvent( + 'ethereumBroadcaster:TransactionBroadcastRequest', + chainflipApi, + (event) => { + broadcastIdToTxPayload[event.data.broadcastAttemptId.broadcastId] = + event.data.transactionPayload; + return false; + }, + () => swapScheduledObserved, + ); + await send(sourceAsset, depositAddress); const { @@ -141,36 +154,33 @@ async function testGasLimitSwap( !( swapId in swapIdToEgressAmount && swapId in swapIdToEgressId && - swapIdToEgressId[swapId] in egressIdToBroadcastId + swapIdToEgressId[swapId] in egressIdToBroadcastId && + egressIdToBroadcastId[swapIdToEgressId[swapId]] in broadcastIdToTxPayload ) ) { await sleep(3000); } swapScheduledObserved = true; - await Promise.all([swapExecutedHandle, swapEgressHandle, ccmBroadcastHandle]); + await Promise.all([ + swapExecutedHandle, + swapEgressHandle, + ccmBroadcastHandle, + broadcastRequesthandle, + ]); const egressBudgetAmount = sourceAsset !== Assets.ETH ? Number(swapIdToEgressAmount[swapId].replace(/,/g, '')) : messageMetadata.gasBudget; - const { baseFee, priorityFee } = await getChainFees(); - - // On the state chain the gasLimit is calculated from the egressBudget and the MaxFeePerGas - // max_fee_per_gas = 2 * baseFee + priorityFee - // gasLimitBudget = egressBudgetAmount / (1 * baseFee + priorityFee) - const currentFeePerGas = baseFee + priorityFee; - const maxFeePerGas = ETHEREUM_BASE_FEE_MULTIPLIER * baseFee + priorityFee; - const gasLimitBudget = egressBudgetAmount / currentFeePerGas; + const txPayload = broadcastIdToTxPayload[egressIdToBroadcastId[swapIdToEgressId[swapId]]]; + const maxFeePerGas = Number(txPayload.maxFeePerGas.replace(/,/g, '')); + const gasLimitBudget = Number(txPayload.gasLimit.replace(/,/g, '')); const byteLength = Web3.utils.hexToBytes(messageMetadata.message).length; const minGasLimitRequired = gasConsumption + MIN_BASE_GAS_OVERHEAD + byteLength * GAS_PER_BYTE; - console.log( - `${tag} baseFee: ${baseFee}, priorityFee: ${priorityFee}, maxFeePerGas: ${maxFeePerGas}`, - ); - // This is a very rough approximation for the gas limit required. A buffer is added to account for that. if (minGasLimitRequired + BASE_GAS_OVERHEAD_BUFFER >= gasLimitBudget) { observeCcmReceived( @@ -186,7 +196,9 @@ async function testGasLimitSwap( } }); // Expect Broadcast Aborted - console.log(`${tag} Gas budget is too low. Expecting BroadcastAborted event.`); + console.log( + `${tag} Gas budget of ${gasLimitBudget} is too low. Expecting BroadcastAborted event.`, + ); await observeEvent( 'ethereumBroadcaster:BroadcastAborted', await getChainflipApi(), @@ -199,6 +211,8 @@ async function testGasLimitSwap( }`, ); } else if (minGasLimitRequired < gasLimitBudget) { + console.log(`${tag} Gas budget ${gasLimitBudget}. Expecting successful broadcast.`); + const ccmReceived = await observeCcmReceived( sourceAsset, destAsset, @@ -217,18 +231,20 @@ async function testGasLimitSwap( // Priority fee is not fully deterministic so we just log it for now if (tx.maxFeePerGas !== maxFeePerGas.toString()) { - console.log( - `${tag} Max fee per gas in the transaction ${tx.maxFeePerGas} different than expected ${maxFeePerGas}`, + throw new Error( + `${tag} Tx Max fee per gas ${tx.maxFeePerGas} different than expected ${maxFeePerGas}`, ); } - if (Math.trunc(tx.gas) !== Math.min(Math.trunc(gasLimitBudget), CFE_GAS_LIMIT_CAP)) { - throw new Error(`${tag} Gas limit in the transaction is different than the one expected!`); + if (tx.gas !== Math.min(gasLimitBudget, CFE_GAS_LIMIT_CAP)) { + throw new Error(`${tag} Tx gas limit ${tx.gas} different than expected ${gasLimitBudget}`); } // This should not happen by definition, as maxFeePerGas * gasLimit < egressBudgetAmount if (totalFee > egressBudgetAmount) { throw new Error(`${tag} Transaction fee paid is higher than the budget paid by the user!`); } console.log(`${tag} Swap success! TxHash: ${ccmReceived?.txHash as string}!`); + } else { + console.log(`${tag} Budget too tight, can't determine if swap should succeed.`); } }