Skip to content

Commit

Permalink
Merge pull request #15 from radiusxyz/feat/subprocess
Browse files Browse the repository at this point in the history
Feat/subprocess
  • Loading branch information
datactor authored May 31, 2024
2 parents 76779cd + d0f5949 commit 87695a3
Show file tree
Hide file tree
Showing 27 changed files with 2,274 additions and 948 deletions.
1,616 changes: 922 additions & 694 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions crates/client/block-proposer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ repository = "https://github.com/keep-starknet-strange/madara"
targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
async-std = { version = "1.12.0" }
codec = { package = "parity-scale-codec", version = "3.2.2" }
futures = { workspace = true }
futures-timer = { workspace = true }
Expand Down
165 changes: 2 additions & 163 deletions crates/client/block-proposer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,12 @@ use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::time;
use std::time::{Duration, UNIX_EPOCH};

use blockifier::transaction::account_transaction::AccountTransaction;
use blockifier::transaction::transactions::InvokeTransaction;
use codec::Encode;
use futures::channel::oneshot;
use futures::future::{Future, FutureExt};
use futures::{future, select};
use log::{debug, error, info, trace, warn};
use mc_rpc::submit_extrinsic_with_order;
use mc_transaction_pool::decryptor::{Decryptor, EncryptorInvokeTransaction};
use mc_transaction_pool::EncryptedTransactionPool;
use pallet_starknet_runtime_api::{ConvertTransactionRuntimeApi, StarknetRuntimeApi};
use prometheus_endpoint::Registry as PrometheusRegistry;
Expand Down Expand Up @@ -404,21 +399,10 @@ where
// - encrypted_txs_len: The length of encrypted transactions in the pool.
// - block_tx_pool_is_closed: A boolean indicating if the transaction pool for the block is closed.
//
let (
is_using_encrypted_mempool,
using_external_decryptor,
encrypted_transaction_pool_orders,
is_closed_block_encrypted_transaction_pool,
) = {
{
let encrypted_mempool = self.transaction_pool.encrypted_mempool().clone();
let mut locked_encrypted_mempool = encrypted_mempool.lock().await;
let is_using_encrypted_mempool = locked_encrypted_mempool.is_using_encrypted_mempool();
let mut encrypted_transaction_pool_orders = vec![];

if !is_using_encrypted_mempool {
(is_using_encrypted_mempool, false, encrypted_transaction_pool_orders, true)
} else {
let is_using_external_decryptor = locked_encrypted_mempool.is_using_external_decryptor();
if locked_encrypted_mempool.is_using_encrypted_mempool() {
let block_height = self.parent_number.to_string().parse::<u64>().map_err(|e| {
sp_blockchain::Error::Application(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
Expand All @@ -429,63 +413,12 @@ where
locked_encrypted_mempool.get_or_init_block_encrypted_transaction_pool(block_height);
let is_closed_block_encrypted_transaction_pool = block_encrypted_transaction_pool.is_closed();

let order = block_encrypted_transaction_pool.get_order();
let tx_cnt = block_encrypted_transaction_pool.get_tx_cnt();
let dec_cnt = block_encrypted_transaction_pool.get_submitted_tx_count();
let ready_cnt = self.transaction_pool.status().ready as u64;
log::debug!(
"block height: {}, current order: {}, (tx count:submitted tx count:ready count) = ({}:{}:{})",
block_height,
order,
tx_cnt,
dec_cnt,
ready_cnt
);

if !is_closed_block_encrypted_transaction_pool {
locked_encrypted_mempool.close(block_height).unwrap();
encrypted_transaction_pool_orders = locked_encrypted_mempool
.get_block_encrypted_transaction_pool(&block_height)
.unwrap()
.encrypted_transaction_pool_orders()
.cloned()
.collect::<Vec<u64>>();
}

(
is_using_encrypted_mempool,
is_using_external_decryptor,
encrypted_transaction_pool_orders,
is_closed_block_encrypted_transaction_pool,
)
}
};

// Processing of Encrypted Transactions
// This section of the code is executed if an encrypted transaction pool is being used and the
// transaction pool for the block is not closed. It performs the following operations:
// 1. Captures the current system time to log the start time of the decryption process.
// 2. Iterates through each encrypted transaction in the pool based on the number of encrypted
// transactions.
// 3. For each transaction, it calls a function to decrypt and submit the transaction to the block.
// The decision to use an external decryptor is also considered in this step.
//
// - If the encrypted transaction pool is in use (`is_using_encrypted_mempool` is true) and the
// transaction pool for the block is not closed (`block_tx_pool_is_closed` is false), then: a. The
// current system time is recorded. b. The decryption process starts, and each transaction is
// decrypted and submitted in sequence.
if is_using_encrypted_mempool && !is_closed_block_encrypted_transaction_pool {
// Records the start time of the decryption process
let start = std::time::SystemTime::now();
let since_the_epoch = start.duration_since(UNIX_EPOCH).expect("Time went backwards");
log::debug!("Decrypt Start in {:?}", since_the_epoch);

// Iterates through each encrypted transaction and processes them
encrypted_transaction_pool_orders.iter().for_each(|&order| {
self.decrypt_and_submit_transaction(order, using_external_decryptor);
});
}

// proceed with transactions
// We calculate soft deadline used only in case we start skipping transactions.
let now = (self.now)();
Expand Down Expand Up @@ -665,98 +598,4 @@ where
"hash" => ?<Block as BlockT>::Hash::from(block.header().hash()),
);
}

fn decrypt_and_submit_transaction(&self, order: u64, using_external_decryptor: bool) {
let block_height = self.parent_number.to_string().parse::<u64>().unwrap() + 1;
let best_block_hash = self.client.info().best_hash;
let client = self.client.clone();
let pool = self.transaction_pool.clone();

let encrypted_mempool = self.transaction_pool.encrypted_mempool();
self.spawn_handle.spawn_blocking(
"Decryptor",
None,
Box::pin(async move {
tokio::time::sleep(Duration::from_secs(1)).await;

let encrypted_invoke_transaction = {
let locked_encrypted_mempool = encrypted_mempool.lock().await;
let Some(encrypted_transaction_block) =
locked_encrypted_mempool.get_block_encrypted_transaction_pool(&block_height)
else {
log::error!("Something wrong. Not exist block_height: {block_height}");
return;
};

if encrypted_transaction_block.is_provided_decryption_key(order) {
log::debug!("Decryption key is already provided. Skip decrypting at order: {order}");
return;
}

let encrypted_tx = match encrypted_transaction_block.get_encrypted_invoke_tx(order) {
Ok(encrypted_tx) => encrypted_tx.clone(),
Err(e) => {
log::error!("Failed to get encrypted_invoke_transaction: {e}");
return;
}
};

encrypted_tx
};

let decryptor = Decryptor::default();
let invoke_tx_result: Result<EncryptorInvokeTransaction, _> = if using_external_decryptor {
decryptor.delegate_to_decrypt_encrypted_invoke_transaction(encrypted_invoke_transaction).await
} else {
decryptor.decrypt_encrypted_invoke_transaction(encrypted_invoke_transaction, None).await
};

let decrypted_invoke_tx = match invoke_tx_result {
Ok(tx) => tx,
Err(e) => {
// Should conduct an integrity check in advance to avoid wasting resources on decrypting invalid
// transactions.
log::error!("Error while decrypting transaction: {e}");
let mut locked_encrypted_mempool = encrypted_mempool.lock().await;
let encrypted_transaction_block =
locked_encrypted_mempool.get_mut_block_encrypted_transaction_pool(&block_height).unwrap();
encrypted_transaction_block.delete_invalid_encrypted_tx(order);

return;
}
};

{
let mut locked_encrypted_mempool = encrypted_mempool.lock().await;
let encrypted_transaction_block =
locked_encrypted_mempool.get_mut_block_encrypted_transaction_pool(&block_height).unwrap();

encrypted_transaction_block.increase_decrypted_tx_count();
}

let end = std::time::SystemTime::now();
let since_the_epoch = match end.duration_since(UNIX_EPOCH) {
Ok(duration) => duration,
Err(e) => {
log::error!("System time error: {e:?}");
return;
}
};
log::debug!("Decrypt {order} End in {since_the_epoch:?}");

let transaction = AccountTransaction::Invoke(InvokeTransaction::from(decrypted_invoke_tx));

let Ok(extrinsic) = client.runtime_api().convert_account_transaction(best_block_hash, transaction)
else {
log::error!("Failed to convert transaction to extrinsic.");
return;
};

match submit_extrinsic_with_order(pool, best_block_hash, extrinsic, order).await {
Ok(_hash) => log::debug!("Successfully submitted extrinsic"),
Err(e) => log::error!("Failed to submit extrinsic: {e:?}"),
}
}),
)
}
}
8 changes: 7 additions & 1 deletion crates/client/rpc-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use serde_with::serde_as;

pub mod utils;

use mc_transaction_pool::decryptor::DecryptedInvokeTransaction;
use mp_transactions::TransactionStatus;
use pallet_starknet::genesis_loader::PredeployedAccount;
use starknet_core::serde::unsigned_field_element::UfeHex;
Expand All @@ -27,6 +28,7 @@ use starknet_core::types::{
MaybePendingTransactionReceipt, MsgFromL1, SimulatedTransaction, SimulationFlag, SimulationFlagForEstimateFee,
SyncStatusType, Transaction, TransactionTrace, TransactionTraceWithHash,
};

pub mod types;

use crate::types::{
Expand Down Expand Up @@ -81,6 +83,10 @@ pub trait StarknetWriteRpcApi {
encrypted_invoke_transaction: EncryptedInvokeTransaction,
) -> RpcResult<EncryptedMempoolTransactionResult>;

/// Add a Decrypted Invoke Transaction to invoke a contract function
#[method(name = "addDecryptedInvokeTransaction")]
async fn add_decrypted_invoke_transaction(&self, decrypted_tx: DecryptedInvokeTransaction) -> RpcResult<InvokeTransactionResult>;

/// Allow a client to pass the decryption key, after the client received the order_commitment
/// from sequencer.
#[method(name = "provideDecryptionKey")]
Expand Down Expand Up @@ -197,7 +203,7 @@ pub trait StarknetReadRpcApi {

/// Decrypts an encrypted invoke transaction and returns the decrypted Invoketransaction.
#[method(name = "decryptEncryptedInvokeTransaction")]
async fn decrypt_encrypted_invoke_transaction(
fn decrypt_encrypted_invoke_transaction(
&self,
encrypted_invoke_transaction: EncryptedInvokeTransaction,
decryption_key: Option<String>,
Expand Down
1 change: 1 addition & 0 deletions crates/client/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
# Madara utils
mc-db = { workspace = true }
madara-runtime = { workspace = true }
mc-genesis-data-provider = { workspace = true }
mc-rpc-core = { workspace = true }
mc-storage = { workspace = true }
Expand Down
Loading

0 comments on commit 87695a3

Please sign in to comment.