Skip to content

Commit

Permalink
Spawn instead of select
Browse files Browse the repository at this point in the history
  • Loading branch information
nyonson committed May 16, 2024
1 parent 5462c17 commit 1843555
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 65 deletions.
2 changes: 1 addition & 1 deletion protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ impl PacketHandler {
}

/// Split the handler into separate reader and a writer.
pub fn split(self) -> (PacketReader, PacketWriter) {
pub fn into_split(self) -> (PacketReader, PacketWriter) {
(self.packet_reader, self.packet_writer)
}

Expand Down
79 changes: 47 additions & 32 deletions proxy/src/bin/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ use bitcoin::Network;
use bytes::BytesMut;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::select;

/// Validate and bootstrap proxy connection.
async fn proxy_conn(mut client: TcpStream) -> Result<(), bip324_proxy::Error> {
async fn proxy_conn(client: TcpStream) -> Result<(), bip324_proxy::Error> {
let remote_ip = bip324_proxy::peek_addr(&client)
.await
.expect("peek address");
Expand Down Expand Up @@ -80,47 +79,63 @@ async fn proxy_conn(mut client: TcpStream) -> Result<(), bip324_proxy::Error> {
bip324::Error::MessageLengthTooSmall => continue,
e => panic!("unable to authenticate garbage {}", e),
},
_ => break,
_ => {
println!("Channel authenticated.");
break;
}
}
}
}
}

println!("Channel authenticated.");

println!("Splitting channels.");
let packet_handler = handshake.finalize().expect("finished handshake");
let (mut client_reader, mut client_writer) = client.split();
let (mut remote_reader, mut remote_writer) = remote.split();
let (mut decrypter, mut encrypter) = packet_handler.split();
let (mut client_reader, mut client_writer) = client.into_split();
let (mut remote_reader, mut remote_writer) = remote.into_split();
let (mut decrypter, mut encrypter) = packet_handler.into_split();

println!("Setting up proxy loop.");
loop {
select! {
res = read_v1(&mut client_reader) => {
match res {
Ok(msg) => {
println!("Read {} message from client, writing to remote.", msg.command());
write_v2(&mut remote_writer, &mut encrypter, msg).await.expect("write v2 message");
},
Err(err) => {
panic!("unable to read v1 from client {}", err);
},
println!("Setting up proxy loops.");
tokio::spawn(async move {
loop {
let res = read_v1(&mut client_reader).await;
match res {
Ok(msg) => {
println!(
"Read {} message from client, writing to remote.",
msg.command()
);
write_v2(&mut remote_writer, &mut encrypter, msg)
.await
.expect("write to remote");
}
},
res = read_v2(&mut remote_reader, &mut decrypter) => {
match res {
Ok(msg) => {
println!("Read {} message from remote, writing to client.", msg.command());
write_v1(&mut client_writer, msg).await.expect("write v1 message");
},
Err(err) => {
panic!("unable to read v2 from client {}", err);
},
Err(e) => {
panic!("unable to read from client {}", e);
}
}
}
});

tokio::spawn(async move {
loop {
let res = read_v2(&mut remote_reader, &mut decrypter).await;
match res {
Ok(msg) => {
println!(
"Read {} message from remote, writing to client.",
msg.command()
);
write_v1(&mut client_writer, msg)
.await
.expect("write to client");
}
Err(e) => {
panic!("unable to read from remote {}", e);
}
},
}
}
}
});

Ok(())
}

#[tokio::main]
Expand Down
76 changes: 48 additions & 28 deletions proxy/src/bin/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,65 @@
use bip324_proxy::{read_v1, write_v1};
use tokio::net::{TcpListener, TcpStream};
use tokio::select;

/// Validate and bootstrap proxy connection.
async fn proxy_conn(mut client: TcpStream) -> Result<(), bip324_proxy::Error> {
async fn proxy_conn(client: TcpStream) -> Result<(), bip324_proxy::Error> {
let remote_ip = bip324_proxy::peek_addr(&client).await?;

println!("Initialing remote connection {}.", remote_ip);
let mut remote = TcpStream::connect(remote_ip).await?;
let remote = TcpStream::connect(remote_ip).await?;

let (mut client_reader, mut client_writer) = client.split();
let (mut remote_reader, mut remote_writer) = remote.split();
let (mut client_reader, mut client_writer) = client.into_split();
let (mut remote_reader, mut remote_writer) = remote.into_split();

println!("Setting up proxy loop.");
loop {
select! {
res = read_v1(&mut client_reader) => {
match res {
Ok(msg) => {
println!("Read {} message from client, writing to remote.", msg.command());
write_v1(&mut remote_writer, msg).await?;
},
Err(e) => {
return Err(e);
},

// Spawning two threads instead of selecting on one due
// to the read calls not being cancelable. A select
// drops other futures when one is ready, so it is
// possible that it drops one with half read state.

tokio::spawn(async move {
loop {
let res = read_v1(&mut client_reader).await;
match res {
Ok(msg) => {
println!(
"Read {} message from client, writing to remote.",
msg.command()
);
write_v1(&mut remote_writer, msg)
.await
.expect("write to remote");
}
},
res = read_v1(&mut remote_reader) => {
match res {
Ok(msg) => {
println!("Read {} message from remote, writing to client.", msg.command());
write_v1(&mut client_writer, msg).await?;
},
Err(e) => {
return Err(e);
},
Err(e) => {
panic!("unable to read from client {}", e);
}
},
}
}
}
});

tokio::spawn(async move {
loop {
let res = read_v1(&mut remote_reader).await;
match res {
Ok(msg) => {
println!(
"Read {} message from remote, writing to client.",
msg.command()
);
write_v1(&mut client_writer, msg)
.await
.expect("write to client");
}
Err(e) => {
panic!("unable to read from remote {}", e);
}
}
}
});

Ok(())
}

#[tokio::main]
Expand Down
13 changes: 9 additions & 4 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ pub async fn peek_addr(client: &TcpStream) -> Result<SocketAddr, Error> {
}

/// Read a v1 message off of the input stream.
///
/// This future is not cancelable since state is read multiple times.
pub async fn read_v1<T: AsyncRead + Unpin>(input: &mut T) -> Result<NetworkMessage, Error> {
let mut header_bytes = [0; V1_HEADER_BYTES];
input.read_exact(&mut header_bytes).await?;
Expand All @@ -116,17 +118,20 @@ pub async fn read_v1<T: AsyncRead + Unpin>(input: &mut T) -> Result<NetworkMessa
.expect("4 header length bytes"),
);

let mut payload = vec![0u8; V1_HEADER_BYTES + payload_len as usize];
payload[0..V1_HEADER_BYTES].copy_from_slice(&header_bytes[..]);
input.read_exact(&mut payload[V1_HEADER_BYTES..]).await?;
let mut full_bytes = vec![0u8; V1_HEADER_BYTES + payload_len as usize];
full_bytes[0..V1_HEADER_BYTES].copy_from_slice(&header_bytes[..]);
let payload_bytes = &mut full_bytes[V1_HEADER_BYTES..];
input.read_exact(payload_bytes).await?;

let message = RawNetworkMessage::consensus_decode(&mut &payload[..]).expect("decode v1");
let message = RawNetworkMessage::consensus_decode(&mut &full_bytes[..]).expect("decode v1");

// todo: drop this clone?
Ok(message.payload().clone())
}

/// Read a v2 message off the input stream.
///
/// This future is not cancelable since state is read multiple times.
pub async fn read_v2<T: AsyncRead + Unpin>(
input: &mut T,
decrypter: &mut PacketReader,
Expand Down

0 comments on commit 1843555

Please sign in to comment.