diff --git a/proxy/src/bin/v1.rs b/proxy/src/bin/v1.rs index 6a5549d..04ff294 100644 --- a/proxy/src/bin/v1.rs +++ b/proxy/src/bin/v1.rs @@ -1,30 +1,12 @@ -use bitcoin::consensus::Decodable; -use bitcoin::p2p::{Address, Magic}; -use hex::prelude::*; -use std::io; +//! Simple V1 proxy to test hooking things up end to end. + use tokio::net::{TcpListener, TcpStream}; use tokio::select; -const PROXY: &str = "127.0.0.1:1324"; -const M: Magic = Magic::SIGNET; - -async fn init_outbound_conn(mut client: TcpStream) -> Result<(), Box> { - println!("Validating client connection."); - // Peek the first 70 bytes, 24 for the header and 46 for the first part of the version message. - let mut peek_bytes = [0; 70]; - let n = client.peek(&mut peek_bytes).await?; - println!("Bytes read from local connection: {n}"); - println!("Got magic: {}", &peek_bytes[0..4].to_lower_hex_string()); - if M.to_bytes().ne(&peek_bytes[0..4]) { - return Err(Box::new(io::Error::new( - io::ErrorKind::Other, - "Invalid magic.", - ))); - } +/// Validate and bootstrap proxy connection. +async fn proxy_conn(mut client: TcpStream) -> Result<(), Box> { + let remote_ip = bip324_proxy::peek_addr(&client).await?; - let mut addr_bytes = &peek_bytes[44..]; - let remote_addr = Address::consensus_decode(&mut addr_bytes).expect("network address bytes"); - let remote_ip = remote_addr.socket_addr().expect("IP"); println!("Initialing remote connection {}.", remote_ip); let mut remote = TcpStream::connect(remote_ip).await?; @@ -37,15 +19,15 @@ async fn init_outbound_conn(mut client: TcpStream) -> Result<(), Box { println!("Responded to {} with {bytes} bytes.", remote_ip); if bytes == 0 { - return Err(Box::new(io::Error::new( - io::ErrorKind::Other, + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, "Client closed connection. Disconnecting.", ))); } }, Err(_) => { - return Err(Box::new(io::Error::new( - io::ErrorKind::Other, + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, "Remote closed connection: Disconnecting.", ))); }, @@ -56,15 +38,15 @@ async fn init_outbound_conn(mut client: TcpStream) -> Result<(), Box { println!("Responded to local with {bytes} bytes."); if bytes == 0 { - return Err(Box::new(io::Error::new( - io::ErrorKind::Other, + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, "Client closed connection. Disconnecting.", ))); } }, Err(_) => { - return Err(Box::new(io::Error::new( - io::ErrorKind::Other, + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, "Client closed connection. Disconnecting.", ))); }, @@ -76,17 +58,21 @@ async fn init_outbound_conn(mut client: TcpStream) -> Result<(), Box { println!("Ended connection with no errors."); } diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs new file mode 100644 index 0000000..4763268 --- /dev/null +++ b/proxy/src/lib.rs @@ -0,0 +1,33 @@ +use std::{error::Error, net::SocketAddr}; + +use bitcoin::consensus::Decodable; +use bitcoin::p2p::{Address, Magic}; +use hex::prelude::*; +use tokio::net::TcpStream; + +/// Default to local host on port 1324. +pub const DEFAULT_PROXY: &str = "127.0.0.1:1324"; +/// Default to the signet network. +const DEFAULT_MAGIC: Magic = Magic::SIGNET; + +/// Peek the input stream and pluck the remote address based on the version message. +pub async fn peek_addr(client: &TcpStream) -> Result> { + println!("Validating client connection."); + // Peek the first 70 bytes, 24 for the header and 46 for the first part of the version message. + let mut peek_bytes = [0; 70]; + let n = client.peek(&mut peek_bytes).await?; + println!("Bytes read from local connection: {n}"); + println!("Got magic: {}", &peek_bytes[0..4].to_lower_hex_string()); + if DEFAULT_MAGIC.to_bytes().ne(&peek_bytes[0..4]) { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + "Invalid magic.", + ))); + } + + let mut addr_bytes = &peek_bytes[44..]; + let remote_addr = Address::consensus_decode(&mut addr_bytes).expect("network address bytes"); + let socket_addr = remote_addr.socket_addr().expect("IP"); + + Ok(socket_addr) +} diff --git a/proxy/src/main.rs b/proxy/src/main.rs index 455d796..4846571 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -1,9 +1,4 @@ use bip324::{initialize_v2_handshake, initiator_complete_v2_handshake, PacketHandler}; -use bitcoin::p2p::Magic; -use bitcoin::{ - consensus::Decodable, - p2p::{message::RawNetworkMessage, message_network::VersionMessage}, -}; use core::fmt; use hex::prelude::*; use std::io; @@ -12,8 +7,6 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::select; use tokio::sync::mpsc; -const PROXY: &str = "127.0.0.1:1324"; -const M: Magic = Magic::SIGNET; type ChannelMessage = Result<(Vec, SendTo), PeerError>; enum SendTo { @@ -37,56 +30,32 @@ impl fmt::Display for PeerError { } } -async fn init_outbound_conn(mut sock: TcpStream) -> Result<(), Box> { - println!("Initialing outbound connection."); - let (mut proxy_reader, mut proxy_writer) = sock.split(); - let mut buf_reader = BufReader::new(&mut proxy_reader); - let mut buffer = Vec::new(); - let n = buf_reader.read_to_end(&mut buffer).await?; - println!("Bytes read from local connection: {n}"); - let recv_magic: [u8; 4] = buffer[..4].try_into()?; - println!("Got magic: {}", recv_magic.to_lower_hex_string()); - if M.to_bytes().ne(&recv_magic) { - return Err(Box::new(io::Error::new( - io::ErrorKind::Other, - "Invalid magic.", - ))); - } - println!("Matches our network."); - let mut cursor = std::io::Cursor::new(buffer.clone()); - let msg = RawNetworkMessage::consensus_decode(&mut cursor)?; - let command = msg.payload().command(); - println!("Message command: {}.", command.to_string()); - if !command.to_string().eq("version") { - return Err(Box::new(io::Error::new( - io::ErrorKind::Other, - "Connections must open with Version message.", - ))); - } - let version = buffer.clone(); - let payload = buffer[24..].to_vec(); - let mut cursor = std::io::Cursor::new(payload); - let ver = VersionMessage::consensus_decode_from_finite_reader(&mut cursor)?; - let remote_addr = ver.receiver.socket_addr()?; - println!("Reaching out to {}.", remote_addr.to_string()); - let mut outbound = TcpStream::connect(remote_addr).await?; - let handshake = initialize_v2_handshake(None)?; - println!("Initiating handshake."); - outbound.write_all(&version).await?; - println!("Sent handshake to remote."); - let (mut remote_reader, mut remote_writer) = outbound.split(); - let mut buf_reader = BufReader::new(&mut remote_reader); - let mut buffer = Vec::new(); - println!("Reading handshake response from remote."); - let n = buf_reader.read_to_end(&mut buffer).await?; - println!("Bytes read from remote host: {n}"); - println!("{}", &buffer.to_lower_hex_string()); - if n < 64 { - return Err(Box::new(io::Error::new( - io::ErrorKind::Other, - "Remote cannot perform V2 handshake. Disconnecting.", - ))); - } +/// Validate and bootstrap proxy connection. +async fn proxy_conn(mut client: TcpStream) -> Result<(), Box> { + let remote_ip = bip324_proxy::peek_addr(&client).await?; + + println!("Reaching out to {}.", remote_ip); + let mut outbound = TcpStream::connect(remote_ip).await?; + + // let handshake = initialize_v2_handshake(None)?; + + // println!("Initiating handshake."); + // outbound.write_all(&version).await?; + // println!("Sent handshake to remote."); + // let (mut remote_reader, mut remote_writer) = outbound.split(); + // let mut buf_reader = BufReader::new(&mut remote_reader); + // let mut buffer = Vec::new(); + // println!("Reading handshake response from remote."); + // let n = buf_reader.read_to_end(&mut buffer).await?; + // println!("Bytes read from remote host: {n}"); + // println!("{}", &buffer.to_lower_hex_string()); + // if n < 64 { + // return Err(Box::new(io::Error::new( + // io::ErrorKind::Other, + // "Remote cannot perform V2 handshake. Disconnecting.", + // ))); + // } + // println!("Completing handshake."); // let finish_handshake = initiator_complete_v2_handshake(buffer, handshake, false)?; // remote_writer.write_all(&finish_handshake.message).await?; @@ -130,84 +99,88 @@ async fn init_outbound_conn(mut sock: TcpStream) -> Result<(), Box, - mut remote: TcpStream, - mut packet_handler: PacketHandler, -) -> Result<(), PeerError> { - loop { - let mut buffer = Vec::new(); - let n = remote - .read_to_end(&mut buffer) - .await - .map_err(|e| PeerError::BytesReadError)?; - println!("Got a message from remote."); - if n == 0 { - println!("Remote node disconnected."); - return Ok(()); - } - println!("Decrypting messages."); - let messages = packet_handler - .receive_v2_packets(buffer, None) - .map_err(|e| PeerError::DecryptionFailure)?; - for message in messages { - if let Some(message) = message.message { - let mut cursor = std::io::Cursor::new(message.clone()); - let msg = RawNetworkMessage::consensus_decode(&mut cursor) - .map_err(|e| PeerError::UnknownMessage)?; - let command = msg.payload().command(); - println!( - "Received a message from remote with command: {}.", - command.to_string() - ); - channel - .send(Ok((message, SendTo::Local))) - .await - .map_err(|e| PeerError::BytesReadError)?; - } - } - } -} +// async fn communicate_outbound( +// channel: tokio::sync::mpsc::Sender, +// mut remote: TcpStream, +// mut packet_handler: PacketHandler, +// ) -> Result<(), PeerError> { +// loop { +// let mut buffer = Vec::new(); +// let n = remote +// .read_to_end(&mut buffer) +// .await +// .map_err(|e| PeerError::BytesReadError)?; +// println!("Got a message from remote."); +// if n == 0 { +// println!("Remote node disconnected."); +// return Ok(()); +// } +// println!("Decrypting messages."); +// let messages = packet_handler +// .receive_v2_packets(buffer, None) +// .map_err(|e| PeerError::DecryptionFailure)?; +// for message in messages { +// if let Some(message) = message.message { +// let mut cursor = std::io::Cursor::new(message.clone()); +// let msg = RawNetworkMessage::consensus_decode(&mut cursor) +// .map_err(|e| PeerError::UnknownMessage)?; +// let command = msg.payload().command(); +// println!( +// "Received a message from remote with command: {}.", +// command.to_string() +// ); +// channel +// .send(Ok((message, SendTo::Local))) +// .await +// .map_err(|e| PeerError::BytesReadError)?; +// } +// } +// } +// } -async fn communicate_local( - channel: tokio::sync::mpsc::Sender, - mut local: TcpStream, - mut packet_handler: PacketHandler, -) -> Result<(), PeerError> { - loop { - let mut buffer = Vec::new(); - let n = local - .read_to_end(&mut buffer) - .await - .map_err(|e| PeerError::BytesReadError)?; - println!("Got a message from local."); - if n == 0 { - println!("Local node disconnected."); - return Ok(()); - } - let message = packet_handler - .prepare_v2_packet(buffer, None, false) - .map_err(|e| PeerError::BytesReadError)?; - channel - .send(Ok((message, SendTo::Remote))) - .await - .map_err(|e| PeerError::BytesReadError)?; - } -} +// async fn communicate_local( +// channel: tokio::sync::mpsc::Sender, +// mut local: TcpStream, +// mut packet_handler: PacketHandler, +// ) -> Result<(), PeerError> { +// loop { +// let mut buffer = Vec::new(); +// let n = local +// .read_to_end(&mut buffer) +// .await +// .map_err(|e| PeerError::BytesReadError)?; +// println!("Got a message from local."); +// if n == 0 { +// println!("Local node disconnected."); +// return Ok(()); +// } +// let message = packet_handler +// .prepare_v2_packet(buffer, None, false) +// .map_err(|e| PeerError::BytesReadError)?; +// channel +// .send(Ok((message, SendTo::Remote))) +// .await +// .map_err(|e| PeerError::BytesReadError)?; +// } +// } #[tokio::main] async fn main() { - let proxy = TcpListener::bind(PROXY) + let proxy = TcpListener::bind(bip324_proxy::DEFAULT_PROXY) .await .expect("Failed to bind to proxy port."); - println!("Listening for connections on {PROXY}"); + println!( + "Listening for connections on {}", + bip324_proxy::DEFAULT_PROXY + ); loop { let (stream, _) = proxy .accept() .await .expect("Failed to accept inbound connection."); + // Spawn a new task per connection. tokio::spawn(async move { - match init_outbound_conn(stream).await { + match proxy_conn(stream).await { Ok(_) => { println!("Ended connection with no errors."); }