Skip to content

Commit

Permalink
Initial quic support
Browse files Browse the repository at this point in the history
  • Loading branch information
AgeManning committed Aug 2, 2023
1 parent ff9b09d commit 61d4ae1
Show file tree
Hide file tree
Showing 8 changed files with 397 additions and 184 deletions.
308 changes: 224 additions & 84 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion beacon_node/lighthouse_network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ prometheus-client = "0.21.0"
unused_port = { path = "../../common/unused_port" }
delay_map = "0.3.0"
void = "1"
libp2p-quic= { version = "0.9.0-alpha", features=["tokio"]}

[dependencies.libp2p]
version = "0.52"
default-features = false
features = ["websocket", "identify", "yamux", "noise", "gossipsub", "dns", "tcp", "tokio", "plaintext", "secp256k1", "macros", "ecdsa"]
features = ["identify", "yamux", "noise", "gossipsub", "dns", "tcp", "tokio", "plaintext", "secp256k1", "macros", "ecdsa"]

[dev-dependencies]
slog-term = "2.6.0"
Expand Down
70 changes: 49 additions & 21 deletions beacon_node/lighthouse_network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ pub struct Config {
/// Disables the discovery protocol from starting.
pub disable_discovery: bool,

/// Disables quic support.
pub disable_quic_support: bool,

/// Attempt to construct external port mappings with UPnP.
pub upnp_enabled: bool,

Expand Down Expand Up @@ -154,27 +157,41 @@ impl Config {
/// Sets the listening address to use an ipv4 address. The discv5 ip_mode and table filter are
/// adjusted accordingly to ensure addresses that are present in the enr are globally
/// reachable.
pub fn set_ipv4_listening_address(&mut self, addr: Ipv4Addr, tcp_port: u16, udp_port: u16) {
pub fn set_ipv4_listening_address(
&mut self,
addr: Ipv4Addr,
tcp_port: u16,
disc_port: u16,
quic_port: u16,
) {
self.listen_addresses = ListenAddress::V4(ListenAddr {
addr,
udp_port,
disc_port,
quic_port,
tcp_port,
});
self.discv5_config.listen_config = discv5::ListenConfig::from_ip(addr.into(), udp_port);
self.discv5_config.listen_config = discv5::ListenConfig::from_ip(addr.into(), disc_port);
self.discv5_config.table_filter = |enr| enr.ip4().as_ref().map_or(false, is_global_ipv4)
}

/// Sets the listening address to use an ipv6 address. The discv5 ip_mode and table filter is
/// adjusted accordingly to ensure addresses that are present in the enr are globally
/// reachable.
pub fn set_ipv6_listening_address(&mut self, addr: Ipv6Addr, tcp_port: u16, udp_port: u16) {
pub fn set_ipv6_listening_address(
&mut self,
addr: Ipv6Addr,
tcp_port: u16,
disc_port: u16,
quic_port: u16,
) {
self.listen_addresses = ListenAddress::V6(ListenAddr {
addr,
udp_port,
disc_port,
quic_port,
tcp_port,
});

self.discv5_config.listen_config = discv5::ListenConfig::from_ip(addr.into(), udp_port);
self.discv5_config.listen_config = discv5::ListenConfig::from_ip(addr.into(), disc_port);
self.discv5_config.table_filter = |enr| enr.ip6().as_ref().map_or(false, is_global_ipv6)
}

Expand All @@ -185,26 +202,30 @@ impl Config {
&mut self,
v4_addr: Ipv4Addr,
tcp4_port: u16,
udp4_port: u16,
disc4_port: u16,
quic4_port: u16,
v6_addr: Ipv6Addr,
tcp6_port: u16,
udp6_port: u16,
disc6_port: u16,
quic6_port: u16,
) {
self.listen_addresses = ListenAddress::DualStack(
ListenAddr {
addr: v4_addr,
udp_port: udp4_port,
disc_port: disc4_port,
quic_port: quic4_port,
tcp_port: tcp4_port,
},
ListenAddr {
addr: v6_addr,
udp_port: udp6_port,
disc_port: disc6_port,
quic_port: quic6_port,
tcp_port: tcp6_port,
},
);
self.discv5_config.listen_config = discv5::ListenConfig::default()
.with_ipv4(v4_addr, udp4_port)
.with_ipv6(v6_addr, udp6_port);
.with_ipv4(v4_addr, disc4_port)
.with_ipv6(v6_addr, disc6_port);

self.discv5_config.table_filter = |enr| match (&enr.ip4(), &enr.ip6()) {
(None, None) => false,
Expand All @@ -218,27 +239,32 @@ impl Config {
match listen_addr {
ListenAddress::V4(ListenAddr {
addr,
udp_port,
disc_port,
quic_port,
tcp_port,
}) => self.set_ipv4_listening_address(addr, tcp_port, udp_port),
}) => self.set_ipv4_listening_address(addr, tcp_port, disc_port, quic_port),
ListenAddress::V6(ListenAddr {
addr,
udp_port,
disc_port,
quic_port,
tcp_port,
}) => self.set_ipv6_listening_address(addr, tcp_port, udp_port),
}) => self.set_ipv6_listening_address(addr, tcp_port, disc_port, quic_port),
ListenAddress::DualStack(
ListenAddr {
addr: ip4addr,
udp_port: udp4_port,
disc_port: disc4_port,
quic_port: quic4_port,
tcp_port: tcp4_port,
},
ListenAddr {
addr: ip6addr,
udp_port: udp6_port,
disc_port: disc6_port,
quic_port: quic6_port,
tcp_port: tcp6_port,
},
) => self.set_ipv4_ipv6_listening_addresses(
ip4addr, tcp4_port, udp4_port, ip6addr, tcp6_port, udp6_port,
ip4addr, tcp4_port, disc4_port, quic4_port, ip6addr, tcp6_port, disc6_port,
quic6_port,
),
}
}
Expand Down Expand Up @@ -277,7 +303,8 @@ impl Default for Config {
);
let listen_addresses = ListenAddress::V4(ListenAddr {
addr: Ipv4Addr::UNSPECIFIED,
udp_port: 9000,
disc_port: 9000,
quic_port: 9001,
tcp_port: 9000,
});

Expand Down Expand Up @@ -325,6 +352,7 @@ impl Default for Config {
disable_peer_scoring: false,
client_version: lighthouse_version::version_with_platform(),
disable_discovery: false,
disable_quic_support: false,
upnp_enabled: true,
network_load: 3,
private: false,
Expand Down Expand Up @@ -560,4 +588,4 @@ pub const fn is_global_ipv6(addr: &Ipv6Addr) -> bool {
|| is_documentation(addr)
|| is_unique_local(addr)
|| is_unicast_link_local(addr))
}
}
31 changes: 22 additions & 9 deletions beacon_node/lighthouse_network/src/listen_addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,26 @@ use serde::{Deserialize, Serialize};
/// A listening address composed by an Ip, an UDP port and a TCP port.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ListenAddr<Ip> {
/// The IP address we will listen on.
pub addr: Ip,
pub udp_port: u16,
/// The UDP port that discovery will listen on.
pub disc_port: u16,
/// The UDP port that QUIC will listen on.
pub quic_port: u16,
/// The TCP port that libp2p will listen on.
pub tcp_port: u16,
}

