Skip to content

Commit

Permalink
Add nitro enclaves clock cops
Browse files Browse the repository at this point in the history
  • Loading branch information
sgdxbc committed May 4, 2024
1 parent 5e8c052 commit e000ade
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 56 deletions.
207 changes: 193 additions & 14 deletions src/bin/boson/cops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{future::Future, mem::take, net::SocketAddr, ops::Range, pin::Pin, time

use augustus::{
app::{self, ycsb, App},
boson::{self, QuorumClient, QuorumClock, VerifyClock},
boson::{self, nitro_enclaves_portal_session, QuorumClient, QuorumClock, VerifyClock},
cops::{self, OrdinaryVersion, OrdinaryVersionService},
event::{
self,
Expand All @@ -22,7 +22,7 @@ use augustus::{
use boson_control_messages::{CopsClient, CopsServer, Variant};
use rand::{rngs::StdRng, thread_rng, Rng, SeedableRng};
use rand_distr::Uniform;
use tokio::{net::TcpListener, task::JoinSet, time::sleep};
use tokio::{net::TcpListener, sync::mpsc::unbounded_channel, task::JoinSet, time::sleep};
use tokio_util::sync::CancellationToken;

fn create_workload(
Expand All @@ -43,18 +43,27 @@ fn create_workload(
settings0.request_distr = ycsb::SettingsDistr::Uniform;
settings0.field_count = 1;
let mut workload0 = ycsb::Workload::new(StdRng::from_rng(&mut rng)?, settings0)?;
workload0.key_num = ycsb::Gen::Uniform(Uniform::from(put_range));
// workload0.key_num = ycsb::Gen::Uniform(Uniform::from(put_range));
workload0.key_num = ycsb::Gen::Uniform(Uniform::from(put_range.clone()));

let mut settings1 = ycsb::WorkloadSettings::new_c(record_count);
settings1.request_distr = ycsb::SettingsDistr::Uniform;
settings1.field_count = 1;
let workload1 = ycsb::Workload::new(StdRng::from_rng(&mut rng)?, settings1)?;
// let workload1 = ycsb::Workload::new(StdRng::from_rng(&mut rng)?, settings1)?;
let mut workload1 = ycsb::Workload::new(StdRng::from_rng(&mut rng)?, settings1)?;
workload1.key_num = ycsb::Gen::Uniform(Uniform::from(put_range));

anyhow::ensure!(put_ratio <= 0.5);
let workload = Weighted2(workload0, workload1, StdRng::from_rng(rng)?, put_ratio * 2.);
Ok(workload)
Ok(Weighted2(
workload0,
workload1,
StdRng::from_rng(rng)?,
put_ratio * 2.,
))
}

// this is a humor function :) a typical consequence of "i am willing to pay any
// cost as long as i can get rid of redundant code"
// this is a humorous function :) a typical consequence of "i am willing to pay
// any cost as long as i can get rid of redundant code"
// maybe its seemingly ridiculous just comes from the fact that i have to write
// down all the types, instead of letting compiler to infer them out, as the
// case of Haskell and OCaml
Expand Down Expand Up @@ -126,7 +135,7 @@ pub async fn pbft_client_session(
let addr = SocketAddr::from((ip, tcp_listener.local_addr()?.port()));
// println!("client {addr} listening {:?}", tcp_listener.local_addr());
let workload = create_workload(
&mut StdRng::seed_from_u64(117418 + index as u64),
&mut thread_rng(),
record_count,
put_range.clone(),
put_ratio,
Expand Down Expand Up @@ -294,12 +303,12 @@ pub async fn untrusted_client_session(
anyhow::ensure!(addrs.is_empty());

let mut sessions = JoinSet::new();
for index in 0..num_concurrent {
for _ in 0..num_concurrent {
// let tcp_listener = TcpListener::bind((ip, 0)).await?;
let tcp_listener = TcpListener::bind(SocketAddr::from(([0; 4], 0))).await?;
let addr = SocketAddr::from((ip, tcp_listener.local_addr()?.port()));
let workload = create_workload(
&mut StdRng::seed_from_u64(117418 + index as u64),
&mut thread_rng(),
record_count,
put_range.clone(),
put_ratio,
Expand Down Expand Up @@ -447,12 +456,12 @@ pub async fn quorum_client_session(
// let crypto = Crypto::new_random(&mut thread_rng());

let mut sessions = JoinSet::new();
for index in 0..num_concurrent {
for _ in 0..num_concurrent {
// let tcp_listener = TcpListener::bind((ip, 0)).await?;
let tcp_listener = TcpListener::bind(SocketAddr::from(([0; 4], 0))).await?;
let addr = SocketAddr::from((ip, tcp_listener.local_addr()?.port()));
let workload = create_workload(
&mut StdRng::seed_from_u64(117418 + index as u64),
&mut thread_rng(),
record_count,
put_range.clone(),
put_ratio,
Expand Down Expand Up @@ -536,7 +545,6 @@ pub async fn quorum_server_session(
anyhow::bail!("unimplemented")
};
let addr = addrs[id as usize];
// replica also don't sign any clock
let crypto = Crypto::new_random(&mut thread_rng());

// let tcp_listener = TcpListener::bind(addr).await?;
Expand Down Expand Up @@ -635,4 +643,175 @@ pub async fn quorum_server_session(
anyhow::bail!("unreachable")
}

pub async fn nitro_enclaves_client_session(
config: CopsClient,
upcall: impl Clone + SendEvent<(f32, Duration)> + Send + Sync + 'static,
) -> anyhow::Result<()> {
// use augustus::crypto::peer::Crypto;

let CopsClient {
mut addrs,
ip,
num_concurrent,
put_ratio,
record_count,
put_range,
variant: Variant::NitroEnclaves,
} = config
else {
anyhow::bail!("unimplemented")
};
let replica_addr = addrs.remove(0);
anyhow::ensure!(addrs.is_empty());
// let crypto = Crypto::new_random(&mut thread_rng());

let mut sessions = JoinSet::new();
for _ in 0..num_concurrent {
// let tcp_listener = TcpListener::bind((ip, 0)).await?;
let tcp_listener = TcpListener::bind(SocketAddr::from(([0; 4], 0))).await?;
let addr = SocketAddr::from((ip, tcp_listener.local_addr()?.port()));
let workload = create_workload(
&mut thread_rng(),
record_count,
put_range.clone(),
put_ratio,
)?;

let mut dispatch_session = event::Session::new();
let mut client_session = Session::new();
let mut close_loop_session = Session::new();
// let (crypto_worker, mut crypto_executor) = spawning_backend();

let mut dispatch = event::Unify(event::Buffered::from(Dispatch::new(
Tcp::new(addr)?,
{
// let mut sender = boson::VerifyQuorumClock::new(config.num_faulty, crypto_worker);
let mut sender = Sender::from(client_session.sender());
move |buf: &_| cops::to_client_on_buf(buf, &mut sender)
},
Once(dispatch_session.sender()),
)?));
let mut client = Blanket(Buffered::from(cops::Client::<
_,
_,
boson::NitroEnclavesClock,
_,
>::new(
addr,
replica_addr,
cops::ToReplicaMessageNet::new(dispatch::Net::from(dispatch_session.sender())),
Box::new(Sender::from(close_loop_session.sender()))
as Box<dyn cops::Upcall + Send + Sync>,
)));
let mut close_loop = Blanket(Unify(CloseLoop::new(
Sender::from(client_session.sender()),
ycsb::Destruct::from(OpLatency::new(workload)),
)));

let mut upcall = upcall.clone();
// let crypto = crypto.clone();
sessions.spawn(async move {
let tcp_accept_session = tcp::accept_session(tcp_listener, dispatch_session.sender());
// let crypto_session = crypto_executor.run(crypto, Sender::from(client_session.sender()));
let dispatch_session = dispatch_session.run(&mut dispatch);
let client_session = client_session.run(&mut client);
let close_loop_session = async move {
Sender::from(close_loop_session.sender()).send(Init)?;
let (throughput, latency) = benchmark_session(
&mut close_loop_session,
&mut close_loop,
|session, state| Box::pin(session.run(state)),
|state| state.workload.as_mut(),
)
.await?;
upcall.send((throughput, latency))
};
tokio::select! {
result = tcp_accept_session => result?,
// result = crypto_session => result?,
result = dispatch_session => result?,
result = client_session => result?,
result = close_loop_session => return result,
}
anyhow::bail!("unreachable")
});
}
while let Some(result) = sessions.join_next().await {
result??
}
Ok(())
}

pub async fn nitro_enclaves_server_session(
config: CopsServer,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let CopsServer {
addrs,
id,
record_count,
variant: Variant::NitroEnclaves,
} = config
else {
anyhow::bail!("unimplemented")
};
let addr = addrs[id as usize];

// let tcp_listener = TcpListener::bind(addr).await?;
let tcp_listener = TcpListener::bind(SocketAddr::from(([0; 4], addr.port()))).await?;

let mut dispatch_session = event::Session::new();
let mut replica_session = Session::new();
let (clock_sender, clock_receiver) = unbounded_channel();
let (client_crypto_worker, mut client_crypto_executor) = spawning_backend();

let mut dispatch = event::Unify(event::Buffered::from(Dispatch::new(
Tcp::new(addr)?,
{
let mut sender = VerifyClock::new(0, client_crypto_worker);
move |buf: &_| cops::to_replica_on_buf(buf, &mut sender)
},
Once(dispatch_session.sender()),
)?));
let mut replica = Blanket(Buffered::from(
cops::Replica::<_, _, _, _, SocketAddr>::new(
boson::NitroEnclavesClock::try_from(OrdinaryVersion::default())?,
cops::ToReplicaMessageNet::<_, _, SocketAddr>::new(IndexNet::new(
dispatch::Net::from(dispatch_session.sender()),
addrs,
id as usize,
)),
cops::ToClientMessageNet::new(dispatch::Net::from(dispatch_session.sender())),
boson::Cops(clock_sender),
),
));
let mut settings = ycsb::WorkloadSettings::new(record_count);
settings.field_count = 1;
let mut workload = ycsb::Workload::new(StdRng::seed_from_u64(117418), settings)?;
let mut workload = Iter::<_, _, ()>::from(workload.startup_ops());
while let Some((op, ())) = workload.next_op()? {
replica.startup_insert(op)?
}

let tcp_accept_session = tcp::accept_session(tcp_listener, dispatch_session.sender());
let client_crypto_session =
client_crypto_executor.run((), Sender::from(replica_session.sender()));
let dispatch_session = dispatch_session.run(&mut dispatch);
let clock_session = nitro_enclaves_portal_session(
16,
clock_receiver,
boson::Cops(Sender::from(replica_session.sender())),
);
let replica_session = replica_session.run(&mut replica);
tokio::select! {
() = cancel.cancelled() => return Ok(()),
result = tcp_accept_session => result?,
result = client_crypto_session => result?,
result = dispatch_session => result?,
result = clock_session => result?,
result = replica_session => result?,
}
anyhow::bail!("unreachable")
}

// cSpell:words pbft upcall
8 changes: 6 additions & 2 deletions src/bin/boson/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,9 @@ async fn cops_start_client(
Quorum(_) => state
.handle
.spawn(cops::quorum_client_session(config, upcall_sender)),
NitroEnclaves => todo!(),
NitroEnclaves => state
.handle
.spawn(cops::nitro_enclaves_client_session(config, upcall_sender)),
};
*session = Some(AppSession { handle, cancel });
let replaced = state.channel.lock().await.replace(AppChannel {
Expand Down Expand Up @@ -324,7 +326,9 @@ async fn cops_start_server(
Quorum(_) => state
.handle
.spawn(cops::quorum_server_session(config, cancel.clone())),
NitroEnclaves => todo!(),
NitroEnclaves => state
.handle
.spawn(cops::nitro_enclaves_server_session(config, cancel.clone())),
};
*session = Some(AppSession { handle, cancel });
let replaced = state.channel.lock().await.replace(AppChannel {
Expand Down
1 change: 1 addition & 0 deletions src/boson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ impl<E: SendEvent<lamport_mutex::events::UpdateOk<C>>, C> SendEvent<UpdateOk<C>>
}
}

#[derive(Debug, Clone)]
pub struct Cops<E>(pub E);

impl<E: SendEvent<Update<C>>, C> SendEvent<cops::events::Update<C>> for Cops<E> {
Expand Down
Loading

0 comments on commit e000ade

Please sign in to comment.