Skip to content

Commit

Permalink
feat(batcher): proposals manager module (#417)
Browse files Browse the repository at this point in the history
  • Loading branch information
yair-starkware authored Aug 25, 2024
1 parent 1276e72 commit 4d110c4
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 1 deletion.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion crates/batcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,22 @@ edition.workspace = true
license.workspace = true
repository.workspace = true


[lints]
workspace = true

[dependencies]
async-trait.workspace = true
papyrus_config.workspace = true
serde.workspace = true
starknet_api.workspace = true
starknet_batcher_types.workspace = true
starknet_mempool_infra.workspace = true
starknet_mempool_types.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true
validator.workspace = true

[dev-dependencies]
assert_matches.workspace = true
mockall.workspace = true
3 changes: 3 additions & 0 deletions crates/batcher/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
pub mod batcher;
pub mod communication;
pub mod config;
pub mod proposals_manager;
#[cfg(test)]
mod proposals_manager_test;
199 changes: 199 additions & 0 deletions crates/batcher/src/proposals_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use papyrus_config::dumping::{ser_param, SerializeConfig};
use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam};
use serde::{Deserialize, Serialize};
use starknet_api::block::BlockNumber;
use starknet_mempool_types::communication::{MempoolClientError, SharedMempoolClient};
use thiserror::Error;
use tokio::sync::Mutex;
use tracing::{debug, error, info, instrument};

// TODO: Should be defined in SN_API probably (shared with the consensus).
pub type ProposalId = u64;

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ProposalsManagerConfig {
pub max_txs_per_mempool_request: usize,
}

impl Default for ProposalsManagerConfig {
fn default() -> Self {
// TODO: Get correct value for default max_txs_per_mempool_request.
Self { max_txs_per_mempool_request: 10 }
}
}

impl SerializeConfig for ProposalsManagerConfig {
fn dump(&self) -> BTreeMap<ParamPath, SerializedParam> {
BTreeMap::from_iter([ser_param(
"max_txs_per_mempool_request",
&self.max_txs_per_mempool_request,
"Maximum transactions to get from the mempool per iteration of proposal generation",
ParamPrivacyInput::Public,
)])
}
}

#[derive(Debug, Error)]
pub enum ProposalsManagerError {
#[error(
"Received proposal generation request with id {new_proposal_id} while already generating \
proposal with id {current_generating_proposal_id}."
)]
AlreadyGeneratingProposal {
current_generating_proposal_id: ProposalId,
new_proposal_id: ProposalId,
},
#[error(transparent)]
MempoolError(#[from] MempoolClientError),
}

pub type ProposalsManagerResult<T> = Result<T, ProposalsManagerError>;

/// Main struct for handling block proposals.
/// Taking care of:
/// - Proposing new blocks.
/// - Validating incoming proposals.
/// - Commiting accepted proposals to the storage.
///
/// Triggered by the consensus.
// TODO: Remove dead_code attribute.
#[allow(dead_code)]
pub(crate) struct ProposalsManager {
config: ProposalsManagerConfig,
mempool_client: SharedMempoolClient,
/// The block proposal that is currently being proposed, if any.
/// At any given time, there can be only one proposal being actively executed (either proposed
/// or validated).
proposal_in_generation: Arc<Mutex<Option<ProposalId>>>,
}

impl ProposalsManager {
// TODO: Remove dead_code attribute.
#[allow(dead_code)]
pub fn new(config: ProposalsManagerConfig, mempool_client: SharedMempoolClient) -> Self {
Self { config, mempool_client, proposal_in_generation: Arc::new(Mutex::new(None)) }
}

/// Starts a new block proposal generation task for the given proposal_id and height with
/// transactions from the mempool.
#[instrument(skip(self), err)]
pub async fn generate_block_proposal(
&mut self,
proposal_id: ProposalId,
timeout: tokio::time::Instant,
_height: BlockNumber,
) -> ProposalsManagerResult<()> {
info!("Starting generation of new proposal.");
self.set_proposal_in_generation(proposal_id).await?;

// TODO: Find where to join the task - needed to make sure it starts immediatly.
let _handle = tokio::spawn(
ProposalGenerationTask {
timeout,
mempool_client: self.mempool_client.clone(),
max_txs_per_mempool_request: self.config.max_txs_per_mempool_request,
proposal_in_generation: self.proposal_in_generation.clone(),
}
.run(),
);

Ok(())
}

// Checks if there is already a proposal being generated, and if not, sets the given proposal_id
// as the one being generated.
async fn set_proposal_in_generation(
&mut self,
proposal_id: ProposalId,
) -> ProposalsManagerResult<()> {
let mut lock = self.proposal_in_generation.lock().await;

if let Some(proposal_in_generation) = *lock {
return Err(ProposalsManagerError::AlreadyGeneratingProposal {
current_generating_proposal_id: proposal_in_generation,
new_proposal_id: proposal_id,
});
}

*lock = Some(proposal_id);
debug!("Set proposal {} as the one being generated.", proposal_id);
Ok(())
}
}

