diff --git a/Cargo.lock b/Cargo.lock index f7f84d80d11..ae44d390bde 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1635,6 +1635,7 @@ dependencies = [ "futures", "hex", "jsonrpsee 0.23.2", + "sc-rpc", "serde", "sp-core 34.0.0", "sp-rpc", diff --git a/api/bin/chainflip-broker-api/Cargo.toml b/api/bin/chainflip-broker-api/Cargo.toml index f91b5eae4e2..a47aa465261 100644 --- a/api/bin/chainflip-broker-api/Cargo.toml +++ b/api/bin/chainflip-broker-api/Cargo.toml @@ -35,6 +35,7 @@ jsonrpsee = { workspace = true, features = ["full"] } serde = { workspace = true, default-features = true, features = ["derive"] } sp-core = { workspace = true } sp-rpc = { workspace = true } +sc-rpc = { workspace = true, default-features = true } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } diff --git a/api/bin/chainflip-broker-api/src/main.rs b/api/bin/chainflip-broker-api/src/main.rs index d29c10af6a4..c8120f7142b 100644 --- a/api/bin/chainflip-broker-api/src/main.rs +++ b/api/bin/chainflip-broker-api/src/main.rs @@ -6,25 +6,30 @@ use cf_utilities::{ }; use chainflip_api::{ self, - primitives::{AccountRole, Affiliates, Asset, BasisPoints, CcmChannelMetadata, DcaParameters}, + primitives::{ + state_chain_runtime::runtime_apis::TaintedBtcTransactionEvent, AccountRole, Affiliates, + Asset, BasisPoints, CcmChannelMetadata, DcaParameters, + }, settings::StateChain, - AccountId32, AddressString, BrokerApi, DepositMonitorApi, OperatorApi, RefundParameters, - SignedExtrinsicApi, StateChainApi, SwapDepositAddress, SwapPayload, WithdrawFeesDetail, + AccountId32, AddressString, BlockUpdate, BrokerApi, DepositMonitorApi, OperatorApi, + RefundParameters, SignedExtrinsicApi, StateChainApi, SwapDepositAddress, SwapPayload, + WithdrawFeesDetail, }; use clap::Parser; use custom_rpc::CustomApiClient; -use futures::FutureExt; +use futures::{FutureExt, TryStreamExt}; use jsonrpsee::{ core::{async_trait, ClientError}, proc_macros::rpc, server::ServerBuilder, types::{ErrorCode, ErrorObject, ErrorObjectOwned}, + PendingSubscriptionSink, }; use std::{ path::PathBuf, sync::{atomic::AtomicBool, Arc}, }; -use tracing::log; +use tracing::{event, log, Level}; #[derive(thiserror::Error, Debug)] pub enum BrokerApiError { @@ -109,6 +114,9 @@ pub trait Rpc { async fn open_btc_deposit_channels( &self, ) -> RpcResult::ChainAccount>>; + + #[subscription(name = "subscribe_tainted_btc_transaction_events", item = Result>,String>)] + async fn subscribe_tainted_btc_transaction_events(&self); } pub struct RpcServerImpl { @@ -225,6 +233,34 @@ impl RpcServer for RpcServerImpl { .await .map_err(BrokerApiError::ClientError) } + + async fn subscribe_tainted_btc_transaction_events( + &self, + pending_sink: PendingSubscriptionSink, + ) { + let result = self + .api + .state_chain_client + .base_rpc_client + .raw_rpc_client + .cf_subscribe_tainted_btc_transaction_events() + .await; + + match result { + Ok(subscription) => { + tokio::spawn(async { + sc_rpc::utils::pipe_from_stream( + pending_sink, + subscription.map_err(|err| format!("{err}")), + ) + .await + }); + }, + Err(err) => { + event!(Level::ERROR, "Could not subscribe to `tainted_btc_transaction_events` subscription on the node. Error: {err}"); + }, + } + } } #[derive(Parser, Debug, Clone, Default)]