Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
neonphog committed May 3, 2024
1 parent 3c3b6ae commit 047ad77
Show file tree
Hide file tree
Showing 10 changed files with 588 additions and 360 deletions.
41 changes: 23 additions & 18 deletions crates/tx5-connection/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,35 @@ pub(crate) enum ConnCmd {
SigRecv(tx5_signal::SignalMessage),
}

/// Receive messages from a tx5 connection.
pub struct ConnRecv(tokio::sync::mpsc::Receiver<Vec<u8>>);

impl ConnRecv {
/// Receive up to 16KiB of message data.
pub async fn recv(&mut self) -> Option<Vec<u8>> {
self.0.recv().await
}
}

/// A tx5 connection.
pub struct Tx5Connection {
pub struct Conn {
ready: Arc<tokio::sync::Semaphore>,
pub_key: PubKey,
client: Weak<tx5_signal::SignalConnection>,
pub(crate) cmd_send: tokio::sync::mpsc::Sender<ConnCmd>,
msg_recv: tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Vec<u8>>>,
conn_task: tokio::task::JoinHandle<()>,
}

impl Drop for Tx5Connection {
impl Drop for Conn {
fn drop(&mut self) {
self.conn_task.abort();
}
}

impl Tx5Connection {
impl Conn {
pub(crate) fn priv_new(
pub_key: PubKey,
client: Weak<tx5_signal::SignalConnection>,
) -> Arc<Self> {
) -> (Arc<Self>, ConnRecv, tokio::sync::mpsc::Sender<ConnCmd>) {
// zero len semaphore.. we actually just wait for the close
let ready = Arc::new(tokio::sync::Semaphore::new(0));

Expand Down Expand Up @@ -117,14 +125,16 @@ impl Tx5Connection {
}
});

Arc::new(Self {
ready,
pub_key,
client,
(
Arc::new(Self {
ready,
pub_key,
client,
conn_task,
}),
ConnRecv(msg_recv),
cmd_send,
msg_recv: tokio::sync::Mutex::new(msg_recv),
conn_task,
})
)
}

/// Wait until this connection is ready to send / receive data.
Expand All @@ -146,9 +156,4 @@ impl Tx5Connection {
Err(Error::other("closed"))
}
}

/// Receive up to 16KiB of message data.
pub async fn recv(&self) -> Option<Vec<u8>> {
self.msg_recv.lock().await.recv().await
}
}
60 changes: 33 additions & 27 deletions crates/tx5-connection/src/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,43 @@ enum Cmd {
got_permit: tokio::sync::oneshot::Sender<()>,
},
RemotePermit(tokio::sync::OwnedSemaphorePermit, u32),
Close,
}

/// Receive a framed message on the connection.
pub struct FramedConnRecv(tokio::sync::mpsc::Receiver<Vec<u8>>);

impl FramedConnRecv {
/// Receive a framed message on the connection.
pub async fn recv(&mut self) -> Option<Vec<u8>> {
self.0.recv().await
}
}

/// A framed wrapper that can send and receive larger messages than
/// the base connection.
pub struct Tx5ConnFramed {
pub struct FramedConn {
pub_key: PubKey,
conn: tokio::sync::Mutex<Arc<Tx5Connection>>,
conn: tokio::sync::Mutex<Arc<Conn>>,
cmd_send: tokio::sync::mpsc::Sender<Cmd>,
recv_task: tokio::task::JoinHandle<()>,
cmd_task: tokio::task::JoinHandle<()>,
msg_recv: tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Vec<u8>>>,
}

impl Drop for Tx5ConnFramed {
impl Drop for FramedConn {
fn drop(&mut self) {
self.recv_task.abort();
self.cmd_task.abort();
}
}

impl Tx5ConnFramed {
impl FramedConn {
/// Construct a new framed wrapper around the base connection.
pub async fn new(
conn: Arc<Tx5Connection>,
conn: Arc<Conn>,
mut conn_recv: ConnRecv,
recv_limit: Arc<tokio::sync::Semaphore>,
) -> Result<Self> {
) -> Result<(Self, FramedConnRecv)> {
conn.ready().await;

let (a, b, c, d) = crate::proto::PROTO_VER_2.encode()?;
Expand All @@ -42,17 +53,14 @@ impl Tx5ConnFramed {
let (msg_send, msg_recv) = tokio::sync::mpsc::channel(32);

let cmd_send2 = cmd_send.clone();
let weak_conn = Arc::downgrade(&conn);
let recv_task = tokio::task::spawn(async move {
while let Some(conn) = weak_conn.upgrade() {
if let Some(msg) = conn.recv().await {
if cmd_send2.send(Cmd::Recv(msg)).await.is_err() {
break;
}
} else {
while let Some(msg) = conn_recv.recv().await {
if cmd_send2.send(Cmd::Recv(msg)).await.is_err() {
break;
}
}

let _ = cmd_send2.send(Cmd::Close).await;
});

let cmd_send2 = cmd_send.clone();
Expand Down Expand Up @@ -125,32 +133,30 @@ impl Tx5ConnFramed {
break;
}
}
Cmd::Close => break,
}
}
});

let pub_key = conn.pub_key().clone();

Ok(Self {
pub_key,
conn: tokio::sync::Mutex::new(conn),
cmd_send,
recv_task,
cmd_task,
msg_recv: tokio::sync::Mutex::new(msg_recv),
})
Ok((
Self {
pub_key,
conn: tokio::sync::Mutex::new(conn),
cmd_send,
recv_task,
cmd_task,
},
FramedConnRecv(msg_recv),
))
}

/// The pub key of the remote peer this is connected to.
pub fn pub_key(&self) -> &PubKey {
&self.pub_key
}

/// Receive a message on the connection.
pub async fn recv(&self) -> Option<Vec<u8>> {
self.msg_recv.lock().await.recv().await
}

/// Send a message on the connection.
pub async fn send(&self, msg: Vec<u8>) -> Result<()> {
let conn = self.conn.lock().await;
Expand Down
Loading

0 comments on commit 047ad77

Please sign in to comment.