diff --git a/api/bin/chainflip-ingress-egress-tracker/src/main.rs b/api/bin/chainflip-ingress-egress-tracker/src/main.rs index 67ccef6243..0624361a11 100644 --- a/api/bin/chainflip-ingress-egress-tracker/src/main.rs +++ b/api/bin/chainflip-ingress-egress-tracker/src/main.rs @@ -1,4 +1,4 @@ -use chainflip_engine::settings::WsHttpEndpoints; +use chainflip_engine::settings::{HttpBasicAuthEndpoint, WsHttpEndpoints}; use futures::FutureExt; use jsonrpsee::{core::Error, server::ServerBuilder, RpcModule}; use std::{env, io::Write, net::SocketAddr, path::PathBuf}; @@ -14,6 +14,7 @@ pub struct DepositTrackerSettings { eth_key_path: PathBuf, dot_node: WsHttpEndpoints, state_chain_ws_endpoint: String, + btc: HttpBasicAuthEndpoint, } async fn start( @@ -26,7 +27,7 @@ async fn start( .expect("setting default subscriber failed"); let mut module = RpcModule::new(()); - let btc_tracker = witnessing::btc::start(scope).await; + let btc_tracker = witnessing::btc_mempool::start(scope, settings.btc.clone()).await; module.register_async_method("status", move |arguments, _context| { let btc_tracker = btc_tracker.clone(); @@ -109,6 +110,13 @@ async fn main() -> anyhow::Result<()> { }, state_chain_ws_endpoint: env::var("SC_WS_ENDPOINT") .unwrap_or("ws://localhost:9944".to_string()), + btc: HttpBasicAuthEndpoint { + http_endpoint: env::var("BTC_ENDPOINT") + .unwrap_or("http://127.0.0.1:8332".to_string()) + .into(), + basic_auth_user: env::var("BTC_USERNAME").unwrap_or("flip".to_string()), + basic_auth_password: env::var("BTC_PASSWORD").unwrap_or("flip".to_string()), + }, }; task_scope::task_scope(|scope| async move { start(scope, settings).await }.boxed()).await diff --git a/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs b/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs index c8baa66068..db5fe79359 100644 --- a/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs +++ b/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs @@ -1,4 +1,5 @@ -pub mod btc; +mod btc; +pub mod btc_mempool; mod dot; mod eth; @@ -27,6 +28,7 @@ struct EnvironmentParameters { usdc_contract_address: H160, supported_erc20_tokens: HashMap, dot_genesis_hash: PolkadotHash, + btc_network: cf_chains::btc::BitcoinNetwork, } async fn get_env_parameters(state_chain_client: &StateChainClient<()>) -> EnvironmentParameters { @@ -66,14 +68,20 @@ async fn get_env_parameters(state_chain_client: &StateChainClient<()>) -> Enviro .map(|(asset, address)| (address, asset.into())) .collect(); - let dot_genesis_hash = PolkadotHash::from( - state_chain_client - .storage_value::>( - state_chain_client.latest_finalized_hash(), - ) - .await - .expect(STATE_CHAIN_CONNECTION), - ); + let dot_genesis_hash = state_chain_client + .storage_value::>( + state_chain_client.latest_finalized_hash(), + ) + .await + .expect(STATE_CHAIN_CONNECTION); + + let btc_network = state_chain_client + .storage_value::>( + state_chain_client.latest_finalized_hash(), + ) + .await + .expect(STATE_CHAIN_CONNECTION) + .into(); EnvironmentParameters { eth_chain_id, @@ -83,6 +91,7 @@ async fn get_env_parameters(state_chain_client: &StateChainClient<()>) -> Enviro eth_address_checker_address, supported_erc20_tokens, dot_genesis_hash, + btc_network, } } @@ -129,6 +138,17 @@ pub(super) async fn start( ) .await?; + btc::start( + scope, + witness_call.clone(), + settings.clone(), + env_params.clone(), + state_chain_client.clone(), + state_chain_stream.clone(), + epoch_source.clone(), + ) + .await?; + dot::start( scope, witness_call, diff --git a/api/bin/chainflip-ingress-egress-tracker/src/witnessing/btc.rs b/api/bin/chainflip-ingress-egress-tracker/src/witnessing/btc.rs index 67b63c1fea..ec611e17e2 100644 --- a/api/bin/chainflip-ingress-egress-tracker/src/witnessing/btc.rs +++ b/api/bin/chainflip-ingress-egress-tracker/src/witnessing/btc.rs @@ -1,533 +1,69 @@ -use std::{ - collections::{HashMap, HashSet}, - env, - sync::{Arc, Mutex}, - time::Duration, +use std::sync::Arc; + +use cf_primitives::EpochIndex; +use chainflip_engine::{ + btc::retry_rpc::{BtcRetryRpcApi, BtcRetryRpcClient}, + settings::NodeContainer, + state_chain_observer::client::{StateChainClient, StateChainStreamApi}, + witness::{ + btc::{btc_source::BtcSource, process_egress}, + common::{chain_source::extension::ChainSourceExt, epoch_source::EpochSourceBuilder}, + }, }; - -use anyhow::anyhow; -use async_trait::async_trait; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use tracing::{error, info}; -use utilities::task_scope; - -type TxHash = String; -type BlockHash = String; -type Address = String; - -#[derive(Deserialize)] -struct BestBlockResult { - result: BlockHash, -} - -#[derive(Deserialize)] -struct MemPoolResult { - result: Vec, -} - -#[derive(Deserialize, Clone)] -struct ScriptPubKey { - address: Option
, -} - -#[derive(Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -struct Vout { - value: f64, - script_pub_key: ScriptPubKey, -} - -#[derive(Deserialize, Clone)] -struct RawTx { - txid: TxHash, - vout: Vec, -} - -#[derive(Deserialize)] -struct RawTxResult { - result: Option, -} - -#[derive(Deserialize, Clone)] -struct Block { - previousblockhash: BlockHash, - tx: Vec, -} - -#[derive(Deserialize)] -struct BlockResult { - result: Block, -} - -#[derive(Clone, Serialize)] -pub struct QueryResult { - confirmations: u32, - destination: Address, - value: f64, - tx_hash: TxHash, -} - -#[derive(Default, Clone)] -enum CacheStatus { - #[default] - Init, - Ready, - Down, -} - -#[derive(Default, Clone)] -struct Cache { - status: CacheStatus, - best_block_hash: BlockHash, - transactions: HashMap, - known_tx_hashes: HashSet, -} - -const SAFETY_MARGIN: u32 = 10; -const REFRESH_INTERVAL: u64 = 10; - -#[async_trait] -trait BtcNode { - async fn getrawmempool(&self) -> anyhow::Result>; - async fn getrawtransactions( - &self, - tx_hashes: Vec, - ) -> anyhow::Result>>; - async fn getbestblockhash(&self) -> anyhow::Result; - async fn getblock(&self, block_hash: BlockHash) -> anyhow::Result; -} - -struct BtcRpc; - -impl BtcRpc { - async fn call( - &self, - method: &str, - params: Vec<&str>, - ) -> anyhow::Result> { - info!("Calling {} with batch size of {}", method, params.len()); - let url = env::var("BTC_ENDPOINT").unwrap_or("http://127.0.0.1:8332".to_string()); - let body = params - .iter() - .map(|param| { - format!(r#"{{"jsonrpc":"1.0","id":0,"method":"{}","params":[{}]}}"#, method, param) - }) - .collect::>() - .join(","); - - let username = env::var("BTC_USERNAME").unwrap_or("flip".to_string()); - let password = env::var("BTC_PASSWORD").unwrap_or("flip".to_string()); - reqwest::Client::new() - .post(url) - .basic_auth(username, Some(password)) - .header("Content-Type", "text/plain") - .body(format!("[{}]", body)) - .send() - .await? - .json::>() - .await - .map_err(|err| anyhow!(err)) - .and_then(|result| { - if result.len() == params.len() { - Ok(result) - } else { - Err(anyhow!("Batched request returned an incorrect number of results")) +use futures::Future; +use utilities::task_scope::Scope; + +use crate::DepositTrackerSettings; + +use super::EnvironmentParameters; +pub(super) async fn start( + scope: &Scope<'_, anyhow::Error>, + witness_call: ProcessCall, + settings: DepositTrackerSettings, + env_params: EnvironmentParameters, + state_chain_client: Arc>, + state_chain_stream: impl StateChainStreamApi + Clone, + epoch_source: EpochSourceBuilder<'_, '_, StateChainClient<()>, (), ()>, +) -> anyhow::Result<()> +where + ProcessCall: Fn(state_chain_runtime::RuntimeCall, EpochIndex) -> ProcessingFut + + Send + + Sync + + Clone + + 'static, + ProcessingFut: Future + Send + 'static, +{ + let btc_client = BtcRetryRpcClient::new( + scope, + NodeContainer { primary: settings.btc, backup: None }, + env_params.btc_network, + ) + .await?; + + let vaults = epoch_source.vaults().await; + + BtcSource::new(btc_client.clone()) + .strictly_monotonic() + .then({ + let btc_client = btc_client.clone(); + move |header| { + let btc_client = btc_client.clone(); + async move { + let block = btc_client.block(header.hash).await; + (header.data, block.txdata) } - }) - } -} - -#[async_trait] -impl BtcNode for BtcRpc { - async fn getrawmempool(&self) -> anyhow::Result> { - self.call::("getrawmempool", vec![""]) - .await - .map(|x| x[0].result.clone()) - } - async fn getrawtransactions( - &self, - tx_hashes: Vec, - ) -> anyhow::Result>> { - let params = tx_hashes - .iter() - .map(|tx_hash| format!("\"{}\", true", tx_hash)) - .collect::>(); - Ok(self - .call::( - "getrawtransaction", - params.iter().map(|x| x.as_str()).collect::>(), - ) - .await? - .into_iter() - .map(|x| x.result) - .collect::>>()) - } - async fn getbestblockhash(&self) -> anyhow::Result { - self.call::("getbestblockhash", vec![""]) - .await - .map(|x| x[0].result.clone()) - } - async fn getblock(&self, block_hash: String) -> anyhow::Result { - self.call::("getblock", vec![&format!("\"{}\", 2", block_hash)]) - .await - .map(|x| x[0].result.clone()) - } -} - -async fn get_updated_cache(btc: T, previous_cache: Cache) -> anyhow::Result { - let all_mempool_transactions: Vec = btc.getrawmempool().await?; - let mut new_transactions: HashMap = Default::default(); - let mut new_known_tx_hashes: HashSet = Default::default(); - let previous_mempool: HashMap = previous_cache - .clone() - .transactions - .into_iter() - .filter_map(|(_, query_result)| { - if query_result.confirmations == 0 { - Some((query_result.tx_hash.clone(), query_result)) - } else { - None - } - }) - .collect(); - let unknown_mempool_transactions: Vec = all_mempool_transactions - .into_iter() - .filter(|tx_hash| { - if let Some(known_transaction) = previous_mempool.get(tx_hash) { - new_known_tx_hashes.insert(tx_hash.clone()); - new_transactions - .insert(known_transaction.destination.clone(), known_transaction.clone()); - } else if previous_cache.known_tx_hashes.contains(tx_hash) { - new_known_tx_hashes.insert(tx_hash.clone()); - } else { - return true } - false }) - .collect(); - let transactions: Vec = btc - .getrawtransactions(unknown_mempool_transactions) - .await? - .iter() - .filter_map(|x| x.clone()) - .collect(); - for tx in transactions { - for vout in tx.vout { - new_known_tx_hashes.insert(tx.txid.clone()); - if let Some(destination) = vout.script_pub_key.address { - new_transactions.insert( - destination.clone(), - QueryResult { - destination, - confirmations: 0, - value: vout.value, - tx_hash: tx.txid.clone(), - }, - ); - } - } - } - let block_hash = btc.getbestblockhash().await?; - if previous_cache.best_block_hash == block_hash { - for entry in previous_cache.transactions { - if entry.1.confirmations > 0 { - new_transactions.insert(entry.0, entry.1); - } - } - } else { - info!("New block found: {}", block_hash); - let mut block_hash_to_query = block_hash.clone(); - for confirmations in 1..SAFETY_MARGIN { - let block = btc.getblock(block_hash_to_query).await?; - for tx in block.tx { - for vout in tx.vout { - if let Some(destination) = vout.script_pub_key.address { - new_transactions.insert( - destination.clone(), - QueryResult { - destination, - confirmations, - value: vout.value, - tx_hash: tx.txid.clone(), - }, - ); - } - } - } - block_hash_to_query = block.previousblockhash; - } - } - Ok(Cache { - status: CacheStatus::Ready, - best_block_hash: block_hash, - transactions: new_transactions, - known_tx_hashes: new_known_tx_hashes, - }) -} - -fn lookup_transactions( - cache: &Cache, - addresses: &[String], -) -> anyhow::Result>> { - match cache.status { - CacheStatus::Ready => Ok(addresses - .iter() - .map(|address| cache.transactions.get(address).map(Clone::clone)) - .collect::>>()), - CacheStatus::Init => Err(anyhow!("Address cache is not initialised.")), - CacheStatus::Down => Err(anyhow!("Address cache is down - check btc connection.")), - } -} - -#[derive(Clone)] -pub struct BtcTracker { - cache: Arc>, -} - -impl BtcTracker { - pub fn lookup_transactions( - &self, - addresses: &[String], - ) -> anyhow::Result>> { - lookup_transactions(&self.cache.lock().unwrap(), addresses) - } -} - -pub async fn start(scope: &task_scope::Scope<'_, anyhow::Error>) -> BtcTracker { - let cache: Arc> = Default::default(); - scope.spawn({ - let cache = cache.clone(); - async move { - let mut interval = tokio::time::interval(Duration::from_secs(REFRESH_INTERVAL)); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - loop { - interval.tick().await; - let cache_copy = cache.lock().unwrap().clone(); - match get_updated_cache(BtcRpc, cache_copy).await { - Ok(updated_cache) => { - let mut cache = cache.lock().unwrap(); - *cache = updated_cache; - }, - Err(err) => { - error!("Error when querying Bitcoin chain: {}", err); - let mut cache = cache.lock().unwrap(); - cache.status = CacheStatus::Down; - }, - } - } - } - }); - - BtcTracker { cache } -} - -#[cfg(test)] -mod tests { - - use super::*; - - #[derive(Clone)] - struct MockBtcRpc { - mempool: Vec, - latest_block_hash: String, - blocks: HashMap, - } - - #[async_trait] - impl BtcNode for MockBtcRpc { - async fn getrawmempool(&self) -> anyhow::Result> { - Ok(self.mempool.iter().map(|x| x.txid.clone()).collect()) - } - async fn getrawtransactions( - &self, - tx_hashes: Vec, - ) -> anyhow::Result>> { - let mut result: Vec> = Default::default(); - for hash in tx_hashes { - for tx in self.mempool.clone() { - if tx.txid == hash { - result.push(Some(tx)) - } else { - result.push(None) - } - } - } - Ok(result) - } - async fn getbestblockhash(&self) -> anyhow::Result { - Ok(self.latest_block_hash.clone()) - } - async fn getblock(&self, block_hash: String) -> anyhow::Result { - self.blocks.get(&block_hash).cloned().ok_or(anyhow!("Block missing")) - } - } - - #[tokio::test] - async fn multiple_outputs_in_one_tx() { - let mempool = vec![RawTx { - txid: "tx1".into(), - vout: vec![ - Vout { - value: 0.8, - script_pub_key: ScriptPubKey { address: Some("address1".into()) }, - }, - Vout { - value: 1.2, - script_pub_key: ScriptPubKey { address: Some("address2".into()) }, - }, - ], - }]; - let latest_block_hash = "15".to_string(); - let mut blocks: HashMap = Default::default(); - for i in 1..16 { - blocks.insert( - i.to_string(), - Block { previousblockhash: (i - 1).to_string(), tx: vec![] }, - ); - } - let btc = MockBtcRpc { mempool, latest_block_hash, blocks }; - let cache: Cache = Default::default(); - let cache = get_updated_cache(btc, cache).await.unwrap(); - let result = lookup_transactions(&cache, &["address1".into(), "address2".into()]).unwrap(); - assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); - assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); - } - - #[tokio::test] - async fn mempool_updates() { - let mempool = vec![ - RawTx { - txid: "tx1".into(), - vout: vec![Vout { - value: 0.8, - script_pub_key: ScriptPubKey { address: Some("address1".into()) }, - }], - }, - RawTx { - txid: "tx2".into(), - vout: vec![Vout { - value: 0.8, - script_pub_key: ScriptPubKey { address: Some("address2".into()) }, - }], - }, - ]; - let latest_block_hash = "15".to_string(); - let mut blocks: HashMap = Default::default(); - for i in 1..16 { - blocks.insert( - i.to_string(), - Block { previousblockhash: (i - 1).to_string(), tx: vec![] }, - ); - } - let mut btc = MockBtcRpc { mempool: mempool.clone(), latest_block_hash, blocks }; - let cache: Cache = Default::default(); - let cache = get_updated_cache(btc.clone(), cache).await.unwrap(); - let result = - lookup_transactions(&cache, &["address1".into(), "address2".into(), "address3".into()]) - .unwrap(); - assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); - assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); - assert!(result[2].is_none()); - - btc.mempool.append(&mut vec![RawTx { - txid: "tx3".into(), - vout: vec![Vout { - value: 0.8, - script_pub_key: ScriptPubKey { address: Some("address3".into()) }, - }], - }]); - let cache = get_updated_cache(btc.clone(), cache.clone()).await.unwrap(); - let result = - lookup_transactions(&cache, &["address1".into(), "address2".into(), "address3".into()]) - .unwrap(); - assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); - assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); - assert_eq!(result[2].as_ref().unwrap().destination, "address3".to_string()); - - btc.mempool.remove(0); - let cache = get_updated_cache(btc.clone(), cache.clone()).await.unwrap(); - let result = - lookup_transactions(&cache, &["address1".into(), "address2".into(), "address3".into()]) - .unwrap(); - assert!(result[0].is_none()); - assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); - assert_eq!(result[2].as_ref().unwrap().destination, "address3".to_string()); - } - - #[tokio::test] - async fn blocks() { - let mempool = vec![]; - let latest_block_hash = "15".to_string(); - let mut blocks: HashMap = Default::default(); - for i in 1..19 { - blocks.insert( - i.to_string(), - Block { previousblockhash: (i - 1).to_string(), tx: vec![] }, - ); - } - blocks.insert( - "15".to_string(), - Block { - previousblockhash: "14".to_string(), - tx: vec![RawTx { - txid: "tx1".into(), - vout: vec![Vout { - value: 12.5, - script_pub_key: ScriptPubKey { address: Some("address1".into()) }, - }], - }], - }, - ); - let mut btc = MockBtcRpc { mempool: mempool.clone(), latest_block_hash, blocks }; - let cache: Cache = Default::default(); - let cache = get_updated_cache(btc.clone(), cache).await.unwrap(); - let result = lookup_transactions(&cache, &["address1".into()]).unwrap(); - assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); - assert_eq!(result[0].as_ref().unwrap().confirmations, 1); - - btc.latest_block_hash = "16".to_string(); - let cache = get_updated_cache(btc.clone(), cache.clone()).await.unwrap(); - let result = lookup_transactions(&cache, &["address1".into()]).unwrap(); - assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); - assert_eq!(result[0].as_ref().unwrap().confirmations, 2); - } - - #[tokio::test] - async fn report_oldest_tx_only() { - let mempool = vec![RawTx { - txid: "tx2".into(), - vout: vec![Vout { - value: 0.8, - script_pub_key: ScriptPubKey { address: Some("address1".into()) }, - }], - }]; - let latest_block_hash = "15".to_string(); - let mut blocks: HashMap = Default::default(); - for i in 1..16 { - blocks.insert( - i.to_string(), - Block { previousblockhash: (i - 1).to_string(), tx: vec![] }, - ); - } - blocks.insert( - "13".to_string(), - Block { - previousblockhash: "12".to_string(), - tx: vec![RawTx { - txid: "tx1".into(), - vout: vec![Vout { - value: 12.5, - script_pub_key: ScriptPubKey { address: Some("address1".into()) }, - }], - }], - }, - ); - let btc = MockBtcRpc { mempool: mempool.clone(), latest_block_hash, blocks }; - let cache: Cache = Default::default(); - let cache = get_updated_cache(btc.clone(), cache).await.unwrap(); - let result = lookup_transactions(&cache, &["address1".into()]).unwrap(); - assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); - assert_eq!(result[0].as_ref().unwrap().confirmations, 3); - assert_eq!(result[0].as_ref().unwrap().value, 12.5); - } + .shared(scope) + .chunk_by_vault(vaults) + .deposit_addresses(scope, state_chain_stream.clone(), state_chain_client.clone()) + .await + .btc_deposits(witness_call.clone()) + .egress_items(scope, state_chain_stream, state_chain_client) + .await + .then(move |epoch, header| process_egress(epoch, header, witness_call.clone())) + .logging("witnessing") + .spawn(scope); + + Ok(()) } diff --git a/api/bin/chainflip-ingress-egress-tracker/src/witnessing/btc_mempool.rs b/api/bin/chainflip-ingress-egress-tracker/src/witnessing/btc_mempool.rs new file mode 100644 index 0000000000..be87643556 --- /dev/null +++ b/api/bin/chainflip-ingress-egress-tracker/src/witnessing/btc_mempool.rs @@ -0,0 +1,536 @@ +use std::{ + collections::{HashMap, HashSet}, + sync::{Arc, Mutex}, + time::Duration, +}; + +use anyhow::anyhow; +use async_trait::async_trait; +use chainflip_engine::settings::HttpBasicAuthEndpoint; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use tracing::{error, info}; +use utilities::task_scope; + +type TxHash = String; +type BlockHash = String; +type Address = String; + +#[derive(Deserialize)] +struct BestBlockResult { + result: BlockHash, +} + +#[derive(Deserialize)] +struct MemPoolResult { + result: Vec, +} + +#[derive(Deserialize, Clone)] +struct ScriptPubKey { + address: Option
, +} + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +struct Vout { + value: f64, + script_pub_key: ScriptPubKey, +} + +#[derive(Deserialize, Clone)] +struct RawTx { + txid: TxHash, + vout: Vec, +} + +#[derive(Deserialize)] +struct RawTxResult { + result: Option, +} + +#[derive(Deserialize, Clone)] +struct Block { + previousblockhash: BlockHash, + tx: Vec, +} + +#[derive(Deserialize)] +struct BlockResult { + result: Block, +} + +#[derive(Clone, Serialize)] +pub struct QueryResult { + confirmations: u32, + destination: Address, + value: f64, + tx_hash: TxHash, +} + +#[derive(Default, Clone)] +enum CacheStatus { + #[default] + Init, + Ready, + Down, +} + +#[derive(Default, Clone)] +struct Cache { + status: CacheStatus, + best_block_hash: BlockHash, + transactions: HashMap, + known_tx_hashes: HashSet, +} + +const SAFETY_MARGIN: u32 = 10; +const REFRESH_INTERVAL: u64 = 10; + +#[async_trait] +trait BtcNode { + async fn getrawmempool(&self) -> anyhow::Result>; + async fn getrawtransactions( + &self, + tx_hashes: Vec, + ) -> anyhow::Result>>; + async fn getbestblockhash(&self) -> anyhow::Result; + async fn getblock(&self, block_hash: BlockHash) -> anyhow::Result; +} + +struct BtcRpc { + settings: HttpBasicAuthEndpoint, +} + +impl BtcRpc { + async fn call( + &self, + method: &str, + params: Vec<&str>, + ) -> anyhow::Result> { + info!("Calling {} with batch size of {}", method, params.len()); + let body = params + .iter() + .map(|param| { + format!(r#"{{"jsonrpc":"1.0","id":0,"method":"{}","params":[{}]}}"#, method, param) + }) + .collect::>() + .join(","); + + reqwest::Client::new() + .post(self.settings.http_endpoint.as_ref()) + .basic_auth(&self.settings.basic_auth_user, Some(&self.settings.basic_auth_password)) + .header("Content-Type", "text/plain") + .body(format!("[{}]", body)) + .send() + .await? + .json::>() + .await + .map_err(|err| anyhow!(err)) + .and_then(|result| { + if result.len() == params.len() { + Ok(result) + } else { + Err(anyhow!("Batched request returned an incorrect number of results")) + } + }) + } +} + +#[async_trait] +impl BtcNode for BtcRpc { + async fn getrawmempool(&self) -> anyhow::Result> { + self.call::("getrawmempool", vec![""]) + .await + .map(|x| x[0].result.clone()) + } + async fn getrawtransactions( + &self, + tx_hashes: Vec, + ) -> anyhow::Result>> { + let params = tx_hashes + .iter() + .map(|tx_hash| format!("\"{}\", true", tx_hash)) + .collect::>(); + Ok(self + .call::( + "getrawtransaction", + params.iter().map(|x| x.as_str()).collect::>(), + ) + .await? + .into_iter() + .map(|x| x.result) + .collect::>>()) + } + async fn getbestblockhash(&self) -> anyhow::Result { + self.call::("getbestblockhash", vec![""]) + .await + .map(|x| x[0].result.clone()) + } + async fn getblock(&self, block_hash: String) -> anyhow::Result { + self.call::("getblock", vec![&format!("\"{}\", 2", block_hash)]) + .await + .map(|x| x[0].result.clone()) + } +} + +async fn get_updated_cache(btc: &T, previous_cache: Cache) -> anyhow::Result { + let all_mempool_transactions: Vec = btc.getrawmempool().await?; + let mut new_transactions: HashMap = Default::default(); + let mut new_known_tx_hashes: HashSet = Default::default(); + let previous_mempool: HashMap = previous_cache + .clone() + .transactions + .into_iter() + .filter_map(|(_, query_result)| { + if query_result.confirmations == 0 { + Some((query_result.tx_hash.clone(), query_result)) + } else { + None + } + }) + .collect(); + let unknown_mempool_transactions: Vec = all_mempool_transactions + .into_iter() + .filter(|tx_hash| { + if let Some(known_transaction) = previous_mempool.get(tx_hash) { + new_known_tx_hashes.insert(tx_hash.clone()); + new_transactions + .insert(known_transaction.destination.clone(), known_transaction.clone()); + } else if previous_cache.known_tx_hashes.contains(tx_hash) { + new_known_tx_hashes.insert(tx_hash.clone()); + } else { + return true + } + false + }) + .collect(); + let transactions: Vec = btc + .getrawtransactions(unknown_mempool_transactions) + .await? + .iter() + .filter_map(|x| x.clone()) + .collect(); + for tx in transactions { + for vout in tx.vout { + new_known_tx_hashes.insert(tx.txid.clone()); + if let Some(destination) = vout.script_pub_key.address { + new_transactions.insert( + destination.clone(), + QueryResult { + destination, + confirmations: 0, + value: vout.value, + tx_hash: tx.txid.clone(), + }, + ); + } + } + } + let block_hash = btc.getbestblockhash().await?; + if previous_cache.best_block_hash == block_hash { + for entry in previous_cache.transactions { + if entry.1.confirmations > 0 { + new_transactions.insert(entry.0, entry.1); + } + } + } else { + info!("New block found: {}", block_hash); + let mut block_hash_to_query = block_hash.clone(); + for confirmations in 1..SAFETY_MARGIN { + let block = btc.getblock(block_hash_to_query).await?; + for tx in block.tx { + for vout in tx.vout { + if let Some(destination) = vout.script_pub_key.address { + new_transactions.insert( + destination.clone(), + QueryResult { + destination, + confirmations, + value: vout.value, + tx_hash: tx.txid.clone(), + }, + ); + } + } + } + block_hash_to_query = block.previousblockhash; + } + } + Ok(Cache { + status: CacheStatus::Ready, + best_block_hash: block_hash, + transactions: new_transactions, + known_tx_hashes: new_known_tx_hashes, + }) +} + +fn lookup_transactions( + cache: &Cache, + addresses: &[String], +) -> anyhow::Result>> { + match cache.status { + CacheStatus::Ready => Ok(addresses + .iter() + .map(|address| cache.transactions.get(address).map(Clone::clone)) + .collect::>>()), + CacheStatus::Init => Err(anyhow!("Address cache is not initialised.")), + CacheStatus::Down => Err(anyhow!("Address cache is down - check btc connection.")), + } +} + +#[derive(Clone)] +pub struct BtcTracker { + cache: Arc>, +} + +impl BtcTracker { + pub fn lookup_transactions( + &self, + addresses: &[String], + ) -> anyhow::Result>> { + lookup_transactions(&self.cache.lock().unwrap(), addresses) + } +} + +pub async fn start( + scope: &task_scope::Scope<'_, anyhow::Error>, + endpoint: HttpBasicAuthEndpoint, +) -> BtcTracker { + let cache: Arc> = Default::default(); + scope.spawn({ + let cache = cache.clone(); + async move { + let btc_rpc = BtcRpc { settings: endpoint }; + let mut interval = tokio::time::interval(Duration::from_secs(REFRESH_INTERVAL)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + interval.tick().await; + let cache_copy = cache.lock().unwrap().clone(); + match get_updated_cache(&btc_rpc, cache_copy).await { + Ok(updated_cache) => { + let mut cache = cache.lock().unwrap(); + *cache = updated_cache; + }, + Err(err) => { + error!("Error when querying Bitcoin chain: {}", err); + let mut cache = cache.lock().unwrap(); + cache.status = CacheStatus::Down; + }, + } + } + } + }); + + BtcTracker { cache } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[derive(Clone)] + struct MockBtcRpc { + mempool: Vec, + latest_block_hash: String, + blocks: HashMap, + } + + #[async_trait] + impl BtcNode for MockBtcRpc { + async fn getrawmempool(&self) -> anyhow::Result> { + Ok(self.mempool.iter().map(|x| x.txid.clone()).collect()) + } + async fn getrawtransactions( + &self, + tx_hashes: Vec, + ) -> anyhow::Result>> { + let mut result: Vec> = Default::default(); + for hash in tx_hashes { + for tx in self.mempool.clone() { + if tx.txid == hash { + result.push(Some(tx)) + } else { + result.push(None) + } + } + } + Ok(result) + } + async fn getbestblockhash(&self) -> anyhow::Result { + Ok(self.latest_block_hash.clone()) + } + async fn getblock(&self, block_hash: String) -> anyhow::Result { + self.blocks.get(&block_hash).cloned().ok_or(anyhow!("Block missing")) + } + } + + #[tokio::test] + async fn multiple_outputs_in_one_tx() { + let mempool = vec![RawTx { + txid: "tx1".into(), + vout: vec![ + Vout { + value: 0.8, + script_pub_key: ScriptPubKey { address: Some("address1".into()) }, + }, + Vout { + value: 1.2, + script_pub_key: ScriptPubKey { address: Some("address2".into()) }, + }, + ], + }]; + let latest_block_hash = "15".to_string(); + let mut blocks: HashMap = Default::default(); + for i in 1..16 { + blocks.insert( + i.to_string(), + Block { previousblockhash: (i - 1).to_string(), tx: vec![] }, + ); + } + let btc = MockBtcRpc { mempool, latest_block_hash, blocks }; + let cache: Cache = Default::default(); + let cache = get_updated_cache(&btc, cache).await.unwrap(); + let result = lookup_transactions(&cache, &["address1".into(), "address2".into()]).unwrap(); + assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); + assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); + } + + #[tokio::test] + async fn mempool_updates() { + let mempool = vec![ + RawTx { + txid: "tx1".into(), + vout: vec![Vout { + value: 0.8, + script_pub_key: ScriptPubKey { address: Some("address1".into()) }, + }], + }, + RawTx { + txid: "tx2".into(), + vout: vec![Vout { + value: 0.8, + script_pub_key: ScriptPubKey { address: Some("address2".into()) }, + }], + }, + ]; + let latest_block_hash = "15".to_string(); + let mut blocks: HashMap = Default::default(); + for i in 1..16 { + blocks.insert( + i.to_string(), + Block { previousblockhash: (i - 1).to_string(), tx: vec![] }, + ); + } + let mut btc = MockBtcRpc { mempool: mempool.clone(), latest_block_hash, blocks }; + let cache: Cache = Default::default(); + let cache = get_updated_cache(&btc, cache).await.unwrap(); + let result = + lookup_transactions(&cache, &["address1".into(), "address2".into(), "address3".into()]) + .unwrap(); + assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); + assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); + assert!(result[2].is_none()); + + btc.mempool.append(&mut vec![RawTx { + txid: "tx3".into(), + vout: vec![Vout { + value: 0.8, + script_pub_key: ScriptPubKey { address: Some("address3".into()) }, + }], + }]); + let cache = get_updated_cache(&btc, cache.clone()).await.unwrap(); + let result = + lookup_transactions(&cache, &["address1".into(), "address2".into(), "address3".into()]) + .unwrap(); + assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); + assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); + assert_eq!(result[2].as_ref().unwrap().destination, "address3".to_string()); + + btc.mempool.remove(0); + let cache = get_updated_cache(&btc, cache.clone()).await.unwrap(); + let result = + lookup_transactions(&cache, &["address1".into(), "address2".into(), "address3".into()]) + .unwrap(); + assert!(result[0].is_none()); + assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); + assert_eq!(result[2].as_ref().unwrap().destination, "address3".to_string()); + } + + #[tokio::test] + async fn blocks() { + let mempool = vec![]; + let latest_block_hash = "15".to_string(); + let mut blocks: HashMap = Default::default(); + for i in 1..19 { + blocks.insert( + i.to_string(), + Block { previousblockhash: (i - 1).to_string(), tx: vec![] }, + ); + } + blocks.insert( + "15".to_string(), + Block { + previousblockhash: "14".to_string(), + tx: vec![RawTx { + txid: "tx1".into(), + vout: vec![Vout { + value: 12.5, + script_pub_key: ScriptPubKey { address: Some("address1".into()) }, + }], + }], + }, + ); + let mut btc = MockBtcRpc { mempool: mempool.clone(), latest_block_hash, blocks }; + let cache: Cache = Default::default(); + let cache = get_updated_cache(&btc, cache).await.unwrap(); + let result = lookup_transactions(&cache, &["address1".into()]).unwrap(); + assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); + assert_eq!(result[0].as_ref().unwrap().confirmations, 1); + + btc.latest_block_hash = "16".to_string(); + let cache = get_updated_cache(&btc, cache.clone()).await.unwrap(); + let result = lookup_transactions(&cache, &["address1".into()]).unwrap(); + assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); + assert_eq!(result[0].as_ref().unwrap().confirmations, 2); + } + + #[tokio::test] + async fn report_oldest_tx_only() { + let mempool = vec![RawTx { + txid: "tx2".into(), + vout: vec![Vout { + value: 0.8, + script_pub_key: ScriptPubKey { address: Some("address1".into()) }, + }], + }]; + let latest_block_hash = "15".to_string(); + let mut blocks: HashMap = Default::default(); + for i in 1..16 { + blocks.insert( + i.to_string(), + Block { previousblockhash: (i - 1).to_string(), tx: vec![] }, + ); + } + blocks.insert( + "13".to_string(), + Block { + previousblockhash: "12".to_string(), + tx: vec![RawTx { + txid: "tx1".into(), + vout: vec![Vout { + value: 12.5, + script_pub_key: ScriptPubKey { address: Some("address1".into()) }, + }], + }], + }, + ); + let btc = MockBtcRpc { mempool: mempool.clone(), latest_block_hash, blocks }; + let cache: Cache = Default::default(); + let cache = get_updated_cache(&btc, cache).await.unwrap(); + let result = lookup_transactions(&cache, &["address1".into()]).unwrap(); + assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); + assert_eq!(result[0].as_ref().unwrap().confirmations, 3); + assert_eq!(result[0].as_ref().unwrap().value, 12.5); + } +} diff --git a/api/bin/chainflip-ingress-egress-tracker/src/witnessing/dot.rs b/api/bin/chainflip-ingress-egress-tracker/src/witnessing/dot.rs index 5ea1047b8a..95715b8695 100644 --- a/api/bin/chainflip-ingress-egress-tracker/src/witnessing/dot.rs +++ b/api/bin/chainflip-ingress-egress-tracker/src/witnessing/dot.rs @@ -62,7 +62,6 @@ where let vaults = epoch_source.vaults().await; DotUnfinalisedSource::new(dot_client.clone()) - .shared(scope) .then(|header| async move { header.data.iter().filter_map(filter_map_events).collect() }) .strictly_monotonic() .shared(scope) @@ -79,12 +78,8 @@ where // Broadcast success .egress_items(scope, state_chain_stream.clone(), state_chain_client.clone()) .await - .then({ - let witness_call = witness_call.clone(); - let dot_client = dot_client.clone(); - move |epoch, header| { - process_egress(epoch, header, witness_call.clone(), dot_client.clone()) - } + .then(move |epoch, header| { + process_egress(epoch, header, witness_call.clone(), dot_client.clone()) }) .logging("witnessing") .spawn(scope); diff --git a/engine/src/witness.rs b/engine/src/witness.rs index cfb99c4c7f..8d1acc8f4b 100644 --- a/engine/src/witness.rs +++ b/engine/src/witness.rs @@ -1,4 +1,4 @@ -mod btc; +pub mod btc; pub mod common; pub mod dot; pub mod eth; diff --git a/engine/src/witness/btc.rs b/engine/src/witness/btc.rs index 9f9aface5e..38b97a2db5 100644 --- a/engine/src/witness/btc.rs +++ b/engine/src/witness/btc.rs @@ -1,10 +1,10 @@ mod btc_chain_tracking; mod btc_deposits; -mod btc_source; +pub mod btc_source; use std::sync::Arc; -use bitcoin::Transaction; +use bitcoin::{BlockHash, Transaction}; use cf_chains::btc::{deposit_address::DepositAddress, CHANGE_ADDRESS_SALT}; use cf_primitives::EpochIndex; use futures_core::Future; @@ -20,13 +20,49 @@ use crate::{ }; use btc_source::BtcSource; -use super::common::{chain_source::extension::ChainSourceExt, epoch_source::EpochSourceBuilder}; +use super::common::{ + chain_source::{extension::ChainSourceExt, Header}, + epoch_source::{EpochSourceBuilder, Vault}, +}; use anyhow::Result; // safety margin of 5 implies 6 block confirmations const SAFETY_MARGIN: usize = 5; +pub async fn process_egress( + epoch: Vault, + header: Header, Vec<[u8; 32]>)>, + process_call: ProcessCall, +) where + ProcessCall: Fn(state_chain_runtime::RuntimeCall, EpochIndex) -> ProcessingFut + + Send + + Sync + + Clone + + 'static, + ProcessingFut: Future + Send + 'static, +{ + let (txs, monitored_tx_hashes) = header.data; + + for tx_hash in success_witnesses(&monitored_tx_hashes, &txs) { + process_call( + state_chain_runtime::RuntimeCall::BitcoinBroadcaster( + pallet_cf_broadcast::Call::transaction_succeeded { + tx_out_id: tx_hash, + signer_id: DepositAddress::new( + epoch.info.0.public_key.current, + CHANGE_ADDRESS_SALT, + ) + .script_pubkey(), + tx_fee: Default::default(), + }, + ), + epoch.index, + ) + .await; + } +} + pub async fn start< StateChainClient, StateChainStream, @@ -107,32 +143,9 @@ where .btc_deposits(process_call.clone()) .egress_items(scope, state_chain_stream, state_chain_client.clone()) .await - .then(move |epoch, header| { + .then({ let process_call = process_call.clone(); - async move { - let (txs, monitored_tx_hashes) = header.data; - - for tx_hash in success_witnesses(&monitored_tx_hashes, &txs) { - process_call( - state_chain_runtime::RuntimeCall::BitcoinBroadcaster( - pallet_cf_broadcast::Call::transaction_succeeded { - tx_out_id: tx_hash, - signer_id: DepositAddress::new( - epoch.info.0.public_key.current, - CHANGE_ADDRESS_SALT, - ) - .script_pubkey(), - // TODO: Ideally we can submit an empty type here. For - // Bitcoin and some other chains fee tracking is not - // necessary. PRO-370. - tx_fee: Default::default(), - }, - ), - epoch.index, - ) - .await; - } - } + move |epoch, header| process_egress(epoch, header, process_call.clone()) }) .continuous("Bitcoin".to_string(), db) .logging("witnessing") diff --git a/engine/src/witness/btc/btc_source.rs b/engine/src/witness/btc/btc_source.rs index 9474a34e57..480633ef7b 100644 --- a/engine/src/witness/btc/btc_source.rs +++ b/engine/src/witness/btc/btc_source.rs @@ -23,7 +23,7 @@ impl BtcSource { } } -const POLL_INTERVAL: Duration = Duration::from_secs(10); +const POLL_INTERVAL: Duration = Duration::from_secs(5); #[async_trait::async_trait] impl ChainSource for BtcSource