Skip to content

Commit

Permalink
Close tainted events subscription if node returns un-deserializable d…
Browse files Browse the repository at this point in the history
…ata.
  • Loading branch information
MxmUrw committed Nov 1, 2024
1 parent ff30b19 commit b7441df
Showing 1 changed file with 22 additions and 21 deletions.
43 changes: 22 additions & 21 deletions api/bin/chainflip-broker-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use chainflip_api::{
};
use clap::Parser;
use custom_rpc::CustomApiClient;
use futures::{FutureExt, TryStreamExt};
use futures::{FutureExt, StreamExt};
use jsonrpsee::{
core::{async_trait, ClientError},
core::{async_trait, ClientError, SubscriptionResult},
proc_macros::rpc,
server::ServerBuilder,
types::{ErrorCode, ErrorObject, ErrorObjectOwned},
Expand All @@ -30,7 +30,7 @@ use std::{
path::PathBuf,
sync::{atomic::AtomicBool, Arc},
};
use tracing::{event, log, Level};
use tracing::log;

#[derive(thiserror::Error, Debug)]
pub enum BrokerApiError {
Expand Down Expand Up @@ -124,7 +124,7 @@ pub trait Rpc {
) -> RpcResult<ChainAccounts>;

#[subscription(name = "subscribe_tainted_transaction_events", item = Result<BlockUpdate<TaintedTransactionEvents>,String>)]
async fn subscribe_tainted_transaction_events(&self);
async fn subscribe_tainted_transaction_events(&self) -> SubscriptionResult;
}

pub struct RpcServerImpl {
Expand Down Expand Up @@ -247,29 +247,30 @@ impl RpcServer for RpcServerImpl {
.map_err(BrokerApiError::ClientError)
}

async fn subscribe_tainted_transaction_events(&self, pending_sink: PendingSubscriptionSink) {
let result = self
async fn subscribe_tainted_transaction_events(&self, pending_sink: PendingSubscriptionSink) -> SubscriptionResult {
let subscription_stream = self
.api
.state_chain_client
.base_rpc_client
.raw_rpc_client
.cf_subscribe_tainted_transaction_events()
.await;
.await?
.take_while(|item| {
futures::future::ready(
item.as_ref()
.inspect_err(|err| {
tracing::error!("Subscription error, could not deserialize message: {err}. Closing subscription.")
})
.is_ok(),
)
})
.map(Result::unwrap);

match result {
Ok(subscription) => {
tokio::spawn(async {
sc_rpc::utils::pipe_from_stream(
pending_sink,
subscription.map_err(|err| format!("{err}")),
)
.await
});
},
Err(err) => {
event!(Level::ERROR, "Could not subscribe to `tainted_btc_transaction_events` subscription on the node. Error: {err}");
},
}
tokio::spawn(async {
sc_rpc::utils::pipe_from_stream(pending_sink, subscription_stream).await
});

Ok(())
}
}

Expand Down

0 comments on commit b7441df

Please sign in to comment.