Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accept transactions from network clients #23

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[workspace]
members = [
"mysticeti-core",
"mysticeti-core",
"mysticeti",
"third-party/minibytes",
"orchestrator"
"orchestrator",
]

[workspace.dependencies]
Expand All @@ -16,6 +16,7 @@ clap = { version = "4.3.3", features = ["derive"] }
tracing = "0.1.37"
tempfile = "3.6.0"
reqwest = { version = "0.11.18", features = ["json"] }
rand = "0.8.5"

[profile.release]
panic = "abort"
Expand Down
8 changes: 5 additions & 3 deletions mysticeti-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ serde = { workspace = true }
eyre = { workspace = true }
tracing = { workspace = true }
tempfile = { workspace = true } # todo - move to dev-dep
rand = { workspace = true }

rand = "0.8.5"
bincode = "1.3.3"
parking_lot = "0.12.1"
prometheus = "0.13.3"
Expand All @@ -25,7 +25,9 @@ libc = "0.2.146"
tracing-subscriber = "0.3.17"
tracing-core = "0.1.31"

minibytes = { path = "../third-party/minibytes", default_features = false, features = ["frommmap"] }
minibytes = { path = "../third-party/minibytes", default_features = false, features = [
"frommmap",
] }

blake2 = "0.10.6"
digest = "0.10.6"
Expand All @@ -43,4 +45,4 @@ tempdir = "0.3.7"
seahash = "4.1.0"

[features]
simulator = []
simulator = []
52 changes: 52 additions & 0 deletions mysticeti-core/src/block_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,58 @@ impl BlockHandler for TestBlockHandler {
}
}

/// This structure is in charge of receiving client transactions (from the network), batching them,
/// and sending them to the block handler for inclusion in blocks.
pub struct BatchGenerator {
transactions_receiver: mpsc::Receiver<Vec<Transaction>>,
batch_sender: mpsc::Sender<Vec<Transaction>>,
max_batch_size: usize,
max_batch_delay: Duration,
}

impl BatchGenerator {
pub fn start(
transactions_receiver: mpsc::Receiver<Vec<Transaction>>,
batch_sender: mpsc::Sender<Vec<Transaction>>,
max_batch_size: usize,
max_batch_delay: Duration,
) {
let this = Self {
transactions_receiver,
batch_sender,
max_batch_size,
max_batch_delay,
};
runtime::Handle::current().spawn(this.run());
}

async fn run(mut self) {
let mut batch = Vec::with_capacity(self.max_batch_size);
loop {
let mut ready = false;
tokio::select! {
_ = runtime::sleep(self.max_batch_delay) => {
ready = true;
}
Some(transactions) = self.transactions_receiver.recv() => {
batch.extend(transactions);
if batch.len() >= self.max_batch_size {
ready = true;
}
},
else => break
}

if ready {
if self.batch_sender.send(batch.clone()).await.is_err() {
break;
}
batch.clear();
}
}
}
}

