From 5c70808fc7d011feff8ee4f3968cab5fc33a755d Mon Sep 17 00:00:00 2001 From: Maxim Shishmarev Date: Mon, 16 Oct 2023 13:18:31 +1100 Subject: [PATCH] Feat: ensure correct process termination in ingress/egress tracker (#4101) * refactor: move most of btc into separate module (no new changes) * chore: tests submodule (allows collapsing all tests when not needed) * refactor: move the rest of btc into separate module (with a few changes) * chore: remove stale comment * feat: ensure correct process termination (using task_scope) * chore: fix clippy * chore: address review --- .../src/main.rs | 585 ++---------------- .../src/witnessing.rs | 96 ++- .../src/witnessing/btc.rs | 533 ++++++++++++++++ 3 files changed, 623 insertions(+), 591 deletions(-) create mode 100644 api/bin/chainflip-ingress-egress-tracker/src/witnessing/btc.rs diff --git a/api/bin/chainflip-ingress-egress-tracker/src/main.rs b/api/bin/chainflip-ingress-egress-tracker/src/main.rs index b8b3a38a7e..b59c3fa605 100644 --- a/api/bin/chainflip-ingress-egress-tracker/src/main.rs +++ b/api/bin/chainflip-ingress-egress-tracker/src/main.rs @@ -1,291 +1,12 @@ -use anyhow::anyhow; -use async_trait::async_trait; use chainflip_engine::settings::WsHttpEndpoints; -use futures::future; +use futures::FutureExt; use jsonrpsee::{core::Error, server::ServerBuilder, RpcModule}; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::{ - collections::{HashMap, HashSet}, - env, - io::Write, - net::SocketAddr, - path::PathBuf, - sync::{Arc, Mutex}, - time::Duration, -}; -use tokio::{task, time}; +use std::{env, io::Write, net::SocketAddr, path::PathBuf}; use tracing::log; - -type TxHash = String; -type BlockHash = String; -type Address = String; +use utilities::task_scope; mod witnessing; -#[derive(Deserialize)] -struct BestBlockResult { - result: BlockHash, -} - -#[derive(Deserialize)] -struct MemPoolResult { - result: Vec, -} - -#[derive(Deserialize, Clone)] -struct ScriptPubKey { - address: Option
, -} - -#[derive(Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -struct Vout { - value: f64, - script_pub_key: ScriptPubKey, -} - -#[derive(Deserialize, Clone)] -struct RawTx { - txid: TxHash, - vout: Vec, -} - -#[derive(Deserialize)] -struct RawTxResult { - result: Option, -} - -#[derive(Deserialize, Clone)] -struct Block { - previousblockhash: BlockHash, - tx: Vec, -} - -#[derive(Deserialize)] -struct BlockResult { - result: Block, -} - -#[derive(Clone, Serialize)] -struct QueryResult { - confirmations: u32, - destination: Address, - value: f64, - tx_hash: TxHash, -} - -#[derive(Default, Clone)] -enum CacheStatus { - #[default] - Init, - Ready, - Down, -} - -#[derive(Default, Clone)] -struct Cache { - status: CacheStatus, - best_block_hash: BlockHash, - transactions: HashMap, - known_tx_hashes: HashSet, -} - -const SAFETY_MARGIN: u32 = 10; -const REFRESH_INTERVAL: u64 = 10; - -#[async_trait] -trait BtcNode { - async fn getrawmempool(&self) -> anyhow::Result>; - async fn getrawtransactions( - &self, - tx_hashes: Vec, - ) -> anyhow::Result>>; - async fn getbestblockhash(&self) -> anyhow::Result; - async fn getblock(&self, block_hash: BlockHash) -> anyhow::Result; -} - -struct BtcRpc; - -impl BtcRpc { - async fn call( - &self, - method: &str, - params: Vec<&str>, - ) -> anyhow::Result> { - log::info!("Calling {} with batch size of {}", method, params.len()); - let url = env::var("BTC_ENDPOINT").unwrap_or("http://127.0.0.1:8332".to_string()); - let body = params - .iter() - .map(|param| { - format!(r#"{{"jsonrpc":"1.0","id":0,"method":"{}","params":[{}]}}"#, method, param) - }) - .collect::>() - .join(","); - - let username = env::var("BTC_USERNAME").unwrap_or("flip".to_string()); - let password = env::var("BTC_PASSWORD").unwrap_or("flip".to_string()); - reqwest::Client::new() - .post(url) - .basic_auth(username, Some(password)) - .header("Content-Type", "text/plain") - .body(format!("[{}]", body)) - .send() - .await? - .json::>() - .await - .map_err(|err| anyhow!(err)) - .and_then(|result| { - if result.len() == params.len() { - Ok(result) - } else { - Err(anyhow!("Batched request returned an incorrect number of results")) - } - }) - } -} - -#[async_trait] -impl BtcNode for BtcRpc { - async fn getrawmempool(&self) -> anyhow::Result> { - self.call::("getrawmempool", vec![""]) - .await - .map(|x| x[0].result.clone()) - } - async fn getrawtransactions( - &self, - tx_hashes: Vec, - ) -> anyhow::Result>> { - let params = tx_hashes - .iter() - .map(|tx_hash| format!("\"{}\", true", tx_hash)) - .collect::>(); - Ok(self - .call::( - "getrawtransaction", - params.iter().map(|x| x.as_str()).collect::>(), - ) - .await? - .into_iter() - .map(|x| x.result) - .collect::>>()) - } - async fn getbestblockhash(&self) -> anyhow::Result { - self.call::("getbestblockhash", vec![""]) - .await - .map(|x| x[0].result.clone()) - } - async fn getblock(&self, block_hash: String) -> anyhow::Result { - self.call::("getblock", vec![&format!("\"{}\", 2", block_hash)]) - .await - .map(|x| x[0].result.clone()) - } -} - -async fn get_updated_cache(btc: T, previous_cache: Cache) -> anyhow::Result { - let all_mempool_transactions: Vec = btc.getrawmempool().await?; - let mut new_transactions: HashMap = Default::default(); - let mut new_known_tx_hashes: HashSet = Default::default(); - let previous_mempool: HashMap = previous_cache - .clone() - .transactions - .into_iter() - .filter_map(|(_, query_result)| { - if query_result.confirmations == 0 { - Some((query_result.tx_hash.clone(), query_result)) - } else { - None - } - }) - .collect(); - let unknown_mempool_transactions: Vec = all_mempool_transactions - .into_iter() - .filter(|tx_hash| { - if let Some(known_transaction) = previous_mempool.get(tx_hash) { - new_known_tx_hashes.insert(tx_hash.clone()); - new_transactions - .insert(known_transaction.destination.clone(), known_transaction.clone()); - } else if previous_cache.known_tx_hashes.contains(tx_hash) { - new_known_tx_hashes.insert(tx_hash.clone()); - } else { - return true - } - false - }) - .collect(); - let transactions: Vec = btc - .getrawtransactions(unknown_mempool_transactions) - .await? - .iter() - .filter_map(|x| x.clone()) - .collect(); - for tx in transactions { - for vout in tx.vout { - new_known_tx_hashes.insert(tx.txid.clone()); - if let Some(destination) = vout.script_pub_key.address { - new_transactions.insert( - destination.clone(), - QueryResult { - destination, - confirmations: 0, - value: vout.value, - tx_hash: tx.txid.clone(), - }, - ); - } - } - } - let block_hash = btc.getbestblockhash().await?; - if previous_cache.best_block_hash == block_hash { - for entry in previous_cache.transactions { - if entry.1.confirmations > 0 { - new_transactions.insert(entry.0, entry.1); - } - } - } else { - log::info!("New block found: {}", block_hash); - let mut block_hash_to_query = block_hash.clone(); - for confirmations in 1..SAFETY_MARGIN { - let block = btc.getblock(block_hash_to_query).await?; - for tx in block.tx { - for vout in tx.vout { - if let Some(destination) = vout.script_pub_key.address { - new_transactions.insert( - destination.clone(), - QueryResult { - destination, - confirmations, - value: vout.value, - tx_hash: tx.txid.clone(), - }, - ); - } - } - } - block_hash_to_query = block.previousblockhash; - } - } - Ok(Cache { - status: CacheStatus::Ready, - best_block_hash: block_hash, - transactions: new_transactions, - known_tx_hashes: new_known_tx_hashes, - }) -} - -fn lookup_transactions( - cache: Cache, - addresses: Vec, -) -> Result>, Error> { - match cache.status { - CacheStatus::Ready => Ok(addresses - .iter() - .map(|address| cache.transactions.get(address).map(Clone::clone)) - .collect::>>()), - CacheStatus::Init => Err(anyhow!("Address cache is not initialised.").into()), - CacheStatus::Down => Err(anyhow!("Address cache is down - check btc connection.").into()), - } -} - pub struct DepositTrackerSettings { eth_node: WsHttpEndpoints, // The key shouldn't be necessary, but the current witnesser wants this @@ -293,82 +14,36 @@ pub struct DepositTrackerSettings { state_chain_ws_endpoint: String, } -#[tokio::main] -async fn main() -> anyhow::Result<()> { +async fn start( + scope: &task_scope::Scope<'_, anyhow::Error>, + settings: DepositTrackerSettings, +) -> anyhow::Result<()> { tracing_subscriber::FmtSubscriber::builder() .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .try_init() .expect("setting default subscriber failed"); - let cache: Arc> = Default::default(); - let updater = task::spawn({ - let cache = cache.clone(); - async move { - let mut interval = time::interval(Duration::from_secs(REFRESH_INTERVAL)); - interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip); - loop { - interval.tick().await; - let cache_copy = cache.lock().unwrap().clone(); - match get_updated_cache(BtcRpc, cache_copy).await { - Ok(updated_cache) => { - let mut cache = cache.lock().unwrap(); - *cache = updated_cache; - }, - Err(err) => { - log::error!("Error when querying Bitcoin chain: {}", err); - let mut cache = cache.lock().unwrap(); - cache.status = CacheStatus::Down; - }, - } - } - } - }); - let server = ServerBuilder::default() - // It seems that if the client doesn't unsubscribe correctly, a "connection" - // won't be released, and we will eventually reach the limit, so we increase - // as a way to mitigate this issue. - // TODO: ensure that connections are always released - .build("0.0.0.0:13337".parse::()?) - .await?; let mut module = RpcModule::new(()); + + let btc_tracker = witnessing::btc::start(scope).await; + module.register_async_method("status", move |arguments, _context| { - let cache = cache.clone(); + let btc_tracker = btc_tracker.clone(); async move { - arguments - .parse::>() - .map_err(Error::Call) - .and_then(|addresses| lookup_transactions(cache.lock().unwrap().clone(), addresses)) + arguments.parse::>().map_err(Error::Call).and_then(|addresses| { + btc_tracker + .lookup_transactions(&addresses) + .map_err(|err| jsonrpsee::core::Error::Custom(err.to_string())) + }) } })?; - // Broadcast channel will drop old messages when the buffer if full to + // Broadcast channel will drop old messages when the buffer is full to // avoid "memory leaks" due to slow receivers. const EVENT_BUFFER_SIZE: usize = 1024; let (witness_sender, _) = tokio::sync::broadcast::channel::(EVENT_BUFFER_SIZE); - // Temporary hack: we don't actually use eth key, but the current witnesser is - // expecting a path with a valid key, so we create a temporary dummy key file here: - let mut eth_key_temp_file = tempfile::NamedTempFile::new()?; - eth_key_temp_file - .write_all(b"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef") - .unwrap(); - let eth_key_path = eth_key_temp_file.path(); - - let eth_ws_endpoint = env::var("ETH_WS_ENDPOINT").unwrap_or("ws://localhost:8546".to_string()); - let eth_http_endpoint = - env::var("ETH_HTTP_ENDPOINT").unwrap_or("http://localhost:8545".to_string()); - let sc_ws_endpoint = env::var("SC_WS_ENDPOINT").unwrap_or("ws://localhost:9944".to_string()); - - let settings = DepositTrackerSettings { - eth_node: WsHttpEndpoints { - ws_endpoint: eth_ws_endpoint.into(), - http_endpoint: eth_http_endpoint.into(), - }, - eth_key_path: eth_key_path.into(), - state_chain_ws_endpoint: sc_ws_endpoint, - }; - - witnessing::start(settings, witness_sender.clone()); + witnessing::start(scope, settings, witness_sender.clone()).await?; module.register_subscription( "subscribe_witnessing", @@ -390,204 +65,42 @@ async fn main() -> anyhow::Result<()> { }, )?; - let addr = server.local_addr()?; - log::info!("Listening on http://{}", addr); - let serverhandle = Box::pin(server.start(module)?.stopped()); - let _ = future::select(serverhandle, updater).await; - Ok(()) -} - -#[cfg(test)] -#[derive(Clone)] -struct MockBtcRpc { - mempool: Vec, - latest_block_hash: String, - blocks: HashMap, -} - -#[cfg(test)] -#[async_trait] -impl BtcNode for MockBtcRpc { - async fn getrawmempool(&self) -> anyhow::Result> { - Ok(self.mempool.iter().map(|x| x.txid.clone()).collect()) - } - async fn getrawtransactions( - &self, - tx_hashes: Vec, - ) -> anyhow::Result>> { - let mut result: Vec> = Default::default(); - for hash in tx_hashes { - for tx in self.mempool.clone() { - if tx.txid == hash { - result.push(Some(tx)) - } else { - result.push(None) - } - } - } - Ok(result) - } - async fn getbestblockhash(&self) -> anyhow::Result { - Ok(self.latest_block_hash.clone()) - } - async fn getblock(&self, block_hash: String) -> anyhow::Result { - self.blocks.get(&block_hash).cloned().ok_or(anyhow!("Block missing")) - } -} + scope.spawn(async { + let server = ServerBuilder::default().build("0.0.0.0:13337".parse::()?).await?; + let addr = server.local_addr()?; + log::info!("Listening on http://{}", addr); + server.start(module)?.stopped().await; + // If the server stops for some reason, we return + // error to terminate other tasks and the process. + Err(anyhow::anyhow!("RPC server stopped")) + }); -#[tokio::test] -async fn multiple_outputs_in_one_tx() { - let mempool = vec![RawTx { - txid: "tx1".into(), - vout: vec![ - Vout { value: 0.8, script_pub_key: ScriptPubKey { address: Some("address1".into()) } }, - Vout { value: 1.2, script_pub_key: ScriptPubKey { address: Some("address2".into()) } }, - ], - }]; - let latest_block_hash = "15".to_string(); - let mut blocks: HashMap = Default::default(); - for i in 1..16 { - blocks.insert(i.to_string(), Block { previousblockhash: (i - 1).to_string(), tx: vec![] }); - } - let btc = MockBtcRpc { mempool, latest_block_hash, blocks }; - let cache: Cache = Default::default(); - let cache = get_updated_cache(btc, cache).await.unwrap(); - let result = lookup_transactions(cache, vec!["address1".into(), "address2".into()]).unwrap(); - assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); - assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); + Ok(()) } -#[tokio::test] -async fn mempool_updates() { - let mempool = vec![ - RawTx { - txid: "tx1".into(), - vout: vec![Vout { - value: 0.8, - script_pub_key: ScriptPubKey { address: Some("address1".into()) }, - }], - }, - RawTx { - txid: "tx2".into(), - vout: vec![Vout { - value: 0.8, - script_pub_key: ScriptPubKey { address: Some("address2".into()) }, - }], - }, - ]; - let latest_block_hash = "15".to_string(); - let mut blocks: HashMap = Default::default(); - for i in 1..16 { - blocks.insert(i.to_string(), Block { previousblockhash: (i - 1).to_string(), tx: vec![] }); - } - let mut btc = MockBtcRpc { mempool: mempool.clone(), latest_block_hash, blocks }; - let cache: Cache = Default::default(); - let cache = get_updated_cache(btc.clone(), cache).await.unwrap(); - let result = lookup_transactions( - cache.clone(), - vec!["address1".into(), "address2".into(), "address3".into()], - ) - .unwrap(); - assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); - assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); - assert!(result[2].is_none()); - - btc.mempool.append(&mut vec![RawTx { - txid: "tx3".into(), - vout: vec![Vout { - value: 0.8, - script_pub_key: ScriptPubKey { address: Some("address3".into()) }, - }], - }]); - let cache = get_updated_cache(btc.clone(), cache.clone()).await.unwrap(); - let result = lookup_transactions( - cache.clone(), - vec!["address1".into(), "address2".into(), "address3".into()], - ) - .unwrap(); - assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); - assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); - assert_eq!(result[2].as_ref().unwrap().destination, "address3".to_string()); +#[tokio::main] +async fn main() -> anyhow::Result<()> { + // Temporary hack: we don't actually use eth key, but the current witnesser is + // expecting a path with a valid key, so we create a temporary dummy key file here: + let mut eth_key_temp_file = tempfile::NamedTempFile::new()?; + eth_key_temp_file + .write_all(b"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef") + .unwrap(); + let eth_key_path = eth_key_temp_file.path(); - btc.mempool.remove(0); - let cache = get_updated_cache(btc.clone(), cache.clone()).await.unwrap(); - let result = lookup_transactions( - cache.clone(), - vec!["address1".into(), "address2".into(), "address3".into()], - ) - .unwrap(); - assert!(result[0].is_none()); - assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); - assert_eq!(result[2].as_ref().unwrap().destination, "address3".to_string()); -} + let eth_ws_endpoint = env::var("ETH_WS_ENDPOINT").unwrap_or("ws://localhost:8546".to_string()); + let eth_http_endpoint = + env::var("ETH_HTTP_ENDPOINT").unwrap_or("http://localhost:8545".to_string()); + let sc_ws_endpoint = env::var("SC_WS_ENDPOINT").unwrap_or("ws://localhost:9944".to_string()); -#[tokio::test] -async fn blocks() { - let mempool = vec![]; - let latest_block_hash = "15".to_string(); - let mut blocks: HashMap = Default::default(); - for i in 1..19 { - blocks.insert(i.to_string(), Block { previousblockhash: (i - 1).to_string(), tx: vec![] }); - } - blocks.insert( - "15".to_string(), - Block { - previousblockhash: "14".to_string(), - tx: vec![RawTx { - txid: "tx1".into(), - vout: vec![Vout { - value: 12.5, - script_pub_key: ScriptPubKey { address: Some("address1".into()) }, - }], - }], + let settings = DepositTrackerSettings { + eth_node: WsHttpEndpoints { + ws_endpoint: eth_ws_endpoint.into(), + http_endpoint: eth_http_endpoint.into(), }, - ); - let mut btc = MockBtcRpc { mempool: mempool.clone(), latest_block_hash, blocks }; - let cache: Cache = Default::default(); - let cache = get_updated_cache(btc.clone(), cache).await.unwrap(); - let result = lookup_transactions(cache.clone(), vec!["address1".into()]).unwrap(); - assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); - assert_eq!(result[0].as_ref().unwrap().confirmations, 1); - - btc.latest_block_hash = "16".to_string(); - let cache = get_updated_cache(btc.clone(), cache.clone()).await.unwrap(); - let result = lookup_transactions(cache.clone(), vec!["address1".into()]).unwrap(); - assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); - assert_eq!(result[0].as_ref().unwrap().confirmations, 2); -} + eth_key_path: eth_key_path.into(), + state_chain_ws_endpoint: sc_ws_endpoint, + }; -#[tokio::test] -async fn report_oldest_tx_only() { - let mempool = vec![RawTx { - txid: "tx2".into(), - vout: vec![Vout { - value: 0.8, - script_pub_key: ScriptPubKey { address: Some("address1".into()) }, - }], - }]; - let latest_block_hash = "15".to_string(); - let mut blocks: HashMap = Default::default(); - for i in 1..16 { - blocks.insert(i.to_string(), Block { previousblockhash: (i - 1).to_string(), tx: vec![] }); - } - blocks.insert( - "13".to_string(), - Block { - previousblockhash: "12".to_string(), - tx: vec![RawTx { - txid: "tx1".into(), - vout: vec![Vout { - value: 12.5, - script_pub_key: ScriptPubKey { address: Some("address1".into()) }, - }], - }], - }, - ); - let btc = MockBtcRpc { mempool: mempool.clone(), latest_block_hash, blocks }; - let cache: Cache = Default::default(); - let cache = get_updated_cache(btc.clone(), cache).await.unwrap(); - let result = lookup_transactions(cache.clone(), vec!["address1".into()]).unwrap(); - assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); - assert_eq!(result[0].as_ref().unwrap().confirmations, 3); - assert_eq!(result[0].as_ref().unwrap().value, 12.5); + task_scope::task_scope(|scope| async move { start(scope, settings).await }.boxed()).await } diff --git a/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs b/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs index d57f77c98b..b40df7b2cf 100644 --- a/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs +++ b/api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs @@ -1,3 +1,4 @@ +pub mod btc; mod eth; use std::collections::HashMap; @@ -10,7 +11,6 @@ use chainflip_engine::{ }, witness::common::epoch_source::EpochSource, }; -use futures::FutureExt; use sp_core::H160; use utilities::task_scope; @@ -72,62 +72,48 @@ async fn get_env_parameters(state_chain_client: &StateChainClient<()>) -> Enviro } } -pub(super) fn start( +pub(super) async fn start( + scope: &task_scope::Scope<'_, anyhow::Error>, settings: DepositTrackerSettings, witness_sender: tokio::sync::broadcast::Sender, -) { - tokio::spawn(async { - // TODO: ensure that if this panics, the whole process exists (probably by moving - // task scope to main) - task_scope::task_scope(|scope| { +) -> anyhow::Result<()> { + let (state_chain_stream, state_chain_client) = { + state_chain_observer::client::StateChainClient::connect_without_account( + scope, + &settings.state_chain_ws_endpoint, + ) + .await? + }; + + let env_params = get_env_parameters(&state_chain_client).await; + + let epoch_source = + EpochSource::builder(scope, state_chain_stream.clone(), state_chain_client.clone()).await; + + let witness_call = { + let witness_sender = witness_sender.clone(); + move |call: state_chain_runtime::RuntimeCall, _epoch_index| { + let witness_sender = witness_sender.clone(); async move { - let (state_chain_stream, state_chain_client) = { - state_chain_observer::client::StateChainClient::connect_without_account( - scope, - &settings.state_chain_ws_endpoint, - ) - .await? - }; - - let env_params = get_env_parameters(&state_chain_client).await; - - let epoch_source = EpochSource::builder( - scope, - state_chain_stream.clone(), - state_chain_client.clone(), - ) - .await; - - let witness_call = { - let witness_sender = witness_sender.clone(); - move |call: state_chain_runtime::RuntimeCall, _epoch_index| { - let witness_sender = witness_sender.clone(); - async move { - // Send may fail if there aren't any subscribers, - // but it is safe to ignore the error. - if let Ok(n) = witness_sender.send(call) { - tracing::info!("Broadcasting witnesser call to {} subscribers", n); - } - } - } - }; - - eth::start( - scope, - state_chain_client.clone(), - state_chain_stream.clone(), - settings, - env_params, - epoch_source.clone(), - witness_call, - ) - .await?; - - Ok(()) + // Send may fail if there aren't any subscribers, + // but it is safe to ignore the error. + if let Ok(n) = witness_sender.send(call.clone()) { + tracing::info!("Broadcasting witnesser call {:?} to {} subscribers", call, n); + } } - .boxed() - }) - .await - .unwrap() - }); + } + }; + + eth::start( + scope, + state_chain_client.clone(), + state_chain_stream.clone(), + settings, + env_params, + epoch_source.clone(), + witness_call, + ) + .await?; + + Ok(()) } diff --git a/api/bin/chainflip-ingress-egress-tracker/src/witnessing/btc.rs b/api/bin/chainflip-ingress-egress-tracker/src/witnessing/btc.rs new file mode 100644 index 0000000000..67b63c1fea --- /dev/null +++ b/api/bin/chainflip-ingress-egress-tracker/src/witnessing/btc.rs @@ -0,0 +1,533 @@ +use std::{ + collections::{HashMap, HashSet}, + env, + sync::{Arc, Mutex}, + time::Duration, +}; + +use anyhow::anyhow; +use async_trait::async_trait; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use tracing::{error, info}; +use utilities::task_scope; + +type TxHash = String; +type BlockHash = String; +type Address = String; + +#[derive(Deserialize)] +struct BestBlockResult { + result: BlockHash, +} + +#[derive(Deserialize)] +struct MemPoolResult { + result: Vec, +} + +#[derive(Deserialize, Clone)] +struct ScriptPubKey { + address: Option
, +} + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +struct Vout { + value: f64, + script_pub_key: ScriptPubKey, +} + +#[derive(Deserialize, Clone)] +struct RawTx { + txid: TxHash, + vout: Vec, +} + +#[derive(Deserialize)] +struct RawTxResult { + result: Option, +} + +#[derive(Deserialize, Clone)] +struct Block { + previousblockhash: BlockHash, + tx: Vec, +} + +#[derive(Deserialize)] +struct BlockResult { + result: Block, +} + +#[derive(Clone, Serialize)] +pub struct QueryResult { + confirmations: u32, + destination: Address, + value: f64, + tx_hash: TxHash, +} + +#[derive(Default, Clone)] +enum CacheStatus { + #[default] + Init, + Ready, + Down, +} + +#[derive(Default, Clone)] +struct Cache { + status: CacheStatus, + best_block_hash: BlockHash, + transactions: HashMap, + known_tx_hashes: HashSet, +} + +const SAFETY_MARGIN: u32 = 10; +const REFRESH_INTERVAL: u64 = 10; + +#[async_trait] +trait BtcNode { + async fn getrawmempool(&self) -> anyhow::Result>; + async fn getrawtransactions( + &self, + tx_hashes: Vec, + ) -> anyhow::Result>>; + async fn getbestblockhash(&self) -> anyhow::Result; + async fn getblock(&self, block_hash: BlockHash) -> anyhow::Result; +} + +struct BtcRpc; + +impl BtcRpc { + async fn call( + &self, + method: &str, + params: Vec<&str>, + ) -> anyhow::Result> { + info!("Calling {} with batch size of {}", method, params.len()); + let url = env::var("BTC_ENDPOINT").unwrap_or("http://127.0.0.1:8332".to_string()); + let body = params + .iter() + .map(|param| { + format!(r#"{{"jsonrpc":"1.0","id":0,"method":"{}","params":[{}]}}"#, method, param) + }) + .collect::>() + .join(","); + + let username = env::var("BTC_USERNAME").unwrap_or("flip".to_string()); + let password = env::var("BTC_PASSWORD").unwrap_or("flip".to_string()); + reqwest::Client::new() + .post(url) + .basic_auth(username, Some(password)) + .header("Content-Type", "text/plain") + .body(format!("[{}]", body)) + .send() + .await? + .json::>() + .await + .map_err(|err| anyhow!(err)) + .and_then(|result| { + if result.len() == params.len() { + Ok(result) + } else { + Err(anyhow!("Batched request returned an incorrect number of results")) + } + }) + } +} + +#[async_trait] +impl BtcNode for BtcRpc { + async fn getrawmempool(&self) -> anyhow::Result> { + self.call::("getrawmempool", vec![""]) + .await + .map(|x| x[0].result.clone()) + } + async fn getrawtransactions( + &self, + tx_hashes: Vec, + ) -> anyhow::Result>> { + let params = tx_hashes + .iter() + .map(|tx_hash| format!("\"{}\", true", tx_hash)) + .collect::>(); + Ok(self + .call::( + "getrawtransaction", + params.iter().map(|x| x.as_str()).collect::>(), + ) + .await? + .into_iter() + .map(|x| x.result) + .collect::>>()) + } + async fn getbestblockhash(&self) -> anyhow::Result { + self.call::("getbestblockhash", vec![""]) + .await + .map(|x| x[0].result.clone()) + } + async fn getblock(&self, block_hash: String) -> anyhow::Result { + self.call::("getblock", vec![&format!("\"{}\", 2", block_hash)]) + .await + .map(|x| x[0].result.clone()) + } +} + +async fn get_updated_cache(btc: T, previous_cache: Cache) -> anyhow::Result { + let all_mempool_transactions: Vec = btc.getrawmempool().await?; + let mut new_transactions: HashMap = Default::default(); + let mut new_known_tx_hashes: HashSet = Default::default(); + let previous_mempool: HashMap = previous_cache + .clone() + .transactions + .into_iter() + .filter_map(|(_, query_result)| { + if query_result.confirmations == 0 { + Some((query_result.tx_hash.clone(), query_result)) + } else { + None + } + }) + .collect(); + let unknown_mempool_transactions: Vec = all_mempool_transactions + .into_iter() + .filter(|tx_hash| { + if let Some(known_transaction) = previous_mempool.get(tx_hash) { + new_known_tx_hashes.insert(tx_hash.clone()); + new_transactions + .insert(known_transaction.destination.clone(), known_transaction.clone()); + } else if previous_cache.known_tx_hashes.contains(tx_hash) { + new_known_tx_hashes.insert(tx_hash.clone()); + } else { + return true + } + false + }) + .collect(); + let transactions: Vec = btc + .getrawtransactions(unknown_mempool_transactions) + .await? + .iter() + .filter_map(|x| x.clone()) + .collect(); + for tx in transactions { + for vout in tx.vout { + new_known_tx_hashes.insert(tx.txid.clone()); + if let Some(destination) = vout.script_pub_key.address { + new_transactions.insert( + destination.clone(), + QueryResult { + destination, + confirmations: 0, + value: vout.value, + tx_hash: tx.txid.clone(), + }, + ); + } + } + } + let block_hash = btc.getbestblockhash().await?; + if previous_cache.best_block_hash == block_hash { + for entry in previous_cache.transactions { + if entry.1.confirmations > 0 { + new_transactions.insert(entry.0, entry.1); + } + } + } else { + info!("New block found: {}", block_hash); + let mut block_hash_to_query = block_hash.clone(); + for confirmations in 1..SAFETY_MARGIN { + let block = btc.getblock(block_hash_to_query).await?; + for tx in block.tx { + for vout in tx.vout { + if let Some(destination) = vout.script_pub_key.address { + new_transactions.insert( + destination.clone(), + QueryResult { + destination, + confirmations, + value: vout.value, + tx_hash: tx.txid.clone(), + }, + ); + } + } + } + block_hash_to_query = block.previousblockhash; + } + } + Ok(Cache { + status: CacheStatus::Ready, + best_block_hash: block_hash, + transactions: new_transactions, + known_tx_hashes: new_known_tx_hashes, + }) +} + +fn lookup_transactions( + cache: &Cache, + addresses: &[String], +) -> anyhow::Result>> { + match cache.status { + CacheStatus::Ready => Ok(addresses + .iter() + .map(|address| cache.transactions.get(address).map(Clone::clone)) + .collect::>>()), + CacheStatus::Init => Err(anyhow!("Address cache is not initialised.")), + CacheStatus::Down => Err(anyhow!("Address cache is down - check btc connection.")), + } +} + +#[derive(Clone)] +pub struct BtcTracker { + cache: Arc>, +} + +impl BtcTracker { + pub fn lookup_transactions( + &self, + addresses: &[String], + ) -> anyhow::Result>> { + lookup_transactions(&self.cache.lock().unwrap(), addresses) + } +} + +pub async fn start(scope: &task_scope::Scope<'_, anyhow::Error>) -> BtcTracker { + let cache: Arc> = Default::default(); + scope.spawn({ + let cache = cache.clone(); + async move { + let mut interval = tokio::time::interval(Duration::from_secs(REFRESH_INTERVAL)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + interval.tick().await; + let cache_copy = cache.lock().unwrap().clone(); + match get_updated_cache(BtcRpc, cache_copy).await { + Ok(updated_cache) => { + let mut cache = cache.lock().unwrap(); + *cache = updated_cache; + }, + Err(err) => { + error!("Error when querying Bitcoin chain: {}", err); + let mut cache = cache.lock().unwrap(); + cache.status = CacheStatus::Down; + }, + } + } + } + }); + + BtcTracker { cache } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[derive(Clone)] + struct MockBtcRpc { + mempool: Vec, + latest_block_hash: String, + blocks: HashMap, + } + + #[async_trait] + impl BtcNode for MockBtcRpc { + async fn getrawmempool(&self) -> anyhow::Result> { + Ok(self.mempool.iter().map(|x| x.txid.clone()).collect()) + } + async fn getrawtransactions( + &self, + tx_hashes: Vec, + ) -> anyhow::Result>> { + let mut result: Vec> = Default::default(); + for hash in tx_hashes { + for tx in self.mempool.clone() { + if tx.txid == hash { + result.push(Some(tx)) + } else { + result.push(None) + } + } + } + Ok(result) + } + async fn getbestblockhash(&self) -> anyhow::Result { + Ok(self.latest_block_hash.clone()) + } + async fn getblock(&self, block_hash: String) -> anyhow::Result { + self.blocks.get(&block_hash).cloned().ok_or(anyhow!("Block missing")) + } + } + + #[tokio::test] + async fn multiple_outputs_in_one_tx() { + let mempool = vec![RawTx { + txid: "tx1".into(), + vout: vec![ + Vout { + value: 0.8, + script_pub_key: ScriptPubKey { address: Some("address1".into()) }, + }, + Vout { + value: 1.2, + script_pub_key: ScriptPubKey { address: Some("address2".into()) }, + }, + ], + }]; + let latest_block_hash = "15".to_string(); + let mut blocks: HashMap = Default::default(); + for i in 1..16 { + blocks.insert( + i.to_string(), + Block { previousblockhash: (i - 1).to_string(), tx: vec![] }, + ); + } + let btc = MockBtcRpc { mempool, latest_block_hash, blocks }; + let cache: Cache = Default::default(); + let cache = get_updated_cache(btc, cache).await.unwrap(); + let result = lookup_transactions(&cache, &["address1".into(), "address2".into()]).unwrap(); + assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); + assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); + } + + #[tokio::test] + async fn mempool_updates() { + let mempool = vec![ + RawTx { + txid: "tx1".into(), + vout: vec![Vout { + value: 0.8, + script_pub_key: ScriptPubKey { address: Some("address1".into()) }, + }], + }, + RawTx { + txid: "tx2".into(), + vout: vec![Vout { + value: 0.8, + script_pub_key: ScriptPubKey { address: Some("address2".into()) }, + }], + }, + ]; + let latest_block_hash = "15".to_string(); + let mut blocks: HashMap = Default::default(); + for i in 1..16 { + blocks.insert( + i.to_string(), + Block { previousblockhash: (i - 1).to_string(), tx: vec![] }, + ); + } + let mut btc = MockBtcRpc { mempool: mempool.clone(), latest_block_hash, blocks }; + let cache: Cache = Default::default(); + let cache = get_updated_cache(btc.clone(), cache).await.unwrap(); + let result = + lookup_transactions(&cache, &["address1".into(), "address2".into(), "address3".into()]) + .unwrap(); + assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); + assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); + assert!(result[2].is_none()); + + btc.mempool.append(&mut vec![RawTx { + txid: "tx3".into(), + vout: vec![Vout { + value: 0.8, + script_pub_key: ScriptPubKey { address: Some("address3".into()) }, + }], + }]); + let cache = get_updated_cache(btc.clone(), cache.clone()).await.unwrap(); + let result = + lookup_transactions(&cache, &["address1".into(), "address2".into(), "address3".into()]) + .unwrap(); + assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); + assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); + assert_eq!(result[2].as_ref().unwrap().destination, "address3".to_string()); + + btc.mempool.remove(0); + let cache = get_updated_cache(btc.clone(), cache.clone()).await.unwrap(); + let result = + lookup_transactions(&cache, &["address1".into(), "address2".into(), "address3".into()]) + .unwrap(); + assert!(result[0].is_none()); + assert_eq!(result[1].as_ref().unwrap().destination, "address2".to_string()); + assert_eq!(result[2].as_ref().unwrap().destination, "address3".to_string()); + } + + #[tokio::test] + async fn blocks() { + let mempool = vec![]; + let latest_block_hash = "15".to_string(); + let mut blocks: HashMap = Default::default(); + for i in 1..19 { + blocks.insert( + i.to_string(), + Block { previousblockhash: (i - 1).to_string(), tx: vec![] }, + ); + } + blocks.insert( + "15".to_string(), + Block { + previousblockhash: "14".to_string(), + tx: vec![RawTx { + txid: "tx1".into(), + vout: vec![Vout { + value: 12.5, + script_pub_key: ScriptPubKey { address: Some("address1".into()) }, + }], + }], + }, + ); + let mut btc = MockBtcRpc { mempool: mempool.clone(), latest_block_hash, blocks }; + let cache: Cache = Default::default(); + let cache = get_updated_cache(btc.clone(), cache).await.unwrap(); + let result = lookup_transactions(&cache, &["address1".into()]).unwrap(); + assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); + assert_eq!(result[0].as_ref().unwrap().confirmations, 1); + + btc.latest_block_hash = "16".to_string(); + let cache = get_updated_cache(btc.clone(), cache.clone()).await.unwrap(); + let result = lookup_transactions(&cache, &["address1".into()]).unwrap(); + assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); + assert_eq!(result[0].as_ref().unwrap().confirmations, 2); + } + + #[tokio::test] + async fn report_oldest_tx_only() { + let mempool = vec![RawTx { + txid: "tx2".into(), + vout: vec![Vout { + value: 0.8, + script_pub_key: ScriptPubKey { address: Some("address1".into()) }, + }], + }]; + let latest_block_hash = "15".to_string(); + let mut blocks: HashMap = Default::default(); + for i in 1..16 { + blocks.insert( + i.to_string(), + Block { previousblockhash: (i - 1).to_string(), tx: vec![] }, + ); + } + blocks.insert( + "13".to_string(), + Block { + previousblockhash: "12".to_string(), + tx: vec![RawTx { + txid: "tx1".into(), + vout: vec![Vout { + value: 12.5, + script_pub_key: ScriptPubKey { address: Some("address1".into()) }, + }], + }], + }, + ); + let btc = MockBtcRpc { mempool: mempool.clone(), latest_block_hash, blocks }; + let cache: Cache = Default::default(); + let cache = get_updated_cache(btc.clone(), cache).await.unwrap(); + let result = lookup_transactions(&cache, &["address1".into()]).unwrap(); + assert_eq!(result[0].as_ref().unwrap().destination, "address1".to_string()); + assert_eq!(result[0].as_ref().unwrap().confirmations, 3); + assert_eq!(result[0].as_ref().unwrap().value, 12.5); + } +}