diff --git a/Cargo.lock b/Cargo.lock index 3fe11b90..d3312f63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -571,9 +571,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" -version = "1.4.0" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" dependencies = [ "serde", ] @@ -1708,6 +1708,7 @@ dependencies = [ "eyre", "futures", "mysticeti-core", + "rand 0.8.5", "tokio", "tracing", "tracing-subscriber", diff --git a/Cargo.toml b/Cargo.toml index 17a0d831..bda5e92b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,9 +1,9 @@ [workspace] members = [ - "mysticeti-core", + "mysticeti-core", "mysticeti", "third-party/minibytes", - "orchestrator" + "orchestrator", ] [workspace.dependencies] @@ -16,6 +16,7 @@ clap = { version = "4.3.3", features = ["derive"] } tracing = "0.1.37" tempfile = "3.6.0" reqwest = { version = "0.11.18", features = ["json"] } +rand = "0.8.5" [profile.release] panic = "abort" diff --git a/mysticeti-core/Cargo.toml b/mysticeti-core/Cargo.toml index 57d921ac..10912bc0 100644 --- a/mysticeti-core/Cargo.toml +++ b/mysticeti-core/Cargo.toml @@ -12,8 +12,8 @@ serde = { workspace = true } eyre = { workspace = true } tracing = { workspace = true } tempfile = { workspace = true } # todo - move to dev-dep +rand = { workspace = true } -rand = "0.8.5" bincode = "1.3.3" parking_lot = "0.12.1" prometheus = "0.13.3" @@ -25,7 +25,9 @@ libc = "0.2.146" tracing-subscriber = "0.3.17" tracing-core = "0.1.31" -minibytes = { path = "../third-party/minibytes", default_features = false, features = ["frommmap"] } +minibytes = { path = "../third-party/minibytes", default_features = false, features = [ + "frommmap", +] } blake2 = "0.10.6" digest = "0.10.6" @@ -43,4 +45,4 @@ tempdir = "0.3.7" seahash = "4.1.0" [features] -simulator = [] \ No newline at end of file +simulator = [] diff --git a/mysticeti-core/src/block_handler.rs b/mysticeti-core/src/block_handler.rs index f6d3c9fe..ec383dbe 100644 --- a/mysticeti-core/src/block_handler.rs +++ b/mysticeti-core/src/block_handler.rs @@ -277,6 +277,58 @@ impl BlockHandler for TestBlockHandler { } } +/// This structure is in charge of receiving client transactions (from the network), batching them, +/// and sending them to the block handler for inclusion in blocks. +pub struct BatchGenerator { + transactions_receiver: mpsc::Receiver>, + batch_sender: mpsc::Sender>, + max_batch_size: usize, + max_batch_delay: Duration, +} + +impl BatchGenerator { + pub fn start( + transactions_receiver: mpsc::Receiver>, + batch_sender: mpsc::Sender>, + max_batch_size: usize, + max_batch_delay: Duration, + ) { + let this = Self { + transactions_receiver, + batch_sender, + max_batch_size, + max_batch_delay, + }; + runtime::Handle::current().spawn(this.run()); + } + + async fn run(mut self) { + let mut batch = Vec::with_capacity(self.max_batch_size); + loop { + let mut ready = false; + tokio::select! { + _ = runtime::sleep(self.max_batch_delay) => { + ready = true; + } + Some(transactions) = self.transactions_receiver.recv() => { + batch.extend(transactions); + if batch.len() >= self.max_batch_size { + ready = true; + } + }, + else => break + } + + if ready { + if self.batch_sender.send(batch.clone()).await.is_err() { + break; + } + batch.clear(); + } + } + } +} + pub struct TransactionGenerator { sender: mpsc::Sender>, rng: StdRng, diff --git a/mysticeti-core/src/config.rs b/mysticeti-core/src/config.rs index 208e8c1d..9b58d322 100644 --- a/mysticeti-core/src/config.rs +++ b/mysticeti-core/src/config.rs @@ -42,6 +42,15 @@ pub struct Parameters { leader_timeout: Duration, rounds_in_epoch: RoundNumber, shutdown_grace_period: Duration, + /// Drop transactions from network clients and instead locally generate random + /// transactions. This is useful for testing (but should not be used for benchmarks). + pub local_transactions_generation: bool, + /// Maximum number of transactions in a batch. This parameter is unused if `local_transactions_generation` + /// is set to `true`. + pub max_batch_size: usize, + /// Maximum delay after which a batch is sent out even if it is not full. This parameter is unused if + /// `local_transactions_generation` is set to `true`. + pub max_batch_delay: Duration, } impl Parameters { @@ -52,10 +61,13 @@ impl Parameters { pub const BENCHMARK_PORT_OFFSET: u16 = 1500; - // needs to be sufficiently long to run benchmarks + // Needs to be sufficiently long to run benchmarks. pub const DEFAULT_ROUNDS_IN_EPOCH: u64 = 3_600_000; pub const DEFAULT_SHUTDOWN_GRACE_PERIOD: Duration = Duration::from_secs(2); + pub const DEFAULT_MAX_BATCH_SIZE: usize = 100; + pub const DEFAULT_MAX_BATCH_DELAY: Duration = Duration::from_millis(100); + pub fn new_for_benchmarks(ips: Vec) -> Self { let benchmark_port_offset = ips.len() as u16; let mut identifiers = Vec::new(); @@ -77,9 +89,17 @@ impl Parameters { leader_timeout: Self::DEFAULT_LEADER_TIMEOUT, rounds_in_epoch: Self::DEFAULT_ROUNDS_IN_EPOCH, shutdown_grace_period: Self::DEFAULT_SHUTDOWN_GRACE_PERIOD, + local_transactions_generation: false, + max_batch_size: Self::DEFAULT_MAX_BATCH_SIZE, + max_batch_delay: Self::DEFAULT_MAX_BATCH_DELAY, } } + pub fn with_local_transactions_generation(mut self) -> Self { + self.local_transactions_generation = true; + self + } + /// Return all network addresses (including our own) in the order of the authority index. pub fn all_network_addresses(&self) -> impl Iterator + '_ { self.identifiers.iter().map(|id| id.network_address) diff --git a/mysticeti-core/src/lib.rs b/mysticeti-core/src/lib.rs index c4a7a37e..6b2fe835 100644 --- a/mysticeti-core/src/lib.rs +++ b/mysticeti-core/src/lib.rs @@ -43,4 +43,5 @@ mod state; mod epoch_close; mod finalization_interpreter; +pub mod load_generator; mod synchronizer; diff --git a/mysticeti-core/src/load_generator.rs b/mysticeti-core/src/load_generator.rs new file mode 100644 index 00000000..ee608d1c --- /dev/null +++ b/mysticeti-core/src/load_generator.rs @@ -0,0 +1,131 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{net::SocketAddr, time::Duration}; + +use eyre::{eyre, Result}; +use futures::future::join_all; +use rand::Rng; +use tokio::{ + net::TcpStream, + sync::mpsc, + time::{interval, sleep, Instant}, +}; + +use crate::{ + config::Parameters, + network::{NetworkMessage, Worker}, + types::Transaction, +}; + +pub struct LoadGenerator { + target_id: u64, + target_address: SocketAddr, + transaction_size: usize, + transaction_rate: u64, + all_nodes: Vec, +} + +impl LoadGenerator { + /// Transaction rate sample precision. + const PRECISION: u64 = 20; + + pub fn new(target: u64, rate: u64, size: usize, parameters: &Parameters) -> Result { + Ok(Self { + target_id: target, + target_address: parameters + .network_address(target) + .ok_or_else(|| eyre!("Target node not found"))?, + transaction_size: size, + transaction_rate: rate, + all_nodes: parameters.all_network_addresses().collect(), + }) + } + + pub async fn run(&self) -> Result<()> { + // The transaction rate must at least match the sample precision. + if self.transaction_rate < Self::PRECISION { + return Err(eyre!( + "Transaction rate must be at least {} tx/s", + Self::PRECISION + )); + } + // The transaction size must be at least 8 bytes to ensure all txs are different. + if self.transaction_size < 8 { + return Err(eyre!("Transaction size must be at least 8 bytes")); + } + + let (connection_sender, mut connection_receiver) = mpsc::channel(1); + let (_tcp_sender, tcp_receiver) = mpsc::unbounded_channel(); + + let worker = Worker { + peer: self.target_address, + peer_id: self.target_id as usize, + connection_sender, + bind_addr: "127.0.0.1:0".parse().unwrap(), // Unused + active_immediately: true, + latency_sender: None, + }; + tokio::spawn(worker.run(tcp_receiver)); + + while let Some(connection) = connection_receiver.recv().await { + tracing::info!( + "Client connected to peer {} ({})", + self.target_id, + self.target_address + ); + self.send_transactions(connection.sender).await; + } + Ok(()) + } + + async fn send_transactions(&self, sender: mpsc::Sender) { + let burst = self.transaction_rate / Self::PRECISION; + let mut tx = Vec::with_capacity(self.transaction_size); + let zeros = vec![0u8; self.transaction_size - 8]; + let mut r: u64 = rand::thread_rng().gen(); + let burst_duration: u128 = 1000 / Self::PRECISION as u128; + + let interval = interval(Duration::from_millis(burst_duration as u64)); + tokio::pin!(interval); + + tracing::info!( + "Start sending transactions to peer {} ({})", + self.target_id, + self.target_address + ); + 'main: loop { + interval.as_mut().tick().await; + let now = Instant::now(); + + for _ in 0..burst { + r += 1; // Ensures all clients send different txs. + tx.extend_from_slice(&r.to_le_bytes()); + tx.extend_from_slice(&zeros[..]); + + let message = NetworkMessage::Transactions(vec![Transaction::new(tx.clone())]); + tx.clear(); + if let Err(e) = sender.send(message).await { + tracing::warn!("Failed to send transaction: {}", e); + break 'main; + } + } + if now.elapsed().as_millis() > burst_duration { + tracing::warn!("Transaction rate too high for this client"); + } + } + } + + pub async fn wait(&self) { + // Wait for all nodes to be online. + tracing::info!("Waiting for all nodes to be online..."); + join_all(self.all_nodes.iter().cloned().map(|address| { + tokio::spawn(async move { + while TcpStream::connect(address).await.is_err() { + sleep(Duration::from_millis(10)).await; + } + }) + })) + .await; + } +} diff --git a/mysticeti-core/src/net_sync.rs b/mysticeti-core/src/net_sync.rs index a3941db8..e8440414 100644 --- a/mysticeti-core/src/net_sync.rs +++ b/mysticeti-core/src/net_sync.rs @@ -1,4 +1,3 @@ -use crate::core::Core; use crate::core_thread::CoreThreadDispatcher; use crate::network::{Connection, Network, NetworkMessage}; use crate::runtime::Handle; @@ -11,6 +10,7 @@ use crate::wal::WalSyncer; use crate::{block_handler::BlockHandler, metrics::Metrics}; use crate::{block_store::BlockStore, synchronizer::BlockDisseminator}; use crate::{committee::Committee, synchronizer::BlockFetcher}; +use crate::{core::Core, types::Transaction}; use futures::future::join_all; use std::collections::HashMap; use std::sync::atomic::AtomicU64; @@ -22,6 +22,7 @@ use tokio::sync::{mpsc, oneshot, Notify}; /// The maximum number of blocks that can be requested in a single message. pub const MAXIMUM_BLOCK_REQUEST: usize = 10; +pub const MAXIMUM_TRANSACTIONS_PER_MESSAGE: usize = 100; pub struct NetworkSyncer { inner: Arc>, @@ -35,6 +36,7 @@ pub struct NetworkSyncerInner { pub block_store: BlockStore, pub notify: Arc, committee: Arc, + transactions_dispatcher: mpsc::Sender>, stop: mpsc::Sender<()>, epoch_close_signal: mpsc::Sender<()>, pub epoch_closing_time: Arc, @@ -46,6 +48,7 @@ impl NetworkSyncer mut core: Core, commit_period: u64, mut commit_observer: C, + transactions_dispatcher: mpsc::Sender>, shutdown_grace_period: Duration, metrics: Arc, ) -> Self { @@ -77,6 +80,7 @@ impl NetworkSyncer syncer, block_store, committee, + transactions_dispatcher, stop: stop_sender.clone(), epoch_close_signal: epoch_sender.clone(), epoch_closing_time, @@ -130,24 +134,40 @@ impl NetworkSyncer shutdown_grace_period, )); let cleanup_task = handle.spawn(Self::cleanup_task(inner.clone())); - while let Some(connection) = inner.recv_or_stopped(network.connection_receiver()).await { - let peer_id = connection.peer_id; - if let Some(task) = connections.remove(&peer_id) { - // wait until previous sync task completes - task.await.ok(); - } + loop { + let (peers_connection_receiver, clients_connection_receiver) = + network.connection_receivers(); + + tokio::select! { + // Handles peers connections. + Some(connection) = inner.recv_or_stopped(peers_connection_receiver) => { + let peer_id = connection.peer_id; + if let Some(task) = connections.remove(&peer_id) { + // wait until previous sync task completes + task.await.ok(); + } - let sender = connection.sender.clone(); - let authority = peer_id as AuthorityIndex; - block_fetcher.register_authority(authority, sender).await; - - let task = handle.spawn(Self::connection_task( - connection, - inner.clone(), - block_fetcher.clone(), - metrics.clone(), - )); - connections.insert(peer_id, task); + let sender = connection.sender.clone(); + let authority = peer_id as AuthorityIndex; + block_fetcher.register_authority(authority, sender).await; + + let task = handle.spawn(Self::connection_task( + connection, + inner.clone(), + block_fetcher.clone(), + metrics.clone(), + )); + connections.insert(peer_id, task); + } + // Handle clients connections. + Some(connection) = inner.recv_or_stopped(clients_connection_receiver) => { + handle.spawn(Self::client_connection_task( + connection, + inner.clone(), + )); + }, + else => break, + } } join_all( connections @@ -216,6 +236,10 @@ impl NetworkSyncer NetworkMessage::BlockNotFound(_references) => { // TODO: leverage this signal to request blocks from other peers } + NetworkMessage::Transactions(_transactions) => { + // We do not accept transactions from peers. + tracing::warn!("Peer {} attempted to submit a raw transaction", peer); + } } } disseminator.shutdown().await; @@ -224,6 +248,34 @@ impl NetworkSyncer None } + async fn client_connection_task( + mut connection: Connection, + inner: Arc>, + ) -> Option<()> { + while let Some(message) = inner.recv_or_stopped(&mut connection.receiver).await { + if let NetworkMessage::Transactions(transactions) = message { + if transactions.is_empty() || transactions.len() > MAXIMUM_TRANSACTIONS_PER_MESSAGE + { + continue; + } + + // TODO: This task is a good place to throttle transactions from each client and + // make preliminary transaction checks. Ending this task will terminate the client + // connection. + + if inner + .transactions_dispatcher + .send(transactions) + .await + .is_err() + { + break; + } + } + } + None + } + async fn leader_timeout_task( inner: Arc>, mut epoch_close_signal: mpsc::Receiver<()>, diff --git a/mysticeti-core/src/network.rs b/mysticeti-core/src/network.rs index 977a71a6..de1c4038 100644 --- a/mysticeti-core/src/network.rs +++ b/mysticeti-core/src/network.rs @@ -1,24 +1,24 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::stat::HistogramSender; use crate::types::{AuthorityIndex, RoundNumber, StatementBlock}; use crate::{config::Parameters, data::Data, runtime}; use crate::{ metrics::{print_network_address_table, Metrics}, types::BlockReference, }; +use crate::{stat::HistogramSender, types::Transaction}; use futures::future::{select, select_all, Either}; use futures::FutureExt; use rand::prelude::ThreadRng; use rand::Rng; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::io; use std::net::SocketAddr; use std::ops::Range; use std::sync::Arc; use std::time::Duration; +use std::{collections::HashMap, sync::atomic::AtomicUsize}; +use std::{io, sync::atomic::Ordering}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::net::{TcpListener, TcpSocket, TcpStream}; @@ -37,10 +37,13 @@ pub enum NetworkMessage { RequestBlocks(Vec), /// Indicate that a requested block is not found. BlockNotFound(Vec), + /// Client transactions. + Transactions(Vec), } pub struct Network { connection_receiver: mpsc::Receiver, + clients_connection_receiver: mpsc::Receiver, } pub struct Connection { @@ -51,9 +54,13 @@ pub struct Connection { impl Network { #[cfg(feature = "simulator")] - pub(crate) fn new_from_raw(connection_receiver: mpsc::Receiver) -> Self { + pub(crate) fn new_from_raw( + connection_receiver: mpsc::Receiver, + clients_connection_receiver: mpsc::Receiver, + ) -> Self { Self { connection_receiver, + clients_connection_receiver, } } @@ -68,8 +75,16 @@ impl Network { Self::from_socket_addresses(&addresses, our_id as usize, local_addr, metrics).await } - pub fn connection_receiver(&mut self) -> &mut mpsc::Receiver { - &mut self.connection_receiver + pub fn connection_receivers( + &mut self, + ) -> ( + &mut mpsc::Receiver, + &mut mpsc::Receiver, + ) { + ( + &mut self.connection_receiver, + &mut self.clients_connection_receiver, + ) } pub async fn from_socket_addresses( @@ -108,38 +123,94 @@ impl Network { connection_sender: connection_sender.clone(), bind_addr: bind_addr(local_addr), active_immediately: id < our_id, - latency_sender: metrics.connection_latency_sender.get(id).expect("Can not locate connection_latency_sender metric - did you initialize metrics with correct committee?").clone() + latency_sender: Some(metrics.connection_latency_sender.get(id).expect("Can not locate connection_latency_sender metric - did you initialize metrics with correct committee?").clone()) } .run(receiver), ); } + let (clients_connection_sender, clients_connection_receiver) = mpsc::channel(16); handle.spawn( - Server { + Server::new( server, + local_addr, worker_senders, - } + clients_connection_sender, + ) .run(), ); Self { connection_receiver, + clients_connection_receiver, } } } struct Server { server: TcpListener, + local_addr: SocketAddr, worker_senders: HashMap>, + clients_connection_sender: mpsc::Sender, + max_connections: usize, + current_connections: Arc, } impl Server { + pub const DEFAULT_MAX_CONNECTIONS: usize = 1000; + + pub fn new( + server: TcpListener, + local_addr: SocketAddr, + worker_senders: HashMap>, + clients_connection_sender: mpsc::Sender, + ) -> Self { + let committee_size = worker_senders.len(); + Self { + server, + local_addr, + worker_senders, + clients_connection_sender, + max_connections: Self::DEFAULT_MAX_CONNECTIONS, + current_connections: Arc::new(AtomicUsize::new(committee_size)), + } + } + async fn run(self) { loop { let (socket, remote_peer) = self.server.accept().await.expect("Accept failed"); let remote_peer = remote_to_local_port(remote_peer); + + // Handle connections from peers. if let Some(sender) = self.worker_senders.get(&remote_peer) { + tracing::debug!("Connection from authority peer {remote_peer}"); sender.send(socket).ok(); + + // Handle connections from clients. } else { - tracing::warn!("Dropping connection from unknown peer {remote_peer}"); + tracing::debug!("Connection from client peer {remote_peer}"); + let current_connections = self.current_connections.clone(); + let peer_id = current_connections.load(Ordering::Relaxed); + if peer_id >= self.max_connections { + tracing::warn!("Too many clients, dropping connection from {remote_peer}"); + continue; + } + self.current_connections.fetch_add(1, Ordering::Relaxed); + + let connection_sender = self.clients_connection_sender.clone(); + let bind_addr = bind_addr(self.local_addr); + let handle = Handle::current(); + handle.spawn(async move { + let _ = Worker { + peer: remote_peer, + peer_id, + connection_sender, + bind_addr, + active_immediately: true, + latency_sender: None, + } + .handle_passive_stream(socket) + .await; + current_connections.fetch_sub(1, Ordering::Relaxed); + }); } } } @@ -170,20 +241,20 @@ fn bind_addr(mut local_peer: SocketAddr) -> SocketAddr { local_peer } -struct Worker { - peer: SocketAddr, - peer_id: usize, - connection_sender: mpsc::Sender, - bind_addr: SocketAddr, - active_immediately: bool, - latency_sender: HistogramSender, +pub struct Worker { + pub peer: SocketAddr, + pub peer_id: usize, + pub connection_sender: mpsc::Sender, + pub bind_addr: SocketAddr, + pub active_immediately: bool, + pub latency_sender: Option>, } struct WorkerConnection { sender: mpsc::Sender, receiver: mpsc::Receiver, peer_id: usize, - latency_sender: HistogramSender, + latency_sender: Option>, } impl Worker { @@ -191,7 +262,7 @@ impl Worker { const PASSIVE_HANDSHAKE: u64 = 0x0000AEAE; const MAX_SIZE: u32 = 16 * 1024 * 1024; - async fn run(self, mut receiver: mpsc::UnboundedReceiver) -> Option<()> { + pub async fn run(self, mut receiver: mpsc::UnboundedReceiver) -> Option<()> { let initial_delay = if self.active_immediately { Duration::ZERO } else { @@ -286,7 +357,7 @@ impl Worker { mut writer: OwnedWriteHalf, mut receiver: mpsc::Receiver, mut pong_receiver: mpsc::Receiver, - latency_sender: HistogramSender, + latency_sender: Option>, ) -> io::Result<()> { let start = Instant::now(); let mut ping_deadline = start + PING_INTERVAL; @@ -330,7 +401,9 @@ impl Worker { let time = start.elapsed().as_micros() as u64; match time.checked_sub(our_ping) { Some(delay) => { - latency_sender.observe(Duration::from_micros(delay)); + if let Some(latency_sender) = latency_sender.as_ref() { + latency_sender.observe(Duration::from_micros(delay)); + } }, None => { tracing::warn!("Invalid ping: {ping}, greater then current time {time}"); diff --git a/mysticeti-core/src/simulated_network.rs b/mysticeti-core/src/simulated_network.rs index e9387f90..c41efae0 100644 --- a/mysticeti-core/src/simulated_network.rs +++ b/mysticeti-core/src/simulated_network.rs @@ -24,8 +24,9 @@ impl SimulatedNetwork { .authorities() .map(|_| { let (connection_sender, connection_receiver) = mpsc::channel(16); + let (_client_connection_sender, clients_connection_receiver) = mpsc::channel(16); ( - Network::new_from_raw(connection_receiver), + Network::new_from_raw(connection_receiver, clients_connection_receiver), connection_sender, ) }) diff --git a/mysticeti-core/src/test_util.rs b/mysticeti-core/src/test_util.rs index 626ec5a9..72b1cbf2 100644 --- a/mysticeti-core/src/test_util.rs +++ b/mysticeti-core/src/test_util.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 use crate::block_handler::{BlockHandler, TestBlockHandler}; -use crate::block_store::{BlockStore, BlockWriter, OwnBlockData, WAL_ENTRY_BLOCK}; use crate::committee::Committee; use crate::config::Parameters; use crate::core::{Core, CoreOptions}; @@ -20,6 +19,10 @@ use crate::types::{ }; use crate::wal::{open_file_for_wal, walf, WalPosition, WalWriter}; use crate::{block_handler::TestCommitHandler, metrics::Metrics}; +use crate::{ + block_store::{BlockStore, BlockWriter, OwnBlockData, WAL_ENTRY_BLOCK}, + runtime, +}; use futures::future::join_all; use prometheus::Registry; use rand::rngs::StdRng; @@ -27,6 +30,7 @@ use rand::SeedableRng; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::path::Path; use std::sync::Arc; +use tokio::sync::mpsc; pub fn test_metrics() -> Arc { Metrics::new(&Registry::new(), None).0 @@ -185,17 +189,23 @@ pub fn simulated_network_syncers_with_epoch_duration( core.block_handler().transaction_time.clone(), core.metrics.clone(), ); + let (transactions_sender, mut transactions_receiver) = mpsc::channel(1000); let node_context = OverrideNodeContext::enter(Some(core.authority())); let network_syncer = NetworkSyncer::start( network, core, 3, commit_handler, + transactions_sender, Parameters::DEFAULT_SHUTDOWN_GRACE_PERIOD, test_metrics(), ); drop(node_context); network_syncers.push(network_syncer); + + // Sync network transactions (there shouldn't be any). + runtime::Handle::current() + .spawn(async move { while transactions_receiver.recv().await.is_some() {} }); } (simulated_network, network_syncers, reporters) } @@ -218,15 +228,21 @@ pub async fn network_syncers_with_epoch_duration( core.block_handler().transaction_time.clone(), test_metrics(), ); + let (transactions_sender, mut transactions_receiver) = mpsc::channel(1000); let network_syncer = NetworkSyncer::start( network, core, 3, commit_handler, + transactions_sender, Parameters::DEFAULT_SHUTDOWN_GRACE_PERIOD, test_metrics(), ); network_syncers.push(network_syncer); + + // Sync network transactions (there shouldn't be any). + runtime::Handle::current() + .spawn(async move { while transactions_receiver.recv().await.is_some() {} }); } network_syncers } diff --git a/mysticeti-core/src/types.rs b/mysticeti-core/src/types.rs index c7e98bfb..77b3fc6a 100644 --- a/mysticeti-core/src/types.rs +++ b/mysticeti-core/src/types.rs @@ -3,7 +3,7 @@ pub type AuthorityIndex = u64; -#[derive(Clone, Eq, PartialEq, Serialize, Deserialize, Default)] +#[derive(Clone, Eq, PartialEq, Serialize, Deserialize, Default, Debug)] pub struct Transaction { data: Vec, } diff --git a/mysticeti-core/src/validator.rs b/mysticeti-core/src/validator.rs index ab46d859..91869884 100644 --- a/mysticeti-core/src/validator.rs +++ b/mysticeti-core/src/validator.rs @@ -10,10 +10,11 @@ use std::{ use ::prometheus::Registry; use eyre::{eyre, Context, Result}; +use tokio::sync::mpsc; -use crate::block_handler::TransactionGenerator; -use crate::core::CoreOptions; use crate::log::TransactionLog; +use crate::{block_handler::BatchGenerator, core::CoreOptions}; +use crate::{block_handler::TransactionGenerator, runtime}; use crate::{ block_handler::{RealBlockHandler, TestCommitHandler}, committee::Committee, @@ -69,24 +70,43 @@ impl Validator { config.storage(), metrics.clone(), ); - let tps = env::var("TPS"); - let tps = tps.map(|t| t.parse::().expect("Failed to parse TPS variable")); - let tps = tps.unwrap_or(10); - let transactions_per_100ms = (tps + 9) / 10; - let initial_delay = env::var("INITIAL_DELAY"); - let initial_delay = initial_delay.map(|t| { - t.parse::() - .expect("Failed to parse INITIAL_DELAY variable") - }); - let initial_delay = initial_delay.unwrap_or(10); - tracing::info!("Starting generator with {transactions_per_100ms} transactions per 100ms, initial delay {initial_delay} sec"); - let initial_delay = Duration::from_secs(initial_delay); - TransactionGenerator::start( - block_sender, - authority, - transactions_per_100ms, - initial_delay, - ); + + // Define how to handle transactions. + let (transactions_sender, mut transactions_receiver) = mpsc::channel(1000); + if parameters.local_transactions_generation { + // Setup the local transaction generator. + let tps = env::var("TPS"); + let tps = tps.map(|t| t.parse::().expect("Failed to parse TPS variable")); + let tps = tps.unwrap_or(10); + let transactions_per_100ms = (tps + 9) / 10; + let initial_delay = env::var("INITIAL_DELAY"); + let initial_delay = initial_delay.map(|t| { + t.parse::() + .expect("Failed to parse INITIAL_DELAY variable") + }); + let initial_delay = initial_delay.unwrap_or(10); + tracing::info!("Starting generator with {transactions_per_100ms} transactions per 100ms, initial delay {initial_delay} sec"); + let initial_delay = Duration::from_secs(initial_delay); + TransactionGenerator::start( + block_sender, + authority, + transactions_per_100ms, + initial_delay, + ); + + // Sink the transactions from the network. + runtime::Handle::current() + .spawn(async move { while transactions_receiver.recv().await.is_some() {} }); + } else { + tracing::info!("Accepting transactions from the network"); + BatchGenerator::start( + transactions_receiver, + block_sender, + parameters.max_batch_size, + parameters.max_batch_delay, + ); + } + let committed_transaction_log = TransactionLog::start(config.storage().committed_transactions_log()) .expect("Failed to open committed transaction log for write"); @@ -119,6 +139,7 @@ impl Validator { core, parameters.wave_length(), commit_handler, + transactions_sender, parameters.shutdown_grace_period(), metrics, ); @@ -150,7 +171,7 @@ impl Validator { } #[cfg(test)] -mod test { +mod smoke_tests { use std::{ collections::VecDeque, net::{IpAddr, Ipv4Addr, SocketAddr}, @@ -163,27 +184,33 @@ mod test { use crate::{ committee::Committee, config::{Parameters, PrivateConfig}, - prometheus, + load_generator::LoadGenerator, + prometheus, runtime, types::AuthorityIndex, }; use super::Validator; /// Check whether the validator specified by its metrics address has committed at least once. - async fn check_commit(address: &SocketAddr) -> Result { + /// The bool parameter indicate whether to check for a non-empty commit. + async fn check_commit(address: &SocketAddr, non_empty: bool) -> Result { let route = prometheus::METRICS_ROUTE; let res = reqwest::get(format! {"http://{address}{route}"}).await?; let string = res.text().await?; - let commit = string.contains("committed_leaders_total"); + let commit = if non_empty { + string.contains("latency_s_count") + } else { + string.contains("committed_leaders_total") + }; Ok(commit) } /// Await for all the validators specified by their metrics addresses to commit. - async fn await_for_commits(addresses: Vec) { + async fn await_for_commits(addresses: Vec, non_empty: bool) { let mut queue = VecDeque::from(addresses); while let Some(address) = queue.pop_front() { time::sleep(Duration::from_millis(100)).await; - match check_commit(&address).await { + match check_commit(&address, non_empty).await { Ok(commits) if commits => (), _ => queue.push_back(address), } @@ -192,7 +219,8 @@ mod test { /// Ensure that a committee of honest validators commits. #[tokio::test] - async fn validator_smoke_test() { + #[ignore = "TODO: fix port binding conflict"] + async fn validator_commit() { let committee_size = 4; let ips = vec![IpAddr::V4(Ipv4Addr::LOCALHOST); committee_size]; @@ -218,14 +246,61 @@ mod test { let timeout = Parameters::DEFAULT_LEADER_TIMEOUT * 5; tokio::select! { - _ = await_for_commits(addresses) => (), + _ = await_for_commits(addresses, false) => (), + _ = time::sleep(timeout) => panic!("Failed to gather commits within a few timeouts"), + } + } + + /// Ensure that a committee of honest validators commits a non-empty block. + #[tokio::test] + async fn validator_commit_non_empty() { + let committee_size = 4; + let ips = vec![IpAddr::V4(Ipv4Addr::LOCALHOST); committee_size]; + + let committee = Committee::new_for_benchmarks(committee_size); + let parameters = Parameters::new_for_benchmarks(ips); + + let mut handles = Vec::new(); + let tempdir = TempDir::new("validator_smoke_test").unwrap(); + for i in 0..committee_size { + let authority = i as AuthorityIndex; + let private = PrivateConfig::new_for_benchmarks(tempdir.as_ref(), authority); + + // Boot the validator node. + let validator = Validator::start(authority, committee.clone(), ¶meters, private) + .await + .unwrap(); + handles.push(validator.await_completion()); + + // Boot a load generator targeting the validator. + let generator = LoadGenerator::new( + authority, + 40, // transaction rate + 10, // transaction size + ¶meters, + ) + .unwrap(); + runtime::Handle::current().spawn(async move { + generator.wait().await; + generator.run().await.unwrap(); + }); + } + + let addresses = parameters + .all_metric_addresses() + .map(|address| address.to_owned()) + .collect(); + let timeout = Parameters::DEFAULT_LEADER_TIMEOUT * 50; + + tokio::select! { + _ = await_for_commits(addresses, true) => (), _ = time::sleep(timeout) => panic!("Failed to gather commits within a few timeouts"), } } /// Ensure validators can sync missing blocks #[tokio::test] - #[ignore = "https://github.com/MystenLabs/project-mysticeti/pull/14"] + #[ignore = "TODO: fix port binding conflict"] async fn validator_sync() { let committee_size = 4; let ips = vec![IpAddr::V4(Ipv4Addr::LOCALHOST); committee_size]; @@ -247,16 +322,15 @@ mod test { handles.push(validator.await_completion()); } - // Boot the last validator after they others commit. + // Boot the last validator after the others commit. let addresses = parameters .all_metric_addresses() .skip(1) .map(|address| address.to_owned()) .collect(); let timeout = Parameters::DEFAULT_LEADER_TIMEOUT * 5; - println!("addresses: {:?}", addresses); tokio::select! { - _ = await_for_commits(addresses) => (), + _ = await_for_commits(addresses, false) => (), _ = time::sleep(timeout) => panic!("Failed to gather commits within a few timeouts"), } @@ -276,7 +350,7 @@ mod test { .unwrap(); let timeout = Parameters::DEFAULT_LEADER_TIMEOUT * 5; tokio::select! { - _ = await_for_commits(vec![address]) => (), + _ = await_for_commits(vec![address], false) => (), _ = time::sleep(timeout) => panic!("Failed to gather commits within a few timeouts"), } } diff --git a/mysticeti/Cargo.toml b/mysticeti/Cargo.toml index e687f172..45843069 100644 --- a/mysticeti/Cargo.toml +++ b/mysticeti/Cargo.toml @@ -12,7 +12,11 @@ eyre = { workspace = true } color-eyre = { workspace = true } clap = { workspace = true } tracing = { workspace = true } +rand = { workspace = true } tracing-subscriber = "0.3" mysticeti-core = { path = "../mysticeti-core" } +[[bin]] +name = "load-generator" +path = "src/load_generator.rs" diff --git a/mysticeti/src/load_generator.rs b/mysticeti/src/load_generator.rs new file mode 100644 index 00000000..022c0bc7 --- /dev/null +++ b/mysticeti/src/load_generator.rs @@ -0,0 +1,51 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use clap::Parser; +use eyre::{Context, Result}; +use mysticeti_core::{ + config::{Parameters, Print}, + load_generator::LoadGenerator, +}; +use tracing_subscriber::{filter::LevelFilter, fmt, EnvFilter}; + +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +struct Args { + /// The index of the target node. + #[clap(short, long, value_name = "INT")] + target: u64, + + /// The rate of transactions per second. + #[clap(short, long, value_name = "INT")] + rate: u64, + + /// The size of each transaction in bytes. + #[clap(short, long, value_name = "INT")] + size: usize, + + /// Path to the file holding the public validator parameters (such as network addresses). + #[clap(long, value_name = "FILE")] + parameters_path: String, +} + +#[tokio::main] +async fn main() -> Result<()> { + color_eyre::install()?; + let filter = EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(); + fmt().with_env_filter(filter).init(); + + let args = Args::parse(); + let parameters = Parameters::load(&args.parameters_path).wrap_err(format!( + "Failed to load parameters file '{}'", + args.parameters_path + ))?; + + let generator = LoadGenerator::new(args.target, args.rate, args.size, ¶meters)?; + generator.wait().await; + generator.run().await?; + + Ok(()) +}