Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(batcher): rename build_proposal => propose_block and validate_proposal => validate_block #2162

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ use papyrus_protobuf::consensus::{
use starknet_api::block::{BlockHash, BlockHashAndNumber, BlockNumber};
use starknet_api::executable_transaction::Transaction;
use starknet_batcher_types::batcher_types::{
BuildProposalInput,
DecisionReachedInput,
GetProposalContent,
GetProposalContentInput,
ProposalId,
ProposalStatus,
ProposeBlockInput,
SendProposalContent,
SendProposalContentInput,
StartHeightInput,
ValidateProposalInput,
ValidateBlockInput,
};
use starknet_batcher_types::communication::BatcherClient;
use tracing::{debug, debug_span, error, info, trace, warn, Instrument};
Expand Down Expand Up @@ -106,7 +106,7 @@ impl ConsensusContext for SequencerConsensusContext {
self.proposal_id += 1;
let timeout =
chrono::Duration::from_std(timeout).expect("Can't convert timeout to chrono::Duration");
let build_proposal_input = BuildProposalInput {
let build_proposal_input = ProposeBlockInput {
proposal_id,
// TODO: Discuss with batcher team passing std Duration instead.
deadline: chrono::Utc::now() + timeout,
Expand All @@ -122,7 +122,7 @@ impl ConsensusContext for SequencerConsensusContext {
// here also.
debug!("Initiating proposal build: {build_proposal_input:?}");
batcher
.build_proposal(build_proposal_input)
.propose_block(build_proposal_input)
.await
.expect("Failed to initiate proposal build");
debug!("Broadcasting proposal init: {proposal_init:?}");
Expand Down Expand Up @@ -164,7 +164,7 @@ impl ConsensusContext for SequencerConsensusContext {

let chrono_timeout =
chrono::Duration::from_std(timeout).expect("Can't convert timeout to chrono::Duration");
let input = ValidateProposalInput {
let input = ValidateBlockInput {
proposal_id,
deadline: chrono::Utc::now() + chrono_timeout,
// TODO(Matan 3/11/2024): Add the real value of the retrospective block hash.
Expand All @@ -174,7 +174,7 @@ impl ConsensusContext for SequencerConsensusContext {
}),
};
self.maybe_start_height(height).await;
batcher.validate_proposal(input).await.expect("Failed to initiate proposal validation");
batcher.validate_block(input).await.expect("Failed to initiate proposal validation");
tokio::spawn(
async move {
let validate_fut = stream_validate_proposal(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ use starknet_api::hash::PoseidonHash;
use starknet_api::test_utils::invoke::{executable_invoke_tx, InvokeTxArgs};
use starknet_api::transaction::TransactionHash;
use starknet_batcher_types::batcher_types::{
BuildProposalInput,
GetProposalContent,
GetProposalContentResponse,
ProposalCommitment,
ProposalId,
ProposalStatus,
ProposeBlockInput,
SendProposalContent,
SendProposalContentInput,
SendProposalContentResponse,
StartHeightInput,
ValidateProposalInput,
ValidateBlockInput,
};
use starknet_batcher_types::communication::MockBatcherClient;
use starknet_types_core::felt::Felt;
Expand Down Expand Up @@ -57,7 +57,7 @@ async fn build_proposal() {
let mut batcher = MockBatcherClient::new();
let proposal_id = Arc::new(OnceLock::new());
let proposal_id_clone = Arc::clone(&proposal_id);
batcher.expect_build_proposal().returning(move |input: BuildProposalInput| {
batcher.expect_propose_block().returning(move |input: ProposeBlockInput| {
proposal_id_clone.set(input.proposal_id).unwrap();
Ok(())
});
Expand Down Expand Up @@ -101,7 +101,7 @@ async fn validate_proposal_success() {
let mut batcher = MockBatcherClient::new();
let proposal_id: Arc<OnceLock<ProposalId>> = Arc::new(OnceLock::new());
let proposal_id_clone = Arc::clone(&proposal_id);
batcher.expect_validate_proposal().returning(move |input: ValidateProposalInput| {
batcher.expect_validate_block().returning(move |input: ValidateBlockInput| {
proposal_id_clone.set(input.proposal_id).unwrap();
Ok(())
});
Expand Down Expand Up @@ -149,7 +149,7 @@ async fn validate_proposal_success() {
async fn repropose() {
// Receive a proposal. Then re-retrieve it.
let mut batcher = MockBatcherClient::new();
batcher.expect_validate_proposal().returning(move |_| Ok(()));
batcher.expect_validate_block().returning(move |_| Ok(()));
batcher.expect_start_height().return_once(|input: StartHeightInput| {
assert_eq!(input.height, BlockNumber(0));
Ok(())
Expand Down
46 changes: 23 additions & 23 deletions crates/starknet_batcher/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@ use starknet_api::executable_transaction::Transaction;
use starknet_api::state::ThinStateDiff;
use starknet_batcher_types::batcher_types::{
BatcherResult,
BuildProposalInput,
DecisionReachedInput,
GetProposalContent,
GetProposalContentInput,
GetProposalContentResponse,
ProposalId,
ProposalStatus,
ProposeBlockInput,
SendProposalContent,
SendProposalContentInput,
SendProposalContentResponse,
StartHeightInput,
ValidateProposalInput,
ValidateBlockInput,
};
use starknet_batcher_types::errors::BatcherError;
use starknet_mempool_types::communication::SharedMempoolClient;
Expand Down Expand Up @@ -56,8 +56,8 @@ pub struct Batcher {
pub storage_writer: Box<dyn BatcherStorageWriterTrait>,
pub mempool_client: SharedMempoolClient,
proposal_manager: Box<dyn ProposalManagerTrait>,
build_proposals: HashMap<ProposalId, OutputStreamReceiver>,
validate_proposals: HashMap<ProposalId, InputStreamSender>,
propose_tx_streams: HashMap<ProposalId, OutputStreamReceiver>,
validate_tx_streams: HashMap<ProposalId, InputStreamSender>,
}

impl Batcher {
Expand All @@ -74,24 +74,24 @@ impl Batcher {
storage_writer,
mempool_client,
proposal_manager,
build_proposals: HashMap::new(),
validate_proposals: HashMap::new(),
propose_tx_streams: HashMap::new(),
validate_tx_streams: HashMap::new(),
}
}

pub async fn start_height(&mut self, input: StartHeightInput) -> BatcherResult<()> {
self.build_proposals.clear();
self.validate_proposals.clear();
self.propose_tx_streams.clear();
self.validate_tx_streams.clear();
self.proposal_manager.start_height(input.height).await.map_err(BatcherError::from)
}

#[instrument(skip(self), err)]
pub async fn build_proposal(
pub async fn propose_block(
&mut self,
build_proposal_input: BuildProposalInput,
propose_block_input: ProposeBlockInput,
) -> BatcherResult<()> {
let proposal_id = build_proposal_input.proposal_id;
let deadline = deadline_as_instant(build_proposal_input.deadline)?;
let proposal_id = propose_block_input.proposal_id;
let deadline = deadline_as_instant(propose_block_input.deadline)?;

let (output_tx_sender, output_tx_receiver) = tokio::sync::mpsc::unbounded_channel();
let tx_provider = ProposeTransactionProvider::new(
Expand All @@ -102,23 +102,23 @@ impl Batcher {
);

self.proposal_manager
.build_block_proposal(
.propose_block(
proposal_id,
build_proposal_input.retrospective_block_hash,
propose_block_input.retrospective_block_hash,
deadline,
output_tx_sender,
tx_provider,
)
.await?;

self.build_proposals.insert(proposal_id, output_tx_receiver);
self.propose_tx_streams.insert(proposal_id, output_tx_receiver);
Ok(())
}

#[instrument(skip(self), err)]
pub async fn validate_proposal(
pub async fn validate_block(
&mut self,
validate_proposal_input: ValidateProposalInput,
validate_proposal_input: ValidateBlockInput,
) -> BatcherResult<()> {
let proposal_id = validate_proposal_input.proposal_id;
let deadline = deadline_as_instant(validate_proposal_input.deadline)?;
Expand All @@ -132,15 +132,15 @@ impl Batcher {
};

self.proposal_manager
.validate_block_proposal(
.validate_block(
proposal_id,
validate_proposal_input.retrospective_block_hash,
deadline,
tx_provider,
)
.await?;

self.validate_proposals.insert(proposal_id, input_tx_sender);
self.validate_tx_streams.insert(proposal_id, input_tx_sender);
Ok(())
}

Expand Down Expand Up @@ -173,7 +173,7 @@ impl Batcher {
match self.proposal_manager.get_proposal_status(proposal_id).await {
InternalProposalStatus::Processing => {
let tx_provider_sender = &self
.validate_proposals
.validate_tx_streams
.get(&proposal_id)
.expect("Expecting tx_provider_sender to exist during batching.");
for tx in txs {
Expand Down Expand Up @@ -219,7 +219,7 @@ impl Batcher {
}

fn close_input_transaction_stream(&mut self, proposal_id: ProposalId) -> BatcherResult<()> {
self.validate_proposals
self.validate_tx_streams
.remove(&proposal_id)
.ok_or(BatcherError::ProposalNotFound { proposal_id })?;
Ok(())
Expand All @@ -233,7 +233,7 @@ impl Batcher {
let proposal_id = get_proposal_content_input.proposal_id;

let tx_stream = &mut self
.build_proposals
.propose_tx_streams
.get_mut(&proposal_id)
.ok_or(BatcherError::ProposalNotFound { proposal_id })?;

Expand All @@ -250,7 +250,7 @@ impl Batcher {
// Finished streaming all the transactions.
// TODO: Consider removing the proposal from the proposal manager and keep it in the batcher
// for decision reached.
self.build_proposals.remove(&proposal_id);
self.propose_tx_streams.remove(&proposal_id);
let proposal_commitment =
self.proposal_manager.await_proposal_commitment(proposal_id).await?;
Ok(GetProposalContentResponse {
Expand Down
Loading
Loading