Skip to content

Commit

Permalink
Checkpoint working quorum based mutex protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
sgdxbc committed Apr 19, 2024
1 parent 2e197a0 commit 4bf38b4
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 55 deletions.
10 changes: 5 additions & 5 deletions src/bin/boson_cops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use augustus::{
Dispatch, IndexNet,
},
pbft,
worker::{spawn_backend, Submit},
worker::{spawning_backend, Submit},
workload::{CloseLoop, Iter, Json, Upcall, Workload},
};
use boson_control_messages::{CopsClient, CopsServer, Variant};
Expand Down Expand Up @@ -179,7 +179,7 @@ pub async fn pbft_server_session(
let tcp_listener = TcpListener::bind(addr).await?;
let mut dispatch_session = event::Session::new();
let mut replica_session = Session::new();
let (crypto_worker, mut crypto_executor) = spawn_backend();
let (crypto_worker, mut crypto_executor) = spawning_backend();

let mut dispatch = event::Unify(event::Buffered::from(Dispatch::new(
Tcp::new(addr)?,
Expand Down Expand Up @@ -411,7 +411,7 @@ pub async fn quorum_client_session(
let mut dispatch_session = event::Session::new();
let mut client_session = Session::new();
let mut close_loop_session = Session::new();
let (crypto_worker, mut crypto_executor) = spawn_backend();
let (crypto_worker, mut crypto_executor) = spawning_backend();

let mut dispatch = event::Unify(event::Buffered::from(Dispatch::new(
Tcp::new(addr)?,
Expand Down Expand Up @@ -504,8 +504,8 @@ pub async fn quorum_server_session(
let mut clock_dispatch_session = event::Session::new();
let mut replica_session = Session::new();
let mut clock_session = Session::new();
let (client_crypto_worker, mut client_crypto_executor) = spawn_backend();
let (crypto_worker, mut crypto_executor) = spawn_backend();
let (client_crypto_worker, mut client_crypto_executor) = spawning_backend();
let (crypto_worker, mut crypto_executor) = spawning_backend();

let mut dispatch = event::Unify(event::Buffered::from(Dispatch::new(
Tcp::new(addr)?,
Expand Down
61 changes: 45 additions & 16 deletions src/bin/boson_mutex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,15 @@ use augustus::{
erased::{events::Init, session::Sender, Blanket, Buffered, Session, Unify},
Once, SendEvent,
},
lamport_mutex::{
self, events::RequestOk, Causal, Lamport, LamportClock, Processor, Replicated,
},
lamport_mutex::{self, events::RequestOk, Causal, Lamport, LamportClock, Replicated},
net::{
deserialize, dispatch,
events::Recv,
session::{tcp, Tcp},
Detach, Dispatch, IndexNet, InvokeNet,
},
pbft,
worker::{spawn_backend, Submit, Worker},
worker::{spawning_backend, Submit, Worker},
workload::{events::InvokeOk, Queue},
};
use rand::thread_rng;
Expand All @@ -36,10 +34,13 @@ pub async fn untrusted_session(
upcall: impl SendEvent<RequestOk> + Send + Sync + 'static,
cancel: CancellationToken,
) -> anyhow::Result<()> {
use lamport_mutex::Processor;

let boson_control_messages::Mutex {
id,
addrs,
variant: boson_control_messages::Variant::Untrusted,
..
} = config
else {
anyhow::bail!("unimplemented")
Expand Down Expand Up @@ -115,12 +116,16 @@ pub async fn replicated_session(
upcall: impl SendEvent<RequestOk> + Send + Sync + 'static,
cancel: CancellationToken,
) -> anyhow::Result<()> {
use augustus::crypto::{Crypto, CryptoFlavor};
use augustus::{
crypto::{Crypto, CryptoFlavor},
lamport_mutex::Processor,
};

let boson_control_messages::Mutex {
id,
addrs,
variant: boson_control_messages::Variant::Replicated(config),
..
} = config
else {
anyhow::bail!("unimplemented")
Expand Down Expand Up @@ -242,12 +247,16 @@ pub async fn quorum_session(
upcall: impl SendEvent<RequestOk> + Send + Sync + 'static,
cancel: CancellationToken,
) -> anyhow::Result<()> {
use augustus::crypto::peer::{Crypto, Verifiable};
use augustus::{
crypto::peer::{Crypto, Verifiable},
lamport_mutex::verifiable::Processor,
};

let boson_control_messages::Mutex {
id,
addrs,
variant: boson_control_messages::Variant::Quorum(config),
num_faulty,
} = config
else {
anyhow::bail!("unimplemented")
Expand All @@ -264,14 +273,23 @@ pub async fn quorum_session(
let mut processor_session = Session::new();
let mut causal_net_session = Session::new();
let mut clock_session = Session::new();
let (recv_crypto_worker, mut recv_crypto_executor) = spawn_backend();
let (crypto_worker, mut crypto_executor) = spawn_backend();
// verify clocked messages sent by other processors before they are received causal net
// let (recv_crypto_worker, mut recv_crypto_executor) = spawning_backend();
// owned by quorum client
let (crypto_worker, mut crypto_executor) = spawning_backend();
// sign Ordered messages
let (processor_crypto_worker, mut processor_crypto_executor) = spawning_backend();

let mut dispatch = event::Unify(event::Buffered::from(Dispatch::new(
Tcp::new(addr)?,
{
let mut sender = VerifyQuorumClock::new(config.num_faulty, recv_crypto_worker);
move |buf: &_| lamport_mutex::on_buf(buf, &mut sender)
// let mut clocked_sender = VerifyQuorumClock::new(config.num_faulty, recv_crypto_worker);
let mut clocked_sender = VerifyQuorumClock::new(
config.num_faulty,
Worker::Inline(crypto.clone(), Sender::from(causal_net_session.sender())),
);
let mut sender = Sender::from(processor_session.sender());
move |buf: &_| lamport_mutex::verifiable::on_buf(buf, &mut clocked_sender, &mut sender)
},
Once(dispatch_session.sender()),
)?));
Expand All @@ -286,18 +304,20 @@ pub async fn quorum_session(
let mut processor = Blanket(Unify(Processor::new(
id,
config.addrs.len(),
num_faulty,
|_| QuorumClock::default(),
Detach(Sender::from(causal_net_session.sender())),
lamport_mutex::verifiable::SignOrdered::new(processor_crypto_worker),
upcall,
)));
let mut causal_net = Blanket(Unify(Causal::new(
QuorumClock::default(),
Box::new(Sender::from(processor_session.sender()))
as Box<dyn lamport_mutex::SendRecvEvent<QuorumClock> + Send + Sync>,
boson::Lamport(Sender::from(clock_session.sender()), id as _),
lamport_mutex::MessageNet::<_, QuorumClock>::new(IndexNet::new(
lamport_mutex::verifiable::MessageNet::<_, QuorumClock>::new(IndexNet::new(
dispatch::Net::from(dispatch_session.sender()),
addrs,
addrs.clone(),
// intentionally sending loopback messages as expected by processor protocol
None,
)),
Expand Down Expand Up @@ -343,9 +363,17 @@ pub async fn quorum_session(
let tcp_accept_session = tcp::accept_session(tcp_listener, dispatch_session.sender());
let clock_tcp_accept_session =
tcp::accept_session(clock_tcp_listener, clock_dispatch_session.sender());
let recv_crypto_session =
recv_crypto_executor.run(crypto.clone(), Sender::from(causal_net_session.sender()));
let crypto_session = crypto_executor.run(crypto, Sender::from(clock_session.sender()));
// let recv_crypto_session =
// recv_crypto_executor.run(crypto.clone(), Sender::from(causal_net_session.sender()));
let crypto_session = crypto_executor.run(crypto.clone(), Sender::from(clock_session.sender()));
let processor_crypto_session = processor_crypto_executor.run(
crypto,
lamport_mutex::verifiable::MessageNet::<_, QuorumClock>::new(IndexNet::new(
dispatch::Net::from(dispatch_session.sender()),
addrs,
None,
)),
);
let dispatch_session = dispatch_session.run(&mut dispatch);
let clock_dispatch_session = clock_dispatch_session.run(&mut clock_dispatch);
let processor_session = processor_session.run(&mut processor);
Expand All @@ -361,8 +389,9 @@ pub async fn quorum_session(
result = clock_dispatch_session => result?,
result = processor_session => result?,
result = causal_net_session => result?,
result = recv_crypto_session => result?,
// result = recv_crypto_session => result?,
result = crypto_session => result?,
result = processor_crypto_session => result?,
result = clock_session => result?,
}
anyhow::bail!("unreachable")
Expand Down
6 changes: 3 additions & 3 deletions src/bin/boson_quorum/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use augustus::{
session::{tcp, Tcp},
Dispatch, MessageNet,
},
worker::spawn_backend,
worker::spawning_backend,
};
use rand::thread_rng;
use tokio::net::TcpListener;
Expand All @@ -30,8 +30,8 @@ pub async fn session(

let mut dispatch_session = event::Session::new();
let mut server_session = Session::new();
let (recv_crypto_worker, mut recv_crypto_executor) = spawn_backend();
let (send_crypto_worker, mut send_crypto_executor) = spawn_backend();
let (recv_crypto_worker, mut recv_crypto_executor) = spawning_backend();
let (send_crypto_worker, mut send_crypto_executor) = spawning_backend();

let mut dispatch = event::Unify(event::Buffered::from(Dispatch::new(
Tcp::new(addr)?,
Expand Down
4 changes: 2 additions & 2 deletions src/bin/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use augustus::{
},
net::{session::Udp, IndexNet, Payload},
pbft, unreplicated,
worker::{spawn_backend, Submit},
worker::{spawning_backend, Submit},
workload::{
self,
events::{Invoke, InvokeOk},
Expand Down Expand Up @@ -375,7 +375,7 @@ async fn start_replica(State(state): State<AppState>, Json(config): Json<Replica
))
}
Protocol::Pbft => {
let (crypto_worker, mut crypto_executor) = spawn_backend();
let (crypto_worker, mut crypto_executor) = spawning_backend();
let state = Blanket(Buffered::from(pbft::Replica::new(
config.replica_id,
app,
Expand Down
5 changes: 4 additions & 1 deletion src/boson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub struct AnnounceOk {
pub struct QuorumClock {
plain: DefaultVersion, // redundant, just for easier use
#[derive_where(skip)]
id: u64, // to break tie in `arbitrary_cmp`, should not consider by `PartialOrd`
#[derive_where(skip)]
cert: Vec<Verifiable<AnnounceOk>>,
}

Expand All @@ -52,7 +54,7 @@ impl DepOrd for QuorumClock {

impl lamport_mutex::Clock for QuorumClock {
fn arbitrary_cmp(&self, other: &Self) -> std::cmp::Ordering {
self.plain.sum().cmp(&other.plain.sum())
(self.plain.sum(), self.id).cmp(&(other.plain.sum(), other.id))
}
}

Expand Down Expand Up @@ -262,6 +264,7 @@ impl<
let announce_ok = announce_ok.into_inner();
let clock = QuorumClock {
plain: announce_ok.plain,
id: announce_ok.id,
cert: working_state.replies.into_values().collect(),
};
self.upcall.send((announce_ok.id, clock))?
Expand Down
Loading

0 comments on commit 4bf38b4

Please sign in to comment.