From 685ae2df988e985cfef4242cfc19dbe3d9cf770b Mon Sep 17 00:00:00 2001 From: Yuwei Ba Date: Fri, 10 May 2024 15:11:11 +1000 Subject: [PATCH] feat: relay refactor - support UDP relay (UoT only) (#374) * wip * add proxy connector * fmt * wip * wip * wip * wip * apply patch from #213 * clippy * t * t --- clash/tests/data/config/rules.yaml | 56 ++++- clash/tests/data/config/uot.yaml | 37 ++++ .../src/app/dispatcher/dispatcher_impl.rs | 9 +- clash_lib/src/app/outbound/manager.rs | 2 +- clash_lib/src/app/router/mod.rs | 2 +- clash_lib/src/app/router/rules/geoip.rs | 2 +- clash_lib/src/config/internal/config.rs | 6 +- clash_lib/src/proxy/converters/vmess.rs | 6 +- clash_lib/src/proxy/direct/mod.rs | 66 ++++-- clash_lib/src/proxy/fallback/mod.rs | 34 +-- clash_lib/src/proxy/loadbalance/mod.rs | 34 +-- clash_lib/src/proxy/mocks.rs | 18 +- clash_lib/src/proxy/mod.rs | 73 +++++-- clash_lib/src/proxy/reject/mod.rs | 23 +-- clash_lib/src/proxy/relay/mod.rs | 193 +++++++++++++----- clash_lib/src/proxy/selector/mod.rs | 42 ++-- clash_lib/src/proxy/shadowsocks/mod.rs | 128 +++++++----- clash_lib/src/proxy/socks/inbound/stream.rs | 2 +- clash_lib/src/proxy/tor/mod.rs | 21 +- clash_lib/src/proxy/transport/ws/websocket.rs | 1 + clash_lib/src/proxy/trojan/mod.rs | 78 +++++-- clash_lib/src/proxy/tuic/mod.rs | 24 +-- clash_lib/src/proxy/urltest/mod.rs | 56 +++-- clash_lib/src/proxy/utils/mod.rs | 3 + clash_lib/src/proxy/utils/proxy_connector.rs | 166 +++++++++++++++ clash_lib/src/proxy/utils/socket_helpers.rs | 8 +- clash_lib/src/proxy/vmess/mod.rs | 72 +++++-- .../src/proxy/vmess/vmess_impl/datagram.rs | 8 +- .../src/proxy/vmess/vmess_impl/header.rs | 2 +- clash_lib/src/proxy/wg/mod.rs | 22 +- clash_lib/src/session.rs | 2 + 31 files changed, 859 insertions(+), 337 deletions(-) create mode 100644 clash/tests/data/config/uot.yaml create mode 100644 clash_lib/src/proxy/utils/proxy_connector.rs diff --git a/clash/tests/data/config/rules.yaml b/clash/tests/data/config/rules.yaml index cba74e195..f5ea5e55c 100644 --- a/clash/tests/data/config/rules.yaml +++ b/clash/tests/data/config/rules.yaml @@ -65,11 +65,32 @@ proxy-groups: proxies: - "plain-vmess" - "ws-vmess" + - "ss-simple" + - "trojan" - "auto" - "fallback-auto" - "load-balance" - "select" + - "wg" - DIRECT + + - name: "udp-relay" + type: relay + proxies: + # - "plain-vmess" + - "ws-vmess" + # - "h2-vmess" + # - "tls-vmess" + # - "grpc-vmess" + # - "ss-simple" + # - "trojan" + # - "auto" + # - "fallback-auto" + # - "load-balance" + # - "select" + # - "wg" + # - DIRECT + - name: "relay-one" type: relay @@ -79,7 +100,7 @@ proxy-groups: - name: "auto" type: url-test use: - - "file-provider" + - "file-provider-uot" proxies: - DIRECT url: "http://www.gstatic.com/generate_204" @@ -88,7 +109,7 @@ proxy-groups: - name: "fallback-auto" type: fallback use: - - "file-provider" + - "file-provider-uot" proxies: - DIRECT url: "http://www.gstatic.com/generate_204" @@ -97,7 +118,7 @@ proxy-groups: - name: "load-balance" type: load-balance use: - - "file-provider" + - "file-provider-uot" proxies: - DIRECT strategy: round-robin @@ -107,7 +128,7 @@ proxy-groups: - name: select type: select use: - - "file-provider" + - "file-provider-uot" - name: test 🌏 type: select @@ -215,6 +236,20 @@ proxies: grpc-opts: grpc-service-name: def + - name: "wg" + type: wireguard + server: engage.cloudflareclient.com + port: 2408 + private-key: uIwDn4c7656E/1pHkJu23ZOe/4SuCnL+vL+jE2s4MHE= + ip: 172.16.0.2/32 + ipv6: 2606:4700:110:8e5e:fa1:3f30:c077:e17c/128 + public-key: bmXOC+F1FxEMF9dyiK2H5/1SUtzH0JuVo51h2wPfgyo= + allowed-ips: ['0.0.0.0/0', '::/0'] + remote-dns-resolve: true + dns: + - 1.1.1.1 + udp: true + proxy-providers: file-provider: type: file @@ -225,6 +260,15 @@ proxy-providers: url: http://www.gstatic.com/generate_204 interval: 300 + file-provider-uot: + type: file + path: ./uot.yaml + interval: 300 + health-check: + enable: true + url: http://www.gstatic.com/generate_204 + interval: 300 + rule-providers: file-provider: type: file @@ -233,7 +277,7 @@ rule-providers: behavior: domain rules: - - DOMAIN,google.com,relay + - DOMAIN,google.com,ws-vmess - DOMAIN-KEYWORD,httpbin,trojan-grpc - DOMAIN,ipinfo.io,trojan-grpc # - RULE-SET,file-provider,trojan @@ -244,7 +288,7 @@ rules: - SRC-IP-CIDR,192.168.1.1/24,DIRECT - GEOIP,CN,DIRECT - IP-CIDR,10.0.0.11/32,select - - DST-PORT,53,trojan + - DST-PORT,53,ws-vmess - SRC-PORT,7777,DIRECT - MATCH, DIRECT ... diff --git a/clash/tests/data/config/uot.yaml b/clash/tests/data/config/uot.yaml new file mode 100644 index 000000000..6ef430319 --- /dev/null +++ b/clash/tests/data/config/uot.yaml @@ -0,0 +1,37 @@ +proxies: + - name: plain-vmess + type: vmess + server: 10.0.0.13 + port: 16823 + uuid: b831381d-6324-4d53-ad4f-8cda48b30811 + alterId: 0 + cipher: auto + udp: true + skip-cert-verify: true + + - name: ws-vmess + type: vmess + server: 10.0.0.13 + port: 16824 + uuid: b831381d-6324-4d53-ad4f-8cda48b30811 + alterId: 0 + cipher: auto + udp: true + skip-cert-verify: true + network: ws + ws-opts: + path: /api/v3/download.getFile + headers: + Host: www.amazon.com + + - name: "trojan" + type: trojan + server: 10.0.0.13 + port: 9443 + password: password1 + udp: true + # sni: example.com # aka server name + alpn: + - h2 + - http/1.1 + skip-cert-verify: true diff --git a/clash_lib/src/app/dispatcher/dispatcher_impl.rs b/clash_lib/src/app/dispatcher/dispatcher_impl.rs index 78427a63c..edb4124f2 100644 --- a/clash_lib/src/app/dispatcher/dispatcher_impl.rs +++ b/clash_lib/src/app/dispatcher/dispatcher_impl.rs @@ -74,7 +74,7 @@ impl Dispatcher { *self.mode.lock().unwrap() } - #[instrument(skip(lhs))] + #[instrument(skip(self, sess, lhs))] pub async fn dispatch_stream(&self, sess: Session, mut lhs: S) where S: AsyncRead + AsyncWrite + Unpin + Send, @@ -124,11 +124,7 @@ impl Dispatcher { match handler .connect_stream(&sess, self.resolver.clone()) - .instrument(info_span!( - "connect_stream", - outbound_name = outbound_name, - session = %sess, - )) + .instrument(info_span!("connect_stream", outbound_name = outbound_name,)) .await { Ok(rhs) => { @@ -145,7 +141,6 @@ impl Dispatcher { .instrument(info_span!( "copy_bidirectional", outbound_name = outbound_name, - session = %sess, )) .await { diff --git a/clash_lib/src/app/outbound/manager.rs b/clash_lib/src/app/outbound/manager.rs index 14fa49a50..86990fc21 100644 --- a/clash_lib/src/app/outbound/manager.rs +++ b/clash_lib/src/app/outbound/manager.rs @@ -240,7 +240,7 @@ impl OutboundManager { ) -> Result { if name == PROXY_DIRECT || name == PROXY_REJECT { return Err(Error::InvalidConfig(format!( - "proxy group {} is reserved", + "proxy group name `{}` is reserved", name ))); } diff --git a/clash_lib/src/app/router/mod.rs b/clash_lib/src/app/router/mod.rs index efc51bd36..9fe89a8a7 100644 --- a/clash_lib/src/app/router/mod.rs +++ b/clash_lib/src/app/router/mod.rs @@ -79,7 +79,7 @@ impl Router { for r in self.rules.iter() { if sess.destination.is_domain() && r.should_resolve_ip() && !sess_resolved { debug!( - "rule {r} local resolving domain {}", + "rule `{r}` resolving domain {} locally", sess.destination.domain().unwrap() ); if let Ok(Some(ip)) = self diff --git a/clash_lib/src/app/router/rules/geoip.rs b/clash_lib/src/app/router/rules/geoip.rs index a69adfe12..86752ece9 100644 --- a/clash_lib/src/app/router/rules/geoip.rs +++ b/clash_lib/src/app/router/rules/geoip.rs @@ -16,7 +16,7 @@ pub struct GeoIP { impl std::fmt::Display for GeoIP { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{} geoip {}", self.target, self.country_code) + write!(f, "GeoIP({} - {})", self.target, self.country_code) } } diff --git a/clash_lib/src/config/internal/config.rs b/clash_lib/src/config/internal/config.rs index 5e159682d..6cd527d28 100644 --- a/clash_lib/src/config/internal/config.rs +++ b/clash_lib/src/config/internal/config.rs @@ -111,7 +111,8 @@ impl TryFrom for Config { .rule_provider .map(|m| { m.into_iter() - .try_fold(HashMap::new(), |mut rv, (name, body)| { + .try_fold(HashMap::new(), |mut rv, (name, mut body)| { + body.insert("name".to_owned(), serde_yaml::Value::String(name.clone())); let provider = RuleProviderDef::try_from(body).map_err(|x| { Error::InvalidConfig(format!( "invalid rule provider {}: {}", @@ -186,7 +187,8 @@ impl TryFrom for Config { .proxy_provider .map(|m| { m.into_iter() - .try_fold(HashMap::new(), |mut rv, (name, body)| { + .try_fold(HashMap::new(), |mut rv, (name, mut body)| { + body.insert("name".to_owned(), serde_yaml::Value::String(name.clone())); let provider = OutboundProxyProviderDef::try_from(body).map_err(|x| { Error::InvalidConfig(format!( diff --git a/clash_lib/src/proxy/converters/vmess.rs b/clash_lib/src/proxy/converters/vmess.rs index e04606e97..7a49565dd 100644 --- a/clash_lib/src/proxy/converters/vmess.rs +++ b/clash_lib/src/proxy/converters/vmess.rs @@ -68,7 +68,11 @@ impl TryFrom<&OutboundVmess> for AnyOutboundHandler { .as_ref() .map(|x| { VmessTransport::H2(Http2Option { - host: x.host.as_ref().map(|x| x.to_owned()).unwrap_or_default(), + host: x + .host + .as_ref() + .map(|x| x.to_owned()) + .unwrap_or(vec![s.server.to_owned()]), path: x.path.as_ref().map(|x| x.to_owned()).unwrap_or_default(), }) }) diff --git a/clash_lib/src/proxy/direct/mod.rs b/clash_lib/src/proxy/direct/mod.rs index 8abf99ff1..1a813e013 100644 --- a/clash_lib/src/proxy/direct/mod.rs +++ b/clash_lib/src/proxy/direct/mod.rs @@ -6,14 +6,15 @@ use crate::app::dns::ThreadSafeDNSResolver; use crate::config::internal::proxy::PROXY_DIRECT; use crate::proxy::datagram::OutboundDatagramImpl; use crate::proxy::utils::{new_tcp_stream, new_udp_socket}; -use crate::proxy::{AnyOutboundHandler, AnyStream, OutboundHandler}; -use crate::session::{Session, SocksAddr}; +use crate::proxy::{AnyOutboundHandler, OutboundHandler}; +use crate::session::Session; use async_trait::async_trait; use serde::Serialize; use std::sync::Arc; -use super::OutboundType; +use super::utils::RemoteConnector; +use super::{ConnectorType, OutboundType}; #[derive(Serialize)] pub struct Handler; @@ -35,10 +36,6 @@ impl OutboundHandler for Handler { OutboundType::Direct } - async fn remote_addr(&self) -> Option { - None - } - async fn support_udp(&self) -> bool { true } @@ -63,15 +60,6 @@ impl OutboundHandler for Handler { Ok(Box::new(s)) } - async fn proxy_stream( - &self, - s: AnyStream, - #[allow(unused_variables)] sess: &Session, - #[allow(unused_variables)] _resolver: ThreadSafeDNSResolver, - ) -> std::io::Result { - Ok(s) - } - async fn connect_datagram( &self, sess: &Session, @@ -90,4 +78,50 @@ impl OutboundHandler for Handler { d.append_to_chain(self.name()).await; Ok(Box::new(d)) } + + async fn support_connector(&self) -> ConnectorType { + ConnectorType::Tcp + } + + async fn connect_stream_with_connector( + &self, + sess: &Session, + resolver: ThreadSafeDNSResolver, + connector: &dyn RemoteConnector, + ) -> std::io::Result { + let s = connector + .connect_stream( + resolver, + sess.destination.host().as_str(), + sess.destination.port(), + None, + #[cfg(any(target_os = "linux", target_os = "android"))] + None, + ) + .await?; + let s = ChainedStreamWrapper::new(s); + s.append_to_chain(self.name()).await; + Ok(Box::new(s)) + } + + async fn connect_datagram_with_connector( + &self, + sess: &Session, + resolver: ThreadSafeDNSResolver, + connector: &dyn RemoteConnector, + ) -> std::io::Result { + let d = connector + .connect_datagram( + resolver, + None, + &sess.destination, + sess.iface.as_ref(), + #[cfg(any(target_os = "linux", target_os = "android"))] + None, + ) + .await?; + let d = ChainedDatagramWrapper::new(d); + d.append_to_chain(self.name()).await; + Ok(Box::new(d)) + } } diff --git a/clash_lib/src/proxy/fallback/mod.rs b/clash_lib/src/proxy/fallback/mod.rs index df4dd72e9..07f10856f 100644 --- a/clash_lib/src/proxy/fallback/mod.rs +++ b/clash_lib/src/proxy/fallback/mod.rs @@ -11,12 +11,12 @@ use crate::{ providers::proxy_provider::ThreadSafeProxyProvider, ProxyManager, }, }, - session::{Session, SocksAddr}, + session::Session, }; use super::{ - utils::provider_helper::get_proxies_from_providers, AnyOutboundHandler, AnyStream, - CommonOption, OutboundHandler, OutboundType, + utils::{provider_helper::get_proxies_from_providers, RemoteConnector}, + AnyOutboundHandler, CommonOption, ConnectorType, OutboundHandler, OutboundType, }; #[derive(Default, Clone)] @@ -75,11 +75,6 @@ impl OutboundHandler for Handler { OutboundType::Fallback } - /// The proxy remote address - async fn remote_addr(&self) -> Option { - self.find_alive_proxy(false).await.remote_addr().await - } - /// whether the outbound handler support UDP async fn support_udp(&self) -> bool { self.opts.udp || self.find_alive_proxy(false).await.support_udp().await @@ -101,25 +96,30 @@ impl OutboundHandler for Handler { } } - /// wraps a stream with outbound handler - async fn proxy_stream( + /// connect to remote target via UDP + async fn connect_datagram( &self, - s: AnyStream, sess: &Session, resolver: ThreadSafeDNSResolver, - ) -> io::Result { + ) -> io::Result { let proxy = self.find_alive_proxy(true).await; - proxy.proxy_stream(s, sess, resolver).await + proxy.connect_datagram(sess, resolver).await } - /// connect to remote target via UDP - async fn connect_datagram( + async fn support_connector(&self) -> ConnectorType { + ConnectorType::Tcp + } + + async fn connect_stream_with_connector( &self, sess: &Session, resolver: ThreadSafeDNSResolver, - ) -> io::Result { + connector: &dyn RemoteConnector, + ) -> io::Result { let proxy = self.find_alive_proxy(true).await; - proxy.connect_datagram(sess, resolver).await + proxy + .connect_stream_with_connector(sess, resolver, connector) + .await } async fn as_map(&self) -> HashMap> { diff --git a/clash_lib/src/proxy/loadbalance/mod.rs b/clash_lib/src/proxy/loadbalance/mod.rs index fb65f7825..d4c2de435 100644 --- a/clash_lib/src/proxy/loadbalance/mod.rs +++ b/clash_lib/src/proxy/loadbalance/mod.rs @@ -13,14 +13,14 @@ use crate::{ remote_content_manager::providers::proxy_provider::ThreadSafeProxyProvider, }, config::internal::proxy::LoadBalanceStrategy, - session::{Session, SocksAddr}, + session::Session, }; use self::helpers::{strategy_consistent_hashring, strategy_rr, StrategyFn}; use super::{ - utils::provider_helper::get_proxies_from_providers, AnyOutboundHandler, AnyStream, - CommonOption, OutboundHandler, OutboundType, + utils::{provider_helper::get_proxies_from_providers, RemoteConnector}, + AnyOutboundHandler, CommonOption, ConnectorType, OutboundHandler, OutboundType, }; #[derive(Default, Clone)] @@ -76,11 +76,6 @@ impl OutboundHandler for Handler { OutboundType::LoadBalance } - /// The proxy remote address - async fn remote_addr(&self) -> Option { - None - } - /// whether the outbound handler support UDP async fn support_udp(&self) -> bool { self.opts.udp @@ -104,29 +99,34 @@ impl OutboundHandler for Handler { } } - /// wraps a stream with outbound handler - async fn proxy_stream( + /// connect to remote target via UDP + async fn connect_datagram( &self, - s: AnyStream, sess: &Session, resolver: ThreadSafeDNSResolver, - ) -> io::Result { + ) -> io::Result { let proxies = self.get_proxies(false).await; let proxy = (self.inner.lock().await.strategy_fn)(proxies, sess).await?; debug!("{} use proxy {}", self.name(), proxy.name()); - proxy.proxy_stream(s, sess, resolver).await + proxy.connect_datagram(sess, resolver).await } - /// connect to remote target via UDP - async fn connect_datagram( + async fn support_connector(&self) -> ConnectorType { + ConnectorType::Tcp + } + + async fn connect_stream_with_connector( &self, sess: &Session, resolver: ThreadSafeDNSResolver, - ) -> io::Result { + connector: &dyn RemoteConnector, + ) -> io::Result { let proxies = self.get_proxies(false).await; let proxy = (self.inner.lock().await.strategy_fn)(proxies, sess).await?; debug!("{} use proxy {}", self.name(), proxy.name()); - proxy.connect_datagram(sess, resolver).await + proxy + .connect_stream_with_connector(sess, resolver, connector) + .await } async fn as_map(&self) -> HashMap> { diff --git a/clash_lib/src/proxy/mocks.rs b/clash_lib/src/proxy/mocks.rs index b0a6404a5..875f03f85 100644 --- a/clash_lib/src/proxy/mocks.rs +++ b/clash_lib/src/proxy/mocks.rs @@ -11,10 +11,10 @@ use crate::{ proxy_provider::ProxyProvider, Provider, ProviderType, ProviderVehicleType, }, }, - session::{Session, SocksAddr}, + session::Session, }; -use super::{AnyOutboundHandler, AnyStream, OutboundHandler, OutboundType}; +use super::{AnyOutboundHandler, OutboundHandler, OutboundType}; mock! { pub DummyProxyProvider {} @@ -51,9 +51,6 @@ mock! { /// only contains Type information, do not rely on the underlying value fn proto(&self) -> OutboundType; - /// The proxy remote address - async fn remote_addr(&self) -> Option; - /// whether the outbound handler support UDP async fn support_udp(&self) -> bool; @@ -64,13 +61,6 @@ mock! { resolver: ThreadSafeDNSResolver, ) -> io::Result; - /// wraps a stream with outbound handler - async fn proxy_stream( - &self, - s: AnyStream, - sess: &Session, - resolver: ThreadSafeDNSResolver, - ) -> io::Result; /// connect to remote target via UDP async fn connect_datagram( @@ -79,7 +69,7 @@ mock! { resolver: ThreadSafeDNSResolver, ) -> io::Result; - /// for API - async fn as_map(&self) -> HashMap>; + /// relay related + async fn support_connector(&self) -> crate::proxy::ConnectorType; } } diff --git a/clash_lib/src/proxy/mod.rs b/clash_lib/src/proxy/mod.rs index 96c76b826..16ec0ae41 100644 --- a/clash_lib/src/proxy/mod.rs +++ b/clash_lib/src/proxy/mod.rs @@ -2,20 +2,23 @@ use crate::app::dispatcher::{BoxedChainedDatagram, BoxedChainedStream}; use crate::app::dns::ThreadSafeDNSResolver; use crate::proxy::datagram::UdpPacket; use crate::proxy::utils::Interface; -use crate::session::{Session, SocksAddr}; +use crate::session::Session; use async_trait::async_trait; use erased_serde::Serialize as ESerialize; use futures::{Sink, Stream}; use serde::{Deserialize, Serialize}; +use tracing::error; use std::collections::HashMap; -use std::fmt::Debug; +use std::fmt::{Debug, Display}; use std::io; use std::sync::Arc; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; +use self::utils::RemoteConnector; + pub mod direct; pub mod reject; @@ -124,6 +127,33 @@ pub enum OutboundType { Reject, } +impl Display for OutboundType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + OutboundType::Shadowsocks => write!(f, "Shadowsocks"), + OutboundType::Vmess => write!(f, "Vmess"), + OutboundType::Trojan => write!(f, "Trojan"), + OutboundType::WireGuard => write!(f, "WireGuard"), + OutboundType::Tor => write!(f, "Tor"), + OutboundType::Tuic => write!(f, "Tuic"), + OutboundType::UrlTest => write!(f, "URLTest"), + OutboundType::Selector => write!(f, "Selector"), + OutboundType::Relay => write!(f, "Relay"), + OutboundType::LoadBalance => write!(f, "LoadBalance"), + OutboundType::Fallback => write!(f, "Fallback"), + OutboundType::Direct => write!(f, "Direct"), + OutboundType::Reject => write!(f, "Reject"), + } + } +} + +pub enum ConnectorType { + Tcp, + Udp, + All, + None, +} + #[async_trait] pub trait OutboundHandler: Sync + Send + Unpin { /// The name of the outbound handler @@ -133,9 +163,6 @@ pub trait OutboundHandler: Sync + Send + Unpin { /// only contains Type information, do not rely on the underlying value fn proto(&self) -> OutboundType; - /// The proxy remote address - async fn remote_addr(&self) -> Option; - /// whether the outbound handler support UDP async fn support_udp(&self) -> bool; @@ -146,14 +173,6 @@ pub trait OutboundHandler: Sync + Send + Unpin { resolver: ThreadSafeDNSResolver, ) -> io::Result; - /// wraps a stream with outbound handler - async fn proxy_stream( - &self, - s: AnyStream, - sess: &Session, - resolver: ThreadSafeDNSResolver, - ) -> io::Result; - /// connect to remote target via UDP async fn connect_datagram( &self, @@ -161,6 +180,34 @@ pub trait OutboundHandler: Sync + Send + Unpin { resolver: ThreadSafeDNSResolver, ) -> io::Result; + /// relay related + async fn support_connector(&self) -> ConnectorType; + + async fn connect_stream_with_connector( + &self, + _sess: &Session, + _resolver: ThreadSafeDNSResolver, + _connector: &dyn RemoteConnector, + ) -> io::Result { + error!("tcp relay not supported for {}", self.proto()); + Err(io::Error::new( + io::ErrorKind::Other, + format!("tcp relay not supported for {}", self.proto()), + )) + } + + async fn connect_datagram_with_connector( + &self, + _sess: &Session, + _resolver: ThreadSafeDNSResolver, + _connector: &dyn RemoteConnector, + ) -> io::Result { + Err(io::Error::new( + io::ErrorKind::Other, + format!("udp relay not supported for {}", self.proto()), + )) + } + /// for API /// the map only contains basic information /// to populate history/liveness information, use the proxy_manager diff --git a/clash_lib/src/proxy/reject/mod.rs b/clash_lib/src/proxy/reject/mod.rs index 1b914cbb1..bb4b23978 100644 --- a/clash_lib/src/proxy/reject/mod.rs +++ b/clash_lib/src/proxy/reject/mod.rs @@ -1,14 +1,14 @@ use crate::app::dispatcher::{BoxedChainedDatagram, BoxedChainedStream}; use crate::app::dns::ThreadSafeDNSResolver; use crate::config::internal::proxy::PROXY_REJECT; -use crate::proxy::{AnyOutboundHandler, AnyStream, OutboundHandler}; -use crate::session::{Session, SocksAddr}; +use crate::proxy::{AnyOutboundHandler, OutboundHandler}; +use crate::session::Session; use async_trait::async_trait; use serde::Serialize; use std::io; use std::sync::Arc; -use super::OutboundType; +use super::{ConnectorType, OutboundType}; #[derive(Serialize)] pub struct Handler; @@ -31,10 +31,6 @@ impl OutboundHandler for Handler { OutboundType::Reject } - async fn remote_addr(&self) -> Option { - None - } - async fn support_udp(&self) -> bool { false } @@ -47,15 +43,6 @@ impl OutboundHandler for Handler { Err(io::Error::new(io::ErrorKind::Other, "REJECT")) } - async fn proxy_stream( - &self, - _s: AnyStream, - #[allow(unused_variables)] sess: &Session, - #[allow(unused_variables)] _resolver: ThreadSafeDNSResolver, - ) -> std::io::Result { - Err(io::Error::new(io::ErrorKind::Other, "REJECT")) - } - async fn connect_datagram( &self, #[allow(unused_variables)] sess: &Session, @@ -63,4 +50,8 @@ impl OutboundHandler for Handler { ) -> io::Result { Err(io::Error::new(io::ErrorKind::Other, "REJECT")) } + + async fn support_connector(&self) -> ConnectorType { + ConnectorType::All + } } diff --git a/clash_lib/src/proxy/relay/mod.rs b/clash_lib/src/proxy/relay/mod.rs index 39bb69539..e304997cd 100644 --- a/clash_lib/src/proxy/relay/mod.rs +++ b/clash_lib/src/proxy/relay/mod.rs @@ -3,23 +3,27 @@ use std::{collections::HashMap, io, sync::Arc}; use async_trait::async_trait; use erased_serde::Serialize; use futures::stream::{self, StreamExt}; +use tracing::debug; use crate::{ app::{ dispatcher::{ - BoxedChainedDatagram, BoxedChainedStream, ChainedStream, ChainedStreamWrapper, + BoxedChainedDatagram, BoxedChainedStream, ChainedDatagram, ChainedDatagramWrapper, + ChainedStream, ChainedStreamWrapper, }, dns::ThreadSafeDNSResolver, remote_content_manager::providers::proxy_provider::ThreadSafeProxyProvider, }, common::errors::new_io_error, - proxy::utils::new_tcp_stream, - session::{Session, SocksAddr}, + session::Session, }; use super::{ - utils::provider_helper::get_proxies_from_providers, AnyOutboundHandler, AnyStream, - CommonOption, OutboundHandler, OutboundType, + utils::{ + provider_helper::get_proxies_from_providers, DirectConnector, ProxyConnector, + RemoteConnector, + }, + AnyOutboundHandler, CommonOption, ConnectorType, OutboundHandler, OutboundType, }; #[derive(Default)] @@ -57,10 +61,6 @@ impl OutboundHandler for Handler { OutboundType::Relay } - async fn remote_addr(&self) -> Option { - None - } - async fn support_udp(&self) -> bool { false } @@ -70,44 +70,29 @@ impl OutboundHandler for Handler { sess: &Session, resolver: ThreadSafeDNSResolver, ) -> io::Result { - let proxies: Vec = stream::iter(self.get_proxies(true).await) - .filter_map(|x| async { x.remote_addr().await.map(|_| x) }) - .collect() - .await; + let proxies: Vec = + stream::iter(self.get_proxies(true).await).collect().await; match proxies.len() { 0 => Err(new_io_error("no proxy available")), 1 => { let proxy = proxies[0].clone(); + debug!("tcp relay `{}` via proxy `{}`", self.name(), proxy.name()); proxy.connect_stream(sess, resolver).await } _ => { - let mut first = proxies[0].clone(); - let last = proxies[proxies.len() - 1].clone(); - - let remote_addr = first.remote_addr().await.unwrap(); - - let mut s = new_tcp_stream( - resolver.clone(), - remote_addr.host().as_str(), - remote_addr.port(), - None, - #[cfg(any(target_os = "linux", target_os = "android"))] - None, - ) - .await?; - - let mut next_sess = sess.clone(); - for proxy in proxies.iter().skip(1) { - let proxy = proxy.clone(); - next_sess.destination = - proxy.remote_addr().await.expect("must have remote addr"); - s = first.proxy_stream(s, &next_sess, resolver.clone()).await?; - - first = proxy; + let mut connector: Box = Box::new(DirectConnector::new()); + let (proxies, last) = proxies.split_at(proxies.len() - 1); + for proxy in proxies { + debug!("tcp relay `{}` via proxy `{}`", self.name(), proxy.name()); + connector = Box::new(ProxyConnector::new(proxy.clone(), connector)); } - s = last.proxy_stream(s, sess, resolver).await?; + debug!("relay `{}` via proxy `{}`", self.name(), last[0].name()); + let s = last[0] + .connect_stream_with_connector(sess, resolver, connector.as_ref()) + .await?; + let chained = ChainedStreamWrapper::new(s); chained.append_to_chain(self.name()).await; Ok(Box::new(chained)) @@ -115,21 +100,41 @@ impl OutboundHandler for Handler { } } - async fn proxy_stream( - &self, - #[allow(unused_variables)] _s: AnyStream, - #[allow(unused_variables)] sess: &Session, - #[allow(unused_variables)] _resolver: ThreadSafeDNSResolver, - ) -> std::io::Result { - Err(new_io_error("not implemented for Relay")) - } - async fn connect_datagram( &self, - _sess: &Session, - _resolver: ThreadSafeDNSResolver, + sess: &Session, + resolver: ThreadSafeDNSResolver, ) -> io::Result { - Err(new_io_error("not implemented for Relay")) + let proxies: Vec = + stream::iter(self.get_proxies(true).await).collect().await; + + match proxies.len() { + 0 => Err(new_io_error("no proxy available")), + 1 => { + let proxy = proxies[0].clone(); + debug!("udp relay `{}` via proxy `{}`", self.name(), proxy.name()); + proxy.connect_datagram(sess, resolver).await + } + _ => { + let mut connector: Box = Box::new(DirectConnector::new()); + let (proxies, last) = proxies.split_at(proxies.len() - 1); + for proxy in proxies { + debug!("udp relay `{}` via proxy `{}`", self.name(), proxy.name()); + connector = Box::new(ProxyConnector::new(proxy.clone(), connector)); + } + let d = last[0] + .connect_datagram_with_connector(sess, resolver, connector.as_ref()) + .await?; + + let chained = ChainedDatagramWrapper::new(d); + chained.append_to_chain(self.name()).await; + Ok(Box::new(chained)) + } + } + } + + async fn support_connector(&self) -> ConnectorType { + ConnectorType::None } async fn as_map(&self) -> HashMap> { @@ -145,3 +150,93 @@ impl OutboundHandler for Handler { m } } + +#[cfg(all(test, not(ci)))] +mod tests { + + use tokio::sync::RwLock; + + use crate::proxy::mocks::MockDummyProxyProvider; + use crate::proxy::utils::test_utils::{consts::*, docker_runner::DockerTestRunner}; + use crate::proxy::utils::test_utils::{ + docker_runner::DockerTestRunnerBuilder, run_default_test_suites_and_cleanup, + }; + + use super::*; + + const PASSWORD: &str = "FzcLbKs2dY9mhL"; + const CIPHER: &str = "aes-256-gcm"; + + async fn get_ss_runner(port: u16) -> anyhow::Result { + let host = format!("0.0.0.0:{}", port); + DockerTestRunnerBuilder::new() + .image(IMAGE_SS_RUST) + .entrypoint(&["ssserver"]) + .cmd(&["-s", &host, "-m", CIPHER, "-k", PASSWORD, "-U"]) + .build() + .await + } + + #[tokio::test] + #[serial_test::serial] + async fn test_relay_1_tcp() -> anyhow::Result<()> { + let ss_opts = crate::proxy::shadowsocks::HandlerOptions { + name: "test-ss".to_owned(), + common_opts: Default::default(), + server: LOCAL_ADDR.to_owned(), + port: 10002, + password: PASSWORD.to_owned(), + cipher: CIPHER.to_owned(), + plugin_opts: Default::default(), + udp: false, + }; + let port = ss_opts.port; + let ss_handler = crate::proxy::shadowsocks::Handler::new(ss_opts); + + let mut provider = MockDummyProxyProvider::new(); + + provider.expect_touch().returning(|| ()); + provider.expect_healthcheck().returning(|| ()); + + provider.expect_proxies().returning(move || { + let mut proxies = Vec::new(); + proxies.push(ss_handler.clone()); + proxies + }); + + let handler = Handler::new(Default::default(), vec![Arc::new(RwLock::new(provider))]); + run_default_test_suites_and_cleanup(handler, get_ss_runner(port).await?).await + } + + #[tokio::test] + #[serial_test::serial] + async fn test_relay_2_tcp() -> anyhow::Result<()> { + let ss_opts = crate::proxy::shadowsocks::HandlerOptions { + name: "test-ss".to_owned(), + common_opts: Default::default(), + server: LOCAL_ADDR.to_owned(), + port: 10002, + password: PASSWORD.to_owned(), + cipher: CIPHER.to_owned(), + plugin_opts: Default::default(), + udp: false, + }; + let port = ss_opts.port; + let ss_handler = crate::proxy::shadowsocks::Handler::new(ss_opts); + + let mut provider = MockDummyProxyProvider::new(); + + provider.expect_touch().returning(|| ()); + provider.expect_healthcheck().returning(|| ()); + + provider.expect_proxies().returning(move || { + let mut proxies = Vec::new(); + proxies.push(ss_handler.clone()); + proxies.push(ss_handler.clone()); + proxies + }); + + let handler = Handler::new(Default::default(), vec![Arc::new(RwLock::new(provider))]); + run_default_test_suites_and_cleanup(handler, get_ss_runner(port).await?).await + } +} diff --git a/clash_lib/src/proxy/selector/mod.rs b/clash_lib/src/proxy/selector/mod.rs index c7a541a37..c24b5b09c 100644 --- a/clash_lib/src/proxy/selector/mod.rs +++ b/clash_lib/src/proxy/selector/mod.rs @@ -11,13 +11,13 @@ use crate::{ dns::ThreadSafeDNSResolver, remote_content_manager::providers::proxy_provider::ThreadSafeProxyProvider, }, - session::{Session, SocksAddr}, + session::Session, Error, }; use super::{ - utils::provider_helper::get_proxies_from_providers, AnyOutboundHandler, AnyStream, - CommonOption, OutboundHandler, OutboundType, + utils::{provider_helper::get_proxies_from_providers, RemoteConnector}, + AnyOutboundHandler, CommonOption, ConnectorType, OutboundHandler, OutboundType, }; #[async_trait] @@ -108,10 +108,6 @@ impl OutboundHandler for Handler { OutboundType::Selector } - async fn remote_addr(&self) -> Option { - self.selected_proxy(false).await.remote_addr().await - } - async fn support_udp(&self) -> bool { self.opts.udp && self.selected_proxy(false).await.support_udp().await } @@ -136,26 +132,46 @@ impl OutboundHandler for Handler { } } - async fn proxy_stream( + async fn connect_datagram( &self, - s: AnyStream, sess: &Session, resolver: ThreadSafeDNSResolver, - ) -> io::Result { + ) -> io::Result { self.selected_proxy(true) .await - .proxy_stream(s, sess, resolver) + .connect_datagram(sess, resolver) .await } - async fn connect_datagram( + async fn support_connector(&self) -> ConnectorType { + ConnectorType::Tcp + } + + async fn connect_stream_with_connector( &self, sess: &Session, resolver: ThreadSafeDNSResolver, + connector: &dyn RemoteConnector, + ) -> io::Result { + let s = self + .selected_proxy(true) + .await + .connect_stream_with_connector(sess, resolver, connector) + .await?; + + s.append_to_chain(self.name()).await; + Ok(s) + } + + async fn connect_datagram_with_connector( + &self, + sess: &Session, + resolver: ThreadSafeDNSResolver, + connector: &dyn RemoteConnector, ) -> io::Result { self.selected_proxy(true) .await - .connect_datagram(sess, resolver) + .connect_datagram_with_connector(sess, resolver, connector) .await } diff --git a/clash_lib/src/proxy/shadowsocks/mod.rs b/clash_lib/src/proxy/shadowsocks/mod.rs index 86f14d083..c217759c2 100644 --- a/clash_lib/src/proxy/shadowsocks/mod.rs +++ b/clash_lib/src/proxy/shadowsocks/mod.rs @@ -20,7 +20,7 @@ use crate::{ dns::ThreadSafeDNSResolver, }, proxy::{CommonOption, OutboundHandler}, - session::{Session, SocksAddr}, + session::Session, Error, }; use std::{collections::HashMap, io, sync::Arc}; @@ -28,8 +28,8 @@ use std::{collections::HashMap, io, sync::Arc}; use self::{datagram::OutboundDatagramShadowsocks, stream::ShadowSocksStream}; use super::{ - utils::{new_tcp_stream, new_udp_socket}, - AnyOutboundHandler, AnyStream, OutboundType, + utils::{new_tcp_stream, new_udp_socket, RemoteConnector}, + AnyOutboundHandler, AnyStream, ConnectorType, OutboundType, }; pub enum SimpleOBFSMode { @@ -188,55 +188,6 @@ impl Handler { pub fn new(opts: HandlerOptions) -> AnyOutboundHandler { Arc::new(Self { opts }) } -} - -#[async_trait] -impl OutboundHandler for Handler { - fn name(&self) -> &str { - self.opts.name.as_str() - } - - fn proto(&self) -> OutboundType { - OutboundType::Shadowsocks - } - - async fn remote_addr(&self) -> Option { - Some(SocksAddr::Domain(self.opts.server.clone(), self.opts.port)) - } - - async fn support_udp(&self) -> bool { - self.opts.udp - } - - async fn connect_stream( - &self, - sess: &Session, - resolver: ThreadSafeDNSResolver, - ) -> io::Result { - let stream = new_tcp_stream( - resolver.clone(), - self.opts.server.as_str(), - self.opts.port, - self.opts.common_opts.iface.as_ref(), - #[cfg(any(target_os = "linux", target_os = "android"))] - None, - ) - .map_err(|x| { - io::Error::new( - io::ErrorKind::Other, - format!( - "dial outbound {}:{}: {}", - self.opts.server, self.opts.port, x - ), - ) - }) - .await?; - - let s = self.proxy_stream(stream, sess, resolver).await?; - let chained = ChainedStreamWrapper::new(s); - chained.append_to_chain(self.name()).await; - Ok(Box::new(chained)) - } async fn proxy_stream( &self, @@ -291,6 +242,51 @@ impl OutboundHandler for Handler { Ok(Box::new(ShadowSocksStream(stream))) } +} + +#[async_trait] +impl OutboundHandler for Handler { + fn name(&self) -> &str { + self.opts.name.as_str() + } + + fn proto(&self) -> OutboundType { + OutboundType::Shadowsocks + } + + async fn support_udp(&self) -> bool { + self.opts.udp + } + + async fn connect_stream( + &self, + sess: &Session, + resolver: ThreadSafeDNSResolver, + ) -> io::Result { + let stream = new_tcp_stream( + resolver.clone(), + self.opts.server.as_str(), + self.opts.port, + self.opts.common_opts.iface.as_ref(), + #[cfg(any(target_os = "linux", target_os = "android"))] + None, + ) + .map_err(|x| { + io::Error::new( + io::ErrorKind::Other, + format!( + "dial outbound {}:{}: {}", + self.opts.server, self.opts.port, x + ), + ) + }) + .await?; + + let s = self.proxy_stream(stream, sess, resolver).await?; + let chained = ChainedStreamWrapper::new(s); + chained.append_to_chain(self.name()).await; + Ok(Box::new(chained)) + } async fn connect_datagram( &self, @@ -315,6 +311,7 @@ impl OutboundHandler for Handler { None, ) .await?; + let socket = ProxySocket::from_socket(UdpSocketType::Client, ctx, &cfg, socket); let d = OutboundDatagramShadowsocks::new( socket, @@ -325,6 +322,33 @@ impl OutboundHandler for Handler { d.append_to_chain(self.name()).await; Ok(Box::new(d)) } + + async fn support_connector(&self) -> ConnectorType { + ConnectorType::Tcp + } + + async fn connect_stream_with_connector( + &self, + sess: &Session, + resolver: ThreadSafeDNSResolver, + connector: &dyn RemoteConnector, + ) -> io::Result { + let stream = connector + .connect_stream( + resolver.clone(), + self.opts.server.as_str(), + self.opts.port, + self.opts.common_opts.iface.as_ref(), + #[cfg(any(target_os = "linux", target_os = "android"))] + None, + ) + .await?; + + let s = self.proxy_stream(stream, sess, resolver).await?; + let chained = ChainedStreamWrapper::new(s); + chained.append_to_chain(self.name()).await; + Ok(Box::new(chained)) + } } #[cfg(all(test, not(ci)))] diff --git a/clash_lib/src/proxy/socks/inbound/stream.rs b/clash_lib/src/proxy/socks/inbound/stream.rs index d518c8ca9..2597f01ac 100644 --- a/clash_lib/src/proxy/socks/inbound/stream.rs +++ b/clash_lib/src/proxy/socks/inbound/stream.rs @@ -16,7 +16,7 @@ use tokio::net::TcpStream; use tokio_util::udp::UdpFramed; use tracing::{instrument, trace, warn}; -#[instrument(skip(s, dispatcher, authenticator))] +#[instrument(skip(sess, s, dispatcher, authenticator))] pub async fn handle_tcp<'a>( sess: &'a mut Session, s: &'a mut TcpStream, diff --git a/clash_lib/src/proxy/tor/mod.rs b/clash_lib/src/proxy/tor/mod.rs index 2647b9b01..e0dfb96e0 100644 --- a/clash_lib/src/proxy/tor/mod.rs +++ b/clash_lib/src/proxy/tor/mod.rs @@ -13,12 +13,12 @@ use crate::{ dns::ThreadSafeDNSResolver, }, common::errors::new_io_error, - session::{Session, SocksAddr}, + session::Session, }; use self::stream::StreamWrapper; -use super::{AnyOutboundHandler, AnyStream, OutboundHandler, OutboundType}; +use super::{AnyOutboundHandler, ConnectorType, OutboundHandler, OutboundType}; pub struct HandlerOptions { pub name: String, @@ -53,10 +53,6 @@ impl OutboundHandler for Handler { OutboundType::Tor } - async fn remote_addr(&self) -> Option { - None - } - async fn support_udp(&self) -> bool { false } @@ -84,15 +80,6 @@ impl OutboundHandler for Handler { Ok(Box::new(s)) } - async fn proxy_stream( - &self, - s: AnyStream, - _sess: &Session, - _resolver: ThreadSafeDNSResolver, - ) -> std::io::Result { - Ok(s) - } - async fn connect_datagram( &self, _sess: &Session, @@ -100,4 +87,8 @@ impl OutboundHandler for Handler { ) -> std::io::Result { Err(new_io_error("Tor outbound handler does not support UDP")) } + + async fn support_connector(&self) -> ConnectorType { + ConnectorType::None + } } diff --git a/clash_lib/src/proxy/transport/ws/websocket.rs b/clash_lib/src/proxy/transport/ws/websocket.rs index c2e9f4430..87b087533 100644 --- a/clash_lib/src/proxy/transport/ws/websocket.rs +++ b/clash_lib/src/proxy/transport/ws/websocket.rs @@ -85,6 +85,7 @@ impl AsyncWrite for WebsocketConn { Pin::new(&mut self.inner) .start_send(message) .map_err(map_io_error)?; + ready!(self.poll_flush(cx)?); std::task::Poll::Ready(Ok(buf.len())) } diff --git a/clash_lib/src/proxy/trojan/mod.rs b/clash_lib/src/proxy/trojan/mod.rs index 6a00e1727..63dc86cf9 100644 --- a/clash_lib/src/proxy/trojan/mod.rs +++ b/clash_lib/src/proxy/trojan/mod.rs @@ -16,13 +16,15 @@ use crate::app::dispatcher::ChainedStreamWrapper; use crate::common::utils; use crate::{ app::{dispatcher::BoxedChainedStream, dns::ThreadSafeDNSResolver}, - session::{Session, SocksAddr}, + session::Session, }; use self::datagram::OutboundDatagramTrojan; use super::transport; use super::transport::TLSOptions; +use super::utils::RemoteConnector; +use super::ConnectorType; use super::{ options::{GrpcOption, WsOption}, utils::new_tcp_stream, @@ -67,7 +69,7 @@ impl Handler { &self, s: AnyStream, sess: &Session, - tcp: bool, + udp: bool, ) -> io::Result { let tls_opt = TLSOptions { skip_cert_verify: self.opts.skip_cert_verify, @@ -119,7 +121,7 @@ impl Handler { let password = utils::encode_hex(&password[..]); buf.put_slice(password.as_bytes()); buf.put_slice(b"\r\n"); - buf.put_u8(if tcp { 0x01 } else { 0x03 }); // tcp + buf.put_u8(if udp { 0x03 } else { 0x01 }); sess.destination.write_buf(&mut buf); buf.put_slice(b"\r\n"); s.write_all(&buf).await?; @@ -138,10 +140,6 @@ impl OutboundHandler for Handler { OutboundType::Trojan } - async fn remote_addr(&self) -> Option { - Some(SocksAddr::Domain(self.opts.server.clone(), self.opts.port)) - } - async fn support_udp(&self) -> bool { self.opts.udp } @@ -170,22 +168,13 @@ impl OutboundHandler for Handler { }) .await?; - let stream = self.proxy_stream(stream, sess, resolver).await?; + let stream = self.inner_proxy_stream(stream, sess, false).await?; let chained = ChainedStreamWrapper::new(stream); chained.append_to_chain(self.name()).await; Ok(Box::new(chained)) } - async fn proxy_stream( - &self, - s: AnyStream, - sess: &Session, - _: ThreadSafeDNSResolver, - ) -> io::Result { - self.inner_proxy_stream(s, sess, true).await - } - async fn connect_datagram( &self, sess: &Session, @@ -210,7 +199,60 @@ impl OutboundHandler for Handler { }) .await?; - let stream = self.inner_proxy_stream(stream, sess, false).await?; + let stream = self.inner_proxy_stream(stream, sess, true).await?; + + let d = OutboundDatagramTrojan::new(stream, sess.destination.clone()); + + let chained = ChainedDatagramWrapper::new(d); + chained.append_to_chain(self.name()).await; + Ok(Box::new(chained)) + } + + async fn support_connector(&self) -> ConnectorType { + ConnectorType::All + } + + async fn connect_stream_with_connector( + &self, + sess: &Session, + resolver: ThreadSafeDNSResolver, + connector: &dyn RemoteConnector, + ) -> io::Result { + let stream = connector + .connect_stream( + resolver, + self.opts.server.as_str(), + self.opts.port, + self.opts.common_opts.iface.as_ref(), + #[cfg(any(target_os = "linux", target_os = "android"))] + None, + ) + .await?; + + let s = self.inner_proxy_stream(stream, sess, false).await?; + let chained = ChainedStreamWrapper::new(s); + chained.append_to_chain(self.name()).await; + Ok(Box::new(chained)) + } + + async fn connect_datagram_with_connector( + &self, + sess: &Session, + resolver: ThreadSafeDNSResolver, + connector: &dyn RemoteConnector, + ) -> io::Result { + let stream = connector + .connect_stream( + resolver, + self.opts.server.as_str(), + self.opts.port, + self.opts.common_opts.iface.as_ref(), + #[cfg(any(target_os = "linux", target_os = "android"))] + None, + ) + .await?; + + let stream = self.inner_proxy_stream(stream, sess, true).await?; let d = OutboundDatagramTrojan::new(stream, sess.destination.clone()); diff --git a/clash_lib/src/proxy/tuic/mod.rs b/clash_lib/src/proxy/tuic/mod.rs index 572e414f2..51caa78f3 100644 --- a/clash_lib/src/proxy/tuic/mod.rs +++ b/clash_lib/src/proxy/tuic/mod.rs @@ -29,7 +29,7 @@ use crate::{ }, common::tls::GLOBAL_ROOT_STORE, proxy::tuic::types::{ServerAddr, TuicEndpoint}, - session::{Session, SocksAddr}, + session::Session, }; use crate::session::SocksAddr as ClashSocksAddr; @@ -43,9 +43,9 @@ use rustls::client::ClientConfig as TlsConfig; use self::types::{CongestionControl, TuicConnection, UdpSession}; +use super::ConnectorType; use super::{ - datagram::UdpPacket, AnyOutboundDatagram, AnyOutboundHandler, AnyStream, OutboundHandler, - OutboundType, + datagram::UdpPacket, AnyOutboundDatagram, AnyOutboundHandler, OutboundHandler, OutboundType, }; #[derive(Debug, Clone)] @@ -92,24 +92,10 @@ impl OutboundHandler for Handler { OutboundType::Tuic } - async fn remote_addr(&self) -> Option { - None - } - async fn support_udp(&self) -> bool { true } - async fn proxy_stream( - &self, - s: AnyStream, - _sess: &Session, - _resolver: ThreadSafeDNSResolver, - ) -> std::io::Result { - tracing::warn!("Proxy stream currently is direcrt connect"); - Ok(s) - } - async fn connect_stream( &self, sess: &Session, @@ -130,6 +116,10 @@ impl OutboundHandler for Handler { std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) }) } + + async fn support_connector(&self) -> ConnectorType { + ConnectorType::None + } } impl Handler { diff --git a/clash_lib/src/proxy/urltest/mod.rs b/clash_lib/src/proxy/urltest/mod.rs index da3a423be..16424ab95 100644 --- a/clash_lib/src/proxy/urltest/mod.rs +++ b/clash_lib/src/proxy/urltest/mod.rs @@ -13,12 +13,12 @@ use crate::{ providers::proxy_provider::ThreadSafeProxyProvider, ProxyManager, }, }, - session::{Session, SocksAddr}, + session::Session, }; use super::{ - utils::provider_helper::get_proxies_from_providers, AnyOutboundHandler, AnyStream, - CommonOption, OutboundHandler, OutboundType, + utils::{provider_helper::get_proxies_from_providers, RemoteConnector}, + AnyOutboundHandler, CommonOption, ConnectorType, OutboundHandler, OutboundType, }; #[derive(Default)] @@ -133,11 +133,6 @@ impl OutboundHandler for Handler { OutboundType::UrlTest } - /// The proxy remote address - async fn remote_addr(&self) -> Option { - self.fastest(false).await.remote_addr().await - } - /// whether the outbound handler support UDP async fn support_udp(&self) -> bool { self.opts.udp || self.fastest(false).await.support_udp().await @@ -158,19 +153,6 @@ impl OutboundHandler for Handler { Ok(s) } - /// wraps a stream with outbound handler - async fn proxy_stream( - &self, - s: AnyStream, - sess: &Session, - resolver: ThreadSafeDNSResolver, - ) -> io::Result { - self.fastest(true) - .await - .proxy_stream(s, sess, resolver) - .await - } - /// connect to remote target via UDP async fn connect_datagram( &self, @@ -186,6 +168,38 @@ impl OutboundHandler for Handler { Ok(d) } + async fn support_connector(&self) -> ConnectorType { + self.fastest(false).await.support_connector().await + } + + async fn connect_stream_with_connector( + &self, + sess: &Session, + resolver: ThreadSafeDNSResolver, + connector: &dyn RemoteConnector, + ) -> io::Result { + let s = self + .fastest(true) + .await + .connect_stream_with_connector(sess, resolver, connector) + .await?; + + s.append_to_chain(self.name()).await; + Ok(s) + } + + async fn connect_datagram_with_connector( + &self, + sess: &Session, + resolver: ThreadSafeDNSResolver, + connector: &dyn RemoteConnector, + ) -> io::Result { + self.fastest(true) + .await + .connect_datagram_with_connector(sess, resolver, connector) + .await + } + async fn as_map(&self) -> HashMap> { let all = get_proxies_from_providers(&self.providers, false).await; diff --git a/clash_lib/src/proxy/utils/mod.rs b/clash_lib/src/proxy/utils/mod.rs index 0c8d00027..92d86c190 100644 --- a/clash_lib/src/proxy/utils/mod.rs +++ b/clash_lib/src/proxy/utils/mod.rs @@ -7,8 +7,11 @@ use std::{ pub mod test_utils; pub mod provider_helper; +mod proxy_connector; mod socket_helpers; +pub use proxy_connector::*; + use serde::{Deserialize, Serialize}; pub use socket_helpers::*; diff --git a/clash_lib/src/proxy/utils/proxy_connector.rs b/clash_lib/src/proxy/utils/proxy_connector.rs new file mode 100644 index 000000000..ba227740f --- /dev/null +++ b/clash_lib/src/proxy/utils/proxy_connector.rs @@ -0,0 +1,166 @@ +use std::net::SocketAddr; + +use async_trait::async_trait; +use tracing::trace; + +use crate::{ + app::{ + dispatcher::{ + ChainedDatagram, ChainedDatagramWrapper, ChainedStream, ChainedStreamWrapper, + }, + dns::ThreadSafeDNSResolver, + }, + proxy::{datagram::OutboundDatagramImpl, AnyOutboundDatagram, AnyOutboundHandler, AnyStream}, + session::{Network, Session, SocksAddr, Type}, +}; + +use super::{new_tcp_stream, new_udp_socket, Interface}; + +/// allows a proxy to get a connection to a remote server +#[async_trait] +pub trait RemoteConnector: Send + Sync { + async fn connect_stream( + &self, + resolver: ThreadSafeDNSResolver, + address: &str, + port: u16, + iface: Option<&Interface>, + #[cfg(any(target_os = "linux", target_os = "android"))] packet_mark: Option, + ) -> std::io::Result; + + async fn connect_datagram( + &self, + resolver: ThreadSafeDNSResolver, + src: Option<&SocketAddr>, + destination: &SocksAddr, + iface: Option<&Interface>, + #[cfg(any(target_os = "linux", target_os = "android"))] packet_mark: Option, + ) -> std::io::Result; +} + +pub struct DirectConnector; + +impl DirectConnector { + pub fn new() -> Self { + Self + } +} + +#[async_trait] +impl RemoteConnector for DirectConnector { + async fn connect_stream( + &self, + resolver: ThreadSafeDNSResolver, + address: &str, + port: u16, + iface: Option<&Interface>, + #[cfg(any(target_os = "linux", target_os = "android"))] packet_mark: Option, + ) -> std::io::Result { + new_tcp_stream( + resolver, + address, + port, + iface, + #[cfg(any(target_os = "linux", target_os = "android"))] + packet_mark, + ) + .await + } + + async fn connect_datagram( + &self, + resolver: ThreadSafeDNSResolver, + src: Option<&SocketAddr>, + _destination: &SocksAddr, + iface: Option<&Interface>, + #[cfg(any(target_os = "linux", target_os = "android"))] packet_mark: Option, + ) -> std::io::Result { + let dgram = new_udp_socket( + src, + iface, + #[cfg(any(target_os = "linux", target_os = "android"))] + packet_mark, + ) + .await + .map(|x| OutboundDatagramImpl::new(x, resolver))?; + + let dgram = ChainedDatagramWrapper::new(dgram); + Ok(Box::new(dgram)) + } +} + +pub struct ProxyConnector { + proxy: AnyOutboundHandler, + connector: Box, +} + +impl ProxyConnector { + pub fn new(proxy: AnyOutboundHandler, connector: Box) -> Self { + Self { proxy, connector } + } +} + +#[async_trait] +impl RemoteConnector for ProxyConnector { + async fn connect_stream( + &self, + resolver: ThreadSafeDNSResolver, + address: &str, + port: u16, + iface: Option<&Interface>, + #[cfg(any(target_os = "linux", target_os = "android"))] packet_mark: Option, + ) -> std::io::Result { + let sess = Session { + network: Network::Tcp, + typ: Type::Ignore, + destination: crate::session::SocksAddr::Domain(address.to_owned(), port), + iface: iface.cloned(), + #[cfg(any(target_os = "linux", target_os = "android"))] + packet_mark, + ..Default::default() + }; + + trace!( + "proxy connector `{}` connecting to {}:{}", + self.proxy.name(), + address, + port + ); + + let s = self + .proxy + .connect_stream_with_connector(&sess, resolver, self.connector.as_ref()) + .await?; + + let stream = ChainedStreamWrapper::new(s); + stream.append_to_chain(self.proxy.name()).await; + Ok(Box::new(stream)) + } + + async fn connect_datagram( + &self, + resolver: ThreadSafeDNSResolver, + _src: Option<&SocketAddr>, + destination: &SocksAddr, + iface: Option<&Interface>, + #[cfg(any(target_os = "linux", target_os = "android"))] packet_mark: Option, + ) -> std::io::Result { + let sess = Session { + network: Network::Udp, + typ: Type::Ignore, + iface: iface.cloned(), + destination: destination.clone(), + #[cfg(any(target_os = "linux", target_os = "android"))] + packet_mark, + ..Default::default() + }; + let s = self + .proxy + .connect_datagram_with_connector(&sess, resolver, self.connector.as_ref()) + .await?; + + let stream = ChainedDatagramWrapper::new(s); + stream.append_to_chain(self.proxy.name()).await; + Ok(Box::new(stream)) + } +} diff --git a/clash_lib/src/proxy/utils/socket_helpers.rs b/clash_lib/src/proxy/utils/socket_helpers.rs index 1f163db5c..0c984e333 100644 --- a/clash_lib/src/proxy/utils/socket_helpers.rs +++ b/clash_lib/src/proxy/utils/socket_helpers.rs @@ -10,6 +10,7 @@ use tokio::{ time::timeout, }; +use tracing::debug; #[cfg(target_os = "windows")] use tracing::warn; @@ -82,7 +83,11 @@ pub async fn new_tcp_stream<'a>( io::ErrorKind::Other, format!("can't resolve dns: {}", address), ))?; - tracing::debug!("resolved addr of {:?}:{:?}", address, dial_addr); + + debug!( + "dialing {}[{}]:{} via {:?}", + address, dial_addr, port, iface + ); let socket = match (dial_addr, resolver.ipv6()) { (IpAddr::V4(_), _) => { @@ -118,6 +123,7 @@ pub async fn new_tcp_stream<'a>( ) .await??; + debug!("connected to {}[{}]:{}", address, dial_addr, port); Ok(Box::new(stream)) } diff --git a/clash_lib/src/proxy/vmess/mod.rs b/clash_lib/src/proxy/vmess/mod.rs index 178ddc1c7..d6332cc8a 100644 --- a/clash_lib/src/proxy/vmess/mod.rs +++ b/clash_lib/src/proxy/vmess/mod.rs @@ -23,8 +23,8 @@ use self::vmess_impl::OutboundDatagramVmess; use super::{ options::{GrpcOption, Http2Option, HttpOption, WsOption}, transport::{self, Http2Config}, - utils::new_tcp_stream, - AnyOutboundHandler, AnyStream, CommonOption, OutboundHandler, OutboundType, + utils::{new_tcp_stream, RemoteConnector}, + AnyOutboundHandler, AnyStream, CommonOption, ConnectorType, OutboundHandler, OutboundType, }; pub enum VmessTransport { @@ -154,11 +154,6 @@ impl OutboundHandler for Handler { OutboundType::Vmess } - /// The proxy remote address - async fn remote_addr(&self) -> Option { - Some(SocksAddr::Domain(self.opts.server.clone(), self.opts.port)) - } - /// whether the outbound handler support UDP async fn support_udp(&self) -> bool { self.opts.udp @@ -195,16 +190,6 @@ impl OutboundHandler for Handler { Ok(Box::new(chained)) } - /// wraps a stream with outbound handler - async fn proxy_stream( - &self, - s: AnyStream, - sess: &Session, - _: ThreadSafeDNSResolver, - ) -> io::Result { - self.inner_proxy_stream(s, sess, false).await - } - async fn connect_datagram( &self, sess: &Session, @@ -251,6 +236,59 @@ impl OutboundHandler for Handler { chained.append_to_chain(self.name()).await; Ok(Box::new(chained)) } + + async fn support_connector(&self) -> ConnectorType { + ConnectorType::All + } + + async fn connect_stream_with_connector( + &self, + sess: &Session, + resolver: ThreadSafeDNSResolver, + connector: &dyn RemoteConnector, + ) -> io::Result { + let stream = connector + .connect_stream( + resolver, + self.opts.server.as_str(), + self.opts.port, + self.opts.common_opts.iface.as_ref(), + #[cfg(any(target_os = "linux", target_os = "android"))] + None, + ) + .await?; + + let s = self.inner_proxy_stream(stream, sess, false).await?; + let chained = ChainedStreamWrapper::new(s); + chained.append_to_chain(self.name()).await; + Ok(Box::new(chained)) + } + + async fn connect_datagram_with_connector( + &self, + sess: &Session, + resolver: ThreadSafeDNSResolver, + connector: &dyn RemoteConnector, + ) -> io::Result { + let stream = connector + .connect_stream( + resolver, + self.opts.server.as_str(), + self.opts.port, + self.opts.common_opts.iface.as_ref(), + #[cfg(any(target_os = "linux", target_os = "android"))] + None, + ) + .await?; + + let stream = self.inner_proxy_stream(stream, sess, true).await?; + + let d = OutboundDatagramVmess::new(stream, sess.destination.clone()); + + let chained = ChainedDatagramWrapper::new(d); + chained.append_to_chain(self.name()).await; + Ok(Box::new(chained)) + } } #[cfg(all(test, not(ci)))] diff --git a/clash_lib/src/proxy/vmess/vmess_impl/datagram.rs b/clash_lib/src/proxy/vmess/vmess_impl/datagram.rs index 05f1a8acc..ccc606a80 100644 --- a/clash_lib/src/proxy/vmess/vmess_impl/datagram.rs +++ b/clash_lib/src/proxy/vmess/vmess_impl/datagram.rs @@ -1,7 +1,7 @@ use std::{io, pin::Pin, task::Poll}; use futures::{ready, Sink, Stream}; -use tracing::{debug, instrument}; +use tracing::{debug, error, instrument}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; @@ -75,9 +75,9 @@ impl Sink for OutboundDatagramVmess { let pkt_container = pkt; - if let Some(pkt) = pkt_container.take() { + if let Some(pkt) = pkt_container.as_ref() { if &pkt.dst_addr != remote_addr { - debug!( + error!( "udp packet dst_addr not match, pkt.dst_addr: {}, remote_addr: {}", pkt.dst_addr, remote_addr ); @@ -86,7 +86,7 @@ impl Sink for OutboundDatagramVmess { "udp packet dst_addr not match", ))); } - let data = pkt.data; + let data = &pkt.data; let n = ready!(inner.as_mut().poll_write(cx, data.as_ref()))?; diff --git a/clash_lib/src/proxy/vmess/vmess_impl/header.rs b/clash_lib/src/proxy/vmess/vmess_impl/header.rs index 76c553d62..7ccee19b5 100644 --- a/clash_lib/src/proxy/vmess/vmess_impl/header.rs +++ b/clash_lib/src/proxy/vmess/vmess_impl/header.rs @@ -124,7 +124,7 @@ mod tests { let cmd_key = "1234567890123456".as_bytes(); - let pk = kdf::vmess_kdf_1_one_shot(&cmd_key[..], KDF_SALT_CONST_AUTH_ID_ENCRYPTION_KEY); + let pk = kdf::vmess_kdf_1_one_shot(cmd_key, KDF_SALT_CONST_AUTH_ID_ENCRYPTION_KEY); let pk: [u8; 16] = pk[..16].try_into().unwrap(); // That's wired let key = GenericArray::from(pk); let cipher = aes::Aes128::new(&key); diff --git a/clash_lib/src/proxy/wg/mod.rs b/clash_lib/src/proxy/wg/mod.rs index 9cc63a0aa..16ba46433 100644 --- a/clash_lib/src/proxy/wg/mod.rs +++ b/clash_lib/src/proxy/wg/mod.rs @@ -16,12 +16,12 @@ use crate::{ dns::ThreadSafeDNSResolver, }, common::errors::{map_io_error, new_io_error}, - session::{Session, SocksAddr}, + session::Session, }; use self::{keys::KeyBytes, wireguard::Config}; -use super::{AnyOutboundHandler, AnyStream, CommonOption, OutboundHandler, OutboundType}; +use super::{AnyOutboundHandler, CommonOption, ConnectorType, OutboundHandler, OutboundType}; use async_trait::async_trait; use futures::TryFutureExt; @@ -204,10 +204,6 @@ impl OutboundHandler for Handler { OutboundType::WireGuard } - async fn remote_addr(&self) -> Option { - Some(SocksAddr::Domain(self.opts.server.clone(), self.opts.port)) - } - async fn support_udp(&self) -> bool { self.opts.udp } @@ -263,16 +259,6 @@ impl OutboundHandler for Handler { Ok(Box::new(chained)) } - /// wraps a stream with outbound handler - async fn proxy_stream( - &self, - _s: AnyStream, - _sess: &Session, - _resolver: ThreadSafeDNSResolver, - ) -> io::Result { - Err(new_io_error("not supported")) - } - /// connect to remote target via UDP async fn connect_datagram( &self, @@ -289,6 +275,10 @@ impl OutboundHandler for Handler { chained.append_to_chain(self.name()).await; Ok(Box::new(chained)) } + + async fn support_connector(&self) -> ConnectorType { + ConnectorType::None + } } #[cfg(all(test, not(ci)))] diff --git a/clash_lib/src/session.rs b/clash_lib/src/session.rs index be5a17399..ac9725a48 100644 --- a/clash_lib/src/session.rs +++ b/clash_lib/src/session.rs @@ -354,6 +354,8 @@ pub enum Type { HttpConnect, Socks5, Tun, + + Ignore, } impl Display for Network {