Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(starknet_batcher): delete the proposal manager #2618

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 140 additions & 63 deletions crates/starknet_batcher/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,26 @@ use starknet_mempool_types::communication::SharedMempoolClient;
use starknet_mempool_types::mempool_types::CommitBlockArgs;
use starknet_sequencer_infra::component_definitions::ComponentStarter;
use starknet_state_sync_types::state_sync_types::SyncBlock;
use tracing::{debug, error, info, instrument, trace};
use tokio::sync::Mutex;
use tracing::{debug, error, info, instrument, trace, Instrument};

use crate::block_builder::{
BlockBuilderError,
BlockBuilderExecutionParams,
BlockBuilderFactory,
BlockBuilderFactoryTrait,
BlockBuilderTrait,
BlockMetadata,
};
use crate::config::BatcherConfig;
use crate::proposal_manager::{GenerateProposalError, ProposalManager, ProposalManagerTrait};
use crate::transaction_provider::{ProposeTransactionProvider, ValidateTransactionProvider};
use crate::utils::{
deadline_as_instant,
proposal_status_from,
verify_block_input,
ProposalOutput,
ProposalResult,
ProposalTask,
};

type OutputStreamReceiver = tokio::sync::mpsc::UnboundedReceiver<Transaction>;
Expand All @@ -63,11 +66,30 @@ pub struct Batcher {
pub l1_provider_client: SharedL1ProviderClient,
pub mempool_client: SharedMempoolClient,

// Used to create block builders.
// Using the factory pattern to allow for easier testing.
block_builder_factory: Box<dyn BlockBuilderFactoryTrait>,

// The height that the batcher is currently working on.
// All proposals are considered to be at this height.
active_height: Option<BlockNumber>,
proposal_manager: Box<dyn ProposalManagerTrait>,

block_builder_factory: Box<dyn BlockBuilderFactoryTrait>,
// The block proposal that is currently being built, if any.
// At any given time, there can be only one proposal being actively executed (either proposed
// or validated).
active_proposal: Arc<Mutex<Option<ProposalId>>>,
active_proposal_task: Option<ProposalTask>,

// Holds all the proposals that completed execution in the current height.
executed_proposals: Arc<Mutex<HashMap<ProposalId, ProposalResult<ProposalOutput>>>>,

// The propose blocks transaction streams, used to stream out the proposal transactions.
// Each stream is kept until all the transactions are streamed out, or a new height is started.
propose_tx_streams: HashMap<ProposalId, OutputStreamReceiver>,

// The validate blocks transaction streams, used to stream in the transactions to validate.
// Each stream is kept until SendProposalContent::Finish/Abort is received, or a new height is
// started.
validate_tx_streams: HashMap<ProposalId, InputStreamSender>,
}

