Skip to content

Commit

Permalink
Merge pull request #18 from nyonson/proxy-v2
Browse files Browse the repository at this point in the history
Break off peek address function in prep for v2 proxy
  • Loading branch information
nyonson authored Apr 4, 2024
2 parents c876316 + 6df4495 commit 9fff53a
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 157 deletions.
54 changes: 20 additions & 34 deletions proxy/src/bin/v1.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,12 @@
use bitcoin::consensus::Decodable;
use bitcoin::p2p::{Address, Magic};
use hex::prelude::*;
use std::io;
//! Simple V1 proxy to test hooking things up end to end.
use tokio::net::{TcpListener, TcpStream};
use tokio::select;

const PROXY: &str = "127.0.0.1:1324";
const M: Magic = Magic::SIGNET;

async fn init_outbound_conn(mut client: TcpStream) -> Result<(), Box<dyn std::error::Error>> {
println!("Validating client connection.");
// Peek the first 70 bytes, 24 for the header and 46 for the first part of the version message.
let mut peek_bytes = [0; 70];
let n = client.peek(&mut peek_bytes).await?;
println!("Bytes read from local connection: {n}");
println!("Got magic: {}", &peek_bytes[0..4].to_lower_hex_string());
if M.to_bytes().ne(&peek_bytes[0..4]) {
return Err(Box::new(io::Error::new(
io::ErrorKind::Other,
"Invalid magic.",
)));
}
/// Validate and bootstrap proxy connection.
async fn proxy_conn(mut client: TcpStream) -> Result<(), Box<dyn std::error::Error>> {
let remote_ip = bip324_proxy::peek_addr(&client).await?;

let mut addr_bytes = &peek_bytes[44..];
let remote_addr = Address::consensus_decode(&mut addr_bytes).expect("network address bytes");
let remote_ip = remote_addr.socket_addr().expect("IP");
println!("Initialing remote connection {}.", remote_ip);
let mut remote = TcpStream::connect(remote_ip).await?;

Expand All @@ -37,15 +19,15 @@ async fn init_outbound_conn(mut client: TcpStream) -> Result<(), Box<dyn std::er
Ok(bytes) => {
println!("Responded to {} with {bytes} bytes.", remote_ip);
if bytes == 0 {
return Err(Box::new(io::Error::new(
io::ErrorKind::Other,
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"Client closed connection. Disconnecting.",
)));
}
},
Err(_) => {
return Err(Box::new(io::Error::new(
io::ErrorKind::Other,
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"Remote closed connection: Disconnecting.",
)));
},
Expand All @@ -56,15 +38,15 @@ async fn init_outbound_conn(mut client: TcpStream) -> Result<(), Box<dyn std::er
Ok(bytes) => {
println!("Responded to local with {bytes} bytes.");
if bytes == 0 {
return Err(Box::new(io::Error::new(
io::ErrorKind::Other,
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"Client closed connection. Disconnecting.",
)));
}
},
Err(_) => {
return Err(Box::new(io::Error::new(
io::ErrorKind::Other,
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"Client closed connection. Disconnecting.",
)));
},
Expand All @@ -76,17 +58,21 @@ async fn init_outbound_conn(mut client: TcpStream) -> Result<(), Box<dyn std::er

#[tokio::main]
async fn main() {
let proxy = TcpListener::bind(PROXY)
let proxy = TcpListener::bind(bip324_proxy::DEFAULT_PROXY)
.await
.expect("Failed to bind to proxy port.");
println!("Listening for connections on {PROXY}");
println!(
"Listening for connections on {}",
bip324_proxy::DEFAULT_PROXY
);
loop {
let (stream, _) = proxy
.accept()
.await
.expect("Failed to accept inbound connection.");
// Spawn a new task per connection.
tokio::spawn(async move {
match init_outbound_conn(stream).await {
match proxy_conn(stream).await {
Ok(_) => {
println!("Ended connection with no errors.");
}
Expand Down
33 changes: 33 additions & 0 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::{error::Error, net::SocketAddr};

use bitcoin::consensus::Decodable;
use bitcoin::p2p::{Address, Magic};
use hex::prelude::*;
use tokio::net::TcpStream;

/// Default to local host on port 1324.
pub const DEFAULT_PROXY: &str = "127.0.0.1:1324";
/// Default to the signet network.
const DEFAULT_MAGIC: Magic = Magic::SIGNET;

/// Peek the input stream and pluck the remote address based on the version message.
pub async fn peek_addr(client: &TcpStream) -> Result<SocketAddr, Box<dyn Error>> {
println!("Validating client connection.");
// Peek the first 70 bytes, 24 for the header and 46 for the first part of the version message.
let mut peek_bytes = [0; 70];
let n = client.peek(&mut peek_bytes).await?;
println!("Bytes read from local connection: {n}");
println!("Got magic: {}", &peek_bytes[0..4].to_lower_hex_string());
if DEFAULT_MAGIC.to_bytes().ne(&peek_bytes[0..4]) {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"Invalid magic.",
)));
}

let mut addr_bytes = &peek_bytes[44..];
let remote_addr = Address::consensus_decode(&mut addr_bytes).expect("network address bytes");
let socket_addr = remote_addr.socket_addr().expect("IP");

Ok(socket_addr)
}
219 changes: 96 additions & 123 deletions proxy/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
use bip324::{initialize_v2_handshake, initiator_complete_v2_handshake, PacketHandler};
use bitcoin::p2p::Magic;
use bitcoin::{
consensus::Decodable,
p2p::{message::RawNetworkMessage, message_network::VersionMessage},
};
use core::fmt;
use hex::prelude::*;
use std::io;
Expand All @@ -12,8 +7,6 @@ use tokio::net::{TcpListener, TcpStream};
use tokio::select;
use tokio::sync::mpsc;

const PROXY: &str = "127.0.0.1:1324";
const M: Magic = Magic::SIGNET;
type ChannelMessage = Result<(Vec<u8>, SendTo), PeerError>;

enum SendTo {
Expand All @@ -37,56 +30,32 @@ impl fmt::Display for PeerError {
}
}

async fn init_outbound_conn(mut sock: TcpStream) -> Result<(), Box<dyn std::error::Error>> {
println!("Initialing outbound connection.");
let (mut proxy_reader, mut proxy_writer) = sock.split();
let mut buf_reader = BufReader::new(&mut proxy_reader);
let mut buffer = Vec::new();
let n = buf_reader.read_to_end(&mut buffer).await?;
println!("Bytes read from local connection: {n}");
let recv_magic: [u8; 4] = buffer[..4].try_into()?;
println!("Got magic: {}", recv_magic.to_lower_hex_string());
if M.to_bytes().ne(&recv_magic) {
return Err(Box::new(io::Error::new(
io::ErrorKind::Other,
"Invalid magic.",
)));
}
println!("Matches our network.");
let mut cursor = std::io::Cursor::new(buffer.clone());
let msg = RawNetworkMessage::consensus_decode(&mut cursor)?;
let command = msg.payload().command();
println!("Message command: {}.", command.to_string());
if !command.to_string().eq("version") {
return Err(Box::new(io::Error::new(
io::ErrorKind::Other,
"Connections must open with Version message.",
)));
}
let version = buffer.clone();
let payload = buffer[24..].to_vec();
let mut cursor = std::io::Cursor::new(payload);
let ver = VersionMessage::consensus_decode_from_finite_reader(&mut cursor)?;
let remote_addr = ver.receiver.socket_addr()?;
println!("Reaching out to {}.", remote_addr.to_string());
let mut outbound = TcpStream::connect(remote_addr).await?;
let handshake = initialize_v2_handshake(None)?;
println!("Initiating handshake.");
outbound.write_all(&version).await?;
println!("Sent handshake to remote.");
let (mut remote_reader, mut remote_writer) = outbound.split();
let mut buf_reader = BufReader::new(&mut remote_reader);
let mut buffer = Vec::new();
println!("Reading handshake response from remote.");
let n = buf_reader.read_to_end(&mut buffer).await?;
println!("Bytes read from remote host: {n}");
println!("{}", &buffer.to_lower_hex_string());
if n < 64 {
return Err(Box::new(io::Error::new(
io::ErrorKind::Other,
"Remote cannot perform V2 handshake. Disconnecting.",
)));
}
/// Validate and bootstrap proxy connection.
async fn proxy_conn(mut client: TcpStream) -> Result<(), Box<dyn std::error::Error>> {
let remote_ip = bip324_proxy::peek_addr(&client).await?;

println!("Reaching out to {}.", remote_ip);
let mut outbound = TcpStream::connect(remote_ip).await?;

// let handshake = initialize_v2_handshake(None)?;

// println!("Initiating handshake.");
// outbound.write_all(&version).await?;
// println!("Sent handshake to remote.");
// let (mut remote_reader, mut remote_writer) = outbound.split();
// let mut buf_reader = BufReader::new(&mut remote_reader);
// let mut buffer = Vec::new();
// println!("Reading handshake response from remote.");
// let n = buf_reader.read_to_end(&mut buffer).await?;
// println!("Bytes read from remote host: {n}");
// println!("{}", &buffer.to_lower_hex_string());
// if n < 64 {
// return Err(Box::new(io::Error::new(
// io::ErrorKind::Other,
// "Remote cannot perform V2 handshake. Disconnecting.",
// )));
// }

// println!("Completing handshake.");
// let finish_handshake = initiator_complete_v2_handshake(buffer, handshake, false)?;
// remote_writer.write_all(&finish_handshake.message).await?;
Expand Down Expand Up @@ -130,84 +99,88 @@ async fn init_outbound_conn(mut sock: TcpStream) -> Result<(), Box<dyn std::erro
// }
}

async fn communicate_outbound(
channel: tokio::sync::mpsc::Sender<ChannelMessage>,
mut remote: TcpStream,
mut packet_handler: PacketHandler,
) -> Result<(), PeerError> {
loop {
let mut buffer = Vec::new();
let n = remote
.read_to_end(&mut buffer)
.await
.map_err(|e| PeerError::BytesReadError)?;
println!("Got a message from remote.");
if n == 0 {
println!("Remote node disconnected.");
return Ok(());
}
println!("Decrypting messages.");
let messages = packet_handler
.receive_v2_packets(buffer, None)
.map_err(|e| PeerError::DecryptionFailure)?;
for message in messages {
if let Some(message) = message.message {
let mut cursor = std::io::Cursor::new(message.clone());
let msg = RawNetworkMessage::consensus_decode(&mut cursor)
.map_err(|e| PeerError::UnknownMessage)?;
let command = msg.payload().command();
println!(
"Received a message from remote with command: {}.",
command.to_string()
);
channel
.send(Ok((message, SendTo::Local)))
.await
.map_err(|e| PeerError::BytesReadError)?;
}
}
}
}
// async fn communicate_outbound(
// channel: tokio::sync::mpsc::Sender<ChannelMessage>,
// mut remote: TcpStream,
// mut packet_handler: PacketHandler,
// ) -> Result<(), PeerError> {
// loop {
// let mut buffer = Vec::new();
// let n = remote
// .read_to_end(&mut buffer)
// .await
// .map_err(|e| PeerError::BytesReadError)?;
// println!("Got a message from remote.");
// if n == 0 {
// println!("Remote node disconnected.");
// return Ok(());
// }
// println!("Decrypting messages.");
// let messages = packet_handler
// .receive_v2_packets(buffer, None)
// .map_err(|e| PeerError::DecryptionFailure)?;
// for message in messages {
// if let Some(message) = message.message {
// let mut cursor = std::io::Cursor::new(message.clone());
// let msg = RawNetworkMessage::consensus_decode(&mut cursor)
// .map_err(|e| PeerError::UnknownMessage)?;
// let command = msg.payload().command();
// println!(
// "Received a message from remote with command: {}.",
// command.to_string()
// );
// channel
// .send(Ok((message, SendTo::Local)))
// .await
// .map_err(|e| PeerError::BytesReadError)?;
// }
// }
// }
// }

async fn communicate_local(
channel: tokio::sync::mpsc::Sender<ChannelMessage>,
mut local: TcpStream,
mut packet_handler: PacketHandler,
) -> Result<(), PeerError> {
loop {
let mut buffer = Vec::new();
let n = local
.read_to_end(&mut buffer)
.await
.map_err(|e| PeerError::BytesReadError)?;
println!("Got a message from local.");
if n == 0 {
println!("Local node disconnected.");
return Ok(());
}
let message = packet_handler
.prepare_v2_packet(buffer, None, false)
.map_err(|e| PeerError::BytesReadError)?;
channel
.send(Ok((message, SendTo::Remote)))
.await
.map_err(|e| PeerError::BytesReadError)?;
}
}
// async fn communicate_local(
// channel: tokio::sync::mpsc::Sender<ChannelMessage>,
// mut local: TcpStream,
// mut packet_handler: PacketHandler,
// ) -> Result<(), PeerError> {
// loop {
// let mut buffer = Vec::new();
// let n = local
// .read_to_end(&mut buffer)
// .await
// .map_err(|e| PeerError::BytesReadError)?;
// println!("Got a message from local.");
// if n == 0 {
// println!("Local node disconnected.");
// return Ok(());
// }
// let message = packet_handler
// .prepare_v2_packet(buffer, None, false)
// .map_err(|e| PeerError::BytesReadError)?;
// channel
// .send(Ok((message, SendTo::Remote)))
// .await
// .map_err(|e| PeerError::BytesReadError)?;
// }
// }

#[tokio::main]
async fn main() {
let proxy = TcpListener::bind(PROXY)
let proxy = TcpListener::bind(bip324_proxy::DEFAULT_PROXY)
.await
.expect("Failed to bind to proxy port.");
println!("Listening for connections on {PROXY}");
println!(
"Listening for connections on {}",
bip324_proxy::DEFAULT_PROXY
);
loop {
let (stream, _) = proxy
.accept()
.await
.expect("Failed to accept inbound connection.");
// Spawn a new task per connection.
tokio::spawn(async move {
match init_outbound_conn(stream).await {
match proxy_conn(stream).await {
Ok(_) => {
println!("Ended connection with no errors.");
}
Expand Down

0 comments on commit 9fff53a

Please sign in to comment.