Skip to content

Commit

Permalink
refactor: use async closures for SC specific witnessing functionality (
Browse files Browse the repository at this point in the history
…#4049)

* refactor: pull out extrinsic submission from erc20 witnessing

* refactor: pull out eth deposit submission

* refactor: factor out finalize extrinsic code

* refactor: remove unnecessary macro

* factor out extrinsic code for BTC

* refactor: factor out extrinsic submission for DOT

* refactor: use it for the rest of the witnessers for consistency

* chore: remove unnecessary clippy allow
  • Loading branch information
kylezs authored and dandanlen committed Sep 29, 2023
1 parent 072c27a commit a3f4c81
Show file tree
Hide file tree
Showing 9 changed files with 278 additions and 334 deletions.
73 changes: 37 additions & 36 deletions engine/src/witness/btc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use cf_chains::{
btc::{deposit_address::DepositAddress, ScriptPubkey, UtxoId, CHANGE_ADDRESS_SALT},
Bitcoin,
};
use cf_primitives::chains::assets::btc;
use cf_primitives::{chains::assets::btc, EpochIndex};
use futures_core::Future;
use pallet_cf_ingress_egress::{DepositChannelDetails, DepositWitness};
use secp256k1::hashes::Hash;
use state_chain_runtime::BitcoinInstance;
Expand All @@ -29,9 +30,10 @@ use anyhow::Result;

const SAFETY_MARGIN: usize = 6;

pub async fn start<StateChainClient, StateChainStream>(
pub async fn start<StateChainClient, StateChainStream, ProcessCall, ProcessingFut>(
scope: &Scope<'_, anyhow::Error>,
btc_client: BtcRetryRpcClient,
process_call: ProcessCall,
state_chain_client: Arc<StateChainClient>,
state_chain_stream: StateChainStream,
epoch_source: EpochSourceBuilder<'_, '_, StateChainClient, (), ()>,
Expand All @@ -40,6 +42,12 @@ pub async fn start<StateChainClient, StateChainStream>(
where
StateChainClient: StorageApi + SignedExtrinsicApi + 'static + Send + Sync,
StateChainStream: StateChainStreamApi + Clone + 'static + Send + Sync,
ProcessCall: Fn(state_chain_runtime::RuntimeCall, EpochIndex) -> ProcessingFut
+ Send
+ Sync
+ Clone
+ 'static,
ProcessingFut: Future<Output = ()> + Send + 'static,
{
let btc_source = BtcSource::new(btc_client.clone()).shared(scope);

Expand Down Expand Up @@ -68,9 +76,9 @@ where
.deposit_addresses(scope, state_chain_stream.clone(), state_chain_client.clone())
.await
.then({
let state_chain_client = state_chain_client.clone();
let process_call = process_call.clone();
move |epoch, header| {
let state_chain_client = state_chain_client.clone();
let process_call = process_call.clone();
async move {
// TODO: Make addresses a Map of some kind?
let (((), txs), addresses) = header.data;
Expand All @@ -81,18 +89,12 @@ where

// Submit all deposit witnesses for the block.
if !deposit_witnesses.is_empty() {
state_chain_client
.finalize_signed_extrinsic(pallet_cf_witnesser::Call::witness_at_epoch {
call: Box::new(
pallet_cf_ingress_egress::Call::<_, BitcoinInstance>::process_deposits {
deposit_witnesses,
block_height: header.index,
}
.into(),
),
epoch_index: epoch.index,
})
.await;
process_call(
pallet_cf_ingress_egress::Call::<_, BitcoinInstance>::process_deposits {
deposit_witnesses,
block_height: header.index,
}.into(),
epoch.index).await;
}
txs
}
Expand All @@ -101,30 +103,29 @@ where
.egress_items(scope, state_chain_stream, state_chain_client.clone())
.await
.then(move |epoch, header| {
let state_chain_client = state_chain_client.clone();
let process_call = process_call.clone();
async move {
let (txs, monitored_tx_hashes) = header.data;

for tx_hash in success_witnesses(&monitored_tx_hashes, &txs) {
state_chain_client
.finalize_signed_extrinsic(pallet_cf_witnesser::Call::witness_at_epoch {
call: Box::new(state_chain_runtime::RuntimeCall::BitcoinBroadcaster(
pallet_cf_broadcast::Call::transaction_succeeded {
tx_out_id: tx_hash,
signer_id: DepositAddress::new(
epoch.info.0.public_key.current,
CHANGE_ADDRESS_SALT,
)
.script_pubkey(),
// TODO: Ideally we can submit an empty type here. For
// Bitcoin and some other chains fee tracking is not
// necessary. PRO-370.
tx_fee: Default::default(),
},
)),
epoch_index: epoch.index,
})
.await;
process_call(
state_chain_runtime::RuntimeCall::BitcoinBroadcaster(
pallet_cf_broadcast::Call::transaction_succeeded {
tx_out_id: tx_hash,
signer_id: DepositAddress::new(
epoch.info.0.public_key.current,
CHANGE_ADDRESS_SALT,
)
.script_pubkey(),
// TODO: Ideally we can submit an empty type here. For
// Bitcoin and some other chains fee tracking is not
// necessary. PRO-370.
tx_fee: Default::default(),
},
),
epoch.index,
)
.await;
}
}
})
Expand Down
89 changes: 40 additions & 49 deletions engine/src/witness/dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use cf_chains::{
dot::{PolkadotAccountId, PolkadotBalance, PolkadotExtrinsicIndex, PolkadotUncheckedExtrinsic},
Polkadot,
};
use cf_primitives::{chains::assets, PolkadotBlockNumber, TxId};
use cf_primitives::{chains::assets, EpochIndex, PolkadotBlockNumber, TxId};
use futures_core::Future;
use pallet_cf_ingress_egress::{DepositChannelDetails, DepositWitness};
use state_chain_runtime::PolkadotInstance;
use subxt::{
Expand Down Expand Up @@ -81,9 +82,10 @@ fn filter_map_events(
}
}

pub async fn start<StateChainClient, StateChainStream>(
pub async fn start<StateChainClient, StateChainStream, ProcessCall, ProcessingFut>(
scope: &Scope<'_, anyhow::Error>,
dot_client: DotRetryRpcClient,
process_call: ProcessCall,
state_chain_client: Arc<StateChainClient>,
state_chain_stream: StateChainStream,
epoch_source: EpochSourceBuilder<'_, '_, StateChainClient, (), ()>,
Expand All @@ -92,6 +94,12 @@ pub async fn start<StateChainClient, StateChainStream>(
where
StateChainClient: StorageApi + SignedExtrinsicApi + 'static + Send + Sync,
StateChainStream: StateChainStreamApi + Clone,
ProcessCall: Fn(state_chain_runtime::RuntimeCall, EpochIndex) -> ProcessingFut
+ Send
+ Sync
+ Clone
+ 'static,
ProcessingFut: Future<Output = ()> + Send + 'static,
{
DotUnfinalisedSource::new(dot_client.clone())
.then(|header| async move { header.data.iter().filter_map(filter_map_events).collect() })
Expand Down Expand Up @@ -127,9 +135,9 @@ where
.await
// Deposit witnessing
.then({
let state_chain_client = state_chain_client.clone();
let process_call = process_call.clone();
move |epoch, header| {
let state_chain_client = state_chain_client.clone();
let process_call = process_call.clone();
async move {
let (events, addresses_and_details) = header.data;

Expand All @@ -139,18 +147,14 @@ where
deposit_witnesses(header.index, addresses, &events, &epoch.info.1);

if !deposit_witnesses.is_empty() {
state_chain_client
.finalize_signed_extrinsic(pallet_cf_witnesser::Call::witness_at_epoch {
call: Box::new(
pallet_cf_ingress_egress::Call::<_, PolkadotInstance>::process_deposits {
deposit_witnesses,
block_height: header.index,
}
.into(),
),
epoch_index: epoch.index,
})
.await;
process_call(
pallet_cf_ingress_egress::Call::<_, PolkadotInstance>::process_deposits {
deposit_witnesses,
block_height: header.index,
}
.into(),
epoch.index
).await
}

(events, broadcast_indices)
Expand All @@ -159,22 +163,17 @@ where
})
// Proxy added witnessing
.then({
let state_chain_client = state_chain_client.clone();
let process_call = process_call.clone();
move |epoch, header| {
let state_chain_client = state_chain_client.clone();
let process_call = process_call.clone();
async move {
let (events, mut broadcast_indices) = header.data;

let (vault_key_rotated_calls, mut proxy_added_broadcasts) = proxy_addeds(header.index, &events, &epoch.info.1);
broadcast_indices.append(&mut proxy_added_broadcasts);

for call in vault_key_rotated_calls {
state_chain_client
.finalize_signed_extrinsic(pallet_cf_witnesser::Call::witness_at_epoch {
call,
epoch_index: epoch.index,
})
.await;
process_call(call, epoch.index).await;
}

(events, broadcast_indices)
Expand All @@ -185,10 +184,10 @@ where
.egress_items(scope, state_chain_stream.clone(), state_chain_client.clone())
.await
.then({
let state_chain_client = state_chain_client.clone();
let process_call = process_call.clone();
let dot_client = dot_client.clone();
move |epoch, header| {
let state_chain_client = state_chain_client.clone();
let process_call = process_call.clone();
let dot_client = dot_client.clone();
async move {
let ((events, broadcast_indices), monitored_egress_ids) = header.data;
Expand All @@ -206,25 +205,18 @@ where
if let Some(signature) = unchecked.signature() {
if monitored_egress_ids.contains(&signature) {
tracing::info!("Witnessing transaction_succeeded. signature: {signature:?}");
state_chain_client
.finalize_signed_extrinsic(
pallet_cf_witnesser::Call::witness_at_epoch {
call:
Box::new(
pallet_cf_broadcast::Call::<
_,
PolkadotInstance,
>::transaction_succeeded {
tx_out_id: signature,
signer_id: epoch.info.1,
tx_fee,
}
.into(),
),
epoch_index: epoch.index,
},
)
.await;
process_call(
pallet_cf_broadcast::Call::<
_,
PolkadotInstance,
>::transaction_succeeded {
tx_out_id: signature,
signer_id: epoch.info.1,
tx_fee,
}
.into(),
epoch.index,
).await;
}
}
} else {
Expand Down Expand Up @@ -312,12 +304,11 @@ fn transaction_fee_paids(
indices_with_fees
}

#[allow(clippy::vec_box)]
fn proxy_addeds(
block_number: PolkadotBlockNumber,
events: &Vec<(Phase, EventWrapper)>,
our_vault: &PolkadotAccountId,
) -> (Vec<Box<state_chain_runtime::RuntimeCall>>, BTreeSet<PolkadotExtrinsicIndex>) {
) -> (Vec<state_chain_runtime::RuntimeCall>, BTreeSet<PolkadotExtrinsicIndex>) {
let mut vault_key_rotated_calls = vec![];
let mut extrinsic_indices = BTreeSet::new();
for (phase, wrapped_event) in events {
Expand All @@ -329,13 +320,13 @@ fn proxy_addeds(

tracing::info!("Witnessing ProxyAdded. new delegatee: {delegatee:?} at block number {block_number} and extrinsic_index; {extrinsic_index}");

vault_key_rotated_calls.push(Box::new(
vault_key_rotated_calls.push(
pallet_cf_vaults::Call::<_, PolkadotInstance>::vault_key_rotated {
block_number,
tx_id: TxId { block_number, extrinsic_index },
}
.into(),
));
);

extrinsic_indices.insert(extrinsic_index);
}
Expand Down
28 changes: 18 additions & 10 deletions engine/src/witness/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ pub mod vault;

use std::{collections::HashMap, sync::Arc};

use cf_primitives::chains::assets::eth;
use cf_primitives::{chains::assets::eth, EpochIndex};
use futures_core::Future;
use sp_core::H160;
use utilities::task_scope::Scope;

Expand All @@ -33,9 +34,10 @@ use anyhow::{Context, Result};

const SAFETY_MARGIN: usize = 7;

pub async fn start<StateChainClient, StateChainStream>(
pub async fn start<StateChainClient, StateChainStream, ProcessCall, ProcessingFut>(
scope: &Scope<'_, anyhow::Error>,
eth_client: EthersRetryRpcClient,
process_call: ProcessCall,
state_chain_client: Arc<StateChainClient>,
state_chain_stream: StateChainStream,
epoch_source: EpochSourceBuilder<'_, '_, StateChainClient, (), ()>,
Expand All @@ -44,6 +46,12 @@ pub async fn start<StateChainClient, StateChainStream>(
where
StateChainClient: StorageApi + ChainApi + SignedExtrinsicApi + 'static + Send + Sync,
StateChainStream: StateChainStreamApi + Clone,
ProcessCall: Fn(state_chain_runtime::RuntimeCall, EpochIndex) -> ProcessingFut
+ Send
+ Sync
+ Clone
+ 'static,
ProcessingFut: Future<Output = ()> + Send + 'static,
{
let state_chain_gateway_address = state_chain_client
.storage_value::<pallet_cf_environment::EthereumStateChainGatewayAddress<state_chain_runtime::Runtime>>(
Expand Down Expand Up @@ -111,15 +119,15 @@ where

eth_safe_vault_source
.clone()
.key_manager_witnessing(state_chain_client.clone(), eth_client.clone(), key_manager_address)
.key_manager_witnessing(process_call.clone(), eth_client.clone(), key_manager_address)
.continuous("KeyManager".to_string(), db.clone())
.logging("KeyManager")
.spawn(scope);

eth_safe_vault_source
.clone()
.state_chain_gateway_witnessing(
state_chain_client.clone(),
process_call.clone(),
eth_client.clone(),
state_chain_gateway_address,
)
Expand All @@ -131,8 +139,8 @@ where
.clone()
.deposit_addresses(scope, state_chain_stream.clone(), state_chain_client.clone())
.await
.erc20_deposits::<_, _, UsdcEvents>(
state_chain_client.clone(),
.erc20_deposits::<_, _, _, UsdcEvents>(
process_call.clone(),
eth_client.clone(),
cf_primitives::chains::assets::eth::Asset::Usdc,
usdc_contract_address,
Expand All @@ -146,8 +154,8 @@ where
.clone()
.deposit_addresses(scope, state_chain_stream.clone(), state_chain_client.clone())
.await
.erc20_deposits::<_, _, FlipEvents>(
state_chain_client.clone(),
.erc20_deposits::<_, _, _, FlipEvents>(
process_call.clone(),
eth_client.clone(),
cf_primitives::chains::assets::eth::Asset::Flip,
flip_contract_address,
Expand All @@ -162,7 +170,7 @@ where
.deposit_addresses(scope, state_chain_stream.clone(), state_chain_client.clone())
.await
.ethereum_deposits(
state_chain_client.clone(),
process_call.clone(),
eth_client.clone(),
eth::Asset::Eth,
address_checker_address,
Expand All @@ -175,7 +183,7 @@ where

eth_safe_vault_source
.vault_witnessing(
state_chain_client.clone(),
process_call,
eth_client.clone(),
vault_address,
cf_primitives::Asset::Eth,
Expand Down
Loading

0 comments on commit a3f4c81

Please sign in to comment.