Skip to content

Commit

Permalink
Merge pull request #73 from nyonson/one-loop-proxy
Browse files Browse the repository at this point in the history
Add network flag to proxy and refactor back to one loop
  • Loading branch information
rustaceanrob authored Oct 3, 2024
2 parents 4f1dee7 + 6b7c080 commit 1d97c9b
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 134 deletions.
4 changes: 2 additions & 2 deletions proxy/README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# V2 Proxy

A proxy process which allows V1-only clients to communicate over a V2 protocol. The process listens on port `1324` for V1 connections and requires the V1 client to send along the remote peer's IP address in the `addr_recv` field.
A proxy sidecar process which allows V1-only clients to communicate over the V2 protocol. The process listens on port `1324` for V1 connections and requires the V1 client to send along the remote peer's IP address in the `addr_recv` field.

## Running the Proxy

`cargo run --bin proxy`

## Testing with Nakamoto

[Nakamoto](https://github.com/cloudhead/nakamoto) is a BIP-157/BIP-158 Light Client that communicates over the Bitcoin P2P network. With a single change, Nakamoto may be modified to use the proxy.
[Nakamoto](https://github.com/cloudhead/nakamoto) is a BIP-157/BIP-158 Light Client that communicates over the Bitcoin P2P network. With a single change, Nakamoto may be modified to use the proxy. This patch hardcodes Nakamoto to connect to the localhost on port 1324 where the proxy should be running.

```diff
diff --git a/net/poll/src/reactor.rs b/net/poll/src/reactor.rs
Expand Down
6 changes: 6 additions & 0 deletions proxy/config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,9 @@ name = "bind_host"
type = "String"
default = "\"127.0.0.1\".into()"
doc = "The address to listen on"

[[param]]
name = "network"
type = "String"
default = "\"bitcoin\".into()"
doc = "The bitcoin network to operate on"
101 changes: 45 additions & 56 deletions proxy/src/bin/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
// SPDX-License-Identifier: MIT OR Apache-2.0

use std::str::FromStr;

use bip324::{
serde::{deserialize, serialize},
AsyncProtocol, PacketType, Role,
};
use bip324_proxy::{read_v1, write_v1};
use bip324_proxy::{V1ProtocolReader, V1ProtocolWriter};
use bitcoin::Network;
use tokio::net::{TcpListener, TcpStream};
use tokio::{
net::{TcpListener, TcpStream},
select,
};
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};

configure_me::include_config!();

/// Validate and bootstrap proxy connection.
async fn proxy_conn(client: TcpStream) -> Result<(), bip324_proxy::Error> {
let remote_ip = bip324_proxy::peek_addr(&client)
async fn proxy_conn(client: TcpStream, network: Network) -> Result<(), bip324_proxy::Error> {
let remote_ip = bip324_proxy::peek_addr(&client, network)
.await
.expect("peek address");

Expand All @@ -28,70 +33,54 @@ async fn proxy_conn(client: TcpStream) -> Result<(), bip324_proxy::Error> {
let remote_reader = remote_reader.compat();
let remote_writer = remote_writer.compat_write();

let protocol = AsyncProtocol::new(
Network::Bitcoin,
Role::Initiator,
None,
remote_reader,
remote_writer,
)
.await
.expect("protocol establishment");
let protocol = AsyncProtocol::new(network, Role::Initiator, None, remote_reader, remote_writer)
.await
.expect("protocol establishment");

let (client_reader, client_writer) = client.into_split();
let mut v1_client_reader = V1ProtocolReader::new(client_reader);
let mut v1_client_writer = V1ProtocolWriter::new(network, client_writer);

let (mut client_reader, mut client_writer) = client.into_split();
let (mut remote_reader, mut remote_writer) = protocol.into_split();

println!("Setting up proxy.");

// Spawning two threads instead of selecting on one due
// to the IO calls not being cancellation safe. A select
// drops other futures when one is ready, so it is
// possible that it drops one with half read state.

tokio::spawn(async move {
loop {
let msg = read_v1(&mut client_reader).await.expect("read from client");
println!(
"Read {} message from client, writing to remote.",
msg.command()
);
let contents = serialize(msg).expect("serialize-able contents into network message");
remote_writer
.encrypt(&contents)
.await
.expect("write to remote");
}
});

tokio::spawn(async move {
loop {
// Ignore any decoy packets.
let payload = loop {
let payload = remote_reader.decrypt().await.expect("read packet");

loop {
select! {
result = v1_client_reader.read() => {
let msg = result?;
println!(
"Read {} message from client, writing to remote.",
msg.command()
);

let contents = serialize(msg).expect("serialize-able contents into network message");
remote_writer
.encrypt(&contents)
.await
.expect("write to remote");
},
result = remote_reader.decrypt() => {
let payload = result.expect("read packet");
// Ignore decoy packets.
if payload.packet_type() == PacketType::Genuine {
break payload;
let msg = deserialize(payload.contents())
.expect("deserializable contents into network message");
println!(
"Read {} message from remote, writing to client.",
msg.command()
);
v1_client_writer.write(msg).await.expect("write to client");
}
};

let msg = deserialize(payload.contents())
.expect("deserializable contents into network message");
println!(
"Read {} message from remote, writing to client.",
msg.command()
);
write_v1(&mut client_writer, msg)
.await
.expect("write to client");
},
}
});

Ok(())
}
}

#[tokio::main]
async fn main() {
let (config, _) = Config::including_optional_config_files::<&[&str]>(&[]).unwrap_or_exit();
let network = Network::from_str(&config.network).expect("parse-able network");

let proxy = TcpListener::bind((&*config.bind_host, config.bind_port))
.await
Expand All @@ -107,7 +96,7 @@ async fn main() {
.expect("Failed to accept inbound connection.");
// Spawn a new task per connection.
tokio::spawn(async move {
match proxy_conn(stream).await {
match proxy_conn(stream, network).await {
Ok(_) => {
println!("Proxy establilshed.");
}
Expand Down
76 changes: 38 additions & 38 deletions proxy/src/bin/v1.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,60 @@
// SPDX-License-Identifier: MIT OR Apache-2.0
//! Simple V1 proxy to test hooking things up end to end.
use bip324_proxy::{read_v1, write_v1};
use tokio::net::{TcpListener, TcpStream};
use std::str::FromStr;

use bip324_proxy::{V1ProtocolReader, V1ProtocolWriter};
use bitcoin::Network;
use tokio::{
net::{TcpListener, TcpStream},
select,
};

configure_me::include_config!();

/// Validate and bootstrap proxy connection.
async fn proxy_conn(client: TcpStream) -> Result<(), bip324_proxy::Error> {
let remote_ip = bip324_proxy::peek_addr(&client).await?;
async fn proxy_conn(client: TcpStream, network: Network) -> Result<(), bip324_proxy::Error> {
let remote_ip = bip324_proxy::peek_addr(&client, network).await?;

println!("Initialing remote connection {}.", remote_ip);
let remote = TcpStream::connect(remote_ip).await?;

let (mut client_reader, mut client_writer) = client.into_split();
let (mut remote_reader, mut remote_writer) = remote.into_split();

println!("Setting up proxy loop.");
let (client_reader, client_writer) = client.into_split();
let (remote_reader, remote_writer) = remote.into_split();

// Spawning two threads instead of selecting on one due
// to the IO calls not being cancellation safe. A select
// drops other futures when one is ready, so it is
// possible that it drops one with half read state.
let mut v1_client_reader = V1ProtocolReader::new(client_reader);
let mut v1_client_writer = V1ProtocolWriter::new(network, client_writer);
let mut v1_remote_reader = V1ProtocolReader::new(remote_reader);
let mut v1_remote_writer = V1ProtocolWriter::new(network, remote_writer);

tokio::spawn(async move {
loop {
let msg = read_v1(&mut client_reader).await.expect("read from client");
println!(
"Read {} message from client, writing to remote.",
msg.command()
);
write_v1(&mut remote_writer, msg)
.await
.expect("write to remote");
}
});
println!("Setting up proxy loop.");

tokio::spawn(async move {
loop {
let msg = read_v1(&mut remote_reader).await.expect("read from remote");
println!(
"Read {} message from remote, writing to client.",
msg.command()
);
write_v1(&mut client_writer, msg)
.await
.expect("write to client");
loop {
select! {
result = v1_client_reader.read() => {
let msg = result?;
println!(
"Read {} message from client, writing to remote.",
msg.command()
);
v1_remote_writer.write(msg).await?;
},
result = v1_remote_reader.read() => {
let msg = result?;
println!(
"Read {} message from remote, writing to client.",
msg.command()
);
v1_client_writer.write(msg).await?;
},
}
});

Ok(())
}
}

#[tokio::main]
async fn main() {
let (config, _) = Config::including_optional_config_files::<&[&str]>(&[]).unwrap_or_exit();
let network = Network::from_str(&config.network).expect("parse-able network");

let proxy = TcpListener::bind((&*config.bind_host, config.bind_port))
.await
Expand All @@ -70,7 +70,7 @@ async fn main() {
.expect("Failed to accept inbound connection.");
// Spawn a new task per connection.
tokio::spawn(async move {
match proxy_conn(stream).await {
match proxy_conn(stream, network).await {
Ok(_) => {
println!("Proxy establilshed.");
}
Expand Down
Loading

0 comments on commit 1d97c9b

Please sign in to comment.