diff --git a/clash/tests/data/config/hysteria2.yaml b/clash/tests/data/config/hysteria2.yaml index 9335440ca..aff9a6755 100644 --- a/clash/tests/data/config/hysteria2.yaml +++ b/clash/tests/data/config/hysteria2.yaml @@ -47,11 +47,11 @@ proxies: type: hysteria2 server: 127.0.0.1 port: 10086 - password: "passwd" + password: passwd sni: example.com skip-cert-verify: true - # obfs: salamander - # obfs-password: "obfs" + obfs: salamander + obfs-password: "passwd" rules: - MATCH, local diff --git a/clash_lib/src/proxy/converters/hysteria2.rs b/clash_lib/src/proxy/converters/hysteria2.rs index 69740fb37..cb587881a 100644 --- a/clash_lib/src/proxy/converters/hysteria2.rs +++ b/clash_lib/src/proxy/converters/hysteria2.rs @@ -7,9 +7,9 @@ use std::{ use rand::Rng; use crate::{ - config::internal::proxy::OutboundHysteria2, + config::internal::proxy::{Hysteria2Obfs, OutboundHysteria2}, proxy::{ - hysteria2::{Handler, HystOption}, + hysteria2::{self, Handler, HystOption, SalamanderObfs}, AnyOutboundHandler, }, session::SocksAddr, @@ -85,15 +85,22 @@ impl TryFrom for AnyOutboundHandler { fn try_from(value: OutboundHysteria2) -> Result { let addr = SocksAddr::try_from((value.server, value.port))?; - let obfs_passwd = match value.obfs { - Some(_) => value - .obfs_password - .ok_or(crate::Error::InvalidConfig( + + let obfs = match (value.obfs, value.obfs_password.as_ref()) { + (Some(obfs), Some(passwd)) => match obfs { + Hysteria2Obfs::Salamander => { + Some(hysteria2::Obfs::Salamander(SalamanderObfs { + key: passwd.to_owned().into(), + })) + } + }, + (Some(_), None) => { + return Err(crate::Error::InvalidConfig( "hysteria2 found obfs enable, but obfs password is none" .to_owned(), - ))? - .into(), - None => None, + )) + } + _ => None, }; let ports_gen = if let Some(ports) = value.ports { @@ -120,7 +127,7 @@ impl TryFrom for AnyOutboundHandler { skip_cert_verify: value.skip_cert_verify, passwd: value.password, ports: ports_gen, - salamander: obfs_passwd, + obfs, up_down: value.up.zip(value.down), ca_str: value.ca_str, cwnd: value.cwnd, diff --git a/clash_lib/src/proxy/hysteria2/congestion.rs b/clash_lib/src/proxy/hysteria2/congestion.rs index 6c6d1dc57..a03527569 100644 --- a/clash_lib/src/proxy/hysteria2/congestion.rs +++ b/clash_lib/src/proxy/hysteria2/congestion.rs @@ -206,8 +206,6 @@ impl DynController { } } -unsafe impl Send for DynController {} - impl Controller for DynController { fn initial_window(&self) -> u64 { self.0.read().unwrap().initial_window() diff --git a/clash_lib/src/proxy/hysteria2/mod.rs b/clash_lib/src/proxy/hysteria2/mod.rs index 0cdb0d347..7288b6bcd 100644 --- a/clash_lib/src/proxy/hysteria2/mod.rs +++ b/clash_lib/src/proxy/hysteria2/mod.rs @@ -42,6 +42,7 @@ use crate::{ dns::ThreadSafeDNSResolver, }, common::{ + errors::new_io_error, tls::GLOBAL_ROOT_STORE, utils::{encode_hex, sha256}, }, @@ -60,6 +61,16 @@ use super::{ DialWithConnector, OutboundHandler, OutboundType, }; +#[derive(Clone)] +pub struct SalamanderObfs { + pub key: Vec, +} + +#[derive(Clone)] +pub enum Obfs { + Salamander(SalamanderObfs), +} + #[derive(Clone)] pub struct HystOption { pub name: String, @@ -67,7 +78,7 @@ pub struct HystOption { pub ports: Option, pub sni: Option, pub passwd: String, - pub salamander: Option, + pub obfs: Option, pub skip_cert_verify: bool, pub alpn: Vec, #[allow(dead_code)] @@ -181,11 +192,8 @@ pub struct Handler { ep_config: quinn::EndpointConfig, client_config: quinn::ClientConfig, session: Mutex>>, - // h3_conn is a copy of session, because we need h3 crate to send request, but - // this crate have not a method to into_inner, we have to keep is - // maybe future version of h3 crate will have a method to into_inner, or we send - // h3 request manually, it is too complex - h3_conn: Mutex>>, + // a send request guard to keep the connection alive + guard: Mutex>>, // support udp is decided by server support_udp: RwLock, } @@ -232,11 +240,11 @@ impl Handler { let ep_config = quinn::EndpointConfig::default(); Ok(Self { - opts: opts.clone(), + opts, ep_config, client_config, session: Mutex::new(None), - h3_conn: Mutex::new(None), + guard: Mutex::new(None), support_udp: RwLock::new(true), }) } @@ -261,13 +269,43 @@ impl Handler { // Here maybe we should use a AsyncUdpSocket which implement salamander obfs // and port hopping - let mut ep = if self.opts.salamander.is_some() { - // let udp = salamander::Salamander::new( - // udp_socket, - // self.opts.salamander.as_ref().map(|s| s.as_bytes().to_vec()), - // self.opts.ports.clone(), - // )?; - unimplemented!("salamander obfs is not implemented yet"); + let create_socket = || async { + if resolver.ipv6() { + new_udp_socket( + Some((Ipv6Addr::UNSPECIFIED, 0).into()), + sess.iface.clone(), + #[cfg(any(target_os = "linux", target_os = "android"))] + sess.so_mark, + ) + .await + } else { + new_udp_socket( + Some((Ipv4Addr::UNSPECIFIED, 0).into()), + sess.iface.clone(), + #[cfg(any(target_os = "linux", target_os = "android"))] + sess.so_mark, + ) + .await + } + }; + + let mut ep = if let Some(obfs) = self.opts.obfs.as_ref() { + match obfs { + Obfs::Salamander(salamander_obfs) => { + let socket = create_socket().await?; + let obfs = salamander::Salamander::new( + socket.into_std()?, + salamander_obfs.key.to_vec(), + )?; + + quinn::Endpoint::new_with_abstract_socket( + self.ep_config.clone(), + None, + Arc::new(obfs), + Arc::new(TokioRuntime), + )? + } + } } else if let Some(port_gen) = self.opts.ports.as_ref() { let udp_hop = udp_hop::UdpHop::new( server_socket_addr.port(), @@ -314,7 +352,7 @@ impl Handler { let session = ep .connect(server_socket_addr, self.opts.sni.as_deref().unwrap_or(""))? .await?; - let (h3_conn, _rx, udp) = Self::auth(&session, &self.opts.passwd).await?; + let (guard, _rx, udp) = Self::auth(&session, &self.opts.passwd).await?; *self.support_udp.write().unwrap() = udp; // todo set congestion controller according to cc_rx @@ -331,7 +369,7 @@ impl Handler { } } - anyhow::Ok((session, h3_conn)) + Ok((session, guard)) } async fn auth( @@ -350,6 +388,7 @@ impl Handler { .body(()) .unwrap(); let mut r = sender.send_request(req).await?; + r.finish().await?; let r = r.recv_response().await?; @@ -374,7 +413,7 @@ impl Handler { .to_str()? .parse()?; - anyhow::Ok((sender, cc_rx, support_udp)) + Ok((sender, cc_rx, support_udp)) } } @@ -404,7 +443,7 @@ impl OutboundHandler for Handler { _sess: &Session, _resolver: ThreadSafeDNSResolver, ) -> std::io::Result { - todo!() + Err(new_io_error("hysteria2 udp is not implemented yet")) } async fn connect_stream( @@ -425,7 +464,7 @@ impl OutboundHandler for Handler { }) { Some(s) => s.clone(), None => { - let (session, h3_conn) = self + let (session, guard) = self .new_authed_session(sess, resolver) .await .map_err(|e| { @@ -439,7 +478,7 @@ impl OutboundHandler for Handler { })?; let session = Arc::new(session); *session_lock = Some(session.clone()); - *self.h3_conn.lock().await = Some(h3_conn); + *self.guard.lock().await = Some(guard); session } } diff --git a/clash_lib/src/proxy/hysteria2/salamander.rs b/clash_lib/src/proxy/hysteria2/salamander.rs index 2c122bf98..d004ea6b1 100644 --- a/clash_lib/src/proxy/hysteria2/salamander.rs +++ b/clash_lib/src/proxy/hysteria2/salamander.rs @@ -26,7 +26,6 @@ impl SalamanderObfs { /// /// new() should init a blake2b256 hasher with key to reduce calculation, /// but rust-analyzer can't recognize its type - #[allow(dead_code)] pub fn new(key: Vec) -> Self { Self { key } } @@ -68,7 +67,6 @@ pub struct Salamander { } impl Salamander { - #[allow(dead_code)] pub fn new(socket: std::net::UdpSocket, key: Vec) -> std::io::Result { use quinn::Runtime; let inner = TokioRuntime.wrap_udp_socket(socket)?;