From 3f7413f77cfec8c6c1095a6ccb04796542d909b2 Mon Sep 17 00:00:00 2001 From: nmammeri Date: Fri, 6 Dec 2024 17:36:47 +0100 Subject: [PATCH 1/2] refactor: move subscribe_transaction_screening_events to custom-rpc --- api/bin/chainflip-broker-api/src/main.rs | 81 ++++++++---------------- state-chain/custom-rpc/src/lib.rs | 22 +++++++ 2 files changed, 49 insertions(+), 54 deletions(-) diff --git a/api/bin/chainflip-broker-api/src/main.rs b/api/bin/chainflip-broker-api/src/main.rs index cc54f2e404..43f20b41cc 100644 --- a/api/bin/chainflip-broker-api/src/main.rs +++ b/api/bin/chainflip-broker-api/src/main.rs @@ -14,15 +14,15 @@ use chainflip_api::{ CcmChannelMetadata, DcaParameters, }, settings::StateChain, - AccountId32, AddressString, BlockUpdate, BrokerApi, ChainApi, ChannelId, DepositMonitorApi, - OperatorApi, RefundParameters, SignedExtrinsicApi, StateChainApi, SwapDepositAddress, - TransactionInId, WithdrawFeesDetail, + AccountId32, AddressString, BlockUpdate, BrokerApi, ChannelId, DepositMonitorApi, OperatorApi, + RefundParameters, SignedExtrinsicApi, StateChainApi, SwapDepositAddress, TransactionInId, + WithdrawFeesDetail, }; use clap::Parser; use custom_rpc::CustomApiClient; -use futures::{FutureExt, StreamExt}; +use futures::{stream, FutureExt, StreamExt}; use jsonrpsee::{ - core::{async_trait, ClientError, SubscriptionResult}, + core::{async_trait, ClientError}, proc_macros::rpc, server::ServerBuilder, types::{ErrorCode, ErrorObject, ErrorObjectOwned}, @@ -127,7 +127,7 @@ pub trait Rpc { ) -> RpcResult; #[subscription(name = "subscribe_transaction_screening_events", item = BlockUpdate)] - async fn subscribe_transaction_screening_events(&self) -> SubscriptionResult; + async fn subscribe_transaction_screening_events(&self); #[method(name = "open_private_btc_channel", aliases = ["broker_openPrivateBtcChannel"])] async fn open_private_btc_channel(&self) -> RpcResult; @@ -260,59 +260,32 @@ impl RpcServer for RpcServerImpl { }; self.api - .state_chain_client - .base_rpc_client - .raw_rpc_client + .raw_client() .cf_get_open_deposit_channels(account_id, None) .await .map_err(BrokerApiError::ClientError) } - async fn subscribe_transaction_screening_events( - &self, - pending_sink: PendingSubscriptionSink, - ) -> SubscriptionResult { - let state_chain_client = self.api.state_chain_client.clone(); - let sink = pending_sink.accept().await.map_err(|e| e.to_string())?; - tokio::spawn(async move { - // Note we construct the subscription here rather than relying on the custom-rpc - // subscription. This is because we want to use finalized blocks. - // TODO: allow custom rpc subscriptions to use finalized blocks. - let mut finalized_block_stream = state_chain_client.finalized_block_stream().await; - while let Some(block) = finalized_block_stream.next().await { - match state_chain_client - .base_rpc_client - .raw_rpc_client - .cf_get_transaction_screening_events(Some(block.hash)) - .await - { - Ok(events) => { - // We only want to send a notification if there have been events. - // If other chains are added, they have to be considered here as - // well. - if events.btc_events.is_empty() { - continue; - } - - let block_update = BlockUpdate { - block_hash: block.hash, - block_number: block.number, - data: events, - }; - - if sink - .send(sc_rpc::utils::to_sub_message(&sink, &block_update)) - .await - .is_err() - { - break; - } - }, - Err(_) => break, - } - } - }); - Ok(()) + async fn subscribe_transaction_screening_events(&self, pending_sink: PendingSubscriptionSink) { + // pipe results through from custom-rpc subscription + match self.api.raw_client().cf_subscribe_transaction_screening_events().await { + Ok(subscription) => { + let stream = stream::unfold(subscription, move |mut sub| async move { + match sub.next().await { + Some(Ok(block_update)) => Some((block_update, sub)), + _ => None, + } + }) + .boxed(); + + tokio::spawn(async move { + sc_rpc::utils::pipe_from_stream(pending_sink, stream).await; + }); + }, + Err(e) => { + pending_sink.reject(BrokerApiError::ClientError(e)).await; + }, + } } async fn open_private_btc_channel(&self) -> RpcResult { diff --git a/state-chain/custom-rpc/src/lib.rs b/state-chain/custom-rpc/src/lib.rs index 816a37f9ed..782fa5eeef 100644 --- a/state-chain/custom-rpc/src/lib.rs +++ b/state-chain/custom-rpc/src/lib.rs @@ -898,6 +898,9 @@ pub trait CustomApi { #[subscription(name = "subscribe_lp_order_fills", item = BlockUpdate)] async fn cf_subscribe_lp_order_fills(&self); + #[subscription(name = "subscribe_transaction_screening_events", item = BlockUpdate)] + async fn cf_subscribe_transaction_screening_events(&self); + #[method(name = "scheduled_swaps")] fn cf_scheduled_swaps( &self, @@ -1669,6 +1672,25 @@ where .await } + async fn cf_subscribe_transaction_screening_events( + &self, + pending_sink: PendingSubscriptionSink, + ) { + self.new_subscription( + NotificationBehaviour::Finalized, /* only_finalized */ + false, /* only_on_changes */ + true, /* end_on_error */ + pending_sink, + move |client, hash| { + Ok( + (*client.runtime_api()) + .cf_transaction_screening_events(hash)? + ) + }, + ) + .await; + } + async fn cf_subscribe_scheduled_swaps( &self, pending_sink: PendingSubscriptionSink, From a2e4bf6b0b8219899fd7f3bde4c84c91020bfc77 Mon Sep 17 00:00:00 2001 From: nmammeri Date: Mon, 9 Dec 2024 09:36:36 +0100 Subject: [PATCH 2/2] refactor: cargo fmt --- state-chain/custom-rpc/src/lib.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/state-chain/custom-rpc/src/lib.rs b/state-chain/custom-rpc/src/lib.rs index 782fa5eeef..3df1c001af 100644 --- a/state-chain/custom-rpc/src/lib.rs +++ b/state-chain/custom-rpc/src/lib.rs @@ -1678,17 +1678,12 @@ where ) { self.new_subscription( NotificationBehaviour::Finalized, /* only_finalized */ - false, /* only_on_changes */ - true, /* end_on_error */ + false, /* only_on_changes */ + true, /* end_on_error */ pending_sink, - move |client, hash| { - Ok( - (*client.runtime_api()) - .cf_transaction_screening_events(hash)? - ) - }, + move |client, hash| Ok((*client.runtime_api()).cf_transaction_screening_events(hash)?), ) - .await; + .await; } async fn cf_subscribe_scheduled_swaps(