Skip to content

Commit

Permalink
refactor: extract dot witnessing procedures into reusable functions
Browse files Browse the repository at this point in the history
  • Loading branch information
msgmaxim committed Oct 16, 2023
1 parent 5c70808 commit fd2a1f8
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 62 deletions.
155 changes: 93 additions & 62 deletions engine/src/witness/dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ mod dot_deposits;
mod dot_source;

use cf_chains::dot::{
PolkadotAccountId, PolkadotBalance, PolkadotExtrinsicIndex, PolkadotUncheckedExtrinsic,
PolkadotAccountId, PolkadotBalance, PolkadotExtrinsicIndex, PolkadotHash, PolkadotSignature,
PolkadotUncheckedExtrinsic,
};
use cf_primitives::{EpochIndex, PolkadotBlockNumber, TxId};
use futures_core::Future;
Expand All @@ -29,9 +30,13 @@ use crate::{
witness::common::chain_source::extension::ChainSourceExt,
};
use anyhow::Result;
use dot_source::{DotFinalisedSource, DotUnfinalisedSource};
pub use dot_source::{DotFinalisedSource, DotUnfinalisedSource};

use super::common::{epoch_source::EpochSourceBuilder, STATE_CHAIN_CONNECTION};
use super::common::{
chain_source::Header,
epoch_source::{EpochSourceBuilder, Vault},
STATE_CHAIN_CONNECTION,
};

// To generate the metadata file, use the subxt-cli tool (`cargo install subxt-cli`):
// subxt metadata --format=json --pallets Proxy,Balances,TransactionPayment --url
Expand All @@ -51,7 +56,7 @@ use polkadot::{
transaction_payment::events::TransactionFeePaid,
};

