diff --git a/api/bin/chainflip-broker-api/src/main.rs b/api/bin/chainflip-broker-api/src/main.rs index 71fc3f844c0..f308cba0459 100644 --- a/api/bin/chainflip-broker-api/src/main.rs +++ b/api/bin/chainflip-broker-api/src/main.rs @@ -11,9 +11,9 @@ use chainflip_api::{ AccountRole, Affiliates, Asset, BasisPoints, CcmChannelMetadata, DcaParameters, }, settings::StateChain, - AccountId32, AddressString, BlockUpdate, BrokerApi, DepositMonitorApi, OperatorApi, - RefundParameters, SignedExtrinsicApi, StateChainApi, SwapDepositAddress, SwapPayload, - TransactionInId, WithdrawFeesDetail, + AccountId32, AddressString, BlockUpdate, BrokerApi, ChainApi, DepositMonitorApi, OperatorApi, + RefundParameters, SignedExtrinsicApi, StateChainApi, SwapDepositAddress, + SwapPayload, TransactionInId, WithdrawFeesDetail, }; use clap::Parser; use custom_rpc::CustomApiClient; @@ -251,28 +251,46 @@ impl RpcServer for RpcServerImpl { &self, pending_sink: PendingSubscriptionSink, ) -> SubscriptionResult { - let subscription_stream = self - .api - .state_chain_client - .base_rpc_client - .raw_rpc_client - .cf_subscribe_tainted_transaction_events() - .await? - .take_while(|item| { - futures::future::ready( - item.as_ref() - .inspect_err(|err| { - tracing::error!("Subscription error, could not deserialize message: {err}. Closing subscription.") - }) - .is_ok(), - ) - }) - .map(Result::unwrap); + let state_chain_client = self.api.state_chain_client.clone(); + let sink = pending_sink.accept().await.map_err(|e| e.to_string())?; + tokio::spawn(async move { + // Note we construct the subscription here rather than relying on the custom-rpc + // subscription. This is because we want to use finalized blocks. + // TODO: allow custom rpc subscriptions to use finalized blocks. + let mut finalized_block_stream = state_chain_client.finalized_block_stream().await; + while let Some(block) = finalized_block_stream.next().await { + match state_chain_client + .base_rpc_client + .raw_rpc_client + .cf_get_tainted_transaction_events(Some(block.hash)) + .await + { + Ok(events) => { + // We only want to send a notification if there have been events. + // If other chains are added, they have to be considered here as + // well. + if events.btc_events.len() == 0 { + continue; + } - tokio::spawn(async { - sc_rpc::utils::pipe_from_stream(pending_sink, subscription_stream).await - }); + let block_update = BlockUpdate { + block_hash: block.hash, + block_number: block.number, + data: events, + }; + if sink + .send(sc_rpc::utils::to_sub_message(&sink, &block_update)) + .await + .is_err() + { + break; + } + }, + Err(_) => break, + } + } + }); Ok(()) } } diff --git a/state-chain/custom-rpc/src/lib.rs b/state-chain/custom-rpc/src/lib.rs index ede0c3d1d7f..28f0396add1 100644 --- a/state-chain/custom-rpc/src/lib.rs +++ b/state-chain/custom-rpc/src/lib.rs @@ -964,6 +964,12 @@ pub trait CustomApi { #[subscription(name = "subscribe_tainted_transaction_events", item = BlockUpdate)] async fn cf_subscribe_tainted_transaction_events(&self); + + #[method(name = "get_tainted_transaction_events")] + fn cf_get_tainted_transaction_events( + &self, + at: Option, + ) -> RpcResult; } /// An RPC extension for the state chain node. @@ -1767,6 +1773,13 @@ where ) .await } + + fn cf_get_tainted_transaction_events( + &self, + at: Option, + ) -> RpcResult { + self.with_runtime_api(at, |api, hash| api.cf_tainted_transaction_events(hash)) + } } impl CustomRpc