Skip to content

Commit

Permalink
test: add consensus to e2e flow test (#1811)
Browse files Browse the repository at this point in the history
  • Loading branch information
dan-starkware authored Nov 12, 2024
1 parent f9b18ea commit 6a1305e
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 86 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/batcher_types/src/batcher_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub struct BuildProposalInput {
pub deadline: chrono::DateTime<Utc>,
pub retrospective_block_hash: Option<BlockHashAndNumber>,
// TODO: Should we get the gas price here?
// TODO: add proposer address.
// TODO: add whether the kzg mechanism is used for DA.
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand All @@ -56,6 +58,7 @@ pub enum GetProposalContent {
}

#[derive(Clone, Debug, Serialize, Deserialize)]
// TODO(Dan): Consider unifying with BuildProposalInput as they have the same fields.
pub struct ValidateProposalInput {
pub proposal_id: ProposalId,
pub deadline: chrono::DateTime<Utc>,
Expand Down
2 changes: 1 addition & 1 deletion crates/blockifier/cairo_native
Submodule cairo_native updated 48 files
+5 −5 .github/workflows/bench-hyperfine.yml
+12 −12 .github/workflows/ci.yml
+2 −2 .github/workflows/publish.yml
+2 −2 .github/workflows/release.yml
+2 −2 .github/workflows/rustdoc.yml
+0 −132 .github/workflows/starknet-blocks.yml
+51 −51 Cargo.lock
+80 −6 benches/benches.rs
+0 −17 benches/compile_time.rs
+3 −54 benches/libfuncs.rs
+45 −33 docs/implementing_libfuncs.md
+5 −0 programs/benches/factorial_2M.c
+5 −0 programs/benches/fib_2M.c
+5 −0 programs/benches/logistic_map.c
+48 −42 programs/compile_benches/dijkstra.cairo
+17 −8 programs/compile_benches/extended_euclidean_algorithm.cairo
+27 −78 programs/compile_benches/fast_power.cairo
+285 −0 programs/compile_benches/sha256.cairo
+0 −539 programs/compile_benches/sha512.cairo
+2 −5 runtime/Cargo.toml
+6 −236 runtime/src/lib.rs
+1 −1 rust-toolchain.toml
+0 −32 scripts/diff-check.sh
+6 −6 src/arch.rs
+26 −2 src/compiler.rs
+1 −15 src/context.rs
+3 −0 src/error.rs
+58 −30 src/executor.rs
+12 −0 src/executor/aot.rs
+66 −13 src/executor/contract.rs
+13 −0 src/executor/jit.rs
+0 −25 src/ffi.rs
+6 −5 src/libfuncs/array.rs
+3 −5 src/libfuncs/circuit.rs
+121 −18 src/libfuncs/gas.rs
+595 −290 src/libfuncs/starknet.rs
+1,508 −677 src/libfuncs/starknet/secp256.rs
+9 −6 src/metadata/gas.rs
+6 −4 src/types.rs
+24 −19 src/types/array.rs
+6 −2 src/types/builtin_costs.rs
+45 −3 src/utils.rs
+1 −87 src/utils/block_ext.rs
+2 −2 src/values.rs
+6 −6 tests/alexandria/Scarb.lock
+4 −4 tests/alexandria/Scarb.toml
+12 −6 tests/common.rs
+2 −0 tests/tests/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use papyrus_consensus::types::{
use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait};
use papyrus_protobuf::consensus::{
ConsensusMessage,
ProposalFin,
ProposalInit,
ProposalPart,
TransactionBatch,
Expand Down Expand Up @@ -318,6 +319,11 @@ async fn stream_build_proposal(
content.len(),
height
);
debug!("Broadcasting proposal fin: {proposal_content_id:?}");
broadcast_client
.broadcast_message(ProposalPart::Fin(ProposalFin { proposal_content_id }))
.await
.expect("Failed to broadcast proposal fin");
// Update valid_proposals before sending fin to avoid a race condition
// with `repropose` being called before `valid_proposals` is updated.
let mut valid_proposals = valid_proposals.lock().expect("Lock was poisoned");
Expand Down
4 changes: 3 additions & 1 deletion crates/tests_integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ assert_matches.workspace = true
axum.workspace = true
blockifier.workspace = true
cairo-lang-starknet-classes.workspace = true
chrono.workspace = true
indexmap.workspace = true
mempool_test_utils.workspace = true
papyrus_common.workspace = true
papyrus_consensus.workspace = true
papyrus_network = { workspace = true, features = ["testing"] }
papyrus_protobuf.workspace = true
papyrus_rpc.workspace = true
papyrus_storage = { workspace = true, features = ["testing"] }
reqwest.workspace = true
Expand All @@ -42,6 +43,7 @@ tokio.workspace = true
tracing.workspace = true

[dev-dependencies]
futures.workspace = true
pretty_assertions.workspace = true
rstest.workspace = true
starknet_sequencer_infra.workspace = true
10 changes: 8 additions & 2 deletions crates/tests_integration/src/flow_test_setup.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::net::SocketAddr;

use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator;
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::ProposalPart;
use starknet_api::rpc_transaction::RpcTransaction;
use starknet_api::transaction::TransactionHash;
use starknet_batcher_types::communication::SharedBatcherClient;
Expand Down Expand Up @@ -32,6 +34,9 @@ pub struct FlowTestSetup {

// Handle of the sequencer node.
pub sequencer_node_handle: JoinHandle<Result<(), anyhow::Error>>,

// Channels for consensus proposals, used for asserting the right transactions are proposed.
pub consensus_proposals_channels: BroadcastTopicChannels<ProposalPart>,
}

impl FlowTestSetup {
Expand All @@ -52,8 +57,8 @@ impl FlowTestSetup {
)
.await;

// Derive the configuration for the mempool node.
let (config, _required_params) =
// Derive the configuration for the sequencer node.
let (config, _required_params, consensus_proposals_channels) =
create_config(rpc_server_addr, storage_for_test.batcher_storage_config).await;

let (clients, servers) = create_node_modules(&config);
Expand All @@ -77,6 +82,7 @@ impl FlowTestSetup {
batcher_client: clients.get_batcher_client().unwrap(),
rpc_storage_file_handle: storage_for_test.rpc_storage_handle,
sequencer_node_handle,
consensus_proposals_channels,
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/tests_integration/src/integration_test_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl IntegrationTestSetup {
.await;

// Derive the configuration for the sequencer node.
let (config, required_params) =
let (config, required_params, _) =
create_config(rpc_server_addr, storage_for_test.batcher_storage_config).await;

let node_config_dir_handle = tempdir().unwrap();
Expand Down
49 changes: 28 additions & 21 deletions crates/tests_integration/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::future::Future;
use std::net::SocketAddr;
use std::time::Duration;

use axum::body::Body;
use blockifier::context::ChainInfo;
Expand All @@ -11,6 +12,9 @@ use mempool_test_utils::starknet_api_test_utils::{
MultiAccountTransactionGenerator,
};
use papyrus_consensus::config::ConsensusConfig;
use papyrus_network::network_manager::test_utils::create_network_config_connected_to_broadcast_channels;
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::ProposalPart;
use papyrus_storage::StorageConfig;
use reqwest::{Client, Response};
use starknet_api::block::BlockNumber;
Expand All @@ -28,29 +32,14 @@ use starknet_gateway::config::{
};
use starknet_gateway_types::errors::GatewaySpecError;
use starknet_http_server::config::HttpServerConfig;
use starknet_sequencer_node::config::component_config::ComponentConfig;
use starknet_sequencer_node::config::test_utils::RequiredParams;
use starknet_sequencer_node::config::{
ComponentExecutionConfig,
ComponentExecutionMode,
SequencerNodeConfig,
};
use starknet_sequencer_node::config::SequencerNodeConfig;
use tokio::net::TcpListener;

pub async fn create_config(
rpc_server_addr: SocketAddr,
batcher_storage_config: StorageConfig,
) -> (SequencerNodeConfig, RequiredParams) {
// TODO(Arni/ Matan): Enable the consensus in the end to end test.
let components = ComponentConfig {
consensus_manager: ComponentExecutionConfig {
execution_mode: ComponentExecutionMode::Disabled,
local_server_config: None,
..Default::default()
},
..Default::default()
};

) -> (SequencerNodeConfig, RequiredParams, BroadcastTopicChannels<ProposalPart>) {
let chain_id = batcher_storage_config.db_config.chain_id.clone();
// TODO(Tsabary): create chain_info in setup, and pass relevant values throughout.
let mut chain_info = ChainInfo::create_for_testing();
Expand All @@ -60,12 +49,10 @@ pub async fn create_config(
let gateway_config = create_gateway_config(chain_info).await;
let http_server_config = create_http_server_config().await;
let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr);
let consensus_manager_config = ConsensusManagerConfig {
consensus_config: ConsensusConfig { start_height: BlockNumber(1), ..Default::default() },
};
let (consensus_manager_config, consensus_proposals_channels) =
create_consensus_manager_config_and_channels();
(
SequencerNodeConfig {
components,
batcher_config,
consensus_manager_config,
gateway_config,
Expand All @@ -78,9 +65,29 @@ pub async fn create_config(
eth_fee_token_address: fee_token_addresses.eth_fee_token_address,
strk_fee_token_address: fee_token_addresses.strk_fee_token_address,
},
consensus_proposals_channels,
)
}

fn create_consensus_manager_config_and_channels()
-> (ConsensusManagerConfig, BroadcastTopicChannels<ProposalPart>) {
let (network_config, broadcast_channels) =
create_network_config_connected_to_broadcast_channels(
papyrus_network::gossipsub_impl::Topic::new(
starknet_consensus_manager::consensus_manager::NETWORK_TOPIC,
),
);
let consensus_manager_config = ConsensusManagerConfig {
consensus_config: ConsensusConfig {
start_height: BlockNumber(1),
consensus_delay: Duration::from_secs(1),
network_config,
..Default::default()
},
};
(consensus_manager_config, broadcast_channels)
}

pub fn test_rpc_state_reader_config(rpc_server_addr: SocketAddr) -> RpcStateReaderConfig {
// TODO(Tsabary): get the latest version from the RPC crate.
const RPC_SPEC_VERSION: &str = "V0_8";
Expand Down
117 changes: 58 additions & 59 deletions crates/tests_integration/tests/end_to_end_flow_test.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
use futures::StreamExt;
use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator;
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::{ProposalFin, ProposalInit, ProposalPart};
use pretty_assertions::assert_eq;
use rstest::{fixture, rstest};
use starknet_api::block::BlockNumber;
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::{ChainId, ContractAddress};
use starknet_api::transaction::TransactionHash;
use starknet_batcher_types::batcher_types::{
BuildProposalInput,
DecisionReachedInput,
GetProposalContent,
GetProposalContentInput,
ProposalId,
StartHeightInput,
};
use starknet_batcher_types::communication::SharedBatcherClient;
use starknet_integration_tests::flow_test_setup::FlowTestSetup;
use starknet_integration_tests::utils::{
create_integration_test_tx_generator,
run_integration_test_scenario,
};
use starknet_types_core::felt::Felt;

#[fixture]
fn tx_generator() -> MultiAccountTransactionGenerator {
Expand All @@ -25,7 +21,9 @@ fn tx_generator() -> MultiAccountTransactionGenerator {

#[rstest]
#[tokio::test]
async fn test_end_to_end(tx_generator: MultiAccountTransactionGenerator) {
async fn end_to_end(tx_generator: MultiAccountTransactionGenerator) {
const LISTEN_TO_BROADCAST_MESSAGES_TIMEOUT: std::time::Duration =
std::time::Duration::from_secs(5);
// Setup.
let mock_running_system = FlowTestSetup::new_from_tx_generator(&tx_generator).await;

Expand All @@ -34,60 +32,61 @@ async fn test_end_to_end(tx_generator: MultiAccountTransactionGenerator) {
mock_running_system.assert_add_tx_success(tx)
})
.await;

// Test.
run_consensus_for_end_to_end_test(
&mock_running_system.batcher_client,
&expected_batched_tx_hashes,
// TODO(Dan, Itay): Consider adding a utility function that waits for something to happen.
tokio::time::timeout(
LISTEN_TO_BROADCAST_MESSAGES_TIMEOUT,
listen_to_broadcasted_messages(
mock_running_system.consensus_proposals_channels,
&expected_batched_tx_hashes,
),
)
.await;
.await
.expect("listen to broadcasted messages should finish in time");
}

/// This function should mirror
/// [`run_consensus`](papyrus_consensus::manager::run_consensus). It makes requests
/// from the batcher client and asserts the expected responses were received.
pub async fn run_consensus_for_end_to_end_test(
batcher_client: &SharedBatcherClient,
async fn listen_to_broadcasted_messages(
consensus_proposals_channels: BroadcastTopicChannels<ProposalPart>,
expected_batched_tx_hashes: &[TransactionHash],
) {
// Start height.
// TODO(Arni): Get the current height and retrospective_block_hash from the rpc storage or use
// consensus directly.
let current_height = BlockNumber(1);
batcher_client.start_height(StartHeightInput { height: current_height }).await.unwrap();

// Build proposal.
let proposal_id = ProposalId(0);
let retrospective_block_hash = None;
let build_proposal_duaration = chrono::TimeDelta::new(1, 0).unwrap();
batcher_client
.build_proposal(BuildProposalInput {
proposal_id,
deadline: chrono::Utc::now() + build_proposal_duaration,
retrospective_block_hash,
})
.await
.unwrap();

// Get proposal content.
let mut executed_tx_hashes: Vec<TransactionHash> = vec![];
let _proposal_commitment = loop {
let response = batcher_client
.get_proposal_content(GetProposalContentInput { proposal_id })
.await
.unwrap();
match response.content {
GetProposalContent::Txs(batched_txs) => {
executed_tx_hashes.append(&mut batched_txs.iter().map(|tx| tx.tx_hash()).collect());
// TODO(Dan, Guy): retrieve chain ID. Maybe by modifying IntegrationTestSetup to hold it as a
// member, and instantiate the value using StorageTestSetup.
const CHAIN_ID_NAME: &str = "CHAIN_ID_SUBDIR";
let chain_id = ChainId::Other(CHAIN_ID_NAME.to_string());
let mut broadcasted_messages_receiver =
consensus_proposals_channels.broadcasted_messages_receiver;
let mut received_tx_hashes = vec![];
// TODO (Dan, Guy): retrieve / calculate the expected proposal init and fin.
let expected_proposal_init = ProposalInit {
height: BlockNumber(1),
round: 0,
valid_round: None,
proposer: ContractAddress::default(),
};
let expected_proposal_fin = ProposalFin {
proposal_content_id: BlockHash(Felt::from_hex_unchecked(
"0x4597ceedbef644865917bf723184538ef70d43954d63f5b7d8cb9d1bd4c2c32",
)),
};
assert_eq!(
broadcasted_messages_receiver.next().await.unwrap().0.unwrap(),
ProposalPart::Init(expected_proposal_init)
);
loop {
match broadcasted_messages_receiver.next().await.unwrap().0.unwrap() {
ProposalPart::Init(init) => panic!("Unexpected init: {:?}", init),
ProposalPart::Fin(proposal_fin) => {
assert_eq!(proposal_fin, expected_proposal_fin);
break;
}
GetProposalContent::Finished(proposal_commitment) => {
break proposal_commitment;
ProposalPart::Transactions(transactions) => {
received_tx_hashes.extend(
transactions
.transactions
.iter()
.map(|tx| tx.calculate_transaction_hash(&chain_id).unwrap()),
);
}
}
};

// Decision reached.
batcher_client.decision_reached(DecisionReachedInput { proposal_id }).await.unwrap();

assert_eq!(expected_batched_tx_hashes, executed_tx_hashes);
}
assert_eq!(received_tx_hashes, expected_batched_tx_hashes);
}

0 comments on commit 6a1305e

Please sign in to comment.