Skip to content

Commit

Permalink
feat: speedy scc
Browse files Browse the repository at this point in the history
  • Loading branch information
AlastairHolmes committed Sep 11, 2023
1 parent d77f68b commit 868a0bf
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 84 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ substrate-frame-rpc-system = { git = "https://github.com/chainflip-io/substrate.
frame-support = { git = "https://github.com/chainflip-io/substrate.git", tag = "chainflip-monthly-2023-08+1" }
frame-system = { git = "https://github.com/chainflip-io/substrate.git", tag = "chainflip-monthly-2023-08+1" }
sc-rpc-api = { git = "https://github.com/chainflip-io/substrate.git", tag = "chainflip-monthly-2023-08+1" }
sc-transaction-pool-api = { git = "https://github.com/chainflip-io/substrate.git", tag = 'chainflip-monthly-2023-08+1' }
scale-info = { version = "2.5.0", features = ["derive"] }
sp-core = { git = "https://github.com/chainflip-io/substrate.git", tag = "chainflip-monthly-2023-08+1" }
sp-rpc = { git = "https://github.com/chainflip-io/substrate.git", tag = "chainflip-monthly-2023-08+1" }
Expand Down
23 changes: 16 additions & 7 deletions engine/src/state_chain_observer/client/base_rpc_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use async_trait::async_trait;
use cf_amm::{common::Tick, range_orders::Liquidity};
use cf_primitives::Asset;
use jsonrpsee::core::{
client::{ClientT, SubscriptionClientT},
client::{ClientT, Subscription, SubscriptionClientT},
RpcResult,
};
use sc_transaction_pool_api::TransactionStatus;
use sp_core::{
storage::{StorageData, StorageKey},
Bytes,
Expand Down Expand Up @@ -92,6 +93,11 @@ pub trait BaseRpcApi {
extrinsic: state_chain_runtime::UncheckedExtrinsic,
) -> RpcResult<sp_core::H256>;

async fn submit_and_watch_extrinsic(
&self,
extrinsic: state_chain_runtime::UncheckedExtrinsic,
) -> RpcResult<Subscription<TransactionStatus<sp_core::H256, sp_core::H256>>>;

async fn storage(
&self,
block_hash: state_chain_runtime::Hash,
Expand Down Expand Up @@ -120,9 +126,7 @@ pub trait BaseRpcApi {

async fn subscribe_finalized_block_headers(
&self,
) -> RpcResult<
jsonrpsee::core::client::Subscription<sp_runtime::generic::Header<u32, BlakeTwo256>>,
>;
) -> RpcResult<Subscription<sp_runtime::generic::Header<u32, BlakeTwo256>>>;

async fn runtime_version(&self) -> RpcResult<RuntimeVersion>;

Expand Down Expand Up @@ -171,6 +175,13 @@ impl<RawRpcClient: RawRpcApi + Send + Sync> BaseRpcApi for BaseRpcClient<RawRpcC
self.raw_rpc_client.submit_extrinsic(Bytes::from(extrinsic.encode())).await
}

async fn submit_and_watch_extrinsic(
&self,
extrinsic: state_chain_runtime::UncheckedExtrinsic,
) -> RpcResult<Subscription<TransactionStatus<sp_core::H256, sp_core::H256>>> {
self.raw_rpc_client.watch_extrinsic(Bytes::from(extrinsic.encode())).await
}

async fn storage(
&self,
block_hash: state_chain_runtime::Hash,
Expand Down Expand Up @@ -215,9 +226,7 @@ impl<RawRpcClient: RawRpcApi + Send + Sync> BaseRpcApi for BaseRpcClient<RawRpcC

async fn subscribe_finalized_block_headers(
&self,
) -> RpcResult<
jsonrpsee::core::client::Subscription<sp_runtime::generic::Header<u32, BlakeTwo256>>,
> {
) -> RpcResult<Subscription<sp_runtime::generic::Header<u32, BlakeTwo256>>> {
self.raw_rpc_client.subscribe_finalized_heads().await
}

Expand Down
2 changes: 1 addition & 1 deletion engine/src/state_chain_observer/client/error_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl ErrorDecoder {
}
}

#[derive(Error, Debug)]
#[derive(Error, Debug, Clone)]
pub enum DispatchError {
#[error("{0:?}")]
DispatchError(sp_runtime::DispatchError),
Expand Down
106 changes: 83 additions & 23 deletions engine/src/state_chain_observer/client/extrinsic_api/signed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,51 @@ mod submission_watcher;
#[cfg_attr(test, mockall::automock)]
#[async_trait]
pub trait UntilFinalized {
async fn until_finalized(self) -> submission_watcher::ExtrinsicResult;
async fn until_finalized(self) -> submission_watcher::FinalizationResult;
}
#[async_trait]
impl<W: UntilFinalized + Send> UntilFinalized for (state_chain_runtime::Hash, W) {
async fn until_finalized(self) -> submission_watcher::ExtrinsicResult {
async fn until_finalized(self) -> submission_watcher::FinalizationResult {
self.1.until_finalized().await
}
}
pub struct UntilFinalizedFuture(oneshot::Receiver<submission_watcher::ExtrinsicResult>);
#[async_trait]
impl<T: UntilInBlock + Send, W: UntilFinalized + Send> UntilFinalized for (T, W) {
async fn until_finalized(self) -> submission_watcher::FinalizationResult {
self.1.until_finalized().await
}
}

pub struct UntilFinalizedFuture(oneshot::Receiver<submission_watcher::FinalizationResult>);
#[async_trait]
impl UntilFinalized for UntilFinalizedFuture {
async fn until_finalized(self) -> submission_watcher::ExtrinsicResult {
async fn until_finalized(self) -> submission_watcher::FinalizationResult {
self.0.await.expect(OR_CANCEL)
}
}

// Wrapper type to avoid await.await on submits/finalize calls being possible
#[cfg_attr(test, mockall::automock)]
#[async_trait]
pub trait UntilInBlock {
async fn until_in_block(self) -> submission_watcher::InBlockResult;
}
#[async_trait]
impl<W: UntilInBlock + Send> UntilInBlock for (state_chain_runtime::Hash, W) {
async fn until_in_block(self) -> submission_watcher::InBlockResult {
self.1.until_in_block().await
}
}
#[async_trait]
impl<T: UntilFinalized + Send, W: UntilInBlock + Send> UntilInBlock for (W, T) {
async fn until_in_block(self) -> submission_watcher::InBlockResult {
self.0.until_in_block().await
}
}
pub struct UntilInBlockFuture(oneshot::Receiver<submission_watcher::InBlockResult>);
#[async_trait]
impl UntilInBlock for UntilInBlockFuture {
async fn until_in_block(self) -> submission_watcher::InBlockResult {
self.0.await.expect(OR_CANCEL)
}
}
Expand All @@ -44,10 +77,14 @@ impl UntilFinalized for UntilFinalizedFuture {
#[async_trait]
pub trait SignedExtrinsicApi {
type UntilFinalizedFuture: UntilFinalized + Send;
type UntilInBlockFuture: UntilInBlock + Send;

fn account_id(&self) -> AccountId;

async fn submit_signed_extrinsic<Call>(&self, call: Call) -> (H256, Self::UntilFinalizedFuture)
async fn submit_signed_extrinsic<Call>(
&self,
call: Call,
) -> (H256, (Self::UntilInBlockFuture, Self::UntilFinalizedFuture))
where
Call: Into<state_chain_runtime::RuntimeCall>
+ Clone
Expand All @@ -56,7 +93,10 @@ pub trait SignedExtrinsicApi {
+ Sync
+ 'static;

async fn finalize_signed_extrinsic<Call>(&self, call: Call) -> Self::UntilFinalizedFuture
async fn finalize_signed_extrinsic<Call>(
&self,
call: Call,
) -> (Self::UntilInBlockFuture, Self::UntilFinalizedFuture)
where
Call: Into<state_chain_runtime::RuntimeCall>
+ Clone
Expand All @@ -70,7 +110,8 @@ pub struct SignedExtrinsicClient {
account_id: AccountId,
request_sender: mpsc::Sender<(
state_chain_runtime::RuntimeCall,
oneshot::Sender<submission_watcher::ExtrinsicResult>,
oneshot::Sender<submission_watcher::InBlockResult>,
oneshot::Sender<submission_watcher::FinalizationResult>,
submission_watcher::RequestStrategy,
)>,
_task_handle: ScopedJoinHandle<()>,
Expand Down Expand Up @@ -151,9 +192,10 @@ impl SignedExtrinsicClient {
);

utilities::loop_select! {
if let Some((call, result_sender, strategy)) = request_receiver.recv() => {
submission_watcher.new_request(&mut requests, call, result_sender, strategy).await?;
if let Some((call, until_in_block_sender, until_finalized_sender, strategy)) = request_receiver.recv() => {
submission_watcher.new_request(&mut requests, call, until_in_block_sender, until_finalized_sender, strategy).await?;
} else break Ok(()),
let _ = submission_watcher.watch_for_block_inclusion(&mut requests) => {},
if let Some((block_hash, block_header)) = state_chain_stream.next() => {
trace!("Received state chain block: {number} ({block_hash:x?})", number = block_header.number);
submission_watcher.on_block_finalized(
Expand All @@ -171,12 +213,16 @@ impl SignedExtrinsicClient {
#[async_trait]
impl SignedExtrinsicApi for SignedExtrinsicClient {
type UntilFinalizedFuture = UntilFinalizedFuture;
type UntilInBlockFuture = UntilInBlockFuture;

fn account_id(&self) -> AccountId {
self.account_id.clone()
}

async fn submit_signed_extrinsic<Call>(&self, call: Call) -> (H256, Self::UntilFinalizedFuture)
async fn submit_signed_extrinsic<Call>(
&self,
call: Call,
) -> (H256, (Self::UntilInBlockFuture, Self::UntilFinalizedFuture))
where
Call: Into<state_chain_runtime::RuntimeCall>
+ Clone
Expand All @@ -185,23 +231,31 @@ impl SignedExtrinsicApi for SignedExtrinsicClient {
+ Sync
+ 'static,
{
let (result_sender, result_receiver) = oneshot::channel();
let (until_in_block_sender, until_in_block_receiver) = oneshot::channel();
let (until_finalized_sender, until_finalized_receiver) = oneshot::channel();
(
send_request(&self.request_sender, |hash_sender| {
(
call.into(),
result_sender,
until_in_block_sender,
until_finalized_sender,
submission_watcher::RequestStrategy::StrictlyOneSubmission(hash_sender),
)
})
.await
.await
.expect(OR_CANCEL),
UntilFinalizedFuture(result_receiver),
(
UntilInBlockFuture(until_in_block_receiver),
UntilFinalizedFuture(until_finalized_receiver),
),
)
}

async fn finalize_signed_extrinsic<Call>(&self, call: Call) -> Self::UntilFinalizedFuture
async fn finalize_signed_extrinsic<Call>(
&self,
call: Call,
) -> (Self::UntilInBlockFuture, Self::UntilFinalizedFuture)
where
Call: Into<state_chain_runtime::RuntimeCall>
+ Clone
Expand All @@ -210,15 +264,21 @@ impl SignedExtrinsicApi for SignedExtrinsicClient {
+ Sync
+ 'static,
{
UntilFinalizedFuture(
send_request(&self.request_sender, |result_sender| {
(
call.into(),
result_sender,
submission_watcher::RequestStrategy::AllowMultipleSubmissions,
)
})
.await,
let (until_finalized_sender, until_finalized_receiver) = oneshot::channel();

(
UntilInBlockFuture(
send_request(&self.request_sender, |until_in_block_sender| {
(
call.into(),
until_in_block_sender,
until_finalized_sender,
submission_watcher::RequestStrategy::AllowMultipleSubmissions,
)
})
.await,
),
UntilFinalizedFuture(until_finalized_receiver),
)
}
}
Loading

0 comments on commit 868a0bf

Please sign in to comment.