diff --git a/Cargo.lock b/Cargo.lock index bbad0f58ed..ae44d390bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1628,12 +1628,14 @@ name = "chainflip-broker-api" version = "1.7.0" dependencies = [ "anyhow", + "cf-chains", "chainflip-api", "clap", "custom-rpc", "futures", "hex", "jsonrpsee 0.23.2", + "sc-rpc", "serde", "sp-core 34.0.0", "sp-rpc", @@ -2558,6 +2560,7 @@ dependencies = [ "jsonrpsee 0.23.2", "log", "pallet-cf-governance", + "pallet-cf-ingress-egress", "pallet-cf-pools", "pallet-cf-swapping", "pallet-cf-witnesser", diff --git a/api/bin/chainflip-broker-api/Cargo.toml b/api/bin/chainflip-broker-api/Cargo.toml index be7606f79f..4ab0d5ec54 100644 --- a/api/bin/chainflip-broker-api/Cargo.toml +++ b/api/bin/chainflip-broker-api/Cargo.toml @@ -23,6 +23,7 @@ workspace = true [dependencies] chainflip-api = { workspace = true } +cf-chains = { workspace = true, default-features = true } cf-utilities = { workspace = true, default-features = true } custom-rpc = { workspace = true } @@ -32,8 +33,9 @@ futures = { workspace = true } hex = { workspace = true, default-features = true } jsonrpsee = { workspace = true, features = ["full"] } serde = { workspace = true, default-features = true, features = ["derive"] } -sp-core = { workspace = true } -sp-rpc = { workspace = true } +sp-core = { workspace = true, default-features = true } +sp-rpc = { workspace = true, default-features = true } +sc-rpc = { workspace = true, default-features = true } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } diff --git a/api/bin/chainflip-broker-api/src/main.rs b/api/bin/chainflip-broker-api/src/main.rs index 5dcd076080..760781bd11 100644 --- a/api/bin/chainflip-broker-api/src/main.rs +++ b/api/bin/chainflip-broker-api/src/main.rs @@ -4,19 +4,26 @@ use cf_utilities::{ }; use chainflip_api::{ self, - primitives::{AccountRole, Affiliates, Asset, BasisPoints, CcmChannelMetadata, DcaParameters}, + primitives::{ + state_chain_runtime::runtime_apis::{ChainAccounts, TaintedTransactionEvents}, + AccountRole, Affiliates, Asset, BasisPoints, CcmChannelMetadata, DcaParameters, + }, settings::StateChain, - AccountId32, AddressString, BrokerApi, OperatorApi, RefundParameters, StateChainApi, - SwapDepositAddress, WithdrawFeesDetail, + AccountId32, AddressString, BlockUpdate, BrokerApi, ChainApi, DepositMonitorApi, OperatorApi, + RefundParameters, SignedExtrinsicApi, StateChainApi, SwapDepositAddress, TransactionInId, + WithdrawFeesDetail, }; use clap::Parser; -use futures::FutureExt; +use custom_rpc::CustomApiClient; +use futures::{FutureExt, StreamExt}; use jsonrpsee::{ - core::{async_trait, ClientError}, + core::{async_trait, ClientError, SubscriptionResult}, proc_macros::rpc, server::ServerBuilder, types::{ErrorCode, ErrorObject, ErrorObjectOwned}, + PendingSubscriptionSink, }; +use serde::{Deserialize, Serialize}; use std::{ path::PathBuf, sync::{atomic::AtomicBool, Arc}, @@ -59,6 +66,12 @@ impl From for ErrorObjectOwned { } } +#[derive(Serialize, Deserialize)] +pub enum GetOpenDepositChannelsQuery { + All, + Mine, +} + #[rpc(server, client, namespace = "broker")] pub trait Rpc { #[method(name = "register_account", aliases = ["broker_registerAccount"])] @@ -84,6 +97,18 @@ pub trait Rpc { asset: Asset, destination_address: AddressString, ) -> RpcResult; + + #[method(name = "mark_transaction_as_tainted", aliases = ["broker_markTransactionAsTainted"])] + async fn mark_transaction_as_tainted(&self, tx_id: TransactionInId) -> RpcResult<()>; + + #[method(name = "get_open_deposit_channels", aliases = ["broker_getOpenDepositChannels"])] + async fn get_open_deposit_channels( + &self, + query: GetOpenDepositChannelsQuery, + ) -> RpcResult; + + #[subscription(name = "subscribe_tainted_transaction_events", item = BlockUpdate)] + async fn subscribe_tainted_transaction_events(&self) -> SubscriptionResult; } pub struct RpcServerImpl { @@ -149,6 +174,80 @@ impl RpcServer for RpcServerImpl { ) -> RpcResult { Ok(self.api.broker_api().withdraw_fees(asset, destination_address).await?) } + + async fn mark_transaction_as_tainted(&self, tx_id: TransactionInId) -> RpcResult<()> { + self.api + .deposit_monitor_api() + .mark_transaction_as_tainted(tx_id) + .await + .map_err(BrokerApiError::Other)?; + Ok(()) + } + + async fn get_open_deposit_channels( + &self, + query: GetOpenDepositChannelsQuery, + ) -> RpcResult { + let account_id = match query { + GetOpenDepositChannelsQuery::All => None, + GetOpenDepositChannelsQuery::Mine => Some(self.api.state_chain_client.account_id()), + }; + + self.api + .state_chain_client + .base_rpc_client + .raw_rpc_client + .cf_get_open_deposit_channels(account_id, None) + .await + .map_err(BrokerApiError::ClientError) + } + + async fn subscribe_tainted_transaction_events( + &self, + pending_sink: PendingSubscriptionSink, + ) -> SubscriptionResult { + let state_chain_client = self.api.state_chain_client.clone(); + let sink = pending_sink.accept().await.map_err(|e| e.to_string())?; + tokio::spawn(async move { + // Note we construct the subscription here rather than relying on the custom-rpc + // subscription. This is because we want to use finalized blocks. + // TODO: allow custom rpc subscriptions to use finalized blocks. + let mut finalized_block_stream = state_chain_client.finalized_block_stream().await; + while let Some(block) = finalized_block_stream.next().await { + match state_chain_client + .base_rpc_client + .raw_rpc_client + .cf_get_tainted_transaction_events(Some(block.hash)) + .await + { + Ok(events) => { + // We only want to send a notification if there have been events. + // If other chains are added, they have to be considered here as + // well. + if events.btc_events.is_empty() { + continue; + } + + let block_update = BlockUpdate { + block_hash: block.hash, + block_number: block.number, + data: events, + }; + + if sink + .send(sc_rpc::utils::to_sub_message(&sink, &block_update)) + .await + .is_err() + { + break; + } + }, + Err(_) => break, + } + } + }); + Ok(()) + } } #[derive(Parser, Debug, Clone, Default)] diff --git a/api/lib/src/lib.rs b/api/lib/src/lib.rs index 033d691f2a..03096f2bcc 100644 --- a/api/lib/src/lib.rs +++ b/api/lib/src/lib.rs @@ -7,7 +7,8 @@ use cf_chains::{ dot::PolkadotAccountId, evm::to_evm_address, sol::SolAddress, - CcmChannelMetadata, ChannelRefundParametersGeneric, ForeignChain, ForeignChainAddress, + CcmChannelMetadata, Chain, ChainCrypto, ChannelRefundParametersGeneric, ForeignChain, + ForeignChainAddress, }; pub use cf_primitives::{AccountRole, Affiliates, Asset, BasisPoints, ChannelId, SemVer}; use cf_primitives::{BlockNumber, DcaParameters, NetworkEnvironment, Price}; @@ -28,7 +29,7 @@ pub mod primitives { pub type RedemptionAmount = pallet_cf_funding::RedemptionAmount; pub use cf_chains::{ address::{EncodedAddress, ForeignChainAddress}, - CcmChannelMetadata, CcmDepositMetadata, + CcmChannelMetadata, CcmDepositMetadata, Chain, ChainCrypto, }; } pub use cf_chains::eth::Address as EthereumAddress; @@ -150,6 +151,10 @@ impl StateChainApi { self.state_chain_client.clone() } + pub fn deposit_monitor_api(&self) -> Arc { + self.state_chain_client.clone() + } + pub fn query_api(&self) -> queries::QueryApi { queries::QueryApi { state_chain_client: self.state_chain_client.clone() } } @@ -167,6 +172,8 @@ impl BrokerApi for StateChainClient { impl OperatorApi for StateChainClient {} #[async_trait] impl ValidatorApi for StateChainClient {} +#[async_trait] +impl DepositMonitorApi for StateChainClient {} #[async_trait] pub trait ValidatorApi: SimpleSubmissionApi { @@ -548,6 +555,31 @@ pub fn clean_foreign_chain_address(chain: ForeignChain, address: &str) -> Result }) } +pub type TransactionInIdFor = <::ChainCrypto as ChainCrypto>::TransactionInId; + +#[derive(Serialize, Deserialize)] +pub enum TransactionInId { + Bitcoin(TransactionInIdFor), + // other variants reserved for other chains. +} + +#[async_trait] +pub trait DepositMonitorApi: + SignedExtrinsicApi + StorageApi + Sized + Send + Sync + 'static +{ + async fn mark_transaction_as_tainted(&self, tx_id: TransactionInId) -> Result { + match tx_id { + TransactionInId::Bitcoin(tx_id) => + self.simple_submission_with_dry_run( + state_chain_runtime::RuntimeCall::BitcoinIngressEgress( + pallet_cf_ingress_egress::Call::mark_transaction_as_tainted { tx_id }, + ), + ) + .await, + } + } +} + #[derive(Debug, Zeroize, PartialEq, Eq)] /// Public and Secret keys as bytes pub struct KeyPair { diff --git a/state-chain/custom-rpc/Cargo.toml b/state-chain/custom-rpc/Cargo.toml index 18fc15a789..66d10ba3a1 100644 --- a/state-chain/custom-rpc/Cargo.toml +++ b/state-chain/custom-rpc/Cargo.toml @@ -29,6 +29,7 @@ pallet-cf-governance = { workspace = true, default-features = true } pallet-cf-pools = { workspace = true, default-features = true } pallet-cf-witnesser = { workspace = true, default-features = true } pallet-cf-swapping = { workspace = true, default-features = true } +pallet-cf-ingress-egress = { workspace = true, default-features = true } sp-api = { workspace = true, default-features = true } sp-core = { workspace = true, default-features = true } diff --git a/state-chain/custom-rpc/src/lib.rs b/state-chain/custom-rpc/src/lib.rs index e67cd48b50..a5a9a05d22 100644 --- a/state-chain/custom-rpc/src/lib.rs +++ b/state-chain/custom-rpc/src/lib.rs @@ -52,9 +52,10 @@ use state_chain_runtime::{ PendingBroadcasts, PendingTssCeremonies, RedemptionsInfo, SolanaNonces, }, runtime_apis::{ - AuctionState, BoostPoolDepth, BoostPoolDetails, BrokerInfo, CustomRuntimeApi, - DispatchErrorWithMessage, ElectoralRuntimeApi, FailingWitnessValidators, - LiquidityProviderBoostPoolInfo, LiquidityProviderInfo, RuntimeApiPenalty, ValidatorInfo, + AuctionState, BoostPoolDepth, BoostPoolDetails, BrokerInfo, ChainAccounts, + CustomRuntimeApi, DispatchErrorWithMessage, ElectoralRuntimeApi, FailingWitnessValidators, + LiquidityProviderBoostPoolInfo, LiquidityProviderInfo, RuntimeApiPenalty, + TaintedTransactionEvents, ValidatorInfo, }, safe_mode::RuntimeSafeMode, Hash, NetworkFee, SolanaInstance, @@ -938,6 +939,19 @@ pub trait CustomApi { proposed_votes: Vec, at: Option, ) -> RpcResult>; + + #[method(name = "get_open_deposit_channels")] + fn cf_get_open_deposit_channels( + &self, + broker: Option, + at: Option, + ) -> RpcResult; + + #[method(name = "get_tainted_transaction_events")] + fn cf_get_tainted_transaction_events( + &self, + at: Option, + ) -> RpcResult; } /// An RPC extension for the state chain node. @@ -1190,6 +1204,7 @@ where cf_failed_call_arbitrum(broadcast_id: BroadcastId) -> Option<::Transaction>, cf_boost_pools_depth() -> Vec, cf_pool_price(from_asset: Asset, to_asset: Asset) -> Option, + cf_get_open_deposit_channels(account_id: Option) -> ChainAccounts, } pass_through_and_flatten! { @@ -1725,6 +1740,13 @@ where ) -> RpcResult> { self.with_runtime_api(at, |api, hash| api.cf_filter_votes(hash, validator, proposed_votes)) } + + fn cf_get_tainted_transaction_events( + &self, + at: Option, + ) -> RpcResult { + self.with_runtime_api(at, |api, hash| api.cf_tainted_transaction_events(hash)) + } } impl CustomRpc diff --git a/state-chain/runtime/src/lib.rs b/state-chain/runtime/src/lib.rs index d09211334e..c7ab924f40 100644 --- a/state-chain/runtime/src/lib.rs +++ b/state-chain/runtime/src/lib.rs @@ -28,7 +28,8 @@ use crate::{ runtime_decl_for_custom_runtime_api::CustomRuntimeApiV1, AuctionState, BoostPoolDepth, BoostPoolDetails, BrokerInfo, DispatchErrorWithMessage, FailingWitnessValidators, LiquidityProviderBoostPoolInfo, LiquidityProviderInfo, RuntimeApiPenalty, - SimulateSwapAdditionalOrder, SimulatedSwapInformation, ValidatorInfo, + SimulateSwapAdditionalOrder, SimulatedSwapInformation, TaintedTransactionEvents, + ValidatorInfo, }, }; use cf_amm::{ @@ -71,8 +72,9 @@ use pallet_cf_pools::{ AskBidMap, AssetPair, HistoricalEarnedFees, OrderId, PoolLiquidity, PoolOrderbook, PoolPriceV1, PoolPriceV2, UnidirectionalPoolDepth, }; +use runtime_apis::ChainAccounts; -use crate::chainflip::EvmLimit; +use crate::{chainflip::EvmLimit, runtime_apis::TaintedTransactionEvent}; use pallet_cf_reputation::{ExclusionList, HeartbeatQualification, ReputationPointsQualification}; use pallet_cf_swapping::SwapLegInfo; @@ -2065,8 +2067,42 @@ impl_runtime_apis! { fn cf_pools() -> Vec> { LiquidityPools::pools() } + + fn cf_get_open_deposit_channels(account_id: Option) -> ChainAccounts { + let btc_chain_accounts = pallet_cf_ingress_egress::DepositChannelLookup::::iter_values() + .filter(|channel_details| account_id.is_none() || Some(&channel_details.owner) == account_id.as_ref()) + .map(|channel_details| channel_details.deposit_channel.address) + .collect::>(); + + ChainAccounts { + btc_chain_accounts + } + } + + fn cf_tainted_transaction_events() -> crate::runtime_apis::TaintedTransactionEvents { + let btc_events = System::read_events_no_consensus().filter_map(|event_record| { + if let RuntimeEvent::BitcoinIngressEgress(btc_ie_event) = event_record.event { + match btc_ie_event { + pallet_cf_ingress_egress::Event::TaintedTransactionReportExpired{ account_id, tx_id } => + Some(TaintedTransactionEvent::TaintedTransactionReportExpired{ account_id, tx_id }), + pallet_cf_ingress_egress::Event::TaintedTransactionReportReceived{ account_id, tx_id, expires_at: _ } => + Some(TaintedTransactionEvent::TaintedTransactionReportReceived{account_id, tx_id }), + pallet_cf_ingress_egress::Event::TaintedTransactionRejected{ broadcast_id, tx_id } => + Some(TaintedTransactionEvent::TaintedTransactionRejected{ refund_broadcast_id: broadcast_id, tx_id: tx_id.id.tx_id }), + _ => None, + } + } else { + None + } + }).collect(); + + TaintedTransactionEvents { + btc_events + } + } } + impl monitoring_apis::MonitoringRuntimeApi for Runtime { fn cf_authorities() -> AuthoritiesInfo { diff --git a/state-chain/runtime/src/runtime_apis.rs b/state-chain/runtime/src/runtime_apis.rs index 5c0479a739..7d8c19dd93 100644 --- a/state-chain/runtime/src/runtime_apis.rs +++ b/state-chain/runtime/src/runtime_apis.rs @@ -4,7 +4,8 @@ use cf_amm::{ range_orders::Liquidity, }; use cf_chains::{ - assets::any::AssetMap, eth::Address as EthereumAddress, Chain, ForeignChainAddress, + self, assets::any::AssetMap, eth::Address as EthereumAddress, Chain, ChainCrypto, + ForeignChainAddress, }; use cf_primitives::{ AccountRole, Asset, AssetAmount, BlockNumber, BroadcastId, EpochIndex, FlipBalance, @@ -183,6 +184,39 @@ pub struct FailingWitnessValidators { pub validators: Vec<(cf_primitives::AccountId, String, bool)>, } +type ChainAccountFor = ::ChainAccount; + +#[derive(Serialize, Deserialize, Encode, Decode, Eq, PartialEq, TypeInfo, Debug, Clone)] +pub struct ChainAccounts { + pub btc_chain_accounts: Vec>, +} + +#[derive(Serialize, Deserialize, Encode, Decode, Eq, PartialEq, TypeInfo, Debug, Clone)] +pub enum TaintedTransactionEvent { + TaintedTransactionReportReceived { + account_id: ::AccountId, + tx_id: TxId, + }, + + TaintedTransactionReportExpired { + account_id: ::AccountId, + tx_id: TxId, + }, + + TaintedTransactionRejected { + refund_broadcast_id: BroadcastId, + tx_id: TxId, + }, +} + +type TaintedTransactionEventFor = + TaintedTransactionEvent<<::ChainCrypto as ChainCrypto>::TransactionInId>; + +#[derive(Serialize, Deserialize, Encode, Decode, Eq, PartialEq, TypeInfo, Debug, Clone)] +pub struct TaintedTransactionEvents { + pub btc_events: Vec>, +} + decl_runtime_apis!( /// Definition for all runtime API interfaces. pub trait CustomRuntimeApi { @@ -301,6 +335,8 @@ decl_runtime_apis!( fn cf_swap_limits() -> SwapLimits; fn cf_lp_events() -> Vec>; fn cf_minimum_chunk_size(asset: Asset) -> AssetAmount; + fn cf_get_open_deposit_channels(account_id: Option) -> ChainAccounts; + fn cf_tainted_transaction_events() -> TaintedTransactionEvents; } );