Skip to content

Commit

Permalink
feat(proxy): plain hysteria2 (#626)
Browse files Browse the repository at this point in the history
* update

* up

* clippy

* cargo fmt

* f
  • Loading branch information
ibigbug authored Oct 16, 2024
1 parent 6f15468 commit b4a5bac
Show file tree
Hide file tree
Showing 10 changed files with 7,461 additions and 61 deletions.
7,322 changes: 7,322 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions clash/tests/data/config/hysteria2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ proxies:
password: "passwd"
sni: example.com
skip-cert-verify: true
obfs: salamander
obfs-password: "obfs"
# obfs: salamander
# obfs-password: "obfs"

rules:
- MATCH, local
19 changes: 19 additions & 0 deletions clash/tests/data/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,23 @@ services:
- type: bind
source: ./v2ray/key.pem
target: /etc/v2ray/v2ray.key
restart: unless-stopped

hysteria2:
image: tobyxdd/hysteria
network_mode: "host"
command:
- server
- "-c"
- "/etc/hysteria/config.yaml"
volumes:
- type: bind
source: ./hysteria2/config.yaml
target: /etc/hysteria/config.yaml
- type: bind
source: ./v2ray/cert.pem
target: /etc/hysteria/cert.pem
- type: bind
source: ./v2ray/key.pem
target: /etc/hysteria/key.pem
restart: unless-stopped
12 changes: 7 additions & 5 deletions clash_lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,20 +123,22 @@ tor-rtcompat = { version = "0.22", optional = true, default-features = false }
# tuic
tuic = { tag = "v1.3.1", optional = true, git = "https://github.com/Itsusinn/tuic.git" }
tuic-quinn = { tag = "v1.3.1", optional = true, git = "https://github.com/Itsusinn/tuic.git" }
quinn = { version = "0.11", optional = true, default-features = false, features = ["futures-io", "runtime-tokio", "rustls"] }
register-count = { version = "0.1", optional = true }

console-subscriber = { version = "0.4" }
tracing-timing = { version = "0.6" }
criterion = { version = "0.5", features = ["html_reports", "async_tokio"], optional = true }
quinn = { version = "0.11", default-features = false, features = ["futures-io", "runtime-tokio", "rustls"] }

memory-stats = "1.0.0"
# hysteria2
h3 = "0.0.6"
h3-quinn = "0.0.7"
quinn-proto = "0.11.8"
blake2 = "0.10.6"
digest = "0.10.7"

console-subscriber = { version = "0.4" }
tracing-timing = { version = "0.6" }
criterion = { version = "0.5", features = ["html_reports", "async_tokio"], optional = true }
memory-stats = "1.0.0"

[dev-dependencies]
tempfile = "3.13"
mockall = "0.13.0"
Expand Down
1 change: 1 addition & 0 deletions clash_lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ async fn create_components(
let dns_listener =
dns::get_dns_listener(dns_listen, dns_resolver.clone(), &cwd).await;

info!("all components initialized");
Ok(RuntimeComponents {
cache_store,
dns_resolver,
Expand Down
18 changes: 13 additions & 5 deletions clash_lib/src/proxy/hysteria2/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ impl Decoder for Hy2TcpCodec {
type Error = std::io::Error;
type Item = Hy2TcpResp;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
fn decode(
&mut self,
src: &mut BytesMut,
) -> Result<Option<Self::Item>, Self::Error> {
if !src.has_remaining() {
return Err(ErrorKind::UnexpectedEof.into());
}
Expand All @@ -42,8 +45,8 @@ impl Decoder for Hy2TcpCodec {
}

let msg: Vec<u8> = src.split_to(msg_len).into();
let msg: String =
String::from_utf8(msg).map_err(|e| std::io::Error::new(ErrorKind::InvalidData, e))?;
let msg: String = String::from_utf8(msg)
.map_err(|e| std::io::Error::new(ErrorKind::InvalidData, e))?;

let padding_len = VarInt::decode(src)
.map_err(|_| ErrorKind::UnexpectedEof)?
Expand All @@ -68,7 +71,12 @@ pub fn padding(range: std::ops::RangeInclusive<u32>) -> Vec<u8> {

impl Encoder<&'_ SocksAddr> for Hy2TcpCodec {
type Error = std::io::Error;
fn encode(&mut self, item: &'_ SocksAddr, buf: &mut BytesMut) -> Result<(), Self::Error> {

fn encode(
&mut self,
item: &'_ SocksAddr,
buf: &mut BytesMut,
) -> Result<(), Self::Error> {
const REQ_ID: VarInt = VarInt::from_u32(0x401);

let padding = padding(64..=512);
Expand Down Expand Up @@ -123,5 +131,5 @@ fn hy2_resp_parse() {
let mut src = BytesMut::from(&[0x01, 0x00, 0x00][..]);
let msg = Hy2TcpCodec.decode(&mut src).unwrap().unwrap();
assert!(msg.status == 0x1);
assert!(msg.msg == "");
assert!(msg.msg.is_empty());
}
1 change: 1 addition & 0 deletions clash_lib/src/proxy/hysteria2/congestion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub struct Burtal {
budget_at_last_sent: u64,
rtt: u64,
in_flight: u64,
#[allow(dead_code)]
send_now: Instant,

sess: quinn::Connection,
Expand Down
132 changes: 88 additions & 44 deletions clash_lib/src/proxy/hysteria2/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
fmt::{Debug, Formatter},
net::SocketAddr,
net::{Ipv4Addr, Ipv6Addr, SocketAddr},
num::ParseIntError,
path::PathBuf,
pin::Pin,
Expand All @@ -23,7 +23,10 @@ use quinn::{
use quinn_proto::TransportConfig;

use rustls::{
client::danger::{ServerCertVerified, ServerCertVerifier},
client::{
danger::{ServerCertVerified, ServerCertVerifier},
WebPkiServerVerifier,
},
ClientConfig as RustlsClientConfig,
};
use tokio::{
Expand All @@ -38,20 +41,23 @@ use crate::{
},
dns::ThreadSafeDNSResolver,
},
common::utils::{encode_hex, sha256},
common::{
tls::GLOBAL_ROOT_STORE,
utils::{encode_hex, sha256},
},
// proxy::hysteria2::congestion::DynCongestion,
session::{Session, SocksAddr},
};
use tracing::debug;
use tracing::{debug, trace, warn};

use self::{
codec::Hy2TcpCodec,
congestion::{Burtal, DynController},
};

use super::{
converters::hysteria2::PortGenrateor, ConnectorType, DialWithConnector,
OutboundHandler, OutboundType,
converters::hysteria2::PortGenrateor, utils::new_udp_socket, ConnectorType,
DialWithConnector, OutboundHandler, OutboundType,
};

#[derive(Clone)]
Expand All @@ -64,26 +70,35 @@ pub struct HystOption {
pub salamander: Option<String>,
pub skip_cert_verify: bool,
pub alpn: Vec<String>,
#[allow(dead_code)]
pub up_down: Option<(u64, u64)>,
pub fingerprint: Option<String>,
pub ca: Option<PathBuf>,
#[allow(dead_code)]
pub ca_str: Option<String>,
#[allow(dead_code)]
pub cwnd: Option<u64>,
}

#[derive(Debug)]
struct CertVerifyOption {
fingerprint: Option<String>,
_ca: Option<PathBuf>,
skip: bool,
pki: Arc<WebPkiServerVerifier>,
}

impl CertVerifyOption {
fn new(fingerprint: Option<String>, ca: Option<PathBuf>, skip: bool) -> Self {
if ca.is_some() {
warn!("hysteria2 custom ca option is not supported yet");
// TODO: add load the ca and put it into a Store
}
Self {
fingerprint,
_ca: ca,
skip,
pki: WebPkiServerVerifier::builder(GLOBAL_ROOT_STORE.clone())
.build()
.unwrap(),
}
}
}
Expand All @@ -92,10 +107,10 @@ impl ServerCertVerifier for CertVerifyOption {
fn verify_server_cert(
&self,
end_entity: &rustls::pki_types::CertificateDer<'_>,
_intermediates: &[rustls::pki_types::CertificateDer<'_>],
_server_name: &rustls::pki_types::ServerName<'_>,
_ocsp_response: &[u8],
_now: rustls::pki_types::UnixTime,
intermediates: &[rustls::pki_types::CertificateDer<'_>],
server_name: &rustls::pki_types::ServerName<'_>,
ocsp_response: &[u8],
now: rustls::pki_types::UnixTime,
) -> Result<ServerCertVerified, rustls::Error> {
if let Some(ref fingerprint) = self.fingerprint {
let cert_hex = encode_hex(&sha256(end_entity.as_ref()));
Expand All @@ -110,36 +125,42 @@ impl ServerCertVerifier for CertVerifyOption {
if self.skip {
return Ok(ServerCertVerified::assertion());
}
// todo
Ok(ServerCertVerified::assertion())

self.pki.verify_server_cert(
end_entity,
intermediates,
server_name,
ocsp_response,
now,
)
}

fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
vec![]
self.pki.supported_verify_schemes()
}

fn verify_tls12_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
message: &[u8],
cert: &rustls::pki_types::CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
self.pki.verify_tls12_signature(message, cert, dss)
}

fn verify_tls13_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
message: &[u8],
cert: &rustls::pki_types::CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
self.pki.verify_tls13_signature(message, cert, dss)
}
}

enum CcRx {
Auto,
Fixed(u64),
Fixed(#[allow(dead_code)] u64),
}

impl FromStr for CcRx {
Expand Down Expand Up @@ -222,6 +243,7 @@ impl Handler {

async fn new_authed_session(
&self,
sess: &Session,
resolver: ThreadSafeDNSResolver,
) -> anyhow::Result<(Connection, SendRequest<OpenStreams, Bytes>)> {
// Everytime we enstablish a new session, we should lookup the server
Expand All @@ -239,55 +261,75 @@ impl Handler {

// Here maybe we should use a AsyncUdpSocket which implement salamander obfs
// and port hopping
let mut ep = if self.opts.salamander.is_some() || self.opts.ports.is_some() {
debug!("Hysteria2 use salamander obfs");

let mut ep = if self.opts.salamander.is_some() {
// let udp = salamander::Salamander::new(
// udp_socket,
// self.opts.salamander.as_ref().map(|s| s.as_bytes().to_vec()),
// self.opts.ports.clone(),
// )?;

let port_gen = self.opts.ports.as_ref().unwrap().clone();
let udp_hop =
udp_hop::UdpHop::new(server_socket_addr.port(), port_gen, None)?;
unimplemented!("salamander obfs is not implemented yet");
} else if let Some(port_gen) = self.opts.ports.as_ref() {
let udp_hop = udp_hop::UdpHop::new(
server_socket_addr.port(),
port_gen.clone(),
None,
)?;
quinn::Endpoint::new_with_abstract_socket(
self.ep_config.clone(),
None,
Arc::new(udp_hop),
Arc::new(TokioRuntime),
)?
} else {
let udp = SocketAddr::from(([0, 0, 0, 0], 0));
// bind to port 0, so the OS will choose a random port for us
let udp_socket = std::net::UdpSocket::bind::<SocketAddr>(udp)?;
let socket = {
if resolver.ipv6() {
new_udp_socket(
Some((Ipv6Addr::UNSPECIFIED, 0).into()),
sess.iface.clone(),
#[cfg(any(target_os = "linux", target_os = "android"))]
sess.so_mark,
)
.await?
} else {
new_udp_socket(
Some((Ipv4Addr::UNSPECIFIED, 0).into()),
sess.iface.clone(),
#[cfg(any(target_os = "linux", target_os = "android"))]
sess.so_mark,
)
.await?
}
};

quinn::Endpoint::new(
self.ep_config.clone(),
None,
udp_socket,
socket.into_std()?,
Arc::new(TokioRuntime),
)?
};

ep.set_default_client_config(self.client_config.clone());

let session = ep
.connect(
server_socket_addr,
self.opts.sni.as_ref().map(|s| s.as_str()).unwrap_or(""),
)?
.connect(server_socket_addr, self.opts.sni.as_deref().unwrap_or(""))?
.await?;
let (h3_conn, _rx, udp) = Self::auth(&session, &self.opts.passwd).await?;
*self.support_udp.write().unwrap() = udp;
// todo set congestion controller according to cc_rx

let any = session
match session
.congestion_state()
.into_any()
.downcast::<DynController>()
.unwrap();
any.set_controller(Box::new(Burtal::new(0, session.clone())));
{
Ok(any) => {
any.set_controller(Box::new(Burtal::new(0, session.clone())));
}
Err(_) => {
trace!("congestion controller is not set");
}
}

anyhow::Ok((session, h3_conn))
}
Expand Down Expand Up @@ -383,8 +425,10 @@ impl OutboundHandler for Handler {
}) {
Some(s) => s.clone(),
None => {
let (session, h3_conn) =
self.new_authed_session(resolver).await.map_err(|e| {
let (session, h3_conn) = self
.new_authed_session(sess, resolver)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!(
Expand Down
Loading

0 comments on commit b4a5bac

Please sign in to comment.