Skip to content

Commit

Permalink
refactor: move subscribe_transaction_screening_events to custom-rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
nmammeri committed Dec 6, 2024
1 parent b4cfe42 commit 9613338
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 54 deletions.
81 changes: 27 additions & 54 deletions api/bin/chainflip-broker-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ use chainflip_api::{
CcmChannelMetadata, DcaParameters,
},
settings::StateChain,
AccountId32, AddressString, BlockUpdate, BrokerApi, ChainApi, ChannelId, DepositMonitorApi,
OperatorApi, RefundParameters, SignedExtrinsicApi, StateChainApi, SwapDepositAddress,
TransactionInId, WithdrawFeesDetail,
AccountId32, AddressString, BlockUpdate, BrokerApi, ChannelId, DepositMonitorApi, OperatorApi,
RefundParameters, SignedExtrinsicApi, StateChainApi, SwapDepositAddress, TransactionInId,
WithdrawFeesDetail,
};
use clap::Parser;
use custom_rpc::CustomApiClient;
use futures::{FutureExt, StreamExt};
use futures::{stream, FutureExt, StreamExt};
use jsonrpsee::{
core::{async_trait, ClientError, SubscriptionResult},
core::{async_trait, ClientError},
proc_macros::rpc,
server::ServerBuilder,
types::{ErrorCode, ErrorObject, ErrorObjectOwned},
Expand Down Expand Up @@ -127,7 +127,7 @@ pub trait Rpc {
) -> RpcResult<ChainAccounts>;

#[subscription(name = "subscribe_transaction_screening_events", item = BlockUpdate<TransactionScreeningEvents>)]
async fn subscribe_transaction_screening_events(&self) -> SubscriptionResult;
async fn subscribe_transaction_screening_events(&self);

#[method(name = "open_private_btc_channel", aliases = ["broker_openPrivateBtcChannel"])]
async fn open_private_btc_channel(&self) -> RpcResult<ChannelId>;
Expand Down Expand Up @@ -260,59 +260,32 @@ impl RpcServer for RpcServerImpl {
};

self.api
.state_chain_client
.base_rpc_client
.raw_rpc_client
.raw_client()
.cf_get_open_deposit_channels(account_id, None)
.await
.map_err(BrokerApiError::ClientError)
}

async fn subscribe_transaction_screening_events(
&self,
pending_sink: PendingSubscriptionSink,
) -> SubscriptionResult {
let state_chain_client = self.api.state_chain_client.clone();
let sink = pending_sink.accept().await.map_err(|e| e.to_string())?;
tokio::spawn(async move {
// Note we construct the subscription here rather than relying on the custom-rpc
// subscription. This is because we want to use finalized blocks.
// TODO: allow custom rpc subscriptions to use finalized blocks.
let mut finalized_block_stream = state_chain_client.finalized_block_stream().await;
while let Some(block) = finalized_block_stream.next().await {
match state_chain_client
.base_rpc_client
.raw_rpc_client
.cf_get_transaction_screening_events(Some(block.hash))
.await
{
Ok(events) => {
// We only want to send a notification if there have been events.
// If other chains are added, they have to be considered here as
// well.
if events.btc_events.is_empty() {
continue;
}

let block_update = BlockUpdate {
block_hash: block.hash,
block_number: block.number,
data: events,
};

if sink
.send(sc_rpc::utils::to_sub_message(&sink, &block_update))
.await
.is_err()
{
break;
}
},
Err(_) => break,
}
}
});
Ok(())
async fn subscribe_transaction_screening_events(&self, pending_sink: PendingSubscriptionSink) {
// pipe results through from custom-rpc subscription
match self.api.raw_client().cf_subscribe_transaction_screening_events().await {
Ok(subscription) => {
let stream = stream::unfold(subscription, move |mut sub| async move {
match sub.next().await {
Some(Ok(block_update)) => Some((block_update, sub)),
_ => None,
}
})
.boxed();

tokio::spawn(async move {
sc_rpc::utils::pipe_from_stream(pending_sink, stream).await;
});
},
Err(e) => {
pending_sink.reject(BrokerApiError::ClientError(e)).await;
},
}
}

async fn open_private_btc_channel(&self) -> RpcResult<ChannelId> {
Expand Down
22 changes: 22 additions & 0 deletions state-chain/custom-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,9 @@ pub trait CustomApi {
#[subscription(name = "subscribe_lp_order_fills", item = BlockUpdate<OrderFills>)]
async fn cf_subscribe_lp_order_fills(&self);

#[subscription(name = "subscribe_transaction_screening_events", item = BlockUpdate<TransactionScreeningEvents>)]
async fn cf_subscribe_transaction_screening_events(&self);

#[method(name = "scheduled_swaps")]
fn cf_scheduled_swaps(
&self,
Expand Down Expand Up @@ -1669,6 +1672,25 @@ where
.await
}

async fn cf_subscribe_transaction_screening_events(
&self,
pending_sink: PendingSubscriptionSink,
) {
self.new_subscription(
NotificationBehaviour::Finalized, /* only_finalized */
false, /* only_on_changes */
true, /* end_on_error */
pending_sink,
move |client, hash| {
Ok(
(*client.runtime_api())
.cf_transaction_screening_events(hash)?
)
},
)
.await;
}

async fn cf_subscribe_scheduled_swaps(
&self,
pending_sink: PendingSubscriptionSink,
Expand Down

0 comments on commit 9613338

Please sign in to comment.