Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
AlastairHolmes committed Sep 19, 2023
1 parent bb5bb6a commit 7c63ce3
Showing 1 changed file with 30 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use itertools::Itertools;
use sc_transaction_pool_api::TransactionStatus;
use sp_core::H256;
use sp_runtime::{traits::Hash, MultiAddress};
use state_chain_runtime::UncheckedExtrinsic;
use state_chain_runtime::{BlockNumber, Nonce, UncheckedExtrinsic};
use thiserror::Error;
use tokio::sync::oneshot;
use tracing::{debug, warn};
Expand Down Expand Up @@ -69,7 +69,7 @@ pub type SubmissionID = u64;
pub struct Request {
id: RequestID,
next_submission_id: SubmissionID,
pending_submissions: BTreeMap<SubmissionID, state_chain_runtime::Nonce>,
pending_submissions: BTreeMap<SubmissionID, Nonce>,
strictly_one_submission: bool,
resubmit_window: std::ops::RangeToInclusive<cf_primitives::BlockNumber>,
call: state_chain_runtime::RuntimeCall,
Expand All @@ -95,17 +95,17 @@ pub struct SubmissionWatcher<
BaseRpcClient: base_rpc_api::BaseRpcApi + Send + Sync + 'static,
> {
scope: &'a Scope<'env, anyhow::Error>,
submissions_by_nonce: BTreeMap<state_chain_runtime::Nonce, BTreeMap<SubmissionID, Submission>>,
submissions_by_nonce: BTreeMap<Nonce, BTreeMap<SubmissionID, Submission>>,
#[allow(clippy::type_complexity)]
submission_status_futures:
FutureMap<(RequestID, SubmissionID), task_scope::ScopedJoinHandle<Option<(H256, usize)>>>,
signer: signer::PairSigner<sp_core::sr25519::Pair>,
finalized_nonce: state_chain_runtime::Nonce,
finalized_nonce: Nonce,
finalized_block_hash: state_chain_runtime::Hash,
finalized_block_number: state_chain_runtime::BlockNumber,
finalized_block_number: BlockNumber,
runtime_version: sp_version::RuntimeVersion,
genesis_hash: state_chain_runtime::Hash,
extrinsic_lifetime: state_chain_runtime::BlockNumber,
extrinsic_lifetime: BlockNumber,
#[allow(clippy::type_complexity)]
block_cache: VecDeque<(
state_chain_runtime::Hash,
Expand All @@ -127,12 +127,12 @@ impl<'a, 'env, BaseRpcClient: base_rpc_api::BaseRpcApi + Send + Sync + 'static>
pub fn new(
scope: &'a Scope<'env, anyhow::Error>,
signer: signer::PairSigner<sp_core::sr25519::Pair>,
finalized_nonce: state_chain_runtime::Nonce,
finalized_nonce: Nonce,
finalized_block_hash: state_chain_runtime::Hash,
finalized_block_number: state_chain_runtime::BlockNumber,
finalized_block_number: BlockNumber,
runtime_version: sp_version::RuntimeVersion,
genesis_hash: state_chain_runtime::Hash,
extrinsic_lifetime: state_chain_runtime::BlockNumber,
extrinsic_lifetime: BlockNumber,
base_rpc_client: Arc<BaseRpcClient>,
) -> (Self, BTreeMap<RequestID, Request>) {
(
Expand All @@ -151,14 +151,18 @@ impl<'a, 'env, BaseRpcClient: base_rpc_api::BaseRpcApi + Send + Sync + 'static>
base_rpc_client,
error_decoder: Default::default(),
},
// Return an empty requests map. This is done so that initial state of the requests
// matches the submission watchers state. The requests must be stored outside of
// the watcher so it can be manipulated by it's parent while holding a mut reference
// to the watcher.
Default::default(),
)
}

async fn submit_extrinsic_at_nonce(
&mut self,
request: &mut Request,
nonce: state_chain_runtime::Nonce,
nonce: Nonce,
) -> Result<Result<H256, SubmissionLogicError>, anyhow::Error> {
loop {
let (signed_extrinsic, lifetime) = self.signer.new_signed_extrinsic(
Expand Down Expand Up @@ -307,6 +311,8 @@ impl<'a, 'env, BaseRpcClient: base_rpc_api::BaseRpcApi + Send + Sync + 'static>
extrinsic_events: Vec<state_chain_runtime::RuntimeEvent>,
header: state_chain_runtime::Header,
) -> ExtrinsicResult<OtherError> {
// We expect to find a Success or Failed event, grab the dispatch info and send
// it with the events
extrinsic_events
.iter()
.find_map(|event| match event {
Expand Down Expand Up @@ -418,6 +424,7 @@ impl<'a, 'env, BaseRpcClient: base_rpc_api::BaseRpcApi + Send + Sync + 'static>

assert_eq!(block.header.number, self.finalized_block_number + 1, "{SUBSTRATE_BEHAVIOUR}");

// Get our account nonce and compare it to the finalized nonce
let nonce = self
.base_rpc_client
.storage_map_entry::<frame_system::Account<state_chain_runtime::Runtime>>(
Expand All @@ -430,10 +437,9 @@ impl<'a, 'env, BaseRpcClient: base_rpc_api::BaseRpcApi + Send + Sync + 'static>
if nonce < self.finalized_nonce {
Err(anyhow!("Extrinsic signer's account was reaped"))
} else {
// TODO: Get hash and number from the RPC and use std::cmp::max() here
// Update the finalized data
self.finalized_block_number = block.header.number;
self.finalized_block_hash = block_hash;

self.finalized_nonce = nonce;

for (extrinsic_index, extrinsic_events) in events
Expand All @@ -453,9 +459,11 @@ impl<'a, 'env, BaseRpcClient: base_rpc_api::BaseRpcApi + Send + Sync + 'static>
(extrinsic_index, extrinsic_events.map(|(_extrinsics_index, event)| event))
}) {
let extrinsic = &block.extrinsics[extrinsic_index as usize];
// TODO: Assumption needs checking

// Find any submissions that are for the nonce of the extrinsic
if let Some(submissions) = extrinsic.signature.as_ref().and_then(
|(address, _, (.., frame_system::CheckNonce(nonce), _, _))| {
// We only care about the extrinsic if it is from our account
(*address == MultiAddress::Id(self.signer.account_id.clone()))
.then_some(())
.and_then(|_| self.submissions_by_nonce.remove(nonce))
Expand All @@ -466,7 +474,7 @@ impl<'a, 'env, BaseRpcClient: base_rpc_api::BaseRpcApi + Send + Sync + 'static>
extrinsic,
);

let mut not_found_matching_submission = Some(extrinsic_events);
let mut optional_extrinsic_events = Some(extrinsic_events);

for (submission_id, submission) in submissions {
if let Some(request) = requests.get_mut(&submission.request_id) {
Expand All @@ -480,12 +488,12 @@ impl<'a, 'env, BaseRpcClient: base_rpc_api::BaseRpcApi + Send + Sync + 'static>
// notice the extrinsic was not actually the requested one,
// but otherwise would continue to work.
if let Some((extrinsic_events, matching_request)) =
(not_found_matching_submission.is_some() &&
submission.tx_hash == tx_hash)
(optional_extrinsic_events.is_some() && submission.tx_hash == tx_hash)
.then_some(())
.and_then(|_| requests.remove(&submission.request_id))
.map(|request| {
(not_found_matching_submission.take().unwrap(), request)
// If its the right hash, take the events and the request
(optional_extrinsic_events.take().unwrap(), request)
}) {
let extrinsic_events = extrinsic_events.collect::<Vec<_>>();
let result = self.decide_extrinsic_success(
Expand Down Expand Up @@ -514,6 +522,7 @@ impl<'a, 'env, BaseRpcClient: base_rpc_api::BaseRpcApi + Send + Sync + 'static>
}
}

// Remove any submissions that have expired
self.submissions_by_nonce.retain(|nonce, submissions| {
assert!(self.finalized_nonce <= *nonce, "{SUBSTRATE_BEHAVIOUR}");

Expand All @@ -533,6 +542,8 @@ impl<'a, 'env, BaseRpcClient: base_rpc_api::BaseRpcApi + Send + Sync + 'static>
!submissions.is_empty()
});

// Remove any requests that have all their submission have expired and whose
// resubmission window has past.
for (_request_id, request) in requests.extract_if(|_request_id, request| {
request.pending_submissions.is_empty() &&
(!request.resubmit_window.contains(&(block.header.number + 1)) ||
Expand All @@ -542,8 +553,8 @@ impl<'a, 'env, BaseRpcClient: base_rpc_api::BaseRpcApi + Send + Sync + 'static>
.until_finalized_sender
.send(Err(ExtrinsicError::Other(FinalizationError::NotFinalized)));
}

// Has to be a separate loop from the above due to not being able to await inside
// Resubmit any expired requests that have no unexpired submission.
// This has to be a separate loop from the above due to not being able to await inside
// extract_if
for (_request_id, request) in requests.iter_mut() {
if request.pending_submissions.is_empty() {
Expand Down

0 comments on commit 7c63ce3

Please sign in to comment.