pub struct TransactionGenerator {
sender: mpsc::Sender<Vec<Transaction>>,
rng: StdRng,
Expand Down
22 changes: 21 additions & 1 deletion mysticeti-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ pub struct Parameters {
leader_timeout: Duration,
rounds_in_epoch: RoundNumber,
shutdown_grace_period: Duration,
/// Drop transactions from network clients and instead locally generate random
/// transactions. This is useful for testing (but should not be used for benchmarks).
pub local_transactions_generation: bool,
/// Maximum number of transactions in a batch. This parameter is unused if `local_transactions_generation`
/// is set to `true`.
pub max_batch_size: usize,
/// Maximum delay after which a batch is sent out even if it is not full. This parameter is unused if
/// `local_transactions_generation` is set to `true`.
pub max_batch_delay: Duration,
}

impl Parameters {
Expand All @@ -52,10 +61,13 @@ impl Parameters {

pub const BENCHMARK_PORT_OFFSET: u16 = 1500;

// needs to be sufficiently long to run benchmarks
// Needs to be sufficiently long to run benchmarks.
pub const DEFAULT_ROUNDS_IN_EPOCH: u64 = 3_600_000;
pub const DEFAULT_SHUTDOWN_GRACE_PERIOD: Duration = Duration::from_secs(2);

pub const DEFAULT_MAX_BATCH_SIZE: usize = 100;
pub const DEFAULT_MAX_BATCH_DELAY: Duration = Duration::from_millis(100);

pub fn new_for_benchmarks(ips: Vec<IpAddr>) -> Self {
let benchmark_port_offset = ips.len() as u16;
let mut identifiers = Vec::new();
Expand All @@ -77,9 +89,17 @@ impl Parameters {
leader_timeout: Self::DEFAULT_LEADER_TIMEOUT,
rounds_in_epoch: Self::DEFAULT_ROUNDS_IN_EPOCH,
shutdown_grace_period: Self::DEFAULT_SHUTDOWN_GRACE_PERIOD,
local_transactions_generation: false,
max_batch_size: Self::DEFAULT_MAX_BATCH_SIZE,
max_batch_delay: Self::DEFAULT_MAX_BATCH_DELAY,
}
}

pub fn with_local_transactions_generation(mut self) -> Self {
self.local_transactions_generation = true;
self
}

/// Return all network addresses (including our own) in the order of the authority index.
pub fn all_network_addresses(&self) -> impl Iterator<Item = SocketAddr> + '_ {
self.identifiers.iter().map(|id| id.network_address)
Expand Down
1 change: 1 addition & 0 deletions mysticeti-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ mod state;

mod epoch_close;
mod finalization_interpreter;
pub mod load_generator;
mod synchronizer;
131 changes: 131 additions & 0 deletions mysticeti-core/src/load_generator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{net::SocketAddr, time::Duration};

use eyre::{eyre, Result};
use futures::future::join_all;
use rand::Rng;
use tokio::{
net::TcpStream,
sync::mpsc,
time::{interval, sleep, Instant},
};

use crate::{
config::Parameters,
network::{NetworkMessage, Worker},
types::Transaction,
};

pub struct LoadGenerator {
target_id: u64,
target_address: SocketAddr,
transaction_size: usize,
transaction_rate: u64,
all_nodes: Vec<SocketAddr>,
}

impl LoadGenerator {
/// Transaction rate sample precision.
const PRECISION: u64 = 20;

pub fn new(target: u64, rate: u64, size: usize, parameters: &Parameters) -> Result<Self> {
Ok(Self {
target_id: target,
target_address: parameters
.network_address(target)
.ok_or_else(|| eyre!("Target node not found"))?,
transaction_size: size,
transaction_rate: rate,
all_nodes: parameters.all_network_addresses().collect(),
})
}

pub async fn run(&self) -> Result<()> {
// The transaction rate must at least match the sample precision.
if self.transaction_rate < Self::PRECISION {
return Err(eyre!(
"Transaction rate must be at least {} tx/s",
Self::PRECISION
));
}
// The transaction size must be at least 8 bytes to ensure all txs are different.
if self.transaction_size < 8 {
return Err(eyre!("Transaction size must be at least 8 bytes"));
}

let (connection_sender, mut connection_receiver) = mpsc::channel(1);
let (_tcp_sender, tcp_receiver) = mpsc::unbounded_channel();

let worker = Worker {
peer: self.target_address,
peer_id: self.target_id as usize,
connection_sender,
bind_addr: "127.0.0.1:0".parse().unwrap(), // Unused
active_immediately: true,
latency_sender: None,
};
tokio::spawn(worker.run(tcp_receiver));

while let Some(connection) = connection_receiver.recv().await {
tracing::info!(
"Client connected to peer {} ({})",
self.target_id,
self.target_address
);
self.send_transactions(connection.sender).await;
}
Ok(())
}

async fn send_transactions(&self, sender: mpsc::Sender<NetworkMessage>) {
let burst = self.transaction_rate / Self::PRECISION;
let mut tx = Vec::with_capacity(self.transaction_size);
let zeros = vec![0u8; self.transaction_size - 8];
let mut r: u64 = rand::thread_rng().gen();
let burst_duration: u128 = 1000 / Self::PRECISION as u128;

let interval = interval(Duration::from_millis(burst_duration as u64));
tokio::pin!(interval);

tracing::info!(
"Start sending transactions to peer {} ({})",
self.target_id,
self.target_address
);
'main: loop {
interval.as_mut().tick().await;
let now = Instant::now();

for _ in 0..burst {
r += 1; // Ensures all clients send different txs.
tx.extend_from_slice(&r.to_le_bytes());
tx.extend_from_slice(&zeros[..]);

let message = NetworkMessage::Transactions(vec![Transaction::new(tx.clone())]);
tx.clear();
if let Err(e) = sender.send(message).await {
tracing::warn!("Failed to send transaction: {}", e);
break 'main;
}
}
if now.elapsed().as_millis() > burst_duration {
tracing::warn!("Transaction rate too high for this client");
}
}
}

pub async fn wait(&self) {
// Wait for all nodes to be online.
tracing::info!("Waiting for all nodes to be online...");
join_all(self.all_nodes.iter().cloned().map(|address| {
tokio::spawn(async move {
while TcpStream::connect(address).await.is_err() {
sleep(Duration::from_millis(10)).await;
}
})
}))
.await;
}
}
Loading