Expand All @@ -79,17 +101,18 @@ impl Batcher {
l1_provider_client: SharedL1ProviderClient,
mempool_client: SharedMempoolClient,
block_builder_factory: Box<dyn BlockBuilderFactoryTrait>,
proposal_manager: Box<dyn ProposalManagerTrait>,
) -> Self {
Self {
config: config.clone(),
storage_reader,
storage_writer,
l1_provider_client,
mempool_client,
active_height: None,
block_builder_factory,
proposal_manager,
active_height: None,
active_proposal: Arc::new(Mutex::new(None)),
active_proposal_task: None,
executed_proposals: Arc::new(Mutex::new(HashMap::new())),
propose_tx_streams: HashMap::new(),
validate_tx_streams: HashMap::new(),
}
Expand Down Expand Up @@ -135,6 +158,8 @@ impl Batcher {
propose_block_input.retrospective_block_hash,
)?;

self.set_active_proposal(propose_block_input.proposal_id).await?;

let tx_provider = ProposeTransactionProvider::new(
self.mempool_client.clone(),
self.l1_provider_client.clone(),
Expand All @@ -160,8 +185,7 @@ impl Batcher {
)
.map_err(|_| BatcherError::InternalError)?;

self.proposal_manager
.spawn_proposal(propose_block_input.proposal_id, block_builder, abort_signal_sender)
self.spawn_proposal(propose_block_input.proposal_id, block_builder, abort_signal_sender)
.await?;

self.propose_tx_streams.insert(propose_block_input.proposal_id, output_tx_receiver);
Expand All @@ -180,6 +204,8 @@ impl Batcher {
validate_block_input.retrospective_block_hash,
)?;

self.set_active_proposal(validate_block_input.proposal_id).await?;

// A channel to send the transactions to include in the block being validated.
let (input_tx_sender, input_tx_receiver) =
tokio::sync::mpsc::channel(self.config.input_stream_content_buffer_size);
Expand All @@ -205,8 +231,7 @@ impl Batcher {
)
.map_err(|_| BatcherError::InternalError)?;

self.proposal_manager
.spawn_proposal(validate_block_input.proposal_id, block_builder, abort_signal_sender)
self.spawn_proposal(validate_block_input.proposal_id, block_builder, abort_signal_sender)
.await?;

self.validate_tx_streams.insert(validate_block_input.proposal_id, input_tx_sender);
Expand Down Expand Up @@ -234,7 +259,8 @@ impl Batcher {

/// Clear all the proposals from the previous height.
async fn abort_active_height(&mut self) {
self.proposal_manager.reset().await;
self.abort_active_proposal().await;
self.executed_proposals.lock().await.clear();
self.propose_tx_streams.clear();
self.validate_tx_streams.clear();
}
Expand Down Expand Up @@ -263,7 +289,7 @@ impl Batcher {
let proposal_result =
self.get_completed_proposal_result(proposal_id).await.expect("Proposal should exist.");
match proposal_result {
Ok(_) => Err(BatcherError::ProposalAlreadyFinished { proposal_id }),
Ok(_) => panic!("Proposal finished validation before all transactions were sent."),
Err(err) => Ok(SendProposalContentResponse { response: proposal_status_from(err)? }),
}
}
Expand All @@ -274,9 +300,9 @@ impl Batcher {
) -> BatcherResult<SendProposalContentResponse> {
debug!("Send proposal content done for {}", proposal_id);

self.close_input_transaction_stream(proposal_id)?;
self.validate_tx_streams.remove(&proposal_id).expect("validate tx stream should exist.");
if self.is_active(proposal_id).await {
self.proposal_manager.await_active_proposal().await;
self.await_active_proposal().await;
}

let proposal_result =
Expand All @@ -292,18 +318,17 @@ impl Batcher {
&mut self,
proposal_id: ProposalId,
) -> BatcherResult<SendProposalContentResponse> {
self.proposal_manager.abort_proposal(proposal_id).await;
self.close_input_transaction_stream(proposal_id)?;
if self.is_active(proposal_id).await {
self.abort_active_proposal().await;
self.executed_proposals
.lock()
.await
.insert(proposal_id, Err(Arc::new(BlockBuilderError::Aborted)));
}
self.validate_tx_streams.remove(&proposal_id);
Ok(SendProposalContentResponse { response: ProposalStatus::Aborted })
}

fn close_input_transaction_stream(&mut self, proposal_id: ProposalId) -> BatcherResult<()> {
self.validate_tx_streams
.remove(&proposal_id)
.ok_or(BatcherError::ProposalNotFound { proposal_id })?;
Ok(())
}

fn get_height_from_storage(&mut self) -> BatcherResult<BlockNumber> {
self.storage_reader.height().map_err(|err| {
error!("Failed to get height from storage: {}", err);
Expand Down Expand Up @@ -365,53 +390,112 @@ impl Batcher {
let tx_hashes = transaction_hashes.into_iter().collect();

// TODO(Arni): Assert the input `sync_block` corresponds to this `height`.
self.commit_proposal_and_block(state_diff, address_to_nonce, tx_hashes).await
let height = self.get_height_from_storage()?;
self.commit_proposal_and_block(height, state_diff, address_to_nonce, tx_hashes).await
}

#[instrument(skip(self), err)]
pub async fn decision_reached(
&mut self,
input: DecisionReachedInput,
) -> BatcherResult<DecisionReachedResponse> {
let height = self.active_height.ok_or(BatcherError::NoActiveHeight)?;

let proposal_id = input.proposal_id;
let proposal_output = self
.proposal_manager
.take_proposal_result(proposal_id)
.await
.ok_or(BatcherError::ExecutedProposalNotFound { proposal_id })?
.map_err(|_| BatcherError::InternalError)?;
let proposal_result = self.executed_proposals.lock().await.remove(&proposal_id);
let ProposalOutput { state_diff, nonces: address_to_nonce, tx_hashes, .. } =
proposal_output;
proposal_result
.ok_or(BatcherError::ExecutedProposalNotFound { proposal_id })?
.map_err(|_| BatcherError::InternalError)?;

self.commit_proposal_and_block(state_diff.clone(), address_to_nonce, tx_hashes).await?;
self.commit_proposal_and_block(height, state_diff.clone(), address_to_nonce, tx_hashes)
.await?;
Ok(DecisionReachedResponse { state_diff })
}

async fn commit_proposal_and_block(
&mut self,
height: BlockNumber,
state_diff: ThinStateDiff,
address_to_nonce: HashMap<ContractAddress, Nonce>,
tx_hashes: HashSet<TransactionHash>,
) -> BatcherResult<()> {
// TODO: Keep the height from start_height or get it from the input.
let height = self.get_height_from_storage()?;
info!("Committing block at height {} and notifying mempool of the block.", height);
trace!("Transactions: {:#?}, State diff: {:#?}.", tx_hashes, state_diff);

// Commit the proposal to the storage and notify the mempool.
self.storage_writer.commit_proposal(height, state_diff).map_err(|err| {
error!("Failed to commit proposal to storage: {}", err);
BatcherError::InternalError
})?;
if let Err(mempool_err) =
self.mempool_client.commit_block(CommitBlockArgs { address_to_nonce, tx_hashes }).await
{
let mempool_result =
self.mempool_client.commit_block(CommitBlockArgs { address_to_nonce, tx_hashes }).await;

if let Err(mempool_err) = mempool_result {
error!("Failed to commit block to mempool: {}", mempool_err);
// TODO: Should we rollback the state diff and return an error?
}
};

Ok(())
}

async fn is_active(&self, proposal_id: ProposalId) -> bool {
self.proposal_manager.get_active_proposal().await == Some(proposal_id)
*self.active_proposal.lock().await == Some(proposal_id)
}

// Sets a new active proposal task.
// Fails if there is another proposal being currently generated, or a proposal with the same ID
// already exists.
async fn set_active_proposal(&mut self, proposal_id: ProposalId) -> BatcherResult<()> {
if self.executed_proposals.lock().await.contains_key(&proposal_id) {
return Err(BatcherError::ProposalAlreadyExists { proposal_id });
}

let mut active_proposal = self.active_proposal.lock().await;
if let Some(active_proposal_id) = *active_proposal {
return Err(BatcherError::ServerBusy {
active_proposal_id,
new_proposal_id: proposal_id,
});
}

debug!("Set proposal {} as the one being generated.", proposal_id);
*active_proposal = Some(proposal_id);
Ok(())
}

// Starts a new block proposal generation task for the given proposal_id.
// Uses the given block_builder to generate the proposal.
async fn spawn_proposal(
&mut self,
proposal_id: ProposalId,
mut block_builder: Box<dyn BlockBuilderTrait>,
abort_signal_sender: tokio::sync::oneshot::Sender<()>,
) -> BatcherResult<()> {
info!("Starting generation of a new proposal with id {}.", proposal_id);

let active_proposal = self.active_proposal.clone();
let executed_proposals = self.executed_proposals.clone();

let join_handle = tokio::spawn(
async move {
let result =
block_builder.build_block().await.map(ProposalOutput::from).map_err(Arc::new);

// The proposal is done, clear the active proposal.
// Keep the proposal result only if it is the same as the active proposal.
// The active proposal might have changed if this proposal was aborted.
let mut active_proposal = active_proposal.lock().await;
if *active_proposal == Some(proposal_id) {
active_proposal.take();
executed_proposals.lock().await.insert(proposal_id, result);
}
}
.in_current_span(),
);

self.active_proposal_task = Some(ProposalTask { abort_signal_sender, join_handle });
Ok(())
}

// Returns a completed proposal result, either its commitment or an error if the proposal
Expand All @@ -420,8 +504,7 @@ impl Batcher {
&self,
proposal_id: ProposalId,
) -> Option<ProposalResult<ProposalCommitment>> {
let completed_proposals = self.proposal_manager.get_completed_proposals().await;
let guard = completed_proposals.lock().await;
let guard = self.executed_proposals.lock().await;
let proposal_result = guard.get(&proposal_id);

match proposal_result {
Expand All @@ -430,6 +513,21 @@ impl Batcher {
None => None,
}
}

// Ends the current active proposal.
// This call is non-blocking.
async fn abort_active_proposal(&mut self) {
self.active_proposal.lock().await.take();
if let Some(proposal_task) = self.active_proposal_task.take() {
proposal_task.abort_signal_sender.send(()).ok();
}
}

pub async fn await_active_proposal(&mut self) {
if let Some(proposal_task) = self.active_proposal_task.take() {
proposal_task.join_handle.await.ok();
}
}
}

pub fn create_batcher(
Expand All @@ -447,15 +545,13 @@ pub fn create_batcher(
});
let storage_reader = Arc::new(storage_reader);
let storage_writer = Box::new(storage_writer);
let proposal_manager = Box::new(ProposalManager::new());
Batcher::new(
config,
storage_reader,
storage_writer,
l1_provider_client,
mempool_client,
block_builder_factory,
proposal_manager,
)
}

Expand Down Expand Up @@ -491,23 +587,4 @@ impl BatcherStorageWriterTrait for papyrus_storage::StorageWriter {
}
}

impl From<GenerateProposalError> for BatcherError {
fn from(err: GenerateProposalError) -> Self {
match err {
GenerateProposalError::AlreadyGeneratingProposal {
current_generating_proposal_id,
new_proposal_id,
} => BatcherError::ServerBusy {
active_proposal_id: current_generating_proposal_id,
new_proposal_id,
},
GenerateProposalError::BlockBuilderError(..) => BatcherError::InternalError,
GenerateProposalError::NoActiveHeight => BatcherError::NoActiveHeight,
GenerateProposalError::ProposalAlreadyExists { proposal_id } => {
BatcherError::ProposalAlreadyExists { proposal_id }
}
}
}
}

impl ComponentStarter for Batcher {}
Loading