Skip to content

Commit

Permalink
refactor(starknet_batcher): delete the proposal manager
Browse files Browse the repository at this point in the history
  • Loading branch information
dafnamatsry committed Dec 18, 2024
1 parent 3cc6040 commit acc4aba
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 667 deletions.
203 changes: 140 additions & 63 deletions crates/starknet_batcher/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,26 @@ use starknet_mempool_types::communication::SharedMempoolClient;
use starknet_mempool_types::mempool_types::CommitBlockArgs;
use starknet_sequencer_infra::component_definitions::ComponentStarter;
use starknet_state_sync_types::state_sync_types::SyncBlock;
use tracing::{debug, error, info, instrument, trace};
use tokio::sync::Mutex;
use tracing::{debug, error, info, instrument, trace, Instrument};

use crate::block_builder::{
BlockBuilderError,
BlockBuilderExecutionParams,
BlockBuilderFactory,
BlockBuilderFactoryTrait,
BlockBuilderTrait,
BlockMetadata,
};
use crate::config::BatcherConfig;
use crate::proposal_manager::{GenerateProposalError, ProposalManager, ProposalManagerTrait};
use crate::transaction_provider::{ProposeTransactionProvider, ValidateTransactionProvider};
use crate::utils::{
deadline_as_instant,
proposal_status_from,
verify_block_input,
ProposalOutput,
ProposalResult,
ProposalTask,
};

type OutputStreamReceiver = tokio::sync::mpsc::UnboundedReceiver<Transaction>;
Expand All @@ -63,11 +66,30 @@ pub struct Batcher {
pub l1_provider_client: SharedL1ProviderClient,
pub mempool_client: SharedMempoolClient,

// Used to create block builders.
// Using the factory pattern to allow for easier testing.
block_builder_factory: Box<dyn BlockBuilderFactoryTrait>,

// The height that the batcher is currently working on.
// All proposals are considered to be at this height.
active_height: Option<BlockNumber>,
proposal_manager: Box<dyn ProposalManagerTrait>,

block_builder_factory: Box<dyn BlockBuilderFactoryTrait>,
// The block proposal that is currently being built, if any.
// At any given time, there can be only one proposal being actively executed (either proposed
// or validated).
active_proposal: Arc<Mutex<Option<ProposalId>>>,
active_proposal_task: Option<ProposalTask>,

// Holds all the proposals that completed execution in the current height.
executed_proposals: Arc<Mutex<HashMap<ProposalId, ProposalResult<ProposalOutput>>>>,

// The propose blocks transaction streams, used to stream out the proposal transactions.
// Each stream is kept until all the transactions are streamed out, or a new height is started.
propose_tx_streams: HashMap<ProposalId, OutputStreamReceiver>,

// The validate blocks transaction streams, used to stream in the transactions to validate.
// Each stream is kept until SendProposalContent::Finish/Abort is received, or a new height is
// started.
validate_tx_streams: HashMap<ProposalId, InputStreamSender>,
}

