From 4c1a7534fd98453b383edaf12ed9fffe9c1e3970 Mon Sep 17 00:00:00 2001 From: Yuwei Ba Date: Tue, 1 Oct 2024 17:54:25 +1000 Subject: [PATCH] feat(tproxy): support tproxy on Linux (#615) --- Cargo.lock | 45 ++++- clash/tests/data/config/rules.yaml | 1 + clash/tests/data/config/tproxy.yaml | 10 +- clash_lib/Cargo.toml | 3 + clash_lib/src/app/inbound/manager.rs | 18 +- clash_lib/src/app/inbound/network_listener.rs | 18 +- clash_lib/src/proxy/datagram.rs | 93 +--------- clash_lib/src/proxy/mod.rs | 5 + clash_lib/src/proxy/socks/inbound/datagram.rs | 97 +++++++++- clash_lib/src/proxy/socks/inbound/stream.rs | 2 +- clash_lib/src/proxy/tproxy/iptables.sh | 51 ++++++ clash_lib/src/proxy/tproxy/mod.rs | 168 ++++++++++++++++-- clash_lib/src/proxy/tun/datagram.rs | 14 +- clash_lib/src/proxy/tun/inbound.rs | 4 +- clash_lib/src/proxy/tun/mod.rs | 3 + clash_lib/src/session.rs | 9 +- 16 files changed, 397 insertions(+), 144 deletions(-) create mode 100755 clash_lib/src/proxy/tproxy/iptables.sh diff --git a/Cargo.lock b/Cargo.lock index 65e87091c..c8debbbcc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1041,7 +1041,7 @@ dependencies = [ "sha2", "shadowsocks", "smoltcp", - "socket2", + "socket2 0.5.7", "tempfile", "thiserror", "tokio", @@ -1061,6 +1061,7 @@ dependencies = [ "tuic", "tuic-quinn", "tun", + "unix-udp-sock", "url", "uuid", "webpki-roots", @@ -2692,7 +2693,7 @@ dependencies = [ "http-body", "hyper", "pin-project-lite", - "socket2", + "socket2 0.5.7", "tokio", "tower-service", "tracing", @@ -2864,7 +2865,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f" dependencies = [ - "socket2", + "socket2 0.5.7", "widestring", "windows-sys 0.48.0", "winreg", @@ -4100,7 +4101,7 @@ dependencies = [ "quinn-udp", "rustc-hash 2.0.0", "rustls", - "socket2", + "socket2 0.5.7", "thiserror", "tokio", "tracing", @@ -4131,7 +4132,7 @@ checksum = "4fe68c2e9e1a1234e218683dbdf9f9dfcb094113c5ac2b938dfcb9bab4c4140b" dependencies = [ "libc", "once_cell", - "socket2", + "socket2 0.5.7", "tracing", "windows-sys 0.59.0", ] @@ -4822,7 +4823,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "shadowsocks-crypto", - "socket2", + "socket2 0.5.7", "spin 0.9.8", "thiserror", "tokio", @@ -4953,6 +4954,16 @@ dependencies = [ "managed", ] +[[package]] +name = "socket2" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "socket2" version = "0.5.7" @@ -5271,7 +5282,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.5.7", "tokio-macros", "tracing", "windows-sys 0.48.0", @@ -5334,7 +5345,7 @@ dependencies = [ "log", "once_cell", "pin-project", - "socket2", + "socket2 0.5.7", "tokio", "windows-sys 0.52.0", ] @@ -5431,7 +5442,7 @@ dependencies = [ "percent-encoding", "pin-project", "prost", - "socket2", + "socket2 0.5.7", "tokio", "tokio-stream", "tower 0.4.13", @@ -6600,6 +6611,22 @@ dependencies = [ "subtle", ] +[[package]] +name = "unix-udp-sock" +version = "0.7.0" +source = "git+https://github.com/Watfaq/unix-udp-sock.git?rev=cd3e4eca43e6f3be82a2703c3d711b7e18fbfd18#cd3e4eca43e6f3be82a2703c3d711b7e18fbfd18" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "libc", + "pin-project-lite", + "socket2 0.4.10", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "unsafe-libyaml" version = "0.2.11" diff --git a/clash/tests/data/config/rules.yaml b/clash/tests/data/config/rules.yaml index 64ebf54a5..67ea642f4 100644 --- a/clash/tests/data/config/rules.yaml +++ b/clash/tests/data/config/rules.yaml @@ -2,6 +2,7 @@ port: 8888 socks-port: 8889 mixed-port: 8899 +tproxy-port: 8900 tun: enable: false diff --git a/clash/tests/data/config/tproxy.yaml b/clash/tests/data/config/tproxy.yaml index 52d26e3f2..4f6ce9253 100644 --- a/clash/tests/data/config/tproxy.yaml +++ b/clash/tests/data/config/tproxy.yaml @@ -1,17 +1,9 @@ --- -port: 8888 -socks-port: 8889 -mixed-port: 8899 tproxy-port: 8900 mode: rule log-level: debug - -proxies: - - name: "tor" - type: tor - rules: - - MATCH, tor + - MATCH, DIRECT ... diff --git a/clash_lib/Cargo.toml b/clash_lib/Cargo.toml index 69332a769..7d3b8303a 100644 --- a/clash_lib/Cargo.toml +++ b/clash_lib/Cargo.toml @@ -144,6 +144,9 @@ env_logger = "0.11" [build-dependencies] prost-build = "0.13" +[target.'cfg(target_os="linux")'.dependencies] +unix-udp-sock = { git = "https://github.com/Watfaq/unix-udp-sock.git", rev = "cd3e4eca43e6f3be82a2703c3d711b7e18fbfd18"} + [target.'cfg(macos)'.dependencies] security-framework = "3.0.0" diff --git a/clash_lib/src/app/inbound/manager.rs b/clash_lib/src/app/inbound/manager.rs index ab9e785f0..215712f3c 100644 --- a/clash_lib/src/app/inbound/manager.rs +++ b/clash_lib/src/app/inbound/manager.rs @@ -6,7 +6,7 @@ use crate::{ dispatcher::Dispatcher, inbound::network_listener::{ListenerType, NetworkInboundListener}, }, - common::auth::ThreadSafeAuthenticator, + common::{auth::ThreadSafeAuthenticator, errors::new_io_error}, config::internal::config::{BindAddress, Inbound}, Error, Runner, }; @@ -68,7 +68,21 @@ impl InboundManager { } Ok(Box::pin(async move { - futures::future::select_all(runners).await.0 + let mut errors = Vec::new(); + let _ = futures::future::join_all(runners) + .await + .into_iter() + .filter_map(|r| r.map_err(|e| errors.push(e)).ok()) + .collect::>(); + if errors.is_empty() { + Ok(()) + } else { + Err(new_io_error(format!( + "failed to start inbound listeners: {:?}", + errors + )) + .into()) + } })) } diff --git a/clash_lib/src/app/inbound/network_listener.rs b/clash_lib/src/app/inbound/network_listener.rs index 31bc7b178..9b1032647 100644 --- a/clash_lib/src/app/inbound/network_listener.rs +++ b/clash_lib/src/app/inbound/network_listener.rs @@ -2,7 +2,10 @@ use crate::{ common::auth::ThreadSafeAuthenticator, config::internal::config::BindAddress, }; -use crate::proxy::{http, mixed, socks, tproxy, AnyInboundListener}; +use crate::proxy::{http, mixed, socks, AnyInboundListener}; + +#[cfg(target_os = "linux")] +use crate::proxy::tproxy; use crate::{proxy::utils::Interface, Dispatcher, Error, Runner}; use futures::FutureExt; @@ -124,7 +127,18 @@ impl NetworkInboundListener { self.authenticator.clone(), )), ListenerType::Tproxy => { - Arc::new(tproxy::Listener::new((ip, self.port).into())) + #[cfg(target_os = "linux")] + { + Arc::new(tproxy::Listener::new( + (ip, self.port).into(), + self.dispatcher.clone(), + )) + } + #[cfg(not(target_os = "linux"))] + { + warn!("tproxy is not supported on this platform"); + return; + } } }; diff --git a/clash_lib/src/proxy/datagram.rs b/clash_lib/src/proxy/datagram.rs index dfd519d35..f452aee5c 100644 --- a/clash_lib/src/proxy/datagram.rs +++ b/clash_lib/src/proxy/datagram.rs @@ -1,20 +1,15 @@ use crate::{ - app::dns::ThreadSafeDNSResolver, - common::errors::new_io_error, - proxy::{socks::Socks5UDPCodec, InboundDatagram}, + app::dns::ThreadSafeDNSResolver, common::errors::new_io_error, session::SocksAddr, }; -use bytes::Bytes; -use futures::{ready, Sink, SinkExt, Stream, StreamExt}; +use futures::{ready, Sink, Stream}; use std::{ fmt::{Debug, Display, Formatter}, io, - net::SocketAddr, pin::Pin, task::{Context, Poll}, }; use tokio::{io::ReadBuf, net::UdpSocket}; -use tokio_util::udp::UdpFramed; #[derive(Clone)] pub struct UdpPacket { @@ -64,90 +59,6 @@ impl UdpPacket { } } -pub struct InboundUdp { - inner: I, -} - -impl InboundUdp -where - I: Stream + Unpin, - I: Sink<((Bytes, SocksAddr), SocketAddr)>, -{ - pub fn new(inner: I) -> Self { - Self { inner } - } -} - -impl Debug for InboundUdp> { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("InboundUdp").finish() - } -} - -impl Stream for InboundUdp> { - type Item = UdpPacket; - - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - let pin = self.get_mut(); - - match pin.inner.poll_next_unpin(cx) { - Poll::Ready(item) => match item { - None => Poll::Ready(None), - Some(item) => match item { - Ok(((dst, pkt), src)) => Poll::Ready(Some(UdpPacket { - data: pkt.to_vec(), - src_addr: SocksAddr::Ip(src), - dst_addr: dst, - })), - Err(_) => Poll::Ready(None), - }, - }, - Poll::Pending => Poll::Pending, - } - } -} - -impl Sink for InboundUdp> { - type Error = std::io::Error; - - fn poll_ready( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - let pin = self.get_mut(); - pin.inner.poll_ready_unpin(cx) - } - - fn start_send(self: Pin<&mut Self>, item: UdpPacket) -> Result<(), Self::Error> { - let pin = self.get_mut(); - pin.inner.start_send_unpin(( - (item.data.into(), item.src_addr), - item.dst_addr.must_into_socket_addr(), - )) - } - - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - let pin = self.get_mut(); - pin.inner.poll_flush_unpin(cx) - } - - fn poll_close( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - let pin = self.get_mut(); - pin.inner.poll_close_unpin(cx) - } -} - -impl InboundDatagram for InboundUdp> {} - #[must_use = "sinks do nothing unless polled"] // TODO: maybe we should use abstract datagram IO interface instead of the // Stream + Sink trait diff --git a/clash_lib/src/proxy/mod.rs b/clash_lib/src/proxy/mod.rs index 2ad38df51..684b803d3 100644 --- a/clash_lib/src/proxy/mod.rs +++ b/clash_lib/src/proxy/mod.rs @@ -28,6 +28,7 @@ pub mod reject; pub mod http; pub mod mixed; +#[cfg(target_os = "linux")] pub mod tproxy; pub(crate) mod datagram; @@ -82,6 +83,10 @@ pub trait InboundDatagram: Stream + Sink + Send + Sync + Unpin + Debug { } +impl InboundDatagram for T where + T: Stream + Sink + Send + Sync + Unpin + Debug +{ +} pub type AnyInboundDatagram = Box>; diff --git a/clash_lib/src/proxy/socks/inbound/datagram.rs b/clash_lib/src/proxy/socks/inbound/datagram.rs index 1e45427e3..e8f80a07a 100644 --- a/clash_lib/src/proxy/socks/inbound/datagram.rs +++ b/clash_lib/src/proxy/socks/inbound/datagram.rs @@ -1,7 +1,16 @@ -use crate::session::SocksAddr; +use crate::{proxy::datagram::UdpPacket, session::SocksAddr}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use std::io; -use tokio_util::codec::{Decoder, Encoder}; +use futures::{Sink, SinkExt, Stream, StreamExt}; +use std::{ + io, + net::SocketAddr, + pin::Pin, + task::{Context, Poll}, +}; +use tokio_util::{ + codec::{Decoder, Encoder}, + udp::UdpFramed, +}; // +----+------+------+----------+----------+----------+ // |RSV | FRAG | ATYP | DST.ADDR | DST.PORT | DATA | @@ -65,3 +74,85 @@ impl Decoder for Socks5UDPCodec { Ok(Some((addr, packet))) } } + +pub struct InboundUdp { + inner: I, +} + +impl InboundUdp +where + I: Stream + Unpin, + I: Sink<((Bytes, SocksAddr), SocketAddr)>, +{ + pub fn new(inner: I) -> Self { + Self { inner } + } +} + +impl std::fmt::Debug for InboundUdp> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("InboundUdp").finish() + } +} + +impl Stream for InboundUdp> { + type Item = UdpPacket; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let pin = self.get_mut(); + + match pin.inner.poll_next_unpin(cx) { + Poll::Ready(item) => match item { + None => Poll::Ready(None), + Some(item) => match item { + Ok(((dst, pkt), src)) => Poll::Ready(Some(UdpPacket { + data: pkt.to_vec(), + src_addr: SocksAddr::Ip(src), + dst_addr: dst, + })), + Err(_) => Poll::Ready(None), + }, + }, + Poll::Pending => Poll::Pending, + } + } +} + +impl Sink for InboundUdp> { + type Error = std::io::Error; + + fn poll_ready( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let pin = self.get_mut(); + pin.inner.poll_ready_unpin(cx) + } + + fn start_send(self: Pin<&mut Self>, item: UdpPacket) -> Result<(), Self::Error> { + let pin = self.get_mut(); + pin.inner.start_send_unpin(( + (item.data.into(), item.src_addr), + item.dst_addr.must_into_socket_addr(), + )) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let pin = self.get_mut(); + pin.inner.poll_flush_unpin(cx) + } + + fn poll_close( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let pin = self.get_mut(); + pin.inner.poll_close_unpin(cx) + } +} diff --git a/clash_lib/src/proxy/socks/inbound/stream.rs b/clash_lib/src/proxy/socks/inbound/stream.rs index fda35158b..2294fe04e 100644 --- a/clash_lib/src/proxy/socks/inbound/stream.rs +++ b/clash_lib/src/proxy/socks/inbound/stream.rs @@ -1,8 +1,8 @@ use crate::{ common::{auth::ThreadSafeAuthenticator, errors::new_io_error}, proxy::{ - datagram::InboundUdp, socks::{ + inbound::datagram::InboundUdp, socks5::{auth_methods, response_code, socks_command}, Socks5UDPCodec, SOCKS5_VERSION, }, diff --git a/clash_lib/src/proxy/tproxy/iptables.sh b/clash_lib/src/proxy/tproxy/iptables.sh new file mode 100755 index 000000000..61b9e2ff2 --- /dev/null +++ b/clash_lib/src/proxy/tproxy/iptables.sh @@ -0,0 +1,51 @@ +#!/bin/sh + +# ip to bypass tproxy +readonly LOCAL_BY_PASS="\ +127/8 \ +10/8 \ +" + +# declare ip as local for tproxy +ip rule del fwmark 0x3333 lookup 3333 +ip rule add fwmark 0x3333 lookup 3333 +ip route del local 0.0.0.0/0 dev lo table 3333 +ip route add local 0.0.0.0/0 dev lo table 3333 + +# where all traffic enter tproxy and get marked +iptables -t mangle -N CLASH-TPROXY-INPUT +# fill in the chain +for i in $LOCAL_BY_PASS; do + iptables -t mangle -A CLASH-TPROXY-INPUT -d $i -j RETURN +done +iptables -t mangle -A CLASH-TPROXY-INPUT -p tcp -j TPROXY \ + --tproxy-mark 0x3333/0x3333 --on-port 8900 --on-ip 127.0.0.1 +iptables -t mangle -A CLASH-TPROXY-INPUT -p udp -j TPROXY \ + --tproxy-mark 0x3333/0x3333 --on-port 8900 --on-ip 127.0.0.1 + +# for local traffic +iptables -t mangle -N CLASH-TPROXY-LOCAL +for i in $LOCAL_BY_PASS; do + iptables -t mangle -A CLASH-TPROXY-LOCAL -d $i -j RETURN +done +iptables -t mangle -A CLASH-TPROXY-LOCAL -p tcp -m conntrack --ctdir REPLY -j RETURN +iptables -t mangle -A CLASH-TPROXY-LOCAL -p udp -m conntrack --ctdir REPLY -j RETURN + +iptables -t mangle -A CLASH-TPROXY-LOCAL -m owner --uid-owner root -j RETURN +# https://github.com/shadowsocks/shadowsocks-rust/blob/6e6e6948d7fc426c99cc03ef91abae989b6482b4/configs/iptables_tproxy.sh#L187 +iptables -t mangle -A CLASH-TPROXY-LOCAL -p tcp -j MARK --set-xmark 0x3333/0xffffffff # needs to match the ip rule fwmark +iptables -t mangle -A CLASH-TPROXY-LOCAL -p udp -j MARK --set-xmark 0x3333/0xffffffff + +iptables -t mangle -A OUTPUT -p tcp -d 104.21.58.154 -j CLASH-TPROXY-LOCAL +iptables -t mangle -A OUTPUT -p tcp -d 172.67.161.121 -j CLASH-TPROXY-LOCAL +iptables -t mangle -A OUTPUT -p udp -d 1.1.1.1 -j CLASH-TPROXY-LOCAL +iptables -t mangle -A OUTPUT -p udp -d 8.8.8.8 -j CLASH-TPROXY-LOCAL + +# for routed traffic +iptables -t mangle -A PREROUTING -p tcp -j CLASH-TPROXY-INPUT +iptables -t mangle -A PREROUTING -p udp -j CLASH-TPROXY-INPUT + + + +# ipv6 +# TODO \ No newline at end of file diff --git a/clash_lib/src/proxy/tproxy/mod.rs b/clash_lib/src/proxy/tproxy/mod.rs index fd00c9c65..083c33f5f 100644 --- a/clash_lib/src/proxy/tproxy/mod.rs +++ b/clash_lib/src/proxy/tproxy/mod.rs @@ -1,11 +1,22 @@ -use crate::proxy::InboundListener; +use super::tun::TunDatagram; +use crate::{ + app::dispatcher::Dispatcher, + proxy::{datagram::UdpPacket, utils::apply_tcp_options, InboundListener}, + session::{Network, Session, Type}, +}; use async_trait::async_trait; -use std::net::SocketAddr; - -use tracing::warn; +use socket2::{Domain, Socket}; +use std::{ + net::SocketAddr, + os::fd::{AsFd, AsRawFd}, + sync::Arc, +}; +use tokio::net::TcpListener; +use tracing::{trace, warn}; pub struct Listener { addr: SocketAddr, + dispather: Arc, } impl Drop for Listener { @@ -15,26 +26,163 @@ impl Drop for Listener { } impl Listener { - pub fn new(addr: SocketAddr) -> Self { - Self { addr } + pub fn new(addr: SocketAddr, dispather: Arc) -> Self { + Self { addr, dispather } } } #[async_trait] impl InboundListener for Listener { fn handle_tcp(&self) -> bool { - false + true } fn handle_udp(&self) -> bool { - false + true } async fn listen_tcp(&self) -> std::io::Result<()> { - unimplemented!("don't listen to me :)") + let socket = + Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, None)?; + socket.set_ip_transparent(true)?; + socket.set_nonblocking(true)?; + socket.bind(&self.addr.into())?; + socket.listen(1024)?; + + let listener = TcpListener::from_std(socket.into())?; + + loop { + let (socket, src_addr) = listener.accept().await?; + + let socket = apply_tcp_options(socket)?; + + // local_addr is getsockname + let orig_dst = socket.local_addr()?; + + let sess = Session { + network: Network::Tcp, + typ: Type::Tproxy, + source: src_addr, + destination: orig_dst.into(), + ..Default::default() + }; + + trace!("tproxy new tcp conn {}", sess); + + let dispatcher = self.dispather.clone(); + tokio::spawn(async move { + dispatcher.dispatch_stream(sess, socket).await; + }); + } } async fn listen_udp(&self) -> std::io::Result<()> { - unimplemented!("don't listen to me :)") + let socket = Socket::new(Domain::IPV4, socket2::Type::DGRAM, None)?; + socket.set_ip_transparent(true)?; + socket.set_nonblocking(true)?; + socket.set_broadcast(true)?; + + let enable = 1u32; + let payload = std::ptr::addr_of!(enable).cast(); + unsafe { + libc::setsockopt( + socket.as_fd().as_raw_fd(), + libc::IPPROTO_IP, + libc::IP_RECVORIGDSTADDR, + payload, + std::mem::size_of_val(&enable) as libc::socklen_t, + ) + }; + socket.bind(&self.addr.into())?; + + let listener = unix_udp_sock::UdpSocket::from_std(socket.into())?; + + handle_inbound_datagram(Arc::new(listener), self.dispather.clone()).await } } + +async fn handle_inbound_datagram( + socket: Arc, + dispatcher: Arc, +) -> std::io::Result<()> { + // dispatcher <-> tproxy communications + let (l_tx, mut l_rx) = tokio::sync::mpsc::channel(32); + + // forward packets from tproxy to dispatcher + let (d_tx, d_rx) = tokio::sync::mpsc::channel(32); + + // for dispatcher - the dispatcher would receive packets from this channel, + // which is from the stack and send back packets to this channel, which is + // to the tproxy + let udp_stream = TunDatagram::new(l_tx, d_rx); + + let sess = Session { + network: Network::Udp, + typ: Type::Tproxy, + ..Default::default() + }; + + let closer = dispatcher.dispatch_datagram(sess, Box::new(udp_stream)); + + // dispatcher -> tproxy + let responder = socket.clone(); + let fut1 = tokio::spawn(async move { + while let Some(pkt) = l_rx.recv().await { + trace!("tproxy <- dispatcher: {:?}", pkt); + + // remote -> local + match responder + .send_to(&pkt.data[..], pkt.dst_addr.must_into_socket_addr()) + .await + { + Ok(_) => {} + Err(e) => { + warn!("failed to send udp packet to proxy: {}", e); + } + } + } + }); + + // tproxy -> dispatcher + let fut2 = tokio::spawn(async move { + let mut buf = vec![0_u8; 1024 * 64]; + while let Ok(meta) = socket.recv_msg(&mut buf).await { + match meta.orig_dst { + Some(orig_dst) => { + if orig_dst.ip().is_multicast() + || match orig_dst.ip() { + std::net::IpAddr::V4(ip) => ip.is_broadcast(), + std::net::IpAddr::V6(_) => false, + } + { + continue; + } + + trace!("recv msg:{:?} orig_dst:{:?}", meta, orig_dst); + let pkt = UdpPacket { + data: buf[..meta.len].to_vec(), + src_addr: meta.addr.into(), + dst_addr: orig_dst.into(), + }; + trace!("tproxy -> dispatcher: {:?}", pkt); + match d_tx.send(pkt).await { + Ok(_) => {} + Err(e) => { + warn!("failed to send udp packet to proxy: {}", e); + continue; + } + } + } + None => { + warn!("failed to get orig_dst"); + continue; + } + } + } + + closer.send(0).ok(); + }); + + let _ = futures::future::join(fut1, fut2).await; + Ok(()) +} diff --git a/clash_lib/src/proxy/tun/datagram.rs b/clash_lib/src/proxy/tun/datagram.rs index 07b223628..6692ace24 100644 --- a/clash_lib/src/proxy/tun/datagram.rs +++ b/clash_lib/src/proxy/tun/datagram.rs @@ -1,11 +1,8 @@ -use std::{net::SocketAddr, task::Poll}; +use std::task::Poll; use futures::{ready, Sink, Stream}; -use crate::{ - common::errors::new_io_error, - proxy::{datagram::UdpPacket, InboundDatagram}, -}; +use crate::{common::errors::new_io_error, proxy::datagram::UdpPacket}; #[derive(Debug)] pub struct TunDatagram { @@ -14,8 +11,6 @@ pub struct TunDatagram { pkt: Option, flushed: bool, - #[allow(unused)] - local_addr: SocketAddr, } impl TunDatagram { @@ -24,21 +19,16 @@ impl TunDatagram { tx: tokio::sync::mpsc::Sender, // receive from tun rx: tokio::sync::mpsc::Receiver, - // the address of the tun udp socket - local_addr: SocketAddr, ) -> Self { Self { rx, tx, pkt: None, flushed: true, - local_addr, } } } -impl InboundDatagram for TunDatagram {} - impl Stream for TunDatagram { type Item = UdpPacket; diff --git a/clash_lib/src/proxy/tun/inbound.rs b/clash_lib/src/proxy/tun/inbound.rs index 466689f8d..ab35677e9 100644 --- a/clash_lib/src/proxy/tun/inbound.rs +++ b/clash_lib/src/proxy/tun/inbound.rs @@ -58,9 +58,7 @@ async fn handle_inbound_datagram( resolver: ThreadSafeDNSResolver, so_mark: u32, ) { - let local_addr = socket.local_addr(); // tun i/o - let (ls, mut lr) = socket.split(); let ls = Arc::new(ls); @@ -73,7 +71,7 @@ async fn handle_inbound_datagram( // for dispatcher - the dispatcher would receive packets from this channel, // which is from the stack and send back packets to this channel, which // is to the tun - let udp_stream = TunDatagram::new(l_tx, d_rx, local_addr); + let udp_stream = TunDatagram::new(l_tx, d_rx); let sess = Session { network: Network::Udp, diff --git a/clash_lib/src/proxy/tun/mod.rs b/clash_lib/src/proxy/tun/mod.rs index 3693292a4..a8227a19b 100644 --- a/clash_lib/src/proxy/tun/mod.rs +++ b/clash_lib/src/proxy/tun/mod.rs @@ -4,6 +4,9 @@ mod datagram; pub use inbound::get_runner as get_tun_runner; mod routes; +#[cfg(target_os = "linux")] // for tproxy +pub use datagram::TunDatagram; + #[cfg(test)] mod tests { use std::thread; diff --git a/clash_lib/src/session.rs b/clash_lib/src/session.rs index d58337361..c8d5c07cb 100644 --- a/clash_lib/src/session.rs +++ b/clash_lib/src/session.rs @@ -47,11 +47,14 @@ impl SocksAddrType { impl SocksAddr { pub fn any_ipv4() -> Self { - Self::Ip("0.0.0.0:0".parse().unwrap()) + Self::Ip(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0)) } pub fn any_ipv6() -> Self { - Self::Ip("[::]:0".parse().unwrap()) + Self::Ip(SocketAddr::new( + IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), + 0, + )) } pub fn write_buf(&self, buf: &mut T) { @@ -369,6 +372,8 @@ pub enum Type { HttpConnect, Socks5, Tun, + #[cfg(target_os = "linux")] + Tproxy, Ignore, }