From b191d5280c75223b96ad27b8ed869adc15df1122 Mon Sep 17 00:00:00 2001 From: Maxim Urschumzew Date: Fri, 1 Nov 2024 14:42:11 +0100 Subject: [PATCH] Close tainted events subscription if node returns un-deserializable data. --- api/bin/chainflip-broker-api/src/main.rs | 46 +++++++++++++----------- api/lib/src/lib.rs | 4 +-- state-chain/custom-rpc/src/lib.rs | 8 +++-- 3 files changed, 33 insertions(+), 25 deletions(-) diff --git a/api/bin/chainflip-broker-api/src/main.rs b/api/bin/chainflip-broker-api/src/main.rs index bccc1ea2f77..2b400464c1c 100644 --- a/api/bin/chainflip-broker-api/src/main.rs +++ b/api/bin/chainflip-broker-api/src/main.rs @@ -17,9 +17,9 @@ use chainflip_api::{ }; use clap::Parser; use custom_rpc::CustomApiClient; -use futures::{FutureExt, TryStreamExt}; +use futures::{FutureExt, StreamExt}; use jsonrpsee::{ - core::{async_trait, ClientError}, + core::{async_trait, ClientError, SubscriptionResult}, proc_macros::rpc, server::ServerBuilder, types::{ErrorCode, ErrorObject, ErrorObjectOwned}, @@ -30,7 +30,7 @@ use std::{ path::PathBuf, sync::{atomic::AtomicBool, Arc}, }; -use tracing::{event, log, Level}; +use tracing::log; #[derive(thiserror::Error, Debug)] pub enum BrokerApiError { @@ -124,7 +124,7 @@ pub trait Rpc { ) -> RpcResult; #[subscription(name = "subscribe_tainted_transaction_events", item = Result,String>)] - async fn subscribe_tainted_transaction_events(&self); + async fn subscribe_tainted_transaction_events(&self) -> SubscriptionResult; } pub struct RpcServerImpl { @@ -247,29 +247,33 @@ impl RpcServer for RpcServerImpl { .map_err(BrokerApiError::ClientError) } - async fn subscribe_tainted_transaction_events(&self, pending_sink: PendingSubscriptionSink) { - let result = self + async fn subscribe_tainted_transaction_events( + &self, + pending_sink: PendingSubscriptionSink, + ) -> SubscriptionResult { + let subscription_stream = self .api .state_chain_client .base_rpc_client .raw_rpc_client .cf_subscribe_tainted_transaction_events() - .await; + .await? + .take_while(|item| { + futures::future::ready( + item.as_ref() + .inspect_err(|err| { + tracing::error!("Subscription error, could not deserialize message: {err}. Closing subscription.") + }) + .is_ok(), + ) + }) + .map(Result::unwrap); - match result { - Ok(subscription) => { - tokio::spawn(async { - sc_rpc::utils::pipe_from_stream( - pending_sink, - subscription.map_err(|err| format!("{err}")), - ) - .await - }); - }, - Err(err) => { - event!(Level::ERROR, "Could not subscribe to `tainted_btc_transaction_events` subscription on the node. Error: {err}"); - }, - } + tokio::spawn(async { + sc_rpc::utils::pipe_from_stream(pending_sink, subscription_stream).await + }); + + Ok(()) } } diff --git a/api/lib/src/lib.rs b/api/lib/src/lib.rs index 035a58a22ae..7724e8c1012 100644 --- a/api/lib/src/lib.rs +++ b/api/lib/src/lib.rs @@ -10,8 +10,8 @@ use cf_chains::{ dot::PolkadotAccountId, evm::to_evm_address, sol::SolAddress, - CcmChannelMetadata, Chain, ChainCrypto, ChannelRefundParametersEncoded, ChannelRefundParametersGeneric, ForeignChain, - ForeignChainAddress, + CcmChannelMetadata, Chain, ChainCrypto, ChannelRefundParametersEncoded, + ChannelRefundParametersGeneric, ForeignChain, ForeignChainAddress, }; pub use cf_primitives::{AccountRole, Affiliates, Asset, BasisPoints, ChannelId, SemVer}; use cf_primitives::{AssetAmount, BlockNumber, DcaParameters, NetworkEnvironment}; diff --git a/state-chain/custom-rpc/src/lib.rs b/state-chain/custom-rpc/src/lib.rs index 2cab3733dbc..ede0c3d1d7f 100644 --- a/state-chain/custom-rpc/src/lib.rs +++ b/state-chain/custom-rpc/src/lib.rs @@ -1755,13 +1755,17 @@ where self.with_runtime_api(at, |api, hash| api.cf_filter_votes(hash, validator, proposed_votes)) } - async fn cf_subscribe_tainted_transaction_events(&self, sink: jsonrpsee::PendingSubscriptionSink) { + async fn cf_subscribe_tainted_transaction_events( + &self, + sink: jsonrpsee::PendingSubscriptionSink, + ) { self.new_subscription( true, /* only_on_changes */ true, /* end_on_error */ sink, |client, hash| Ok(client.runtime_api().cf_tainted_transaction_events(hash)?), - ).await + ) + .await } }