impl<Ip: Into<IpAddr> + Clone> ListenAddr<Ip> {
pub fn udp_socket_addr(&self) -> SocketAddr {
(self.addr.clone().into(), self.udp_port).into()
pub fn discovery_socket_addr(&self) -> SocketAddr {
(self.addr.clone().into(), self.disc_port).into()
}

pub fn tcp_socket_addr(&self) -> SocketAddr {
pub fn quic_socket_addr(&self) -> SocketAddr {
(self.addr.clone().into(), self.quic_port).into()
}

pub fn libp2p_socket_addr(&self) -> SocketAddr {
(self.addr.clone().into(), self.tcp_port).into()
}
}
Expand Down Expand Up @@ -61,7 +70,8 @@ impl ListenAddress {
pub fn unused_v4_ports() -> Self {
ListenAddress::V4(ListenAddr {
addr: Ipv4Addr::UNSPECIFIED,
udp_port: unused_port::unused_udp4_port().unwrap(),
disc_port: unused_port::unused_udp4_port().unwrap(),
quic_port: unused_port::unused_udp4_port().unwrap(),
tcp_port: unused_port::unused_tcp4_port().unwrap(),
})
}
Expand All @@ -70,7 +80,8 @@ impl ListenAddress {
pub fn unused_v6_ports() -> Self {
ListenAddress::V6(ListenAddr {
addr: Ipv6Addr::UNSPECIFIED,
udp_port: unused_port::unused_udp6_port().unwrap(),
disc_port: unused_port::unused_udp6_port().unwrap(),
quic_port: unused_port::unused_udp6_port().unwrap(),
tcp_port: unused_port::unused_tcp6_port().unwrap(),
})
}
Expand All @@ -84,14 +95,16 @@ impl slog::KV for ListenAddress {
) -> slog::Result {
if let Some(v4_addr) = self.v4() {
serializer.emit_arguments("ip4_address", &format_args!("{}", v4_addr.addr))?;
serializer.emit_u16("udp4_port", v4_addr.udp_port)?;
serializer.emit_u16("disc4_port", v4_addr.disc_port)?;
serializer.emit_u16("quic4_port", v4_addr.quic_port)?;
serializer.emit_u16("tcp4_port", v4_addr.tcp_port)?;
}
if let Some(v6_addr) = self.v6() {
serializer.emit_arguments("ip6_address", &format_args!("{}", v6_addr.addr))?;
serializer.emit_u16("udp6_port", v6_addr.udp_port)?;
serializer.emit_u16("disc6_port", v6_addr.disc_port)?;
serializer.emit_u16("quic6_port", v6_addr.quic_port)?;
serializer.emit_u16("tcp6_port", v6_addr.tcp_port)?;
}
slog::Result::Ok(())
}
}
}
27 changes: 17 additions & 10 deletions beacon_node/lighthouse_network/src/service/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use libp2p::core::{multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::Boxe
use libp2p::gossipsub;
use libp2p::identity::{secp256k1, Keypair};
use libp2p::{core, noise, yamux, PeerId, Transport, TransportExt};
use libp2p_quic;
use prometheus_client::registry::Registry;
use slog::{debug, warn};
use ssz::Decode;
Expand Down Expand Up @@ -37,18 +38,15 @@ pub struct Context<'a> {

type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>;

/// The implementation supports TCP/IP, WebSockets over TCP/IP, noise as the encryption layer, and
/// mplex as the multiplexing layer.
/// The implementation supports TCP/IP, QUIC (experimental) over UDP, noise as the encryption layer, and
/// mplex/yamux as the multiplexing layer (when using TCP).
pub fn build_transport(
local_private_key: Keypair,
) -> std::io::Result<(BoxedTransport, Arc<BandwidthSinks>)> {
// Creates the TCP transport layer
let tcp = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true));
// Enables DNS over the TCP transport.
let transport = libp2p::dns::TokioDnsConfig::system(tcp)?;
#[cfg(feature = "libp2p-websocket")]
let transport = {
let trans_clone = transport.clone();
transport.or_transport(libp2p::websocket::WsConfig::new(trans_clone))
};

// yamux config
let mut yamux_config = yamux::Config::default();
Expand All @@ -58,10 +56,19 @@ pub fn build_transport(
.authenticate(generate_noise_config(&local_private_key))
.multiplex(yamux_config)
.timeout(Duration::from_secs(10))
.boxed()
.with_bandwidth_logging();

// Authentication
// Enables Quic
/*
// The default quic configuration suits us for now.
let quic_config = libp2p_quic::Config::new(&local_private_key);
let transport = transport.or_transport(libp2p_quic::tokio::Transport::new(quic_config));
// TODO: Get quick to support bandwidth measurements.
*/

let transport = transport.boxed();

Ok((transport, bandwidth))
}

Expand Down Expand Up @@ -267,4 +274,4 @@ pub(crate) fn save_metadata_to_disk<E: EthSpec>(
);
}
}
}
}
Loading

0 comments on commit 61d4ae1

Please sign in to comment.