diff --git a/Cargo.lock b/Cargo.lock index 65e87091c..c8debbbcc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1041,7 +1041,7 @@ dependencies = [ "sha2", "shadowsocks", "smoltcp", - "socket2", + "socket2 0.5.7", "tempfile", "thiserror", "tokio", @@ -1061,6 +1061,7 @@ dependencies = [ "tuic", "tuic-quinn", "tun", + "unix-udp-sock", "url", "uuid", "webpki-roots", @@ -2692,7 +2693,7 @@ dependencies = [ "http-body", "hyper", "pin-project-lite", - "socket2", + "socket2 0.5.7", "tokio", "tower-service", "tracing", @@ -2864,7 +2865,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f" dependencies = [ - "socket2", + "socket2 0.5.7", "widestring", "windows-sys 0.48.0", "winreg", @@ -4100,7 +4101,7 @@ dependencies = [ "quinn-udp", "rustc-hash 2.0.0", "rustls", - "socket2", + "socket2 0.5.7", "thiserror", "tokio", "tracing", @@ -4131,7 +4132,7 @@ checksum = "4fe68c2e9e1a1234e218683dbdf9f9dfcb094113c5ac2b938dfcb9bab4c4140b" dependencies = [ "libc", "once_cell", - "socket2", + "socket2 0.5.7", "tracing", "windows-sys 0.59.0", ] @@ -4822,7 +4823,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "shadowsocks-crypto", - "socket2", + "socket2 0.5.7", "spin 0.9.8", "thiserror", "tokio", @@ -4953,6 +4954,16 @@ dependencies = [ "managed", ] +[[package]] +name = "socket2" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "socket2" version = "0.5.7" @@ -5271,7 +5282,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.5.7", "tokio-macros", "tracing", "windows-sys 0.48.0", @@ -5334,7 +5345,7 @@ dependencies = [ "log", "once_cell", "pin-project", - "socket2", + "socket2 0.5.7", "tokio", "windows-sys 0.52.0", ] @@ -5431,7 +5442,7 @@ dependencies = [ "percent-encoding", "pin-project", "prost", - "socket2", + "socket2 0.5.7", "tokio", "tokio-stream", "tower 0.4.13", @@ -6600,6 +6611,22 @@ dependencies = [ "subtle", ] +[[package]] +name = "unix-udp-sock" +version = "0.7.0" +source = "git+https://github.com/Watfaq/unix-udp-sock.git?rev=cd3e4eca43e6f3be82a2703c3d711b7e18fbfd18#cd3e4eca43e6f3be82a2703c3d711b7e18fbfd18" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "libc", + "pin-project-lite", + "socket2 0.4.10", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "unsafe-libyaml" version = "0.2.11" diff --git a/clash_lib/Cargo.toml b/clash_lib/Cargo.toml index 69332a769..0d767f262 100644 --- a/clash_lib/Cargo.toml +++ b/clash_lib/Cargo.toml @@ -27,6 +27,7 @@ hyper-util = "0.1" http = { version = "1" } http-body-util = "0.1.2" socket2 = { version = "0.5", features = ["all"] } +unix-udp-sock = { git = "https://github.com/Watfaq/unix-udp-sock.git", rev = "cd3e4eca43e6f3be82a2703c3d711b7e18fbfd18"} tokio-tungstenite = "0.24.0" # TLS diff --git a/clash_lib/src/proxy/datagram.rs b/clash_lib/src/proxy/datagram.rs index dfd519d35..f452aee5c 100644 --- a/clash_lib/src/proxy/datagram.rs +++ b/clash_lib/src/proxy/datagram.rs @@ -1,20 +1,15 @@ use crate::{ - app::dns::ThreadSafeDNSResolver, - common::errors::new_io_error, - proxy::{socks::Socks5UDPCodec, InboundDatagram}, + app::dns::ThreadSafeDNSResolver, common::errors::new_io_error, session::SocksAddr, }; -use bytes::Bytes; -use futures::{ready, Sink, SinkExt, Stream, StreamExt}; +use futures::{ready, Sink, Stream}; use std::{ fmt::{Debug, Display, Formatter}, io, - net::SocketAddr, pin::Pin, task::{Context, Poll}, }; use tokio::{io::ReadBuf, net::UdpSocket}; -use tokio_util::udp::UdpFramed; #[derive(Clone)] pub struct UdpPacket { @@ -64,90 +59,6 @@ impl UdpPacket { } } -pub struct InboundUdp { - inner: I, -} - -impl InboundUdp -where - I: Stream + Unpin, - I: Sink<((Bytes, SocksAddr), SocketAddr)>, -{ - pub fn new(inner: I) -> Self { - Self { inner } - } -} - -impl Debug for InboundUdp> { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("InboundUdp").finish() - } -} - -impl Stream for InboundUdp> { - type Item = UdpPacket; - - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - let pin = self.get_mut(); - - match pin.inner.poll_next_unpin(cx) { - Poll::Ready(item) => match item { - None => Poll::Ready(None), - Some(item) => match item { - Ok(((dst, pkt), src)) => Poll::Ready(Some(UdpPacket { - data: pkt.to_vec(), - src_addr: SocksAddr::Ip(src), - dst_addr: dst, - })), - Err(_) => Poll::Ready(None), - }, - }, - Poll::Pending => Poll::Pending, - } - } -} - -impl Sink for InboundUdp> { - type Error = std::io::Error; - - fn poll_ready( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - let pin = self.get_mut(); - pin.inner.poll_ready_unpin(cx) - } - - fn start_send(self: Pin<&mut Self>, item: UdpPacket) -> Result<(), Self::Error> { - let pin = self.get_mut(); - pin.inner.start_send_unpin(( - (item.data.into(), item.src_addr), - item.dst_addr.must_into_socket_addr(), - )) - } - - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - let pin = self.get_mut(); - pin.inner.poll_flush_unpin(cx) - } - - fn poll_close( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - let pin = self.get_mut(); - pin.inner.poll_close_unpin(cx) - } -} - -impl InboundDatagram for InboundUdp> {} - #[must_use = "sinks do nothing unless polled"] // TODO: maybe we should use abstract datagram IO interface instead of the // Stream + Sink trait diff --git a/clash_lib/src/proxy/mod.rs b/clash_lib/src/proxy/mod.rs index 2ad38df51..afcf186c0 100644 --- a/clash_lib/src/proxy/mod.rs +++ b/clash_lib/src/proxy/mod.rs @@ -82,6 +82,10 @@ pub trait InboundDatagram: Stream + Sink + Send + Sync + Unpin + Debug { } +impl InboundDatagram for T where + T: Stream + Sink + Send + Sync + Unpin + Debug +{ +} pub type AnyInboundDatagram = Box>; diff --git a/clash_lib/src/proxy/socks/inbound/datagram.rs b/clash_lib/src/proxy/socks/inbound/datagram.rs index 1e45427e3..e8f80a07a 100644 --- a/clash_lib/src/proxy/socks/inbound/datagram.rs +++ b/clash_lib/src/proxy/socks/inbound/datagram.rs @@ -1,7 +1,16 @@ -use crate::session::SocksAddr; +use crate::{proxy::datagram::UdpPacket, session::SocksAddr}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use std::io; -use tokio_util::codec::{Decoder, Encoder}; +use futures::{Sink, SinkExt, Stream, StreamExt}; +use std::{ + io, + net::SocketAddr, + pin::Pin, + task::{Context, Poll}, +}; +use tokio_util::{ + codec::{Decoder, Encoder}, + udp::UdpFramed, +}; // +----+------+------+----------+----------+----------+ // |RSV | FRAG | ATYP | DST.ADDR | DST.PORT | DATA | @@ -65,3 +74,85 @@ impl Decoder for Socks5UDPCodec { Ok(Some((addr, packet))) } } + +pub struct InboundUdp { + inner: I, +} + +impl InboundUdp +where + I: Stream + Unpin, + I: Sink<((Bytes, SocksAddr), SocketAddr)>, +{ + pub fn new(inner: I) -> Self { + Self { inner } + } +} + +impl std::fmt::Debug for InboundUdp> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("InboundUdp").finish() + } +} + +impl Stream for InboundUdp> { + type Item = UdpPacket; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let pin = self.get_mut(); + + match pin.inner.poll_next_unpin(cx) { + Poll::Ready(item) => match item { + None => Poll::Ready(None), + Some(item) => match item { + Ok(((dst, pkt), src)) => Poll::Ready(Some(UdpPacket { + data: pkt.to_vec(), + src_addr: SocksAddr::Ip(src), + dst_addr: dst, + })), + Err(_) => Poll::Ready(None), + }, + }, + Poll::Pending => Poll::Pending, + } + } +} + +impl Sink for InboundUdp> { + type Error = std::io::Error; + + fn poll_ready( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let pin = self.get_mut(); + pin.inner.poll_ready_unpin(cx) + } + + fn start_send(self: Pin<&mut Self>, item: UdpPacket) -> Result<(), Self::Error> { + let pin = self.get_mut(); + pin.inner.start_send_unpin(( + (item.data.into(), item.src_addr), + item.dst_addr.must_into_socket_addr(), + )) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let pin = self.get_mut(); + pin.inner.poll_flush_unpin(cx) + } + + fn poll_close( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let pin = self.get_mut(); + pin.inner.poll_close_unpin(cx) + } +} diff --git a/clash_lib/src/proxy/socks/inbound/stream.rs b/clash_lib/src/proxy/socks/inbound/stream.rs index fda35158b..2294fe04e 100644 --- a/clash_lib/src/proxy/socks/inbound/stream.rs +++ b/clash_lib/src/proxy/socks/inbound/stream.rs @@ -1,8 +1,8 @@ use crate::{ common::{auth::ThreadSafeAuthenticator, errors::new_io_error}, proxy::{ - datagram::InboundUdp, socks::{ + inbound::datagram::InboundUdp, socks5::{auth_methods, response_code, socks_command}, Socks5UDPCodec, SOCKS5_VERSION, }, diff --git a/clash_lib/src/proxy/tproxy/iptables.sh b/clash_lib/src/proxy/tproxy/iptables.sh index 082fc08f1..50c1bfa4c 100755 --- a/clash_lib/src/proxy/tproxy/iptables.sh +++ b/clash_lib/src/proxy/tproxy/iptables.sh @@ -16,7 +16,7 @@ iptables -t mangle -N CLASH-TPROXY-INPUT for i in $LOCAL_BY_PASS; do iptables -t mangle -A CLASH-TPROXY-INPUT -d $i -j RETURN done -iptables -t mangle -A CLASH-TPROXY-LOCAL -m mark --mark 0x3332/0x3332 -j RETURN +iptables -t mangle -A CLASH-TPROXY-LOCAL -m owner --uid-owner root -j RETURN iptables -t mangle -A CLASH-TPROXY-INPUT -p tcp -j TPROXY \ --tproxy-mark 0x3333/0x3333 --on-port 8900 --on-ip 127.0.0.1 @@ -27,10 +27,10 @@ for i in $LOCAL_BY_PASS; do iptables -t mangle -A CLASH-TPROXY-LOCAL -d $i -j RETURN done iptables -t mangle -A CLASH-TPROXY-LOCAL -p tcp -m conntrack --ctdir REPLY -j RETURN -iptables -t mangle -A CLASH-TPROXY-LOCAL -m mark --mark 0x3332/0x3332 -j RETURN +iptables -t mangle -A CLASH-TPROXY-LOCAL -m owner --uid-owner root -j RETURN # https://github.com/shadowsocks/shadowsocks-rust/blob/6e6e6948d7fc426c99cc03ef91abae989b6482b4/configs/iptables_tproxy.sh#L187 iptables -t mangle -A CLASH-TPROXY-LOCAL -p tcp -j MARK --set-xmark 0x3333/0xffffffff -iptables -t mangle -A OUTPUT -j CLASH-TPROXY-LOCAL +#iptables -t mangle -A OUTPUT -j CLASH-TPROXY-LOCAL # for routed traffic iptables -t mangle -A PREROUTING -p tcp -j CLASH-TPROXY-INPUT diff --git a/clash_lib/src/proxy/tproxy/mod.rs b/clash_lib/src/proxy/tproxy/mod.rs index 777207767..d5f59e14a 100644 --- a/clash_lib/src/proxy/tproxy/mod.rs +++ b/clash_lib/src/proxy/tproxy/mod.rs @@ -33,21 +33,26 @@ impl InboundListener for Listener { } fn handle_udp(&self) -> bool { - false + if cfg!(target_os = "linux") { + true + } else { + false + } } #[cfg(target_os = "linux")] async fn listen_tcp(&self) -> std::io::Result<()> { - use socket2::{Domain, Socket, Type}; + use socket2::Socket; use tokio::net::TcpListener; use tracing::info; use crate::{ proxy::utils::apply_tcp_options, - session::{Network, Session}, + session::{Network, Session, Type}, }; - let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?; + let socket = + Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, None)?; socket.set_ip_transparent(true)?; socket.bind(&self.addr.into())?; socket.listen(1024)?; @@ -64,9 +69,9 @@ impl InboundListener for Listener { let sess = Session { network: Network::Tcp, + typ: Type::Tproxy, source: src_addr, destination: orig_dst.into(), - so_mark: Some(0x3332), ..Default::default() }; @@ -85,7 +90,129 @@ impl InboundListener for Listener { Ok(()) } + #[cfg(target_os = "linux")] async fn listen_udp(&self) -> std::io::Result<()> { - unimplemented!("don't listen to me :)") + use std::os::fd::{AsFd, AsRawFd}; + + use socket2::{Domain, Socket}; + + let socket = Socket::new(Domain::IPV4, socket2::Type::DGRAM, None)?; + socket.set_ip_transparent(true)?; + socket.set_nonblocking(true)?; + socket.set_broadcast(true)?; + + let enable = 1u32; + let payload = std::ptr::addr_of!(enable).cast(); + unsafe { + libc::setsockopt( + socket.as_fd().as_raw_fd(), + libc::IPPROTO_IP, + libc::IP_RECVORIGDSTADDR, + payload, + std::mem::size_of_val(&enable) as libc::socklen_t, + ) + }; + socket.bind(&self.addr.into())?; + + let listener = unix_udp_sock::UdpSocket::from_std(socket.into())?; + + handle_inbound_datagram(Arc::new(listener), self.dispather.clone()).await } + + #[cfg(not(target_os = "linux"))] + async fn listen_udp(&self) -> std::io::Result<()> { + warn!("tproxy not supported on non Linux"); + Ok(()) + } +} + +#[cfg(target_os = "linux")] +async fn handle_inbound_datagram( + socket: Arc, + dispatcher: Arc, +) -> std::io::Result<()> { + use tracing::trace; + + use crate::{ + proxy::datagram::UdpPacket, + session::{Network, Session, Type}, + }; + + use super::tun::TunDatagram; + + // dispatcher <-> tun communications + let (l_tx, mut l_rx) = tokio::sync::mpsc::channel(32); + + // forward packets from tun to dispatcher + let (d_tx, d_rx) = tokio::sync::mpsc::channel(32); + + // for dispatcher - the dispatcher would receive packets from this channel, + // which is from the stack and send back packets to this channel, which is + // to the tun + let udp_stream = TunDatagram::new(l_tx, d_rx); + + let sess = Session { + network: Network::Udp, + typ: Type::Tproxy, + ..Default::default() + }; + + let closer = dispatcher.dispatch_datagram(sess, Box::new(udp_stream)); + + // dispatcher -> tproxy + let responder = socket.clone(); + let fut1 = tokio::spawn(async move { + while let Some(pkt) = l_rx.recv().await { + trace!("tproxy <- dispatcher: {:?}", pkt); + + // remote -> local + match responder + .send_to(&pkt.data[..], pkt.dst_addr.must_into_socket_addr()) + .await + { + Ok(_) => {} + Err(e) => { + warn!("failed to send udp packet to proxy: {}", e); + } + } + } + }); + + // tproxy -> dispatcher + let fut2 = tokio::spawn(async move { + let mut buf = vec![0_u8; 1024 * 64]; + while let Ok(meta) = socket.recv_msg(&mut buf).await { + match meta.orig_dst { + Some(orig_dst) => { + if orig_dst.ip().is_multicast() { + continue; + } + + trace!("recv msg:{:?} orig_dst:{:?}", meta, orig_dst); + let pkt = UdpPacket { + data: buf[..meta.len].to_vec(), + src_addr: meta.addr.into(), + dst_addr: orig_dst.into(), + }; + trace!("tproxy -> dispatcher: {:?}", pkt); + match d_tx.send(pkt).await { + Ok(_) => {} + Err(e) => { + warn!("failed to send udp packet to proxy: {}", e); + continue; + } + } + } + None => { + warn!("failed to get orig_dst"); + continue; + } + } + } + + closer.send(0).ok(); + }); + + let _ = futures::future::join(fut1, fut2).await; + Ok(()) } diff --git a/clash_lib/src/proxy/tun/datagram.rs b/clash_lib/src/proxy/tun/datagram.rs index 07b223628..6692ace24 100644 --- a/clash_lib/src/proxy/tun/datagram.rs +++ b/clash_lib/src/proxy/tun/datagram.rs @@ -1,11 +1,8 @@ -use std::{net::SocketAddr, task::Poll}; +use std::task::Poll; use futures::{ready, Sink, Stream}; -use crate::{ - common::errors::new_io_error, - proxy::{datagram::UdpPacket, InboundDatagram}, -}; +use crate::{common::errors::new_io_error, proxy::datagram::UdpPacket}; #[derive(Debug)] pub struct TunDatagram { @@ -14,8 +11,6 @@ pub struct TunDatagram { pkt: Option, flushed: bool, - #[allow(unused)] - local_addr: SocketAddr, } impl TunDatagram { @@ -24,21 +19,16 @@ impl TunDatagram { tx: tokio::sync::mpsc::Sender, // receive from tun rx: tokio::sync::mpsc::Receiver, - // the address of the tun udp socket - local_addr: SocketAddr, ) -> Self { Self { rx, tx, pkt: None, flushed: true, - local_addr, } } } -impl InboundDatagram for TunDatagram {} - impl Stream for TunDatagram { type Item = UdpPacket; diff --git a/clash_lib/src/proxy/tun/inbound.rs b/clash_lib/src/proxy/tun/inbound.rs index 466689f8d..ab35677e9 100644 --- a/clash_lib/src/proxy/tun/inbound.rs +++ b/clash_lib/src/proxy/tun/inbound.rs @@ -58,9 +58,7 @@ async fn handle_inbound_datagram( resolver: ThreadSafeDNSResolver, so_mark: u32, ) { - let local_addr = socket.local_addr(); // tun i/o - let (ls, mut lr) = socket.split(); let ls = Arc::new(ls); @@ -73,7 +71,7 @@ async fn handle_inbound_datagram( // for dispatcher - the dispatcher would receive packets from this channel, // which is from the stack and send back packets to this channel, which // is to the tun - let udp_stream = TunDatagram::new(l_tx, d_rx, local_addr); + let udp_stream = TunDatagram::new(l_tx, d_rx); let sess = Session { network: Network::Udp, diff --git a/clash_lib/src/proxy/tun/mod.rs b/clash_lib/src/proxy/tun/mod.rs index 3693292a4..ff7317c8b 100644 --- a/clash_lib/src/proxy/tun/mod.rs +++ b/clash_lib/src/proxy/tun/mod.rs @@ -4,6 +4,8 @@ mod datagram; pub use inbound::get_runner as get_tun_runner; mod routes; +pub use datagram::TunDatagram; + #[cfg(test)] mod tests { use std::thread; diff --git a/clash_lib/src/session.rs b/clash_lib/src/session.rs index d58337361..a50dcc6ad 100644 --- a/clash_lib/src/session.rs +++ b/clash_lib/src/session.rs @@ -369,6 +369,7 @@ pub enum Type { HttpConnect, Socks5, Tun, + Tproxy, Ignore, }