Skip to content

Commit

Permalink
Add pbft to workload binary
Browse files Browse the repository at this point in the history
  • Loading branch information
sgdxbc committed Jul 6, 2024
1 parent 136a691 commit 0ff8372
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 16 deletions.
1 change: 1 addition & 0 deletions .cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"neatworks",
"pbft",
"prehashed",
"rustup",
"Schnorrkel",
"secp",
"secp256k1",
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,6 @@ jobs:
- uses: actions/checkout@v3
- run: rustup update ${{ matrix.toolchain }} && rustup default ${{ matrix.toolchain }}
- run: cargo build --verbose --all-targets
- run: cargo test --verbose --lib
- run: cargo test --verbose --lib
- run: cargo run --verbose --release --bin workload-standalone -- unreplicated
- run: cargo run --verbose --release --bin workload-standalone -- pbft
36 changes: 31 additions & 5 deletions src/bin/workload-standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
time::{Duration, Instant},
};

use neatworks::workload::events::Invoke;
use neatworks::{pbft::PublicParameters, workload::events::Invoke};
use tokio::{select, time::sleep};

pub mod workload {
Expand All @@ -22,6 +22,7 @@ impl workload::clients::InvokeTask for InvokeTask {
neatworks::workload::events::InvokeOk<bytes::Bytes>,
>,
) -> anyhow::Result<()> {
sleep(Duration::from_millis(100)).await;
for _ in 0..10 {
let start = Instant::now();
sender.send(Invoke(Default::default()))?;
Expand All @@ -40,15 +41,40 @@ async fn main() -> anyhow::Result<()> {
match mode.as_deref().unwrap_or("unreplicated") {
"unreplicated" => {
let server_task = workload::servers::unreplicated();
let client_task = async {
sleep(Duration::from_millis(100)).await;
workload::clients::unreplicated(InvokeTask).await
};
let client_task = workload::clients::unreplicated(InvokeTask);
select! {
result = client_task => break 'client result?,
result = server_task => result?,
}
}
"pbft" => {
let config = PublicParameters {
num_replica: 4,
num_faulty: 1,
num_concurrent: 1,
max_batch_size: 1,
..PublicParameters::durations(if cfg!(debug_assertions) {
Duration::from_millis(300)
} else {
Duration::from_millis(100)
})
};
let addrs = (0..4)
.map(|index| ([127, 0, 0, 1 + index], 3000).into())
.collect::<Vec<_>>();
let server_task0 = workload::servers::pbft(config.clone(), 0, addrs.clone());
let server_task1 = workload::servers::pbft(config.clone(), 1, addrs.clone());
let server_task2 = workload::servers::pbft(config.clone(), 2, addrs.clone());
let server_task3 = workload::servers::pbft(config.clone(), 3, addrs.clone());
let client_task = workload::clients::pbft(InvokeTask, config, addrs);
select! {
result = client_task => break 'client result?,
result = server_task0 => result?,
result = server_task1 => result?,
result = server_task2 => result?,
result = server_task3 => result?,
}
}
_ => anyhow::bail!("unimplemented"),
}
anyhow::bail!("unexpected termination of server task")
Expand Down
15 changes: 11 additions & 4 deletions src/bin/workload/clients.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{future::Future, sync::Arc};
use std::{future::Future, net::SocketAddr, sync::Arc};

use bytes::Bytes;
use neatworks::{
Expand Down Expand Up @@ -71,7 +71,11 @@ pub async fn unreplicated(invoke_task: impl InvokeTask) -> anyhow::Result<()> {
.await
}

pub async fn pbft(invoke_task: impl InvokeTask, config: PublicParameters) -> anyhow::Result<()> {
pub async fn pbft(
invoke_task: impl InvokeTask,
config: PublicParameters,
replica_addrs: Vec<SocketAddr>,
) -> anyhow::Result<()> {
let socket = Arc::new(UdpSocket::bind("localhost:0").await?);
let addr = socket.local_addr()?;
let (upcall_sender, upcall_receiver) = unbounded_channel::<InvokeOk<_>>();
Expand All @@ -84,8 +88,11 @@ pub async fn pbft(invoke_task: impl InvokeTask, config: PublicParameters) -> any
);

let mut context = pbft::client::context::Context::<_, _, Of<_>> {
// TODO
net: pbft::messages::codec::to_replica_encode(IndexNet::new(vec![], None, socket.clone())),
net: pbft::messages::codec::to_replica_encode(IndexNet::new(
replica_addrs,
None,
socket.clone(),
)),
upcall: upcall_sender,
schedule: Erase::new(ScheduleState::new(schedule_sender)),
};
Expand Down
13 changes: 8 additions & 5 deletions src/bin/workload/servers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{net::SocketAddr, sync::Arc};

use neatworks::{
crypto::{Crypto, CryptoFlavor},
Expand Down Expand Up @@ -35,17 +35,20 @@ pub async fn unreplicated() -> anyhow::Result<()> {
anyhow::bail!("unexpected termination of infinite task")
}

pub async fn pbft(config: pbft::PublicParameters, index: usize) -> anyhow::Result<()> {
let socket = Arc::new(UdpSocket::bind("localhost:3000").await?);
pub async fn pbft(
config: pbft::PublicParameters,
index: usize,
addrs: Vec<SocketAddr>,
) -> anyhow::Result<()> {
let socket = Arc::new(UdpSocket::bind(addrs[index]).await?);

let (crypto_sender, mut crypto_receiver) = unbounded_channel();
let (schedule_sender, mut schedule_receiver) = unbounded_channel();
let (sender, mut receiver) = unbounded_channel();

let mut context = pbft::replica::context::Context::<_, _, Of<_>, _> {
// TODO
peer_net: pbft::messages::codec::to_replica_encode(IndexNet::new(
vec![],
addrs,
index,
socket.clone(),
)),
Expand Down
2 changes: 1 addition & 1 deletion src/pbft/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl<S: App, A: Addr, C: Context<Self, A>> OnErasedEvent<Recv<Request<A>>, C> fo
request,
)?;
let view_num = self.view_num;
self.do_view_change_timer.set(
self.do_view_change_timer.ensure_set(
move || events::DoViewChange(view_num + 1),
context.schedule(),
)?;
Expand Down

0 comments on commit 0ff8372

Please sign in to comment.