diff --git a/state-chain/custom-rpc/src/lib.rs b/state-chain/custom-rpc/src/lib.rs index b3c33561a4..18bc155863 100644 --- a/state-chain/custom-rpc/src/lib.rs +++ b/state-chain/custom-rpc/src/lib.rs @@ -260,6 +260,17 @@ pub trait CustomApi { fn cf_min_swap_amount(&self, asset: Asset) -> RpcResult; #[subscription(name = "subscribe_pool_price", item = Price)] fn cf_subscribe_pool_price(&self, from: Asset, to: Asset); + + #[subscription(name = "subscribe_prewitness_swaps", item = Vec)] + fn cf_subscribe_prewitness_swaps(&self, from: Asset, to: Asset); + + #[method(name = "prewitness_swaps")] + fn cf_prewitness_swaps( + &self, + from: Asset, + to: Asset, + at: Option, + ) -> RpcResult>>; } /// An RPC extension for the state chain node. @@ -694,7 +705,28 @@ where from: Asset, to: Asset, ) -> Result<(), SubscriptionEmptyError> { - self.new_subscription(sink, move |api, hash| api.cf_pool_price(hash, from, to)) + self.new_update_subscription(sink, move |api, hash| api.cf_pool_price(hash, from, to)) + } + + fn cf_subscribe_prewitness_swaps( + &self, + sink: SubscriptionSink, + from: Asset, + to: Asset, + ) -> Result<(), SubscriptionEmptyError> { + self.new_items_subscription(sink, move |api, hash| api.cf_prewitness_swaps(hash, from, to)) + } + + fn cf_prewitness_swaps( + &self, + from: Asset, + to: Asset, + at: Option, + ) -> RpcResult>> { + self.client + .runtime_api() + .cf_prewitness_swaps(self.unwrap_or_best(at), from, to) + .map_err(to_rpc_error) } } @@ -709,7 +741,9 @@ where + BlockchainEvents, C::Api: CustomRuntimeApi, { - fn new_subscription< + /// Upon subscribing returns the first value immediately and then subscribes to updates. i.e. it + /// will only return a value if it has changed from the previous value it returned. + fn new_update_subscription< T: Serialize + Send + Clone + Eq + 'static, E: std::error::Error + Send + Sync + 'static, F: Fn(&C::Api, state_chain_runtime::Hash) -> Result + Send + Clone + 'static, @@ -756,7 +790,44 @@ where sink.pipe_from_stream(stream).await; }; - self.executor.spawn("cf-rpc-subscription", Some("rpc"), fut.boxed()); + self.executor.spawn("cf-rpc-update-subscription", Some("rpc"), fut.boxed()); + + Ok(()) + } + + /// After creating the subscription it will return all the new prewitnessed swaps from this + /// point onwards. + fn new_items_subscription< + T: Serialize + Send + Clone + Eq + 'static, + E: std::error::Error + Send + Sync + 'static, + F: Fn(&C::Api, state_chain_runtime::Hash) -> Result, E> + Send + Clone + 'static, + >( + &self, + mut sink: SubscriptionSink, + f: F, + ) -> Result<(), SubscriptionEmptyError> { + use futures::{future::FutureExt, stream::StreamExt}; + + let client = self.client.clone(); + + let stream = self + .client + .import_notification_stream() + .filter(|n| futures::future::ready(n.is_new_best)) + .filter_map(move |n| { + let new = f(&client.runtime_api(), n.hash); + + match new { + Ok(new) => futures::future::ready(new), + _ => futures::future::ready(None), + } + }); + + let fut = async move { + sink.pipe_from_stream(stream).await; + }; + + self.executor.spawn("cf-rpc-stream-subscription", Some("rpc"), fut.boxed()); Ok(()) } diff --git a/state-chain/pallets/cf-swapping/src/lib.rs b/state-chain/pallets/cf-swapping/src/lib.rs index 4a558c1113..f111a15235 100644 --- a/state-chain/pallets/cf-swapping/src/lib.rs +++ b/state-chain/pallets/cf-swapping/src/lib.rs @@ -154,6 +154,13 @@ pub(crate) struct CcmSwap { gas_swap_id: Option, } +pub struct CcmSwapAmounts { + pub principal_swap_amount: AssetAmount, + pub gas_budget: AssetAmount, + // if the gas asset is different to the input asset, it will require a swap + pub other_gas_asset: Option, +} + #[derive(Clone, Debug, PartialEq, Eq, Encode, Decode, TypeInfo, MaxEncodedLen)] pub enum CcmFailReason { UnsupportedForTargetChain, @@ -684,6 +691,39 @@ pub mod pallet { } impl Pallet { + pub fn principal_and_gas_amounts( + deposit_amount: AssetAmount, + channel_metadata: &CcmChannelMetadata, + source_asset: Asset, + destination_asset: Asset, + ) -> Result { + let gas_budget = channel_metadata.gas_budget; + let principal_swap_amount = deposit_amount.saturating_sub(gas_budget); + + if ForeignChain::Ethereum != destination_asset.into() { + return Err(CcmFailReason::UnsupportedForTargetChain) + } else if deposit_amount < gas_budget { + return Err(CcmFailReason::InsufficientDepositAmount) + } else if source_asset != destination_asset && + !principal_swap_amount.is_zero() && + principal_swap_amount < MinimumSwapAmount::::get(source_asset) + { + // If the CCM's principal requires a swap and is non-zero, + // then the principal swap amount must be above minimum swap amount required. + return Err(CcmFailReason::PrincipalSwapAmountTooLow) + } + + // if the gas asset is different. + let output_gas_asset = ForeignChain::from(destination_asset).gas_asset(); + let other_gas_asset = if source_asset == output_gas_asset || gas_budget.is_zero() { + None + } else { + Some(output_gas_asset) + }; + + Ok(CcmSwapAmounts { principal_swap_amount, gas_budget, other_gas_asset }) + } + // The address and the asset being sent or withdrawn must be compatible. fn validate_destination_address( destination_address: &EncodedAddress, @@ -971,38 +1011,28 @@ pub mod pallet { // Caller should ensure that assets and addresses are compatible. debug_assert!(destination_address.chain() == ForeignChain::from(destination_asset)); - let principal_swap_amount = - deposit_amount.saturating_sub(deposit_metadata.channel_metadata.gas_budget); - - // Checks the validity of CCM. - let error = if ForeignChain::Ethereum != destination_asset.into() { - Some(CcmFailReason::UnsupportedForTargetChain) - } else if deposit_amount < deposit_metadata.channel_metadata.gas_budget { - Some(CcmFailReason::InsufficientDepositAmount) - } else if source_asset != destination_asset && - !principal_swap_amount.is_zero() && - principal_swap_amount < MinimumSwapAmount::::get(source_asset) - { - // If the CCM's principal requires a swap and is non-zero, - // then the principal swap amount must be above minimum swap amount required. - Some(CcmFailReason::PrincipalSwapAmountTooLow) - } else { - None - }; - - if let Some(reason) = error { - // Confiscate the deposit and emit an event. - CollectedRejectedFunds::::mutate(source_asset, |fund| { - *fund = fund.saturating_add(deposit_amount) - }); + let CcmSwapAmounts { principal_swap_amount, gas_budget, other_gas_asset } = + match Self::principal_and_gas_amounts( + deposit_amount, + &deposit_metadata.channel_metadata, + source_asset, + destination_asset, + ) { + Ok(amounts) => amounts, + Err(reason) => { + // Confiscate the deposit and emit an event. + CollectedRejectedFunds::::mutate(source_asset, |fund| { + *fund = fund.saturating_add(deposit_amount) + }); - Self::deposit_event(Event::::CcmFailed { - reason, - destination_address: encoded_destination_address, - deposit_metadata, - }); - return - } + Self::deposit_event(Event::::CcmFailed { + reason, + destination_address: encoded_destination_address, + deposit_metadata, + }); + return + }, + }; let ccm_id = CcmIdCounter::::mutate(|id| { id.saturating_accrue(1); @@ -1035,31 +1065,27 @@ pub mod pallet { Some(swap_id) }; - let output_gas_asset = ForeignChain::from(destination_asset).gas_asset(); - let gas_swap_id = if source_asset == output_gas_asset || - deposit_metadata.channel_metadata.gas_budget.is_zero() - { - // Deposit can be used as gas directly - swap_output.gas = Some(deposit_metadata.channel_metadata.gas_budget); - None - } else { + let gas_swap_id = if let Some(other_gas_asset) = other_gas_asset { let swap_id = Self::schedule_swap_internal( source_asset, - output_gas_asset, - deposit_metadata.channel_metadata.gas_budget, + other_gas_asset, + gas_budget, SwapType::CcmGas(ccm_id), ); Self::deposit_event(Event::::SwapScheduled { swap_id, source_asset, - deposit_amount: deposit_metadata.channel_metadata.gas_budget, - destination_asset: output_gas_asset, + deposit_amount: gas_budget, + destination_asset: other_gas_asset, destination_address: encoded_destination_address.clone(), origin, swap_type: SwapType::CcmGas(ccm_id), broker_commission: None, }); Some(swap_id) + } else { + swap_output.gas = Some(gas_budget); + None }; Self::deposit_event(Event::::CcmDepositReceived { diff --git a/state-chain/runtime/src/lib.rs b/state-chain/runtime/src/lib.rs index 665b12c865..018a0a8b59 100644 --- a/state-chain/runtime/src/lib.rs +++ b/state-chain/runtime/src/lib.rs @@ -24,13 +24,15 @@ use cf_chains::{ dot::{self, PolkadotCrypto}, eth::{self, api::EthereumApi, Address as EthereumAddress, Ethereum}, evm::EvmCrypto, - Bitcoin, Polkadot, + Bitcoin, CcmChannelMetadata, Polkadot, }; use core::ops::Range; pub use frame_system::Call as SystemCall; use pallet_cf_governance::GovCallHash; +use pallet_cf_ingress_egress::{ChannelAction, DepositWitness}; use pallet_cf_pools::{AssetsMap, Depth, PoolLiquidity}; use pallet_cf_reputation::ExclusionList; +use pallet_cf_swapping::CcmSwapAmounts; use pallet_transaction_payment::{ConstFeeMultiplier, Multiplier}; use sp_runtime::DispatchError; @@ -1065,6 +1067,119 @@ impl_runtime_apis! { fn cf_min_swap_amount(asset: Asset) -> AssetAmount { Swapping::minimum_swap_amount(asset) } + + /// This should *not* be fully trusted as if the deposits that are pre-witnessed will definitely go through. + /// This returns a list of swaps in the requested direction that are pre-witnessed in the current block. + fn cf_prewitness_swaps(from: Asset, to: Asset) -> Option> { + + fn filter_deposit_swaps(from: Asset, to: Asset, deposit_witnesses: Vec>) -> Vec + where Runtime: pallet_cf_ingress_egress::Config, + C: cf_chains::Chain>::TargetChain as cf_chains::Chain>::ChainAccount> + { + let mut filtered_swaps = Vec::new(); + for deposit in deposit_witnesses { + let Some(details) = pallet_cf_ingress_egress::DepositChannelLookup::::get( + deposit.deposit_address, + ) else { + continue + }; + let channel_asset: Asset = details.deposit_channel.asset.into(); + + match details.action { + ChannelAction::Swap { destination_asset, .. } + if destination_asset == to && channel_asset == from => + { + filtered_swaps.push(deposit.amount.into()); + }, + ChannelAction::CcmTransfer { destination_asset, channel_metadata, .. } => { + filtered_swaps.extend(ccm_swaps(from, to, channel_asset, destination_asset, deposit.amount.into(), channel_metadata)); + } + _ => { + // ignore other deposit actions + } + } + } + filtered_swaps + } + + fn ccm_swaps(from: Asset, to: Asset, source_asset: Asset, destination_asset: Asset, deposit_amount: AssetAmount, channel_metadata: CcmChannelMetadata) -> Vec { + if source_asset != from { + return Vec::new(); + } + + // There are two swaps for CCM, the principal swap, and the gas amount swap. + let Ok(CcmSwapAmounts { principal_swap_amount, gas_budget, other_gas_asset }) = Swapping::principal_and_gas_amounts(deposit_amount, &channel_metadata, source_asset, destination_asset) else { + // not a valid CCM + return Vec::new(); + }; + + let mut ccm_swaps = Vec::new(); + if destination_asset == to { + // the principal swap is in the requested direction. + ccm_swaps.push(principal_swap_amount); + } + + if let Some(gas_asset) = other_gas_asset { + if gas_asset == to { + // the gas swap is in the requested direction + ccm_swaps.push(gas_budget); + } + } + + ccm_swaps + } + + let mut all_prewitnessed_swaps = Vec::new(); + let current_block_events = System::read_events_no_consensus(); + + for event in current_block_events { + match *event { + frame_system::EventRecord:: { event: RuntimeEvent::Witnesser(pallet_cf_witnesser::Event::Prewitnessed { call }), ..} => { + match call { + RuntimeCall::Swapping(pallet_cf_swapping::Call::schedule_swap_from_contract { + from: swap_from, to: swap_to, deposit_amount, .. + }) if from == swap_from && to == swap_to => { + all_prewitnessed_swaps.push(deposit_amount); + } + RuntimeCall::EthereumIngressEgress(pallet_cf_ingress_egress::Call::process_deposits { + deposit_witnesses, .. + }) => { + all_prewitnessed_swaps.extend(filter_deposit_swaps::(from, to, deposit_witnesses)); + }, + RuntimeCall::BitcoinIngressEgress(pallet_cf_ingress_egress::Call::process_deposits { + deposit_witnesses, .. + }) => { + all_prewitnessed_swaps.extend(filter_deposit_swaps::(from, to, deposit_witnesses)); + }, + RuntimeCall::PolkadotIngressEgress(pallet_cf_ingress_egress::Call::process_deposits { + deposit_witnesses, .. + }) => { + all_prewitnessed_swaps.extend(filter_deposit_swaps::(from, to, deposit_witnesses)); + } + RuntimeCall::Swapping(pallet_cf_swapping::Call::ccm_deposit { + source_asset, deposit_amount, destination_asset, deposit_metadata, .. + }) => { + // There are two swaps for CCM, the principal swap, and the gas amount swap. + all_prewitnessed_swaps.extend(ccm_swaps(from, to, source_asset, destination_asset, deposit_amount, deposit_metadata.channel_metadata)); + } + _ => { + // ignore, we only care about calls that trigger swaps. + } + } + } + _ => { + // ignore, we only care about Prewitnessed calls + } + } + } + + // We don't want to return anything from the websocket stream if there are no items + if all_prewitnessed_swaps.is_empty() { + None + } else { + Some(all_prewitnessed_swaps) + } + } } // END custom runtime APIs @@ -1309,7 +1424,6 @@ impl_runtime_apis! { } } } - #[cfg(test)] mod test { use super::*; diff --git a/state-chain/runtime/src/runtime_apis.rs b/state-chain/runtime/src/runtime_apis.rs index da1779f94a..457985ce68 100644 --- a/state-chain/runtime/src/runtime_apis.rs +++ b/state-chain/runtime/src/runtime_apis.rs @@ -134,5 +134,6 @@ decl_runtime_apis!( ) -> Option, DispatchError>>; fn cf_environment() -> Environment; fn cf_min_swap_amount(asset: Asset) -> AssetAmount; + fn cf_prewitness_swaps(from: Asset, to: Asset) -> Option>; } );