Skip to content

Commit

Permalink
Create tainted event subscription in broker API instead of forwarding…
Browse files Browse the repository at this point in the history
… from the node.

This is now the same as in the LP API binary. The previous version didn't
use finalized blocks and thus might not have been stable.

Theoretically it would be nice to be able to forward the subscription from the node,
see PRO-1768.
  • Loading branch information
MxmUrw committed Nov 5, 2024
1 parent 98bdd12 commit 50a1b15
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 23 deletions.
64 changes: 41 additions & 23 deletions api/bin/chainflip-broker-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use chainflip_api::{
AccountRole, Affiliates, Asset, BasisPoints, CcmChannelMetadata, DcaParameters,
},
settings::StateChain,
AccountId32, AddressString, BlockUpdate, BrokerApi, DepositMonitorApi, OperatorApi,
RefundParameters, SignedExtrinsicApi, StateChainApi, SwapDepositAddress, SwapPayload,
TransactionInId, WithdrawFeesDetail,
AccountId32, AddressString, BlockUpdate, BrokerApi, ChainApi, DepositMonitorApi, OperatorApi,
RefundParameters, SignedExtrinsicApi, StateChainApi, SwapDepositAddress,
SwapPayload, TransactionInId, WithdrawFeesDetail,
};
use clap::Parser;
use custom_rpc::CustomApiClient;
Expand Down Expand Up @@ -251,28 +251,46 @@ impl RpcServer for RpcServerImpl {
&self,
pending_sink: PendingSubscriptionSink,
) -> SubscriptionResult {
let subscription_stream = self
.api
.state_chain_client
.base_rpc_client
.raw_rpc_client
.cf_subscribe_tainted_transaction_events()
.await?
.take_while(|item| {
futures::future::ready(
item.as_ref()
.inspect_err(|err| {
tracing::error!("Subscription error, could not deserialize message: {err}. Closing subscription.")
})
.is_ok(),
)
})
.map(Result::unwrap);
let state_chain_client = self.api.state_chain_client.clone();
let sink = pending_sink.accept().await.map_err(|e| e.to_string())?;
tokio::spawn(async move {
// Note we construct the subscription here rather than relying on the custom-rpc
// subscription. This is because we want to use finalized blocks.
// TODO: allow custom rpc subscriptions to use finalized blocks.
let mut finalized_block_stream = state_chain_client.finalized_block_stream().await;
while let Some(block) = finalized_block_stream.next().await {
match state_chain_client
.base_rpc_client
.raw_rpc_client
.cf_get_tainted_transaction_events(Some(block.hash))
.await
{
Ok(events) => {
// We only want to send a notification if there have been events.
// If other chains are added, they have to be considered here as
// well.
if events.btc_events.len() == 0 {
continue;
}

tokio::spawn(async {
sc_rpc::utils::pipe_from_stream(pending_sink, subscription_stream).await
});
let block_update = BlockUpdate {
block_hash: block.hash,
block_number: block.number,
data: events,
};

if sink
.send(sc_rpc::utils::to_sub_message(&sink, &block_update))
.await
.is_err()
{
break;
}
},
Err(_) => break,
}
}
});
Ok(())
}
}
Expand Down
13 changes: 13 additions & 0 deletions state-chain/custom-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,12 @@ pub trait CustomApi {

#[subscription(name = "subscribe_tainted_transaction_events", item = BlockUpdate<TaintedTransactionEvents>)]
async fn cf_subscribe_tainted_transaction_events(&self);

#[method(name = "get_tainted_transaction_events")]
fn cf_get_tainted_transaction_events(
&self,
at: Option<state_chain_runtime::Hash>,
) -> RpcResult<TaintedTransactionEvents>;
}

/// An RPC extension for the state chain node.
Expand Down Expand Up @@ -1767,6 +1773,13 @@ where
)
.await
}

fn cf_get_tainted_transaction_events(
&self,
at: Option<state_chain_runtime::Hash>,
) -> RpcResult<TaintedTransactionEvents> {
self.with_runtime_api(at, |api, hash| api.cf_tainted_transaction_events(hash))
}
}

impl<C, B> CustomRpc<C, B>
Expand Down

0 comments on commit 50a1b15

Please sign in to comment.