// TODO: Should be defined elsewhere.
#[allow(dead_code)]
mod block_builder {
use starknet_api::executable_transaction::Transaction;
use starknet_api::state::StateDiff;

#[derive(Debug, PartialEq)]
pub enum Status {
Building,
Ready,
Timeout,
}

pub struct BlockBuilder {}

impl BlockBuilder {
pub fn status(&self) -> Status {
Status::Building
}

/// Returning true if the block is ready to be proposed.
pub fn add_txs(&self, _txs: &[Transaction]) -> bool {
false
}

pub fn close_block(&self) -> StateDiff {
StateDiff::default()
}
}
}

#[allow(dead_code)]
struct ProposalGenerationTask {
pub timeout: tokio::time::Instant,
pub mempool_client: SharedMempoolClient,
pub max_txs_per_mempool_request: usize,
pub proposal_in_generation: Arc<Mutex<Option<ProposalId>>>,
}

impl ProposalGenerationTask {
#[allow(dead_code)]
async fn run(self) -> ProposalsManagerResult<()> {
let block_builder = block_builder::BlockBuilder {};
loop {
if tokio::time::Instant::now() > self.timeout {
info!("Proposal reached timeout.");
break;
}
let mempool_txs = self.mempool_client.get_txs(self.max_txs_per_mempool_request).await?;
if mempool_txs.is_empty() {
// TODO: check if sleep is needed here.
tokio::task::yield_now().await;
continue;
}

// TODO: Get L1 transactions.
debug!("Adding {} mempool transactions to proposal in generation.", mempool_txs.len());
// TODO: This is cpu bound operation, should use spawn_blocking / Rayon / std::thread
// here or from inside the function.
let is_block_ready = block_builder.add_txs(mempool_txs.as_slice());
if is_block_ready {
break;
}
}

info!("Closing block.");
// TODO: Get state diff.
let mut proposal_id = self.proposal_in_generation.lock().await;
*proposal_id = None;

Ok(())
}
}
41 changes: 41 additions & 0 deletions crates/batcher/src/proposals_manager_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::sync::Arc;

use assert_matches::assert_matches;
use starknet_api::block::BlockNumber;
use starknet_mempool_types::communication::MockMempoolClient;

use crate::proposals_manager::{ProposalsManager, ProposalsManagerConfig, ProposalsManagerError};

const GENERATION_TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(1);

#[tokio::test]
async fn multiple_proposals_generation_fails() {
let mut mempool_client = MockMempoolClient::new();
mempool_client.expect_get_txs().returning(|_| Ok(vec![]));
let mut proposals_manager =
ProposalsManager::new(ProposalsManagerConfig::default(), Arc::new(mempool_client));
proposals_manager
.generate_block_proposal(
0,
tokio::time::Instant::now() + GENERATION_TIMEOUT,
BlockNumber::default(),
)
.await
.unwrap();

let another_generate_request = proposals_manager
.generate_block_proposal(
1,
tokio::time::Instant::now() + GENERATION_TIMEOUT,
BlockNumber::default(),
)
.await;

assert_matches!(
another_generate_request,
Err(ProposalsManagerError::AlreadyGeneratingProposal {
current_generating_proposal_id,
new_proposal_id
}) if current_generating_proposal_id == 0 && new_proposal_id == 1
);
}

0 comments on commit 4d110c4

Please sign in to comment.