Skip to content

Commit

Permalink
chore(batcher): rename build_proposal => propose_block and validate_p…
Browse files Browse the repository at this point in the history
…roposal => validate_block
  • Loading branch information
Yael-Starkware committed Nov 19, 2024
1 parent cbb54e6 commit bd4f174
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ use papyrus_protobuf::consensus::{
use starknet_api::block::{BlockHash, BlockHashAndNumber, BlockNumber};
use starknet_api::executable_transaction::Transaction;
use starknet_batcher_types::batcher_types::{
BuildProposalInput,
DecisionReachedInput,
GetProposalContent,
GetProposalContentInput,
ProposalId,
ProposalStatus,
ProposeBlockInput,
SendProposalContent,
SendProposalContentInput,
StartHeightInput,
ValidateProposalInput,
ValidateBlockInput,
};
use starknet_batcher_types::communication::BatcherClient;
use tracing::{debug, debug_span, error, info, trace, warn, Instrument};
Expand Down Expand Up @@ -106,7 +106,7 @@ impl ConsensusContext for SequencerConsensusContext {
self.proposal_id += 1;
let timeout =
chrono::Duration::from_std(timeout).expect("Can't convert timeout to chrono::Duration");
let build_proposal_input = BuildProposalInput {
let build_proposal_input = ProposeBlockInput {
proposal_id,
// TODO: Discuss with batcher team passing std Duration instead.
deadline: chrono::Utc::now() + timeout,
Expand All @@ -122,7 +122,7 @@ impl ConsensusContext for SequencerConsensusContext {
// here also.
debug!("Initiating proposal build: {build_proposal_input:?}");
batcher
.build_proposal(build_proposal_input)
.propose_block(build_proposal_input)
.await
.expect("Failed to initiate proposal build");
debug!("Broadcasting proposal init: {proposal_init:?}");
Expand Down Expand Up @@ -164,7 +164,7 @@ impl ConsensusContext for SequencerConsensusContext {

let chrono_timeout =
chrono::Duration::from_std(timeout).expect("Can't convert timeout to chrono::Duration");
let input = ValidateProposalInput {
let input = ValidateBlockInput {
proposal_id,
deadline: chrono::Utc::now() + chrono_timeout,
// TODO(Matan 3/11/2024): Add the real value of the retrospective block hash.
Expand All @@ -174,7 +174,7 @@ impl ConsensusContext for SequencerConsensusContext {
}),
};
self.maybe_start_height(height).await;
batcher.validate_proposal(input).await.expect("Failed to initiate proposal validation");
batcher.validate_block(input).await.expect("Failed to initiate proposal validation");
tokio::spawn(
async move {
let validate_fut = stream_validate_proposal(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ use starknet_api::hash::PoseidonHash;
use starknet_api::test_utils::invoke::{executable_invoke_tx, InvokeTxArgs};
use starknet_api::transaction::TransactionHash;
use starknet_batcher_types::batcher_types::{
BuildProposalInput,
GetProposalContent,
GetProposalContentResponse,
ProposalCommitment,
ProposalId,
ProposalStatus,
ProposeBlockInput,
SendProposalContent,
SendProposalContentInput,
SendProposalContentResponse,
StartHeightInput,
ValidateProposalInput,
ValidateBlockInput,
};
use starknet_batcher_types::communication::MockBatcherClient;
use starknet_types_core::felt::Felt;
Expand Down Expand Up @@ -57,7 +57,7 @@ async fn build_proposal() {
let mut batcher = MockBatcherClient::new();
let proposal_id = Arc::new(OnceLock::new());
let proposal_id_clone = Arc::clone(&proposal_id);
batcher.expect_build_proposal().returning(move |input: BuildProposalInput| {
batcher.expect_propose_block().returning(move |input: ProposeBlockInput| {
proposal_id_clone.set(input.proposal_id).unwrap();
Ok(())
});
Expand Down Expand Up @@ -101,7 +101,7 @@ async fn validate_proposal_success() {
let mut batcher = MockBatcherClient::new();
let proposal_id: Arc<OnceLock<ProposalId>> = Arc::new(OnceLock::new());
let proposal_id_clone = Arc::clone(&proposal_id);
batcher.expect_validate_proposal().returning(move |input: ValidateProposalInput| {
batcher.expect_validate_block().returning(move |input: ValidateBlockInput| {
proposal_id_clone.set(input.proposal_id).unwrap();
Ok(())
});
Expand Down Expand Up @@ -149,7 +149,7 @@ async fn validate_proposal_success() {
async fn repropose() {
// Receive a proposal. Then re-retrieve it.
let mut batcher = MockBatcherClient::new();
batcher.expect_validate_proposal().returning(move |_| Ok(()));
batcher.expect_validate_block().returning(move |_| Ok(()));
batcher.expect_start_height().return_once(|input: StartHeightInput| {
assert_eq!(input.height, BlockNumber(0));
Ok(())
Expand Down
46 changes: 23 additions & 23 deletions crates/starknet_batcher/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@ use starknet_api::executable_transaction::Transaction;
use starknet_api::state::ThinStateDiff;
use starknet_batcher_types::batcher_types::{
BatcherResult,
BuildProposalInput,
DecisionReachedInput,
GetProposalContent,
GetProposalContentInput,
GetProposalContentResponse,
ProposalId,
ProposalStatus,
ProposeBlockInput,
SendProposalContent,
SendProposalContentInput,
SendProposalContentResponse,
StartHeightInput,
ValidateProposalInput,
ValidateBlockInput,
};
use starknet_batcher_types::errors::BatcherError;
use starknet_mempool_types::communication::SharedMempoolClient;
Expand Down Expand Up @@ -56,8 +56,8 @@ pub struct Batcher {
pub storage_writer: Box<dyn BatcherStorageWriterTrait>,
pub mempool_client: SharedMempoolClient,
proposal_manager: Box<dyn ProposalManagerTrait>,
build_proposals: HashMap<ProposalId, OutputStreamReceiver>,
validate_proposals: HashMap<ProposalId, InputStreamSender>,
propose_tx_streams: HashMap<ProposalId, OutputStreamReceiver>,
validate_tx_streams: HashMap<ProposalId, InputStreamSender>,
}

impl Batcher {
Expand All @@ -74,24 +74,24 @@ impl Batcher {
storage_writer,
mempool_client,
proposal_manager,
build_proposals: HashMap::new(),
validate_proposals: HashMap::new(),
propose_tx_streams: HashMap::new(),
validate_tx_streams: HashMap::new(),
}
}

pub async fn start_height(&mut self, input: StartHeightInput) -> BatcherResult<()> {
self.build_proposals.clear();
self.validate_proposals.clear();
self.propose_tx_streams.clear();
self.validate_tx_streams.clear();
self.proposal_manager.start_height(input.height).await.map_err(BatcherError::from)
}

#[instrument(skip(self), err)]
pub async fn build_proposal(
pub async fn propose_block(
&mut self,
build_proposal_input: BuildProposalInput,
propose_block_input: ProposeBlockInput,
) -> BatcherResult<()> {
let proposal_id = build_proposal_input.proposal_id;
let deadline = deadline_as_instant(build_proposal_input.deadline)?;
let proposal_id = propose_block_input.proposal_id;
let deadline = deadline_as_instant(propose_block_input.deadline)?;

let (output_tx_sender, output_tx_receiver) = tokio::sync::mpsc::unbounded_channel();
let tx_provider = ProposeTransactionProvider::new(
Expand All @@ -102,23 +102,23 @@ impl Batcher {
);

self.proposal_manager
.build_block_proposal(
.propose_block(
proposal_id,
build_proposal_input.retrospective_block_hash,
propose_block_input.retrospective_block_hash,
deadline,
output_tx_sender,
tx_provider,
)
.await?;

self.build_proposals.insert(proposal_id, output_tx_receiver);
self.propose_tx_streams.insert(proposal_id, output_tx_receiver);
Ok(())
}

#[instrument(skip(self), err)]
pub async fn validate_proposal(
pub async fn validate_block(
&mut self,
validate_proposal_input: ValidateProposalInput,
validate_proposal_input: ValidateBlockInput,
) -> BatcherResult<()> {
let proposal_id = validate_proposal_input.proposal_id;
let deadline = deadline_as_instant(validate_proposal_input.deadline)?;
Expand All @@ -132,15 +132,15 @@ impl Batcher {
};

self.proposal_manager
.validate_block_proposal(
.validate_block(
proposal_id,
validate_proposal_input.retrospective_block_hash,
deadline,
tx_provider,
)
.await?;

self.validate_proposals.insert(proposal_id, input_tx_sender);
self.validate_tx_streams.insert(proposal_id, input_tx_sender);
Ok(())
}

Expand Down Expand Up @@ -173,7 +173,7 @@ impl Batcher {
match self.proposal_manager.get_proposal_status(proposal_id).await {
InternalProposalStatus::Processing => {
let tx_provider_sender = &self
.validate_proposals
.validate_tx_streams
.get(&proposal_id)
.expect("Expecting tx_provider_sender to exist during batching.");
for tx in txs {
Expand Down Expand Up @@ -219,7 +219,7 @@ impl Batcher {
}

fn close_input_transaction_stream(&mut self, proposal_id: ProposalId) -> BatcherResult<()> {
self.validate_proposals
self.validate_tx_streams
.remove(&proposal_id)
.ok_or(BatcherError::ProposalNotFound { proposal_id })?;
Ok(())
Expand All @@ -233,7 +233,7 @@ impl Batcher {
let proposal_id = get_proposal_content_input.proposal_id;

let tx_stream = &mut self
.build_proposals
.propose_tx_streams
.get_mut(&proposal_id)
.ok_or(BatcherError::ProposalNotFound { proposal_id })?;

Expand All @@ -250,7 +250,7 @@ impl Batcher {
// Finished streaming all the transactions.
// TODO: Consider removing the proposal from the proposal manager and keep it in the batcher
// for decision reached.
self.build_proposals.remove(&proposal_id);
self.propose_tx_streams.remove(&proposal_id);
let proposal_commitment =
self.proposal_manager.await_proposal_commitment(proposal_id).await?;
Ok(GetProposalContentResponse {
Expand Down
Loading

0 comments on commit bd4f174

Please sign in to comment.