fn filter_map_events(
pub fn filter_map_events(
res_event_details: Result<EventDetails<PolkadotConfig>, subxt::Error>,
) -> Option<(Phase, EventWrapper)> {
match res_event_details {
Expand Down Expand Up @@ -81,6 +86,86 @@ fn filter_map_events(
}
}

pub async fn proxy_added_witnessing<ProcessCall, ProcessingFut>(
epoch: Vault<cf_chains::Polkadot, PolkadotAccountId, ()>,
header: Header<PolkadotBlockNumber, PolkadotHash, (Vec<(Phase, EventWrapper)>, BTreeSet<u32>)>,
process_call: ProcessCall,
) -> (Vec<(Phase, EventWrapper)>, BTreeSet<u32>)
where
ProcessCall: Fn(state_chain_runtime::RuntimeCall, EpochIndex) -> ProcessingFut
+ Send
+ Sync
+ Clone
+ 'static,
ProcessingFut: Future<Output = ()> + Send + 'static,
{
let (events, mut broadcast_indices) = header.data;

let (vault_key_rotated_calls, mut proxy_added_broadcasts) =
proxy_addeds(header.index, &events, &epoch.info.1);
broadcast_indices.append(&mut proxy_added_broadcasts);

for call in vault_key_rotated_calls {
process_call(call, epoch.index).await;
}

(events, broadcast_indices)
}

pub async fn process_egress<ProcessCall, ProcessingFut>(
epoch: Vault<cf_chains::Polkadot, PolkadotAccountId, ()>,
header: Header<
PolkadotBlockNumber,
PolkadotHash,
((Vec<(Phase, EventWrapper)>, BTreeSet<u32>), Vec<PolkadotSignature>),
>,
process_call: ProcessCall,
dot_client: DotRetryRpcClient,
) where
ProcessCall: Fn(state_chain_runtime::RuntimeCall, EpochIndex) -> ProcessingFut
+ Send
+ Sync
+ Clone
+ 'static,
ProcessingFut: Future<Output = ()> + Send + 'static,
{
let ((events, broadcast_indices), monitored_egress_ids) = header.data;

let extrinsics = dot_client.extrinsics(header.hash).await;

for (extrinsic_index, tx_fee) in transaction_fee_paids(&broadcast_indices, &events) {
let xt = extrinsics.get(extrinsic_index as usize).expect(
"We know this exists since we got
this index from the event, from the block we are querying.",
);
let mut xt_bytes = xt.0.as_slice();

let unchecked = PolkadotUncheckedExtrinsic::decode(&mut xt_bytes);
if let Ok(unchecked) = unchecked {
if let Some(signature) = unchecked.signature() {
if monitored_egress_ids.contains(&signature) {
tracing::info!("Witnessing transaction_succeeded. signature: {signature:?}");
process_call(
pallet_cf_broadcast::Call::<_, PolkadotInstance>::transaction_succeeded {
tx_out_id: signature,
signer_id: epoch.info.1,
tx_fee,
}
.into(),
epoch.index,
)
.await;
}
}
} else {
// We expect this to occur when attempting to decode
// a transaction that was not sent by us.
// We can safely ignore it, but we log it in case.
tracing::debug!("Failed to decode UncheckedExtrinsic {unchecked:?}");
}
}
}

pub async fn start<
StateChainClient,
StateChainStream,
Expand Down Expand Up @@ -168,72 +253,18 @@ where
// Proxy added witnessing
.then({
let process_call = process_call.clone();
move |epoch, header| {
let process_call = process_call.clone();
async move {
let (events, mut broadcast_indices) = header.data;

let (vault_key_rotated_calls, mut proxy_added_broadcasts) = proxy_addeds(header.index, &events, &epoch.info.1);
broadcast_indices.append(&mut proxy_added_broadcasts);

for call in vault_key_rotated_calls {
process_call(call, epoch.index).await;
}

(events, broadcast_indices)
}
}}
)
move |epoch, header| proxy_added_witnessing(epoch, header, process_call.clone())
})
// Broadcast success
.egress_items(scope, state_chain_stream.clone(), state_chain_client.clone())
.await
.then({
let process_call = process_call.clone();
let dot_client = dot_client.clone();
move |epoch, header| {
let process_call = process_call.clone();
let dot_client = dot_client.clone();
async move {
let ((events, broadcast_indices), monitored_egress_ids) = header.data;

let extrinsics = dot_client
.extrinsics(header.hash)
.await;

for (extrinsic_index, tx_fee) in transaction_fee_paids(&broadcast_indices, &events) {
let xt = extrinsics.get(extrinsic_index as usize).expect("We know this exists since we got this index from the event, from the block we are querying.");
let mut xt_bytes = xt.0.as_slice();

let unchecked = PolkadotUncheckedExtrinsic::decode(&mut xt_bytes);
if let Ok(unchecked) = unchecked {
if let Some(signature) = unchecked.signature() {
if monitored_egress_ids.contains(&signature) {
tracing::info!("Witnessing transaction_succeeded. signature: {signature:?}");
process_call(
pallet_cf_broadcast::Call::<
_,
PolkadotInstance,
>::transaction_succeeded {
tx_out_id: signature,
signer_id: epoch.info.1,
tx_fee,
}
.into(),
epoch.index,
).await;
}
}
} else {
// We expect this to occur when attempting to decode
// a transaction that was not sent by us.
// We can safely ignore it, but we log it in case.
tracing::debug!("Failed to decode UncheckedExtrinsic {unchecked:?}");
}
}
}
}
process_egress(epoch, header, process_call.clone(), dot_client.clone())
}
)
})
.continuous("Polkadot".to_string(), db)
.logging("witnessing")
.spawn(scope);
Expand Down
2 changes: 2 additions & 0 deletions engine/src/witness/dot/dot_deposits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl<Inner: ChunkedByVault> ChunkedByVaultBuilder<Inner> {
Data = (Vec<(Phase, EventWrapper)>, BTreeSet<u32>),
Chain = Polkadot,
ExtraInfo = PolkadotAccountId,
ExtraHistoricInfo = (),
>,
>
where
Expand All @@ -42,6 +43,7 @@ impl<Inner: ChunkedByVault> ChunkedByVaultBuilder<Inner> {
Data = (Vec<(Phase, EventWrapper)>, Addresses<Inner>),
Chain = Polkadot,
ExtraInfo = PolkadotAccountId,
ExtraHistoricInfo = (),
>,
ProcessCall: Fn(state_chain_runtime::RuntimeCall, EpochIndex) -> ProcessingFut
+ Send
Expand Down

0 comments on commit fd2a1f8

Please sign in to comment.