Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
neonphog committed May 3, 2024
1 parent 047ad77 commit 30da062
Show file tree
Hide file tree
Showing 11 changed files with 356 additions and 462 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion crates/tx5-connection/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use super::*;

pub(crate) enum ConnCmd {
SigRecv(tx5_signal::SignalMessage),
#[allow(dead_code)]
Close,
}

/// Receive messages from a tx5 connection.
Expand Down Expand Up @@ -32,12 +34,13 @@ impl Conn {
pub(crate) fn priv_new(
pub_key: PubKey,
client: Weak<tx5_signal::SignalConnection>,
) -> (Arc<Self>, ConnRecv, tokio::sync::mpsc::Sender<ConnCmd>) {
) -> (Arc<Self>, ConnRecv, Arc<tokio::sync::mpsc::Sender<ConnCmd>>) {
// zero len semaphore.. we actually just wait for the close
let ready = Arc::new(tokio::sync::Semaphore::new(0));

let (msg_send, msg_recv) = tokio::sync::mpsc::channel(32);
let (cmd_send, mut cmd_recv) = tokio::sync::mpsc::channel(32);
let cmd_send = Arc::new(cmd_send);

let ready2 = ready.clone();
let client2 = client.clone();
Expand Down Expand Up @@ -84,6 +87,11 @@ impl Conn {
}
}
}
ConnCmd::Close => {
return Err(Error::other(
"close during handshake",
))
}
}
if got_peer_res && sent_our_res {
break;
Expand Down Expand Up @@ -121,6 +129,7 @@ impl Conn {
_ => (),
}
}
ConnCmd::Close => break,
}
}
});
Expand Down
41 changes: 35 additions & 6 deletions crates/tx5-connection/src/hub.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,28 @@
pub use super::*;

type HubMap = HashMap<PubKey, (Weak<Conn>, tokio::sync::mpsc::Sender<ConnCmd>)>;
type HubMapT =
HashMap<PubKey, (Weak<Conn>, Arc<tokio::sync::mpsc::Sender<ConnCmd>>)>;
struct HubMap(HubMapT);

impl std::ops::Deref for HubMap {
type Target = HubMapT;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl std::ops::DerefMut for HubMap {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

impl HubMap {
pub fn new() -> Self {
Self(HashMap::new())
}
}

async fn hub_map_assert(
pub_key: PubKey,
Expand All @@ -9,14 +31,15 @@ async fn hub_map_assert(
) -> Result<(
Option<ConnRecv>,
Arc<Conn>,
tokio::sync::mpsc::Sender<ConnCmd>,
Arc<tokio::sync::mpsc::Sender<ConnCmd>>,
)> {
let mut found_during_prune = None;

map.retain(|_, c| {
if let Some(f) = c.0.upgrade() {
if f.pub_key() == &pub_key {
found_during_prune = Some((f, c.1.clone()));
if let Some(conn) = c.0.upgrade() {
let cmd_send = c.1.clone();
if conn.pub_key() == &pub_key {
found_during_prune = Some((conn, cmd_send));
}
true
} else {
Expand All @@ -42,7 +65,7 @@ async fn hub_map_assert(
Ok((Some(recv), conn, cmd_send))
}

enum HubCmd {
pub(crate) enum HubCmd {
CliRecv {
pub_key: PubKey,
msg: tx5_signal::SignalMessage,
Expand Down Expand Up @@ -92,6 +115,8 @@ impl Hub {
tx5_signal::SignalConnection::connect(url, config).await?;
let client = Arc::new(client);

tracing::debug!(%url, pub_key = ?client.pub_key(), "hub connected");

let mut task_list = Vec::new();

let (cmd_send, mut cmd_recv) = tokio::sync::mpsc::channel(32);
Expand All @@ -113,6 +138,8 @@ impl Hub {

let (conn_send, conn_recv) = tokio::sync::mpsc::channel(32);
let weak_client = Arc::downgrade(&client);
let url = url.to_string();
let pub_key = client.pub_key().clone();
task_list.push(tokio::task::spawn(async move {
let mut map = HubMap::new();
while let Some(cmd) = cmd_recv.recv().await {
Expand Down Expand Up @@ -159,6 +186,8 @@ impl Hub {
if let Some(client) = weak_client.upgrade() {
client.close().await;
}

tracing::debug!(%url, ?pub_key, "hub close");
}));

Ok((
Expand Down
79 changes: 79 additions & 0 deletions crates/tx5-connection/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,82 @@ async fn framed_end_when_disconnected() {
assert!(hub1.connect(pk2).await.is_err());
assert!(hub2.connect(pk1).await.is_err());
}

#[tokio::test(flavor = "multi_thread")]
async fn base_con_drop_disconnects() {
init_tracing();

let srv = TestSrv::new().await;

let (hub1, _hubr1) = srv.hub().await;
let pk1 = hub1.pub_key().clone();

let (hub2, mut hubr2) = srv.hub().await;
let pk2 = hub2.pub_key().clone();

println!("connect");
let (c1, mut r1) = hub1.connect(pk2.clone()).await.unwrap();
println!("accept");
let (c2, mut r2) = hubr2.accept().await.unwrap();

println!("await ready");
tokio::join!(c1.ready(), c2.ready());
println!("ready");

assert_eq!(&pk1, c2.pub_key());

c1.send(b"hello".to_vec()).await.unwrap();
assert_eq!(b"hello", r2.recv().await.unwrap().as_slice());

c2.send(b"world".to_vec()).await.unwrap();
assert_eq!(b"world", r1.recv().await.unwrap().as_slice());

println!("drop c1");
drop(c1);

println!("check r1");
assert!(r1.recv().await.is_none());
}

#[tokio::test(flavor = "multi_thread")]
async fn framed_con_drop_disconnects() {
init_tracing();

let srv = TestSrv::new().await;

let (hub1, _hubr1) = srv.hub().await;
let pk1 = hub1.pub_key().clone();

let (hub2, mut hubr2) = srv.hub().await;
let pk2 = hub2.pub_key().clone();

let ((c1, mut r1), (c2, mut r2)) = tokio::join!(
async {
let (c1, r2) = hub1.connect(pk2.clone()).await.unwrap();
let limit =
Arc::new(tokio::sync::Semaphore::new(512 * 1024 * 1024));
let f = FramedConn::new(c1, r2, limit).await.unwrap();
f
},
async {
let (c2, r2) = hubr2.accept().await.unwrap();
assert_eq!(&pk1, c2.pub_key());
let limit =
Arc::new(tokio::sync::Semaphore::new(512 * 1024 * 1024));
let f = FramedConn::new(c2, r2, limit).await.unwrap();
f
},
);

c1.send(b"hello".to_vec()).await.unwrap();
assert_eq!(b"hello", r2.recv().await.unwrap().as_slice());

c2.send(b"world".to_vec()).await.unwrap();
assert_eq!(b"world", r1.recv().await.unwrap().as_slice());

println!("drop c1");
drop(c1);

println!("check r1");
assert!(r1.recv().await.is_none());
}
1 change: 1 addition & 0 deletions crates/tx5-signal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ tracing = { workspace = true }
tx5-core = { workspace = true }

[dev-dependencies]
sbd-server = { workspace = true }
tokio = { workspace = true, features = [ "full" ] }
tracing-subscriber = { workspace = true }
Loading

0 comments on commit 30da062

Please sign in to comment.