Expand All @@ -79,17 +101,18 @@ impl Batcher {
l1_provider_client: SharedL1ProviderClient,
mempool_client: SharedMempoolClient,
block_builder_factory: Box<dyn BlockBuilderFactoryTrait>,
proposal_manager: Box<dyn ProposalManagerTrait>,
) -> Self {
Self {
config: config.clone(),
storage_reader,
storage_writer,
l1_provider_client,
mempool_client,
active_height: None,
block_builder_factory,
proposal_manager,
active_height: None,
active_proposal: Arc::new(Mutex::new(None)),
active_proposal_task: None,
executed_proposals: Arc::new(Mutex::new(HashMap::new())),
propose_tx_streams: HashMap::new(),
validate_tx_streams: HashMap::new(),
}
Expand Down Expand Up @@ -135,6 +158,8 @@ impl Batcher {
propose_block_input.retrospective_block_hash,
)?;

self.set_active_proposal(propose_block_input.proposal_id).await?;

let tx_provider = ProposeTransactionProvider::new(
self.mempool_client.clone(),
self.l1_provider_client.clone(),
Expand All @@ -160,8 +185,7 @@ impl Batcher {
)
.map_err(|_| BatcherError::InternalError)?;

self.proposal_manager
.spawn_proposal(propose_block_input.proposal_id, block_builder, abort_signal_sender)
self.spawn_proposal(propose_block_input.proposal_id, block_builder, abort_signal_sender)
.await?;

self.propose_tx_streams.insert(propose_block_input.proposal_id, output_tx_receiver);
Expand All @@ -180,6 +204,8 @@ impl Batcher {
validate_block_input.retrospective_block_hash,
)?;

self.set_active_proposal(validate_block_input.proposal_id).await?;

// A channel to send the transactions to include in the block being validated.
let (input_tx_sender, input_tx_receiver) =
tokio::sync::mpsc::channel(self.config.input_stream_content_buffer_size);
Expand All @@ -205,8 +231,7 @@ impl Batcher {
)
.map_err(|_| BatcherError::InternalError)?;

self.proposal_manager
.spawn_proposal(validate_block_input.proposal_id, block_builder, abort_signal_sender)
self.spawn_proposal(validate_block_input.proposal_id, block_builder, abort_signal_sender)
.await?;

self.validate_tx_streams.insert(validate_block_input.proposal_id, input_tx_sender);
Expand Down Expand Up @@ -234,7 +259,8 @@ impl Batcher {

/// Clear all the proposals from the previous height.
async fn abort_active_height(&mut self) {
self.proposal_manager.reset().await;
self.abort_active_proposal().await;
self.executed_proposals.lock().await.clear();
self.propose_tx_streams.clear();
self.validate_tx_streams.clear();
}
Expand Down Expand Up @@ -263,7 +289,7 @@ impl Batcher {
let proposal_result =
self.get_completed_proposal_result(proposal_id).await.expect("Proposal should exist.");
match proposal_result {
Ok(_) => Err(BatcherError::ProposalAlreadyFinished { proposal_id }),
Ok(_) => panic!("Proposal finished validation before all transactions were sent."),
Err(err) => Ok(SendProposalContentResponse { response: proposal_status_from(err)? }),
}
}
Expand All @@ -274,9 +300,9 @@ impl Batcher {
) -> BatcherResult<SendProposalContentResponse> {
debug!("Send proposal content done for {}", proposal_id);

self.close_input_transaction_stream(proposal_id)?;
self.validate_tx_streams.remove(&proposal_id).expect("validate tx stream should exist.");
if self.is_active(proposal_id).await {
self.proposal_manager.await_active_proposal().await;
self.await_active_proposal().await;
}

let proposal_result =
Expand All @@ -292,18 +318,17 @@ impl Batcher {
&mut self,
proposal_id: ProposalId,
) -> BatcherResult<SendProposalContentResponse> {
self.proposal_manager.abort_proposal(proposal_id).await;
self.close_input_transaction_stream(proposal_id)?;
if self.is_active(proposal_id).await {
self.abort_active_proposal().await;
self.executed_proposals
.lock()
.await
.insert(proposal_id, Err(Arc::new(BlockBuilderError::Aborted)));
}
self.validate_tx_streams.remove(&proposal_id);
Ok(SendProposalContentResponse { response: ProposalStatus::Aborted })
}

fn close_input_transaction_stream(&mut self, proposal_id: ProposalId) -> BatcherResult<()> {
self.validate_tx_streams
.remove(&proposal_id)
.ok_or(BatcherError::ProposalNotFound { proposal_id })?;
Ok(())
}

fn get_height_from_storage(&mut self) -> BatcherResult<BlockNumber> {
self.storage_reader.height().map_err(|err| {
error!("Failed to get height from storage: {}", err);
Expand Down Expand Up @@ -365,53 +390,112 @@ impl Batcher {
let tx_hashes = transaction_hashes.into_iter().collect();

// TODO(Arni): Assert the input `sync_block` corresponds to this `height`.
self.commit_proposal_and_block(state_diff, address_to_nonce, tx_hashes).await
let height = self.get_height_from_storage()?;
self.commit_proposal_and_block(height, state_diff, address_to_nonce, tx_hashes).await
}

#[instrument(skip(self), err)]
pub async fn decision_reached(
&mut self,
input: DecisionReachedInput,
) -> BatcherResult<DecisionReachedResponse> {
let height = self.active_height.ok_or(BatcherError::NoActiveHeight)?;

let proposal_id = input.proposal_id;
let proposal_output = self
.proposal_manager
.take_proposal_result(proposal_id)
.await
.ok_or(BatcherError::ExecutedProposalNotFound { proposal_id })?
.map_err(|_| BatcherError::InternalError)?;
let proposal_result = self.executed_proposals.lock().await.remove(&proposal_id);
let ProposalOutput { state_diff, nonces: address_to_nonce, tx_hashes, .. } =
proposal_output;
proposal_result
.ok_or(BatcherError::ExecutedProposalNotFound { proposal_id })?
.map_err(|_| BatcherError::InternalError)?;

self.commit_proposal_and_block(state_diff.clone(), address_to_nonce, tx_hashes).await?;
self.commit_proposal_and_block(height, state_diff.clone(), address_to_nonce, tx_hashes)
.await?;
Ok(DecisionReachedResponse { state_diff })
}

async fn commit_proposal_and_block(
&mut self,
height: BlockNumber,
state_diff: ThinStateDiff,
address_to_nonce: HashMap<ContractAddress, Nonce>,
tx_hashes: HashSet<TransactionHash>,
) -> BatcherResult<()> {
// TODO: Keep the height from start_height or get it from the input.
let height = self.get_height_from_storage()?;
info!("Committing block at height {} and notifying mempool of the block.", height);
trace!("Transactions: {:#?}, State diff: {:#?}.", tx_hashes, state_diff);

// Commit the proposal to the storage and notify the mempool.
self.storage_writer.commit_proposal(height, state_diff).map_err(|err| {
error!("Failed to commit proposal to storage: {}", err);
BatcherError::InternalError
})?;
if let Err(mempool_err) =
self.mempool_client.commit_block(CommitBlockArgs { address_to_nonce, tx_hashes }).await
{
let mempool_result =
self.mempool_client.commit_block(CommitBlockArgs { address_to_nonce, tx_hashes }).await;

if let Err(mempool_err) = mempool_result {
error!("Failed to commit block to mempool: {}", mempool_err);
// TODO: Should we rollback the state diff and return an error?
}
};

Ok(())
}

async fn is_active(&self, proposal_id: ProposalId) -> bool {
self.proposal_manager.get_active_proposal().await == Some(proposal_id)
*self.active_proposal.lock().await == Some(proposal_id)
}

// Sets a new active proposal task.
// Fails if there is another proposal being currently generated, or a proposal with the same ID
// already exists.
async fn set_active_proposal(&mut self, proposal_id: ProposalId) -> BatcherResult<()> {
if self.executed_proposals.lock().await.contains_key(&proposal_id) {
return Err(BatcherError::ProposalAlreadyExists { proposal_id });
}

let mut active_proposal = self.active_proposal.lock().await;
if let Some(active_proposal_id) = *active_proposal {
return Err(BatcherError::ServerBusy {
active_proposal_id,
new_proposal_id: proposal_id,
});
}

debug!("Set proposal {} as the one being generated.", proposal_id);
*active_proposal = Some(proposal_id);
Ok(())
}

// Starts a new block proposal generation task for the given proposal_id.
// Uses the given block_builder to generate the proposal.
async fn spawn_proposal(
&mut self,
proposal_id: ProposalId,
mut block_builder: Box<dyn BlockBuilderTrait>,
abort_signal_sender: tokio::sync::oneshot::Sender<()>,
) -> BatcherResult<()> {
info!("Starting generation of a new proposal with id {}.", proposal_id);

let active_proposal = self.active_proposal.clone();
let executed_proposals = self.executed_proposals.clone();

let join_handle = tokio::spawn(
async move {
let result =
block_builder.build_block().await.map(ProposalOutput::from).map_err(Arc::new);

// The proposal is done, clear the active proposal.
// Keep the proposal result only if it is the same as the active proposal.
// The active proposal might have changed if this proposal was aborted.
let mut active_proposal = active_proposal.lock().await;
if *active_proposal == Some(proposal_id) {
active_proposal.take();
executed_proposals.lock().await.insert(proposal_id, result);
}
}
.in_current_span(),
);

self.active_proposal_task = Some(ProposalTask { abort_signal_sender, join_handle });
Ok(())
}

// Returns a completed proposal result, either its commitment or an error if the proposal
Expand All @@ -420,8 +504,7 @@ impl Batcher {
&self,
proposal_id: ProposalId,
) -> Option<ProposalResult<ProposalCommitment>> {
let completed_proposals = self.proposal_manager.get_completed_proposals().await;
let guard = completed_proposals.lock().await;
let guard = self.executed_proposals.lock().await;
let proposal_result = guard.get(&proposal_id);

match proposal_result {
Expand All @@ -430,6 +513,21 @@ impl Batcher {
None => None,
}
}

// Ends the current active proposal.
// This call is non-blocking.
async fn abort_active_proposal(&mut self) {
self.active_proposal.lock().await.take();
if let Some(proposal_task) = self.active_proposal_task.take() {
proposal_task.abort_signal_sender.send(()).ok();
}
}

pub async fn await_active_proposal(&mut self) {
if let Some(proposal_task) = self.active_proposal_task.take() {
proposal_task.join_handle.await.ok();
}
}
}

pub fn create_batcher(
Expand All @@ -447,15 +545,13 @@ pub fn create_batcher(
});
let storage_reader = Arc::new(storage_reader);
let storage_writer = Box::new(storage_writer);
let proposal_manager = Box::new(ProposalManager::new());
Batcher::new(
config,
storage_reader,
storage_writer,
l1_provider_client,
mempool_client,
block_builder_factory,
proposal_manager,
)
}

Expand Down Expand Up @@ -491,23 +587,4 @@ impl BatcherStorageWriterTrait for papyrus_storage::StorageWriter {
}
}

impl From<GenerateProposalError> for BatcherError {
fn from(err: GenerateProposalError) -> Self {
match err {
GenerateProposalError::AlreadyGeneratingProposal {
current_generating_proposal_id,
new_proposal_id,
} => BatcherError::ServerBusy {
active_proposal_id: current_generating_proposal_id,
new_proposal_id,
},
GenerateProposalError::BlockBuilderError(..) => BatcherError::InternalError,
GenerateProposalError::NoActiveHeight => BatcherError::NoActiveHeight,
GenerateProposalError::ProposalAlreadyExists { proposal_id } => {
BatcherError::ProposalAlreadyExists { proposal_id }
}
}
}
}

impl ComponentStarter for Batcher {}
Loading

0 comments on commit acc4aba

Please sign in to comment.