diff --git a/src/bin/boson/cops.rs b/src/bin/boson/cops.rs index 337dbde0..03f8014f 100644 --- a/src/bin/boson/cops.rs +++ b/src/bin/boson/cops.rs @@ -2,7 +2,7 @@ use std::{future::Future, mem::take, net::SocketAddr, ops::Range, pin::Pin, time use augustus::{ app::{self, ycsb, App}, - boson::{self, QuorumClient, QuorumClock, VerifyClock}, + boson::{self, nitro_enclaves_portal_session, QuorumClient, QuorumClock, VerifyClock}, cops::{self, OrdinaryVersion, OrdinaryVersionService}, event::{ self, @@ -22,7 +22,7 @@ use augustus::{ use boson_control_messages::{CopsClient, CopsServer, Variant}; use rand::{rngs::StdRng, thread_rng, Rng, SeedableRng}; use rand_distr::Uniform; -use tokio::{net::TcpListener, task::JoinSet, time::sleep}; +use tokio::{net::TcpListener, sync::mpsc::unbounded_channel, task::JoinSet, time::sleep}; use tokio_util::sync::CancellationToken; fn create_workload( @@ -43,18 +43,27 @@ fn create_workload( settings0.request_distr = ycsb::SettingsDistr::Uniform; settings0.field_count = 1; let mut workload0 = ycsb::Workload::new(StdRng::from_rng(&mut rng)?, settings0)?; - workload0.key_num = ycsb::Gen::Uniform(Uniform::from(put_range)); + // workload0.key_num = ycsb::Gen::Uniform(Uniform::from(put_range)); + workload0.key_num = ycsb::Gen::Uniform(Uniform::from(put_range.clone())); + let mut settings1 = ycsb::WorkloadSettings::new_c(record_count); settings1.request_distr = ycsb::SettingsDistr::Uniform; settings1.field_count = 1; - let workload1 = ycsb::Workload::new(StdRng::from_rng(&mut rng)?, settings1)?; + // let workload1 = ycsb::Workload::new(StdRng::from_rng(&mut rng)?, settings1)?; + let mut workload1 = ycsb::Workload::new(StdRng::from_rng(&mut rng)?, settings1)?; + workload1.key_num = ycsb::Gen::Uniform(Uniform::from(put_range)); + anyhow::ensure!(put_ratio <= 0.5); - let workload = Weighted2(workload0, workload1, StdRng::from_rng(rng)?, put_ratio * 2.); - Ok(workload) + Ok(Weighted2( + workload0, + workload1, + StdRng::from_rng(rng)?, + put_ratio * 2., + )) } -// this is a humor function :) a typical consequence of "i am willing to pay any -// cost as long as i can get rid of redundant code" +// this is a humorous function :) a typical consequence of "i am willing to pay +// any cost as long as i can get rid of redundant code" // maybe its seemingly ridiculous just comes from the fact that i have to write // down all the types, instead of letting compiler to infer them out, as the // case of Haskell and OCaml @@ -126,7 +135,7 @@ pub async fn pbft_client_session( let addr = SocketAddr::from((ip, tcp_listener.local_addr()?.port())); // println!("client {addr} listening {:?}", tcp_listener.local_addr()); let workload = create_workload( - &mut StdRng::seed_from_u64(117418 + index as u64), + &mut thread_rng(), record_count, put_range.clone(), put_ratio, @@ -294,12 +303,12 @@ pub async fn untrusted_client_session( anyhow::ensure!(addrs.is_empty()); let mut sessions = JoinSet::new(); - for index in 0..num_concurrent { + for _ in 0..num_concurrent { // let tcp_listener = TcpListener::bind((ip, 0)).await?; let tcp_listener = TcpListener::bind(SocketAddr::from(([0; 4], 0))).await?; let addr = SocketAddr::from((ip, tcp_listener.local_addr()?.port())); let workload = create_workload( - &mut StdRng::seed_from_u64(117418 + index as u64), + &mut thread_rng(), record_count, put_range.clone(), put_ratio, @@ -447,12 +456,12 @@ pub async fn quorum_client_session( // let crypto = Crypto::new_random(&mut thread_rng()); let mut sessions = JoinSet::new(); - for index in 0..num_concurrent { + for _ in 0..num_concurrent { // let tcp_listener = TcpListener::bind((ip, 0)).await?; let tcp_listener = TcpListener::bind(SocketAddr::from(([0; 4], 0))).await?; let addr = SocketAddr::from((ip, tcp_listener.local_addr()?.port())); let workload = create_workload( - &mut StdRng::seed_from_u64(117418 + index as u64), + &mut thread_rng(), record_count, put_range.clone(), put_ratio, @@ -536,7 +545,6 @@ pub async fn quorum_server_session( anyhow::bail!("unimplemented") }; let addr = addrs[id as usize]; - // replica also don't sign any clock let crypto = Crypto::new_random(&mut thread_rng()); // let tcp_listener = TcpListener::bind(addr).await?; @@ -635,4 +643,175 @@ pub async fn quorum_server_session( anyhow::bail!("unreachable") } +pub async fn nitro_enclaves_client_session( + config: CopsClient, + upcall: impl Clone + SendEvent<(f32, Duration)> + Send + Sync + 'static, +) -> anyhow::Result<()> { + // use augustus::crypto::peer::Crypto; + + let CopsClient { + mut addrs, + ip, + num_concurrent, + put_ratio, + record_count, + put_range, + variant: Variant::NitroEnclaves, + } = config + else { + anyhow::bail!("unimplemented") + }; + let replica_addr = addrs.remove(0); + anyhow::ensure!(addrs.is_empty()); + // let crypto = Crypto::new_random(&mut thread_rng()); + + let mut sessions = JoinSet::new(); + for _ in 0..num_concurrent { + // let tcp_listener = TcpListener::bind((ip, 0)).await?; + let tcp_listener = TcpListener::bind(SocketAddr::from(([0; 4], 0))).await?; + let addr = SocketAddr::from((ip, tcp_listener.local_addr()?.port())); + let workload = create_workload( + &mut thread_rng(), + record_count, + put_range.clone(), + put_ratio, + )?; + + let mut dispatch_session = event::Session::new(); + let mut client_session = Session::new(); + let mut close_loop_session = Session::new(); + // let (crypto_worker, mut crypto_executor) = spawning_backend(); + + let mut dispatch = event::Unify(event::Buffered::from(Dispatch::new( + Tcp::new(addr)?, + { + // let mut sender = boson::VerifyQuorumClock::new(config.num_faulty, crypto_worker); + let mut sender = Sender::from(client_session.sender()); + move |buf: &_| cops::to_client_on_buf(buf, &mut sender) + }, + Once(dispatch_session.sender()), + )?)); + let mut client = Blanket(Buffered::from(cops::Client::< + _, + _, + boson::NitroEnclavesClock, + _, + >::new( + addr, + replica_addr, + cops::ToReplicaMessageNet::new(dispatch::Net::from(dispatch_session.sender())), + Box::new(Sender::from(close_loop_session.sender())) + as Box, + ))); + let mut close_loop = Blanket(Unify(CloseLoop::new( + Sender::from(client_session.sender()), + ycsb::Destruct::from(OpLatency::new(workload)), + ))); + + let mut upcall = upcall.clone(); + // let crypto = crypto.clone(); + sessions.spawn(async move { + let tcp_accept_session = tcp::accept_session(tcp_listener, dispatch_session.sender()); + // let crypto_session = crypto_executor.run(crypto, Sender::from(client_session.sender())); + let dispatch_session = dispatch_session.run(&mut dispatch); + let client_session = client_session.run(&mut client); + let close_loop_session = async move { + Sender::from(close_loop_session.sender()).send(Init)?; + let (throughput, latency) = benchmark_session( + &mut close_loop_session, + &mut close_loop, + |session, state| Box::pin(session.run(state)), + |state| state.workload.as_mut(), + ) + .await?; + upcall.send((throughput, latency)) + }; + tokio::select! { + result = tcp_accept_session => result?, + // result = crypto_session => result?, + result = dispatch_session => result?, + result = client_session => result?, + result = close_loop_session => return result, + } + anyhow::bail!("unreachable") + }); + } + while let Some(result) = sessions.join_next().await { + result?? + } + Ok(()) +} + +pub async fn nitro_enclaves_server_session( + config: CopsServer, + cancel: CancellationToken, +) -> anyhow::Result<()> { + let CopsServer { + addrs, + id, + record_count, + variant: Variant::NitroEnclaves, + } = config + else { + anyhow::bail!("unimplemented") + }; + let addr = addrs[id as usize]; + + // let tcp_listener = TcpListener::bind(addr).await?; + let tcp_listener = TcpListener::bind(SocketAddr::from(([0; 4], addr.port()))).await?; + + let mut dispatch_session = event::Session::new(); + let mut replica_session = Session::new(); + let (clock_sender, clock_receiver) = unbounded_channel(); + let (client_crypto_worker, mut client_crypto_executor) = spawning_backend(); + + let mut dispatch = event::Unify(event::Buffered::from(Dispatch::new( + Tcp::new(addr)?, + { + let mut sender = VerifyClock::new(0, client_crypto_worker); + move |buf: &_| cops::to_replica_on_buf(buf, &mut sender) + }, + Once(dispatch_session.sender()), + )?)); + let mut replica = Blanket(Buffered::from( + cops::Replica::<_, _, _, _, SocketAddr>::new( + boson::NitroEnclavesClock::try_from(OrdinaryVersion::default())?, + cops::ToReplicaMessageNet::<_, _, SocketAddr>::new(IndexNet::new( + dispatch::Net::from(dispatch_session.sender()), + addrs, + id as usize, + )), + cops::ToClientMessageNet::new(dispatch::Net::from(dispatch_session.sender())), + boson::Cops(clock_sender), + ), + )); + let mut settings = ycsb::WorkloadSettings::new(record_count); + settings.field_count = 1; + let mut workload = ycsb::Workload::new(StdRng::seed_from_u64(117418), settings)?; + let mut workload = Iter::<_, _, ()>::from(workload.startup_ops()); + while let Some((op, ())) = workload.next_op()? { + replica.startup_insert(op)? + } + + let tcp_accept_session = tcp::accept_session(tcp_listener, dispatch_session.sender()); + let client_crypto_session = + client_crypto_executor.run((), Sender::from(replica_session.sender())); + let dispatch_session = dispatch_session.run(&mut dispatch); + let clock_session = nitro_enclaves_portal_session( + 16, + clock_receiver, + boson::Cops(Sender::from(replica_session.sender())), + ); + let replica_session = replica_session.run(&mut replica); + tokio::select! { + () = cancel.cancelled() => return Ok(()), + result = tcp_accept_session => result?, + result = client_crypto_session => result?, + result = dispatch_session => result?, + result = clock_session => result?, + result = replica_session => result?, + } + anyhow::bail!("unreachable") +} + // cSpell:words pbft upcall diff --git a/src/bin/boson/main.rs b/src/bin/boson/main.rs index 092dcefa..5d8db905 100644 --- a/src/bin/boson/main.rs +++ b/src/bin/boson/main.rs @@ -249,7 +249,9 @@ async fn cops_start_client( Quorum(_) => state .handle .spawn(cops::quorum_client_session(config, upcall_sender)), - NitroEnclaves => todo!(), + NitroEnclaves => state + .handle + .spawn(cops::nitro_enclaves_client_session(config, upcall_sender)), }; *session = Some(AppSession { handle, cancel }); let replaced = state.channel.lock().await.replace(AppChannel { @@ -324,7 +326,9 @@ async fn cops_start_server( Quorum(_) => state .handle .spawn(cops::quorum_server_session(config, cancel.clone())), - NitroEnclaves => todo!(), + NitroEnclaves => state + .handle + .spawn(cops::nitro_enclaves_server_session(config, cancel.clone())), }; *session = Some(AppSession { handle, cancel }); let replaced = state.channel.lock().await.replace(AppChannel { diff --git a/src/boson.rs b/src/boson.rs index 802f9e7a..8d825f28 100644 --- a/src/boson.rs +++ b/src/boson.rs @@ -40,6 +40,7 @@ impl>, C> SendEvent> } } +#[derive(Debug, Clone)] pub struct Cops(pub E); impl>, C> SendEvent> for Cops { diff --git a/src/cops.rs b/src/cops.rs index 2c63bf34..32eaf020 100644 --- a/src/cops.rs +++ b/src/cops.rs @@ -183,7 +183,7 @@ impl Client { pub struct InvokeTimeout; impl InvokeTimeout { - const AFTER: Duration = Duration::from_millis(800); + const AFTER: Duration = Duration::from_millis(1200); } impl, U, V: Version, A: Addr> OnEvent> for Client { @@ -247,15 +247,15 @@ impl>, V: Version, A> OnEvent>, V: Version, A> OnEvent Replica { sync_key.version_deps.partial_cmp(&state.version_deps), Some(Ordering::Greater) ) { - // + warn!("malformed SyncKey"); return Ok(()); } state.value = sync_key.value; @@ -526,18 +526,18 @@ impl OnEvent>> for Replica>, _: &mut impl Timer, ) -> anyhow::Result<()> { - if !self.can_sync(&sync_key) { - self.pending_sync_keys.push(sync_key); - return Ok(()); - } - self.apply_sync(sync_key)?; - // for sync_key in take(&mut self.pending_sync_keys) { - // if !self.can_sync(&sync_key) { - // self.pending_sync_keys.push(sync_key); - // continue; - // } - // self.apply_sync(sync_key)? + // if !self.can_sync(&sync_key) { + // self.pending_sync_keys.push(sync_key); + // return Ok(()); // } + self.apply_sync(sync_key)?; + for sync_key in take(&mut self.pending_sync_keys) { + if !self.can_sync(&sync_key) { + self.pending_sync_keys.push(sync_key); + continue; + } + self.apply_sync(sync_key)? + } Ok(()) } } diff --git a/src/event.rs b/src/event.rs index d0d5003d..641cc67b 100644 --- a/src/event.rs +++ b/src/event.rs @@ -329,7 +329,7 @@ pub mod erased { } } - #[derive(Debug)] + #[derive_where(Debug, Clone; E)] pub struct Erasure(E, std::marker::PhantomData<(S, T)>); impl From for Erasure { @@ -338,12 +338,6 @@ pub mod erased { } } - impl Clone for Erasure { - fn clone(&self) -> Self { - Self::from(self.0.clone()) - } - } - impl< E: SendEvent, T>>, S: OnEventFixTimer, diff --git a/tools/boson-control/src/main.rs b/tools/boson-control/src/main.rs index e4ca2703..9344e937 100644 --- a/tools/boson-control/src/main.rs +++ b/tools/boson-control/src/main.rs @@ -64,8 +64,8 @@ async fn main() -> anyhow::Result<()> { client_instances.clone(), instances.clone(), clock_instances.clone(), - Variant::Untrusted, - 10, + Variant::NitroEnclaves, + 4, 0.1, ) .await?; @@ -586,7 +586,7 @@ async fn cops_client_session( .json::>() .await?; if let Some(result) = result { - println!("{result:?}"); + println!("{result:?} {url}"); out.lock() .map_err(|err| anyhow::format_err!("{err}"))? .push(result);