From 4189824848c943e72357f105fb14dc08318d9e28 Mon Sep 17 00:00:00 2001 From: Maxim Urschumzew Date: Wed, 30 Oct 2024 09:55:06 +0100 Subject: [PATCH 1/7] Squashed commit of the following: commit 3166701641c19830c6777bf6959f95b13f5a1f5b Author: Maxim Urschumzew Date: Tue Oct 29 15:27:51 2024 +0100 Add `subscribe_tainted_btc_transaction_events` method to Broker API. It which forwards the node subscription of the same name. commit 010772dcf3db43d58d9066c8348cb80db928c981 Author: Maxim Urschumzew Date: Tue Oct 29 13:56:07 2024 +0100 Add `open_btc_deposit_channels` method to broker api. commit 9ea7e6fb2a2ebf5423ec67406a4256932f82e55e Author: Maxim Urschumzew Date: Tue Oct 29 11:19:07 2024 +0100 Add subscription endpoint for tainted transaction events. commit e81028a46d2a823f52b6ecbd84d012db33e468bb Author: Maxim Urschumzew Date: Mon Oct 28 15:12:47 2024 +0100 Implement `open_btc_deposit_channels`. commit 439ddd1a02a3e370e004e87bf214ed6f92d13b0d Author: Maxim Urschumzew Date: Mon Oct 28 14:51:22 2024 +0100 Add boilerplate for `open_btc_deposit_channels`. commit a3835f13dc231f80a84e86c529f4de2b88b52964 Author: Maxim Urschumzew Date: Mon Oct 28 10:25:51 2024 +0100 Add endpoint for marking btc tx as tainted. --- Cargo.lock | 3 + api/bin/chainflip-broker-api/Cargo.toml | 2 + api/bin/chainflip-broker-api/src/main.rs | 77 ++++++++++++++++++++++-- api/lib/src/lib.rs | 20 ++++++ state-chain/custom-rpc/Cargo.toml | 1 + state-chain/custom-rpc/src/lib.rs | 26 +++++++- state-chain/runtime/src/lib.rs | 35 ++++++++++- state-chain/runtime/src/runtime_apis.rs | 25 ++++++++ 8 files changed, 182 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 16c205dd85..066bc7b03d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1617,12 +1617,14 @@ name = "chainflip-broker-api" version = "1.7.0" dependencies = [ "anyhow", + "cf-chains", "chainflip-api", "clap", "custom-rpc", "futures", "hex", "jsonrpsee 0.23.2", + "sc-rpc", "serde", "sp-core 34.0.0", "sp-rpc", @@ -2547,6 +2549,7 @@ dependencies = [ "jsonrpsee 0.23.2", "log", "pallet-cf-governance", + "pallet-cf-ingress-egress", "pallet-cf-pools", "pallet-cf-swapping", "pallet-cf-witnesser", diff --git a/api/bin/chainflip-broker-api/Cargo.toml b/api/bin/chainflip-broker-api/Cargo.toml index be7606f79f..a47aa46526 100644 --- a/api/bin/chainflip-broker-api/Cargo.toml +++ b/api/bin/chainflip-broker-api/Cargo.toml @@ -23,6 +23,7 @@ workspace = true [dependencies] chainflip-api = { workspace = true } +cf-chains = { workspace = true, default-features = true } cf-utilities = { workspace = true, default-features = true } custom-rpc = { workspace = true } @@ -34,6 +35,7 @@ jsonrpsee = { workspace = true, features = ["full"] } serde = { workspace = true, default-features = true, features = ["derive"] } sp-core = { workspace = true } sp-rpc = { workspace = true } +sc-rpc = { workspace = true, default-features = true } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } diff --git a/api/bin/chainflip-broker-api/src/main.rs b/api/bin/chainflip-broker-api/src/main.rs index 8a80b4aee1..c8120f7142 100644 --- a/api/bin/chainflip-broker-api/src/main.rs +++ b/api/bin/chainflip-broker-api/src/main.rs @@ -6,24 +6,30 @@ use cf_utilities::{ }; use chainflip_api::{ self, - primitives::{AccountRole, Affiliates, Asset, BasisPoints, CcmChannelMetadata, DcaParameters}, + primitives::{ + state_chain_runtime::runtime_apis::TaintedBtcTransactionEvent, AccountRole, Affiliates, + Asset, BasisPoints, CcmChannelMetadata, DcaParameters, + }, settings::StateChain, - AccountId32, AddressString, BrokerApi, OperatorApi, RefundParameters, StateChainApi, - SwapDepositAddress, SwapPayload, WithdrawFeesDetail, + AccountId32, AddressString, BlockUpdate, BrokerApi, DepositMonitorApi, OperatorApi, + RefundParameters, SignedExtrinsicApi, StateChainApi, SwapDepositAddress, SwapPayload, + WithdrawFeesDetail, }; use clap::Parser; -use futures::FutureExt; +use custom_rpc::CustomApiClient; +use futures::{FutureExt, TryStreamExt}; use jsonrpsee::{ core::{async_trait, ClientError}, proc_macros::rpc, server::ServerBuilder, types::{ErrorCode, ErrorObject, ErrorObjectOwned}, + PendingSubscriptionSink, }; use std::{ path::PathBuf, sync::{atomic::AtomicBool, Arc}, }; -use tracing::log; +use tracing::{event, log, Level}; #[derive(thiserror::Error, Debug)] pub enum BrokerApiError { @@ -100,6 +106,17 @@ pub trait Rpc { affiliate_fees: Option>, dca_parameters: Option, ) -> RpcResult; + + #[method(name = "mark_btc_transaction_as_tainted", aliases = ["broker_markBtcTransactionAsTainted"])] + async fn mark_btc_transaction_as_tainted(&self, tx_id: cf_chains::btc::Hash) -> RpcResult<()>; + + #[method(name = "open_btc_deposit_channels", aliases = ["broker_openBtcDepositChannels"])] + async fn open_btc_deposit_channels( + &self, + ) -> RpcResult::ChainAccount>>; + + #[subscription(name = "subscribe_tainted_btc_transaction_events", item = Result>,String>)] + async fn subscribe_tainted_btc_transaction_events(&self); } pub struct RpcServerImpl { @@ -194,6 +211,56 @@ impl RpcServer for RpcServerImpl { ) .await?) } + + async fn mark_btc_transaction_as_tainted(&self, tx_id: cf_chains::btc::Hash) -> RpcResult<()> { + self.api + .deposit_monitor_api() + .mark_btc_transaction_as_tainted(tx_id) + .await + .map_err(BrokerApiError::Other) + } + + async fn open_btc_deposit_channels( + &self, + ) -> RpcResult::ChainAccount>> { + let account_id = self.api.state_chain_client.account_id(); + + self.api + .state_chain_client + .base_rpc_client + .raw_rpc_client + .cf_open_btc_deposit_channels(account_id, None) + .await + .map_err(BrokerApiError::ClientError) + } + + async fn subscribe_tainted_btc_transaction_events( + &self, + pending_sink: PendingSubscriptionSink, + ) { + let result = self + .api + .state_chain_client + .base_rpc_client + .raw_rpc_client + .cf_subscribe_tainted_btc_transaction_events() + .await; + + match result { + Ok(subscription) => { + tokio::spawn(async { + sc_rpc::utils::pipe_from_stream( + pending_sink, + subscription.map_err(|err| format!("{err}")), + ) + .await + }); + }, + Err(err) => { + event!(Level::ERROR, "Could not subscribe to `tainted_btc_transaction_events` subscription on the node. Error: {err}"); + }, + } + } } #[derive(Parser, Debug, Clone, Default)] diff --git a/api/lib/src/lib.rs b/api/lib/src/lib.rs index 1c0e5c33f2..90dca8d175 100644 --- a/api/lib/src/lib.rs +++ b/api/lib/src/lib.rs @@ -159,6 +159,10 @@ impl StateChainApi { self.state_chain_client.clone() } + pub fn deposit_monitor_api(&self) -> Arc { + self.state_chain_client.clone() + } + pub fn query_api(&self) -> queries::QueryApi { queries::QueryApi { state_chain_client: self.state_chain_client.clone() } } @@ -176,6 +180,8 @@ impl BrokerApi for StateChainClient { impl OperatorApi for StateChainClient {} #[async_trait] impl ValidatorApi for StateChainClient {} +#[async_trait] +impl DepositMonitorApi for StateChainClient {} #[async_trait] pub trait ValidatorApi: SimpleSubmissionApi { @@ -615,6 +621,20 @@ pub fn clean_foreign_chain_address(chain: ForeignChain, address: &str) -> Result }) } +#[async_trait] +pub trait DepositMonitorApi: + SignedExtrinsicApi + StorageApi + Sized + Send + Sync + 'static +{ + async fn mark_btc_transaction_as_tainted(&self, tx_id: cf_chains::btc::Hash) -> Result<()> { + let _ = self + .submit_signed_extrinsic(state_chain_runtime::RuntimeCall::BitcoinIngressEgress( + pallet_cf_ingress_egress::Call::mark_transaction_as_tainted { tx_id }, + )) + .await; + Ok(()) + } +} + #[derive(Debug, Zeroize, PartialEq, Eq)] /// Public and Secret keys as bytes pub struct KeyPair { diff --git a/state-chain/custom-rpc/Cargo.toml b/state-chain/custom-rpc/Cargo.toml index 18fc15a789..66d10ba3a1 100644 --- a/state-chain/custom-rpc/Cargo.toml +++ b/state-chain/custom-rpc/Cargo.toml @@ -29,6 +29,7 @@ pallet-cf-governance = { workspace = true, default-features = true } pallet-cf-pools = { workspace = true, default-features = true } pallet-cf-witnesser = { workspace = true, default-features = true } pallet-cf-swapping = { workspace = true, default-features = true } +pallet-cf-ingress-egress = { workspace = true, default-features = true } sp-api = { workspace = true, default-features = true } sp-core = { workspace = true, default-features = true } diff --git a/state-chain/custom-rpc/src/lib.rs b/state-chain/custom-rpc/src/lib.rs index a84e888a1f..180f3d47be 100644 --- a/state-chain/custom-rpc/src/lib.rs +++ b/state-chain/custom-rpc/src/lib.rs @@ -54,7 +54,8 @@ use state_chain_runtime::{ runtime_apis::{ AuctionState, BoostPoolDepth, BoostPoolDetails, BrokerInfo, CustomRuntimeApi, DispatchErrorWithMessage, ElectoralRuntimeApi, FailingWitnessValidators, - LiquidityProviderBoostPoolInfo, LiquidityProviderInfo, RuntimeApiPenalty, ValidatorInfo, + LiquidityProviderBoostPoolInfo, LiquidityProviderInfo, RuntimeApiPenalty, + TaintedBtcTransactionEvent, ValidatorInfo, }, safe_mode::RuntimeSafeMode, Hash, NetworkFee, SolanaInstance, @@ -953,6 +954,16 @@ pub trait CustomApi { retry_duration: u32, at: Option, ) -> RpcResult<()>; + + #[method(name = "open_btc_deposit_channels")] + fn cf_open_btc_deposit_channels( + &self, + broker: state_chain_runtime::AccountId, + at: Option, + ) -> RpcResult::ChainAccount>>; + + #[subscription(name = "subscribe_tainted_btc_transaction_events", item = BlockUpdate>)] + fn cf_subscribe_tainted_btc_transaction_events(&self); } /// An RPC extension for the state chain node. @@ -1221,6 +1232,7 @@ where ) -> PoolPairsMap, cf_validate_dca_params(number_of_chunks: u32, chunk_interval: u32) -> (), cf_validate_refund_params(retry_duration: u32) -> (), + cf_open_btc_deposit_channels(account_id: state_chain_runtime::AccountId) -> Vec<::ChainAccount>, } fn cf_current_compatibility_version(&self) -> RpcResult { @@ -1742,6 +1754,18 @@ where ) -> RpcResult> { self.with_runtime_api(at, |api, hash| api.cf_filter_votes(hash, validator, proposed_votes)) } + + fn cf_subscribe_tainted_btc_transaction_events( + &self, + sink: jsonrpsee::PendingSubscriptionSink, + ) { + self.new_subscription( + false, /* only_on_changes */ + true, /* end_on_error */ + sink, + |client, hash| Ok(client.runtime_api().cf_tainted_btc_transaction_events(hash)?), + ) + } } impl CustomRpc diff --git a/state-chain/runtime/src/lib.rs b/state-chain/runtime/src/lib.rs index cc73d6b6bf..6e02e47d65 100644 --- a/state-chain/runtime/src/lib.rs +++ b/state-chain/runtime/src/lib.rs @@ -72,7 +72,7 @@ use pallet_cf_pools::{ PoolPriceV2, UnidirectionalPoolDepth, }; -use crate::chainflip::EvmLimit; +use crate::{chainflip::EvmLimit, runtime_apis::TaintedBtcTransactionEvent}; use pallet_cf_reputation::{ExclusionList, HeartbeatQualification, ReputationPointsQualification}; use pallet_cf_swapping::SwapLegInfo; @@ -2100,8 +2100,41 @@ impl_runtime_apis! { fn cf_validate_refund_params(retry_duration: u32) -> Result<(), DispatchErrorWithMessage> { pallet_cf_swapping::Pallet::::validate_refund_params(retry_duration).map_err(Into::into) } + + fn cf_open_btc_deposit_channels(account_id: AccountId) -> Result::ChainAccount>, DispatchErrorWithMessage> { + Ok(pallet_cf_ingress_egress::DepositChannelLookup::::iter() + .map(|(_, value)| value) + .filter(|channel_details| channel_details.owner == account_id) + .filter(|channel_details| match channel_details.action { + ChannelAction::Swap {..} => true, + ChannelAction::CcmTransfer {..} => true, + ChannelAction::LiquidityProvision {..} => false, + }) + .map(|channel_details| channel_details.deposit_channel.address) + .collect()) + } + + fn cf_tainted_btc_transaction_events() -> Vec { + + System::read_events_no_consensus().filter_map(|event_record| { + if let RuntimeEvent::BitcoinIngressEgress(btc_ie_event) = event_record.event { + match btc_ie_event { + pallet_cf_ingress_egress::Event::TaintedTransactionReportExpired{ account_id, tx_id } => + Some(TaintedBtcTransactionEvent::TaintedTransactionReportExpired{ account_id, tx_id }), + pallet_cf_ingress_egress::Event::TaintedTransactionReportReceived{ account_id, tx_id, expires_at: _ } => + Some(TaintedBtcTransactionEvent::TaintedTransactionReportReceived{account_id, tx_id }), + pallet_cf_ingress_egress::Event::TaintedTransactionRejected{ broadcast_id, tx_id } => + Some(TaintedBtcTransactionEvent::TaintedTransactionRejected{ broadcast_id, tx_id: tx_id.id.tx_id }), + _ => None, + } + } else { + None + } + }).collect() + } } + impl monitoring_apis::MonitoringRuntimeApi for Runtime { fn cf_authorities() -> AuthoritiesInfo { diff --git a/state-chain/runtime/src/runtime_apis.rs b/state-chain/runtime/src/runtime_apis.rs index 536ebc0bfd..0393ab212e 100644 --- a/state-chain/runtime/src/runtime_apis.rs +++ b/state-chain/runtime/src/runtime_apis.rs @@ -183,6 +183,24 @@ pub struct FailingWitnessValidators { pub validators: Vec<(cf_primitives::AccountId, String, bool)>, } +#[derive(Serialize, Deserialize, Encode, Decode, Eq, PartialEq, TypeInfo, Debug, Clone)] +pub enum TaintedBtcTransactionEvent { + TaintedTransactionReportReceived { + account_id: ::AccountId, + tx_id: <::ChainCrypto as cf_chains::ChainCrypto>::TransactionInId, + }, + + TaintedTransactionReportExpired { + account_id: ::AccountId, + tx_id: <::ChainCrypto as cf_chains::ChainCrypto>::TransactionInId, + }, + + TaintedTransactionRejected { + broadcast_id: BroadcastId, + tx_id: <::ChainCrypto as cf_chains::ChainCrypto>::TransactionInId, + }, +} + decl_runtime_apis!( /// Definition for all runtime API interfaces. pub trait CustomRuntimeApi { @@ -306,6 +324,13 @@ decl_runtime_apis!( chunk_interval: u32, ) -> Result<(), DispatchErrorWithMessage>; fn cf_validate_refund_params(retry_duration: u32) -> Result<(), DispatchErrorWithMessage>; + fn cf_open_btc_deposit_channels( + account_id: AccountId32, + ) -> Result< + Vec<::ChainAccount>, + DispatchErrorWithMessage, + >; + fn cf_tainted_btc_transaction_events() -> Vec; } ); From 2389c53aae3b5f5f5927d9bc9099991212f54780 Mon Sep 17 00:00:00 2001 From: Maxim Urschumzew Date: Tue, 5 Nov 2024 10:59:54 +0100 Subject: [PATCH 2/7] Apply suggestions from review. --- api/bin/chainflip-broker-api/Cargo.toml | 4 +- api/bin/chainflip-broker-api/src/main.rs | 92 ++++++++++++++---------- api/lib/src/lib.rs | 31 +++++--- state-chain/custom-rpc/src/lib.rs | 29 ++++---- state-chain/runtime/src/lib.rs | 35 +++++---- state-chain/runtime/src/runtime_apis.rs | 35 +++++---- 6 files changed, 136 insertions(+), 90 deletions(-) diff --git a/api/bin/chainflip-broker-api/Cargo.toml b/api/bin/chainflip-broker-api/Cargo.toml index a47aa46526..4ab0d5ec54 100644 --- a/api/bin/chainflip-broker-api/Cargo.toml +++ b/api/bin/chainflip-broker-api/Cargo.toml @@ -33,8 +33,8 @@ futures = { workspace = true } hex = { workspace = true, default-features = true } jsonrpsee = { workspace = true, features = ["full"] } serde = { workspace = true, default-features = true, features = ["derive"] } -sp-core = { workspace = true } -sp-rpc = { workspace = true } +sp-core = { workspace = true, default-features = true } +sp-rpc = { workspace = true, default-features = true } sc-rpc = { workspace = true, default-features = true } thiserror = { workspace = true } tokio = { workspace = true } diff --git a/api/bin/chainflip-broker-api/src/main.rs b/api/bin/chainflip-broker-api/src/main.rs index c8120f7142..71fc3f844c 100644 --- a/api/bin/chainflip-broker-api/src/main.rs +++ b/api/bin/chainflip-broker-api/src/main.rs @@ -7,29 +7,30 @@ use cf_utilities::{ use chainflip_api::{ self, primitives::{ - state_chain_runtime::runtime_apis::TaintedBtcTransactionEvent, AccountRole, Affiliates, - Asset, BasisPoints, CcmChannelMetadata, DcaParameters, + state_chain_runtime::runtime_apis::{ChainAccounts, TaintedTransactionEvents}, + AccountRole, Affiliates, Asset, BasisPoints, CcmChannelMetadata, DcaParameters, }, settings::StateChain, AccountId32, AddressString, BlockUpdate, BrokerApi, DepositMonitorApi, OperatorApi, RefundParameters, SignedExtrinsicApi, StateChainApi, SwapDepositAddress, SwapPayload, - WithdrawFeesDetail, + TransactionInId, WithdrawFeesDetail, }; use clap::Parser; use custom_rpc::CustomApiClient; -use futures::{FutureExt, TryStreamExt}; +use futures::{FutureExt, StreamExt}; use jsonrpsee::{ - core::{async_trait, ClientError}, + core::{async_trait, ClientError, SubscriptionResult}, proc_macros::rpc, server::ServerBuilder, types::{ErrorCode, ErrorObject, ErrorObjectOwned}, PendingSubscriptionSink, }; +use serde::{Deserialize, Serialize}; use std::{ path::PathBuf, sync::{atomic::AtomicBool, Arc}, }; -use tracing::{event, log, Level}; +use tracing::log; #[derive(thiserror::Error, Debug)] pub enum BrokerApiError { @@ -67,6 +68,12 @@ impl From for ErrorObjectOwned { } } +#[derive(Serialize, Deserialize)] +pub enum GetOpenDepositChannelsQuery { + All, + Mine, +} + #[rpc(server, client, namespace = "broker")] pub trait Rpc { #[method(name = "register_account", aliases = ["broker_registerAccount"])] @@ -107,16 +114,17 @@ pub trait Rpc { dca_parameters: Option, ) -> RpcResult; - #[method(name = "mark_btc_transaction_as_tainted", aliases = ["broker_markBtcTransactionAsTainted"])] - async fn mark_btc_transaction_as_tainted(&self, tx_id: cf_chains::btc::Hash) -> RpcResult<()>; + #[method(name = "mark_transaction_as_tainted", aliases = ["broker_markTransactionAsTainted"])] + async fn mark_transaction_as_tainted(&self, tx_id: TransactionInId) -> RpcResult<()>; - #[method(name = "open_btc_deposit_channels", aliases = ["broker_openBtcDepositChannels"])] - async fn open_btc_deposit_channels( + #[method(name = "get_open_deposit_channels", aliases = ["broker_getOpenDepositChannels"])] + async fn get_open_deposit_channels( &self, - ) -> RpcResult::ChainAccount>>; + query: GetOpenDepositChannelsQuery, + ) -> RpcResult; - #[subscription(name = "subscribe_tainted_btc_transaction_events", item = Result>,String>)] - async fn subscribe_tainted_btc_transaction_events(&self); + #[subscription(name = "subscribe_tainted_transaction_events", item = BlockUpdate)] + async fn subscribe_tainted_transaction_events(&self) -> SubscriptionResult; } pub struct RpcServerImpl { @@ -212,54 +220,60 @@ impl RpcServer for RpcServerImpl { .await?) } - async fn mark_btc_transaction_as_tainted(&self, tx_id: cf_chains::btc::Hash) -> RpcResult<()> { + async fn mark_transaction_as_tainted(&self, tx_id: TransactionInId) -> RpcResult<()> { self.api .deposit_monitor_api() - .mark_btc_transaction_as_tainted(tx_id) + .mark_transaction_as_tainted(tx_id) .await - .map_err(BrokerApiError::Other) + .map_err(BrokerApiError::Other)?; + Ok(()) } - async fn open_btc_deposit_channels( + async fn get_open_deposit_channels( &self, - ) -> RpcResult::ChainAccount>> { - let account_id = self.api.state_chain_client.account_id(); + query: GetOpenDepositChannelsQuery, + ) -> RpcResult { + let account_id = match query { + GetOpenDepositChannelsQuery::All => None, + GetOpenDepositChannelsQuery::Mine => Some(self.api.state_chain_client.account_id()), + }; self.api .state_chain_client .base_rpc_client .raw_rpc_client - .cf_open_btc_deposit_channels(account_id, None) + .cf_get_open_deposit_channels(account_id, None) .await .map_err(BrokerApiError::ClientError) } - async fn subscribe_tainted_btc_transaction_events( + async fn subscribe_tainted_transaction_events( &self, pending_sink: PendingSubscriptionSink, - ) { - let result = self + ) -> SubscriptionResult { + let subscription_stream = self .api .state_chain_client .base_rpc_client .raw_rpc_client - .cf_subscribe_tainted_btc_transaction_events() - .await; + .cf_subscribe_tainted_transaction_events() + .await? + .take_while(|item| { + futures::future::ready( + item.as_ref() + .inspect_err(|err| { + tracing::error!("Subscription error, could not deserialize message: {err}. Closing subscription.") + }) + .is_ok(), + ) + }) + .map(Result::unwrap); - match result { - Ok(subscription) => { - tokio::spawn(async { - sc_rpc::utils::pipe_from_stream( - pending_sink, - subscription.map_err(|err| format!("{err}")), - ) - .await - }); - }, - Err(err) => { - event!(Level::ERROR, "Could not subscribe to `tainted_btc_transaction_events` subscription on the node. Error: {err}"); - }, - } + tokio::spawn(async { + sc_rpc::utils::pipe_from_stream(pending_sink, subscription_stream).await + }); + + Ok(()) } } diff --git a/api/lib/src/lib.rs b/api/lib/src/lib.rs index 90dca8d175..7724e8c101 100644 --- a/api/lib/src/lib.rs +++ b/api/lib/src/lib.rs @@ -10,8 +10,8 @@ use cf_chains::{ dot::PolkadotAccountId, evm::to_evm_address, sol::SolAddress, - CcmChannelMetadata, ChannelRefundParametersEncoded, ChannelRefundParametersGeneric, - ForeignChain, ForeignChainAddress, + CcmChannelMetadata, Chain, ChainCrypto, ChannelRefundParametersEncoded, + ChannelRefundParametersGeneric, ForeignChain, ForeignChainAddress, }; pub use cf_primitives::{AccountRole, Affiliates, Asset, BasisPoints, ChannelId, SemVer}; use cf_primitives::{AssetAmount, BlockNumber, DcaParameters, NetworkEnvironment}; @@ -32,7 +32,7 @@ pub mod primitives { pub type RedemptionAmount = pallet_cf_funding::RedemptionAmount; pub use cf_chains::{ address::{EncodedAddress, ForeignChainAddress}, - CcmChannelMetadata, CcmDepositMetadata, + CcmChannelMetadata, CcmDepositMetadata, Chain, ChainCrypto, }; } pub use cf_chains::eth::Address as EthereumAddress; @@ -621,17 +621,28 @@ pub fn clean_foreign_chain_address(chain: ForeignChain, address: &str) -> Result }) } +pub type TransactionInIdFor = <::ChainCrypto as ChainCrypto>::TransactionInId; + +#[derive(Serialize, Deserialize)] +pub enum TransactionInId { + Bitcoin(TransactionInIdFor), + // other variants reserved for other chains. +} + #[async_trait] pub trait DepositMonitorApi: SignedExtrinsicApi + StorageApi + Sized + Send + Sync + 'static { - async fn mark_btc_transaction_as_tainted(&self, tx_id: cf_chains::btc::Hash) -> Result<()> { - let _ = self - .submit_signed_extrinsic(state_chain_runtime::RuntimeCall::BitcoinIngressEgress( - pallet_cf_ingress_egress::Call::mark_transaction_as_tainted { tx_id }, - )) - .await; - Ok(()) + async fn mark_transaction_as_tainted(&self, tx_id: TransactionInId) -> Result { + match tx_id { + TransactionInId::Bitcoin(tx_id) => + self.simple_submission_with_dry_run( + state_chain_runtime::RuntimeCall::BitcoinIngressEgress( + pallet_cf_ingress_egress::Call::mark_transaction_as_tainted { tx_id }, + ), + ) + .await, + } } } diff --git a/state-chain/custom-rpc/src/lib.rs b/state-chain/custom-rpc/src/lib.rs index 180f3d47be..ede0c3d1d7 100644 --- a/state-chain/custom-rpc/src/lib.rs +++ b/state-chain/custom-rpc/src/lib.rs @@ -52,10 +52,10 @@ use state_chain_runtime::{ PendingBroadcasts, PendingTssCeremonies, RedemptionsInfo, SolanaNonces, }, runtime_apis::{ - AuctionState, BoostPoolDepth, BoostPoolDetails, BrokerInfo, CustomRuntimeApi, - DispatchErrorWithMessage, ElectoralRuntimeApi, FailingWitnessValidators, + AuctionState, BoostPoolDepth, BoostPoolDetails, BrokerInfo, ChainAccounts, + CustomRuntimeApi, DispatchErrorWithMessage, ElectoralRuntimeApi, FailingWitnessValidators, LiquidityProviderBoostPoolInfo, LiquidityProviderInfo, RuntimeApiPenalty, - TaintedBtcTransactionEvent, ValidatorInfo, + TaintedTransactionEvents, ValidatorInfo, }, safe_mode::RuntimeSafeMode, Hash, NetworkFee, SolanaInstance, @@ -955,15 +955,15 @@ pub trait CustomApi { at: Option, ) -> RpcResult<()>; - #[method(name = "open_btc_deposit_channels")] - fn cf_open_btc_deposit_channels( + #[method(name = "get_open_deposit_channels")] + fn cf_get_open_deposit_channels( &self, - broker: state_chain_runtime::AccountId, + broker: Option, at: Option, - ) -> RpcResult::ChainAccount>>; + ) -> RpcResult; - #[subscription(name = "subscribe_tainted_btc_transaction_events", item = BlockUpdate>)] - fn cf_subscribe_tainted_btc_transaction_events(&self); + #[subscription(name = "subscribe_tainted_transaction_events", item = BlockUpdate)] + async fn cf_subscribe_tainted_transaction_events(&self); } /// An RPC extension for the state chain node. @@ -1216,6 +1216,7 @@ where cf_failed_call_arbitrum(broadcast_id: BroadcastId) -> Option<::Transaction>, cf_boost_pools_depth() -> Vec, cf_pool_price(from_asset: Asset, to_asset: Asset) -> Option, + cf_get_open_deposit_channels(account_id: Option) -> ChainAccounts, } pass_through_and_flatten! { @@ -1232,7 +1233,6 @@ where ) -> PoolPairsMap, cf_validate_dca_params(number_of_chunks: u32, chunk_interval: u32) -> (), cf_validate_refund_params(retry_duration: u32) -> (), - cf_open_btc_deposit_channels(account_id: state_chain_runtime::AccountId) -> Vec<::ChainAccount>, } fn cf_current_compatibility_version(&self) -> RpcResult { @@ -1755,16 +1755,17 @@ where self.with_runtime_api(at, |api, hash| api.cf_filter_votes(hash, validator, proposed_votes)) } - fn cf_subscribe_tainted_btc_transaction_events( + async fn cf_subscribe_tainted_transaction_events( &self, sink: jsonrpsee::PendingSubscriptionSink, ) { self.new_subscription( - false, /* only_on_changes */ - true, /* end_on_error */ + true, /* only_on_changes */ + true, /* end_on_error */ sink, - |client, hash| Ok(client.runtime_api().cf_tainted_btc_transaction_events(hash)?), + |client, hash| Ok(client.runtime_api().cf_tainted_transaction_events(hash)?), ) + .await } } diff --git a/state-chain/runtime/src/lib.rs b/state-chain/runtime/src/lib.rs index 6e02e47d65..4e06e15047 100644 --- a/state-chain/runtime/src/lib.rs +++ b/state-chain/runtime/src/lib.rs @@ -28,7 +28,8 @@ use crate::{ runtime_decl_for_custom_runtime_api::CustomRuntimeApiV1, AuctionState, BoostPoolDepth, BoostPoolDetails, BrokerInfo, DispatchErrorWithMessage, FailingWitnessValidators, LiquidityProviderBoostPoolInfo, LiquidityProviderInfo, RuntimeApiPenalty, - SimulateSwapAdditionalOrder, SimulatedSwapInformation, ValidatorInfo, + SimulateSwapAdditionalOrder, SimulatedSwapInformation, TaintedTransactionEvents, + ValidatorInfo, }, }; use cf_amm::{ @@ -71,8 +72,9 @@ use pallet_cf_pools::{ AskBidMap, AssetPair, HistoricalEarnedFees, OrderId, PoolLiquidity, PoolOrderbook, PoolPriceV1, PoolPriceV2, UnidirectionalPoolDepth, }; +use runtime_apis::ChainAccounts; -use crate::{chainflip::EvmLimit, runtime_apis::TaintedBtcTransactionEvent}; +use crate::{chainflip::EvmLimit, runtime_apis::TaintedTransactionEvent}; use pallet_cf_reputation::{ExclusionList, HeartbeatQualification, ReputationPointsQualification}; use pallet_cf_swapping::SwapLegInfo; @@ -2101,36 +2103,43 @@ impl_runtime_apis! { pallet_cf_swapping::Pallet::::validate_refund_params(retry_duration).map_err(Into::into) } - fn cf_open_btc_deposit_channels(account_id: AccountId) -> Result::ChainAccount>, DispatchErrorWithMessage> { - Ok(pallet_cf_ingress_egress::DepositChannelLookup::::iter() + fn cf_get_open_deposit_channels(account_id: Option) -> ChainAccounts { + let btc_chain_accounts = pallet_cf_ingress_egress::DepositChannelLookup::::iter() .map(|(_, value)| value) - .filter(|channel_details| channel_details.owner == account_id) + .filter(|channel_details| account_id.is_none() || Some(&channel_details.owner) == account_id.as_ref()) .filter(|channel_details| match channel_details.action { ChannelAction::Swap {..} => true, ChannelAction::CcmTransfer {..} => true, ChannelAction::LiquidityProvision {..} => false, }) .map(|channel_details| channel_details.deposit_channel.address) - .collect()) - } + .collect::>(); - fn cf_tainted_btc_transaction_events() -> Vec { + ChainAccounts { + btc_chain_accounts + } + } - System::read_events_no_consensus().filter_map(|event_record| { + fn cf_tainted_transaction_events() -> crate::runtime_apis::TaintedTransactionEvents { + let btc_events = System::read_events_no_consensus().filter_map(|event_record| { if let RuntimeEvent::BitcoinIngressEgress(btc_ie_event) = event_record.event { match btc_ie_event { pallet_cf_ingress_egress::Event::TaintedTransactionReportExpired{ account_id, tx_id } => - Some(TaintedBtcTransactionEvent::TaintedTransactionReportExpired{ account_id, tx_id }), + Some(TaintedTransactionEvent::TaintedTransactionReportExpired{ account_id, tx_id }), pallet_cf_ingress_egress::Event::TaintedTransactionReportReceived{ account_id, tx_id, expires_at: _ } => - Some(TaintedBtcTransactionEvent::TaintedTransactionReportReceived{account_id, tx_id }), + Some(TaintedTransactionEvent::TaintedTransactionReportReceived{account_id, tx_id }), pallet_cf_ingress_egress::Event::TaintedTransactionRejected{ broadcast_id, tx_id } => - Some(TaintedBtcTransactionEvent::TaintedTransactionRejected{ broadcast_id, tx_id: tx_id.id.tx_id }), + Some(TaintedTransactionEvent::TaintedTransactionRejected{ broadcast_id, tx_id: tx_id.id.tx_id }), _ => None, } } else { None } - }).collect() + }).collect(); + + TaintedTransactionEvents { + btc_events + } } } diff --git a/state-chain/runtime/src/runtime_apis.rs b/state-chain/runtime/src/runtime_apis.rs index 0393ab212e..8aeba4f198 100644 --- a/state-chain/runtime/src/runtime_apis.rs +++ b/state-chain/runtime/src/runtime_apis.rs @@ -4,7 +4,8 @@ use cf_amm::{ range_orders::Liquidity, }; use cf_chains::{ - assets::any::AssetMap, eth::Address as EthereumAddress, Chain, ForeignChainAddress, + self, assets::any::AssetMap, eth::Address as EthereumAddress, Chain, ChainCrypto, + ForeignChainAddress, }; use cf_primitives::{ AccountRole, Asset, AssetAmount, BlockNumber, BroadcastId, EpochIndex, FlipBalance, @@ -183,24 +184,39 @@ pub struct FailingWitnessValidators { pub validators: Vec<(cf_primitives::AccountId, String, bool)>, } +type ChainAccountFor = ::ChainAccount; + +#[derive(Serialize, Deserialize, Encode, Decode, Eq, PartialEq, TypeInfo, Debug, Clone)] +pub struct ChainAccounts { + pub btc_chain_accounts: Vec>, +} + #[derive(Serialize, Deserialize, Encode, Decode, Eq, PartialEq, TypeInfo, Debug, Clone)] -pub enum TaintedBtcTransactionEvent { +pub enum TaintedTransactionEvent { TaintedTransactionReportReceived { account_id: ::AccountId, - tx_id: <::ChainCrypto as cf_chains::ChainCrypto>::TransactionInId, + tx_id: TxId, }, TaintedTransactionReportExpired { account_id: ::AccountId, - tx_id: <::ChainCrypto as cf_chains::ChainCrypto>::TransactionInId, + tx_id: TxId, }, TaintedTransactionRejected { broadcast_id: BroadcastId, - tx_id: <::ChainCrypto as cf_chains::ChainCrypto>::TransactionInId, + tx_id: TxId, }, } +type TaintedTransactionEventFor = + TaintedTransactionEvent<<::ChainCrypto as ChainCrypto>::TransactionInId>; + +#[derive(Serialize, Deserialize, Encode, Decode, Eq, PartialEq, TypeInfo, Debug, Clone)] +pub struct TaintedTransactionEvents { + pub btc_events: Vec>, +} + decl_runtime_apis!( /// Definition for all runtime API interfaces. pub trait CustomRuntimeApi { @@ -324,13 +340,8 @@ decl_runtime_apis!( chunk_interval: u32, ) -> Result<(), DispatchErrorWithMessage>; fn cf_validate_refund_params(retry_duration: u32) -> Result<(), DispatchErrorWithMessage>; - fn cf_open_btc_deposit_channels( - account_id: AccountId32, - ) -> Result< - Vec<::ChainAccount>, - DispatchErrorWithMessage, - >; - fn cf_tainted_btc_transaction_events() -> Vec; + fn cf_get_open_deposit_channels(account_id: Option) -> ChainAccounts; + fn cf_tainted_transaction_events() -> TaintedTransactionEvents; } ); From 344b2fce038b7c9c21150ce5f14725fa5654049e Mon Sep 17 00:00:00 2001 From: Maxim Urschumzew Date: Tue, 5 Nov 2024 11:16:42 +0100 Subject: [PATCH 3/7] Return all channels for *all* channel actions. We do not filter out `LiquidityProvision` channels anymore. --- state-chain/runtime/src/lib.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/state-chain/runtime/src/lib.rs b/state-chain/runtime/src/lib.rs index 4e06e15047..6810e9724c 100644 --- a/state-chain/runtime/src/lib.rs +++ b/state-chain/runtime/src/lib.rs @@ -2107,11 +2107,6 @@ impl_runtime_apis! { let btc_chain_accounts = pallet_cf_ingress_egress::DepositChannelLookup::::iter() .map(|(_, value)| value) .filter(|channel_details| account_id.is_none() || Some(&channel_details.owner) == account_id.as_ref()) - .filter(|channel_details| match channel_details.action { - ChannelAction::Swap {..} => true, - ChannelAction::CcmTransfer {..} => true, - ChannelAction::LiquidityProvision {..} => false, - }) .map(|channel_details| channel_details.deposit_channel.address) .collect::>(); From 636ddc8c1f087f1a7c5ad8dd49aaa0081371f971 Mon Sep 17 00:00:00 2001 From: Maxim Urschumzew Date: Tue, 5 Nov 2024 14:50:14 +0100 Subject: [PATCH 4/7] Create tainted event subscription in broker API instead of forwarding from the node. This is now the same as in the LP API binary. The previous version didn't use finalized blocks and thus might not have been stable. Theoretically it would be nice to be able to forward the subscription from the node, see PRO-1768. --- api/bin/chainflip-broker-api/src/main.rs | 64 +++++++++++++++--------- state-chain/custom-rpc/src/lib.rs | 13 +++++ 2 files changed, 54 insertions(+), 23 deletions(-) diff --git a/api/bin/chainflip-broker-api/src/main.rs b/api/bin/chainflip-broker-api/src/main.rs index 71fc3f844c..f308cba045 100644 --- a/api/bin/chainflip-broker-api/src/main.rs +++ b/api/bin/chainflip-broker-api/src/main.rs @@ -11,9 +11,9 @@ use chainflip_api::{ AccountRole, Affiliates, Asset, BasisPoints, CcmChannelMetadata, DcaParameters, }, settings::StateChain, - AccountId32, AddressString, BlockUpdate, BrokerApi, DepositMonitorApi, OperatorApi, - RefundParameters, SignedExtrinsicApi, StateChainApi, SwapDepositAddress, SwapPayload, - TransactionInId, WithdrawFeesDetail, + AccountId32, AddressString, BlockUpdate, BrokerApi, ChainApi, DepositMonitorApi, OperatorApi, + RefundParameters, SignedExtrinsicApi, StateChainApi, SwapDepositAddress, + SwapPayload, TransactionInId, WithdrawFeesDetail, }; use clap::Parser; use custom_rpc::CustomApiClient; @@ -251,28 +251,46 @@ impl RpcServer for RpcServerImpl { &self, pending_sink: PendingSubscriptionSink, ) -> SubscriptionResult { - let subscription_stream = self - .api - .state_chain_client - .base_rpc_client - .raw_rpc_client - .cf_subscribe_tainted_transaction_events() - .await? - .take_while(|item| { - futures::future::ready( - item.as_ref() - .inspect_err(|err| { - tracing::error!("Subscription error, could not deserialize message: {err}. Closing subscription.") - }) - .is_ok(), - ) - }) - .map(Result::unwrap); + let state_chain_client = self.api.state_chain_client.clone(); + let sink = pending_sink.accept().await.map_err(|e| e.to_string())?; + tokio::spawn(async move { + // Note we construct the subscription here rather than relying on the custom-rpc + // subscription. This is because we want to use finalized blocks. + // TODO: allow custom rpc subscriptions to use finalized blocks. + let mut finalized_block_stream = state_chain_client.finalized_block_stream().await; + while let Some(block) = finalized_block_stream.next().await { + match state_chain_client + .base_rpc_client + .raw_rpc_client + .cf_get_tainted_transaction_events(Some(block.hash)) + .await + { + Ok(events) => { + // We only want to send a notification if there have been events. + // If other chains are added, they have to be considered here as + // well. + if events.btc_events.len() == 0 { + continue; + } - tokio::spawn(async { - sc_rpc::utils::pipe_from_stream(pending_sink, subscription_stream).await - }); + let block_update = BlockUpdate { + block_hash: block.hash, + block_number: block.number, + data: events, + }; + if sink + .send(sc_rpc::utils::to_sub_message(&sink, &block_update)) + .await + .is_err() + { + break; + } + }, + Err(_) => break, + } + } + }); Ok(()) } } diff --git a/state-chain/custom-rpc/src/lib.rs b/state-chain/custom-rpc/src/lib.rs index ede0c3d1d7..28f0396add 100644 --- a/state-chain/custom-rpc/src/lib.rs +++ b/state-chain/custom-rpc/src/lib.rs @@ -964,6 +964,12 @@ pub trait CustomApi { #[subscription(name = "subscribe_tainted_transaction_events", item = BlockUpdate)] async fn cf_subscribe_tainted_transaction_events(&self); + + #[method(name = "get_tainted_transaction_events")] + fn cf_get_tainted_transaction_events( + &self, + at: Option, + ) -> RpcResult; } /// An RPC extension for the state chain node. @@ -1767,6 +1773,13 @@ where ) .await } + + fn cf_get_tainted_transaction_events( + &self, + at: Option, + ) -> RpcResult { + self.with_runtime_api(at, |api, hash| api.cf_tainted_transaction_events(hash)) + } } impl CustomRpc From 676a12e9875646bfc1fdd5d2ecf9a983df346491 Mon Sep 17 00:00:00 2001 From: Maxim Urschumzew Date: Tue, 5 Nov 2024 15:21:53 +0100 Subject: [PATCH 5/7] Apply clippy suggestion. --- api/bin/chainflip-broker-api/src/main.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/api/bin/chainflip-broker-api/src/main.rs b/api/bin/chainflip-broker-api/src/main.rs index f308cba045..121432b701 100644 --- a/api/bin/chainflip-broker-api/src/main.rs +++ b/api/bin/chainflip-broker-api/src/main.rs @@ -12,8 +12,8 @@ use chainflip_api::{ }, settings::StateChain, AccountId32, AddressString, BlockUpdate, BrokerApi, ChainApi, DepositMonitorApi, OperatorApi, - RefundParameters, SignedExtrinsicApi, StateChainApi, SwapDepositAddress, - SwapPayload, TransactionInId, WithdrawFeesDetail, + RefundParameters, SignedExtrinsicApi, StateChainApi, SwapDepositAddress, SwapPayload, + TransactionInId, WithdrawFeesDetail, }; use clap::Parser; use custom_rpc::CustomApiClient; @@ -269,7 +269,7 @@ impl RpcServer for RpcServerImpl { // We only want to send a notification if there have been events. // If other chains are added, they have to be considered here as // well. - if events.btc_events.len() == 0 { + if events.btc_events.is_empty() { continue; } From 27e686e65fc7afd98797b7bad322d0cca794aef5 Mon Sep 17 00:00:00 2001 From: Maxim Urschumzew Date: Wed, 6 Nov 2024 13:21:18 +0100 Subject: [PATCH 6/7] Remove now unused `tainted_transaction_events` subscription. --- state-chain/custom-rpc/src/lib.rs | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/state-chain/custom-rpc/src/lib.rs b/state-chain/custom-rpc/src/lib.rs index 28f0396add..35bc0567c7 100644 --- a/state-chain/custom-rpc/src/lib.rs +++ b/state-chain/custom-rpc/src/lib.rs @@ -962,9 +962,6 @@ pub trait CustomApi { at: Option, ) -> RpcResult; - #[subscription(name = "subscribe_tainted_transaction_events", item = BlockUpdate)] - async fn cf_subscribe_tainted_transaction_events(&self); - #[method(name = "get_tainted_transaction_events")] fn cf_get_tainted_transaction_events( &self, @@ -1761,19 +1758,6 @@ where self.with_runtime_api(at, |api, hash| api.cf_filter_votes(hash, validator, proposed_votes)) } - async fn cf_subscribe_tainted_transaction_events( - &self, - sink: jsonrpsee::PendingSubscriptionSink, - ) { - self.new_subscription( - true, /* only_on_changes */ - true, /* end_on_error */ - sink, - |client, hash| Ok(client.runtime_api().cf_tainted_transaction_events(hash)?), - ) - .await - } - fn cf_get_tainted_transaction_events( &self, at: Option, From d78917743d3a206b5beff77d2495413f38ec534f Mon Sep 17 00:00:00 2001 From: Maxim Urschumzew Date: Wed, 6 Nov 2024 14:31:25 +0100 Subject: [PATCH 7/7] Apply suggestions from @kylezs's review. --- state-chain/runtime/src/lib.rs | 5 ++--- state-chain/runtime/src/runtime_apis.rs | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/state-chain/runtime/src/lib.rs b/state-chain/runtime/src/lib.rs index 6810e9724c..a929b94d12 100644 --- a/state-chain/runtime/src/lib.rs +++ b/state-chain/runtime/src/lib.rs @@ -2104,8 +2104,7 @@ impl_runtime_apis! { } fn cf_get_open_deposit_channels(account_id: Option) -> ChainAccounts { - let btc_chain_accounts = pallet_cf_ingress_egress::DepositChannelLookup::::iter() - .map(|(_, value)| value) + let btc_chain_accounts = pallet_cf_ingress_egress::DepositChannelLookup::::iter_values() .filter(|channel_details| account_id.is_none() || Some(&channel_details.owner) == account_id.as_ref()) .map(|channel_details| channel_details.deposit_channel.address) .collect::>(); @@ -2124,7 +2123,7 @@ impl_runtime_apis! { pallet_cf_ingress_egress::Event::TaintedTransactionReportReceived{ account_id, tx_id, expires_at: _ } => Some(TaintedTransactionEvent::TaintedTransactionReportReceived{account_id, tx_id }), pallet_cf_ingress_egress::Event::TaintedTransactionRejected{ broadcast_id, tx_id } => - Some(TaintedTransactionEvent::TaintedTransactionRejected{ broadcast_id, tx_id: tx_id.id.tx_id }), + Some(TaintedTransactionEvent::TaintedTransactionRejected{ refund_broadcast_id: broadcast_id, tx_id: tx_id.id.tx_id }), _ => None, } } else { diff --git a/state-chain/runtime/src/runtime_apis.rs b/state-chain/runtime/src/runtime_apis.rs index 8aeba4f198..05bd242d00 100644 --- a/state-chain/runtime/src/runtime_apis.rs +++ b/state-chain/runtime/src/runtime_apis.rs @@ -204,7 +204,7 @@ pub enum TaintedTransactionEvent { }, TaintedTransactionRejected { - broadcast_id: BroadcastId, + refund_broadcast_id: BroadcastId, tx_id: TxId, }, }