Skip to content

Commit

Permalink
Close tainted events subscription if node returns un-deserializable d…
Browse files Browse the repository at this point in the history
…ata.
  • Loading branch information
MxmUrw committed Nov 1, 2024
1 parent ff30b19 commit b191d52
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 25 deletions.
46 changes: 25 additions & 21 deletions api/bin/chainflip-broker-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use chainflip_api::{
};
use clap::Parser;
use custom_rpc::CustomApiClient;
use futures::{FutureExt, TryStreamExt};
use futures::{FutureExt, StreamExt};
use jsonrpsee::{
core::{async_trait, ClientError},
core::{async_trait, ClientError, SubscriptionResult},
proc_macros::rpc,
server::ServerBuilder,
types::{ErrorCode, ErrorObject, ErrorObjectOwned},
Expand All @@ -30,7 +30,7 @@ use std::{
path::PathBuf,
sync::{atomic::AtomicBool, Arc},
};
use tracing::{event, log, Level};
use tracing::log;

#[derive(thiserror::Error, Debug)]
pub enum BrokerApiError {
Expand Down Expand Up @@ -124,7 +124,7 @@ pub trait Rpc {
) -> RpcResult<ChainAccounts>;

#[subscription(name = "subscribe_tainted_transaction_events", item = Result<BlockUpdate<TaintedTransactionEvents>,String>)]
async fn subscribe_tainted_transaction_events(&self);
async fn subscribe_tainted_transaction_events(&self) -> SubscriptionResult;
}

pub struct RpcServerImpl {
Expand Down Expand Up @@ -247,29 +247,33 @@ impl RpcServer for RpcServerImpl {
.map_err(BrokerApiError::ClientError)
}

async fn subscribe_tainted_transaction_events(&self, pending_sink: PendingSubscriptionSink) {
let result = self
async fn subscribe_tainted_transaction_events(
&self,
pending_sink: PendingSubscriptionSink,
) -> SubscriptionResult {
let subscription_stream = self
.api
.state_chain_client
.base_rpc_client
.raw_rpc_client
.cf_subscribe_tainted_transaction_events()
.await;
.await?
.take_while(|item| {
futures::future::ready(
item.as_ref()
.inspect_err(|err| {
tracing::error!("Subscription error, could not deserialize message: {err}. Closing subscription.")
})
.is_ok(),
)
})
.map(Result::unwrap);

match result {
Ok(subscription) => {
tokio::spawn(async {
sc_rpc::utils::pipe_from_stream(
pending_sink,
subscription.map_err(|err| format!("{err}")),
)
.await
});
},
Err(err) => {
event!(Level::ERROR, "Could not subscribe to `tainted_btc_transaction_events` subscription on the node. Error: {err}");
},
}
tokio::spawn(async {
sc_rpc::utils::pipe_from_stream(pending_sink, subscription_stream).await
});

Ok(())
}
}

Expand Down
4 changes: 2 additions & 2 deletions api/lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use cf_chains::{
dot::PolkadotAccountId,
evm::to_evm_address,
sol::SolAddress,
CcmChannelMetadata, Chain, ChainCrypto, ChannelRefundParametersEncoded, ChannelRefundParametersGeneric, ForeignChain,
ForeignChainAddress,
CcmChannelMetadata, Chain, ChainCrypto, ChannelRefundParametersEncoded,
ChannelRefundParametersGeneric, ForeignChain, ForeignChainAddress,
};
pub use cf_primitives::{AccountRole, Affiliates, Asset, BasisPoints, ChannelId, SemVer};
use cf_primitives::{AssetAmount, BlockNumber, DcaParameters, NetworkEnvironment};
Expand Down
8 changes: 6 additions & 2 deletions state-chain/custom-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1755,13 +1755,17 @@ where
self.with_runtime_api(at, |api, hash| api.cf_filter_votes(hash, validator, proposed_votes))
}

async fn cf_subscribe_tainted_transaction_events(&self, sink: jsonrpsee::PendingSubscriptionSink) {
async fn cf_subscribe_tainted_transaction_events(
&self,
sink: jsonrpsee::PendingSubscriptionSink,
) {
self.new_subscription(
true, /* only_on_changes */
true, /* end_on_error */
sink,
|client, hash| Ok(client.runtime_api().cf_tainted_transaction_events(hash)?),
).await
)
.await
}
}

Expand Down

0 comments on commit b191d52

Please sign in to comment.