diff --git a/Cargo.lock b/Cargo.lock index 5fb608ed5..b693f88ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4736,8 +4736,9 @@ dependencies = [ [[package]] name = "shadowsocks" -version = "1.20.3" -source = "git+https://github.com/Watfaq/shadowsocks-rust?rev=c6cb7fd906fe9f4126f724ae252f8a67cc1926b1#c6cb7fd906fe9f4126f724ae252f8a67cc1926b1" +version = "1.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ecb3780dfbc654de9383758015b9bb95c6e32fecace36ebded09d67e854d130" dependencies = [ "aes", "arc-swap", diff --git a/clash_lib/Cargo.toml b/clash_lib/Cargo.toml index 76a303576..5bbd9b224 100644 --- a/clash_lib/Cargo.toml +++ b/clash_lib/Cargo.toml @@ -112,7 +112,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-oslog = { branch = "main", git = "https://github.com/Absolucy/tracing-oslog.git" } tracing-appender = "0.2" -shadowsocks = { git = "https://github.com/Watfaq/shadowsocks-rust", rev = "c6cb7fd906fe9f4126f724ae252f8a67cc1926b1", optional = true, features=["aead-cipher-2022","stream-cipher"] } +shadowsocks = { version="1.21", optional = true, features=["aead-cipher-2022","stream-cipher"] } maxminddb = "0.24" public-suffix = "0.1" murmur3 = "0.5" diff --git a/clash_lib/src/proxy/shadowsocks/datagram.rs b/clash_lib/src/proxy/shadowsocks/datagram.rs index b20e392bb..4d5cd6cd2 100644 --- a/clash_lib/src/proxy/shadowsocks/datagram.rs +++ b/clash_lib/src/proxy/shadowsocks/datagram.rs @@ -4,8 +4,16 @@ use std::{ task::{Context, Poll}, }; -use futures::{ready, Sink, SinkExt, Stream, StreamExt}; -use shadowsocks::ProxySocket; +use bytes::BytesMut; +use futures::{ + ready, + stream::{SplitSink, SplitStream}, + Sink, SinkExt, Stream, StreamExt, +}; +use shadowsocks::{ + relay::udprelay::{DatagramReceive, DatagramSend}, + ProxySocket, +}; use tokio::io::ReadBuf; use tracing::{debug, instrument, trace}; @@ -17,8 +25,8 @@ use crate::{ }; /// the outbound datagram for that shadowsocks returns to us -pub struct OutboundDatagramShadowsocks { - inner: ProxySocket, +pub struct OutboundDatagramShadowsocks { + inner: ProxySocket, remote_addr: SocksAddr, flushed: bool, pkt: Option, @@ -26,26 +34,27 @@ pub struct OutboundDatagramShadowsocks { resolver: ThreadSafeDNSResolver, } -impl OutboundDatagramShadowsocks { - #[allow(clippy::new_ret_no_self)] +impl OutboundDatagramShadowsocks { pub fn new( - inner: ProxySocket, + inner: ProxySocket, remote_addr: (String, u16), resolver: ThreadSafeDNSResolver, - ) -> AnyOutboundDatagram { - let s = Self { + ) -> Self { + Self { inner, flushed: true, pkt: None, remote_addr: remote_addr.try_into().expect("must into socks addr"), buf: vec![0u8; 65535], resolver, - }; - Box::new(s) as _ + } } } -impl Sink for OutboundDatagramShadowsocks { +impl Sink for OutboundDatagramShadowsocks +where + S: DatagramSend + Unpin, +{ type Error = io::Error; fn poll_ready( @@ -156,19 +165,22 @@ impl Sink for OutboundDatagramShadowsocks { } } -impl Stream for OutboundDatagramShadowsocks { +impl Stream for OutboundDatagramShadowsocks +where + S: DatagramReceive + Unpin, +{ type Item = UdpPacket; #[instrument(skip(self, cx))] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { let Self { ref mut buf, ref inner, .. - } = *self; + } = self.get_mut(); let mut buf = ReadBuf::new(buf); @@ -188,75 +200,96 @@ impl Stream for OutboundDatagramShadowsocks { /// Shadowsocks UDP I/O that is passed to shadowsocks relay pub(crate) struct ShadowsocksUdpIo { - inner: AnyOutboundDatagram, + w: tokio::sync::Mutex>, + r: tokio::sync::Mutex<(SplitStream, BytesMut)>, } impl ShadowsocksUdpIo { pub fn new(inner: AnyOutboundDatagram) -> Self { - Self { inner } + let (w, r) = inner.split(); + Self { + w: tokio::sync::Mutex::new(w), + r: tokio::sync::Mutex::new((r, BytesMut::new())), + } } } -impl Sink - for ShadowsocksUdpIo -{ - type Error = io::Error; - - fn poll_ready( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self.inner.poll_ready_unpin(cx) +impl DatagramSend for ShadowsocksUdpIo { + fn poll_send(&self, _: &mut Context<'_>, _: &[u8]) -> Poll> { + Poll::Ready(Err(new_io_error("not supported for shadowsocks udp io"))) } - fn start_send( - mut self: Pin<&mut Self>, - item: shadowsocks::relay::udprelay::proxy_socket::UdpPacket, - ) -> Result<(), Self::Error> { - self.inner.start_send_unpin(UdpPacket { - data: item.data.to_vec(), - src_addr: item.src.map(|x| x.into()).unwrap_or_default(), - dst_addr: item.dst.map(|x| x.into()).unwrap_or_default(), - }) - } - - fn poll_flush( - mut self: Pin<&mut Self>, + fn poll_send_to( + &self, cx: &mut Context<'_>, - ) -> Poll> { - self.inner.poll_flush_unpin(cx) + buf: &[u8], + target: std::net::SocketAddr, + ) -> Poll> { + let mut w = self.w.try_lock().expect("must acquire"); + match w.start_send_unpin(UdpPacket { + data: buf.to_vec(), + src_addr: SocksAddr::any_ipv4(), + dst_addr: target.into(), + }) { + Ok(_) => {} + Err(e) => return Poll::Ready(Err(new_io_error(e.to_string()))), + } + match w.poll_flush_unpin(cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(buf.len())), + Poll::Ready(Err(e)) => { + return Poll::Ready(Err(new_io_error(e.to_string()))) + } + Poll::Pending => return Poll::Pending, + } } - fn poll_close( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self.inner.poll_close_unpin(cx) + fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll> { + let mut w = self.w.try_lock().expect("must acquire"); + w.poll_ready_unpin(cx) + .map_err(|e| new_io_error(e.to_string())) } } -impl Stream for ShadowsocksUdpIo { - type Item = shadowsocks::relay::udprelay::proxy_socket::UdpPacket; - - fn poll_next( - mut self: Pin<&mut Self>, +impl DatagramReceive for ShadowsocksUdpIo { + fn poll_recv( + &self, cx: &mut Context<'_>, - ) -> Poll> { - match ready!(self.inner.poll_next_unpin(cx)) { - Some(pkt) => { - let (src, dst) = ( - pkt.src_addr.must_into_socket_addr(), - pkt.dst_addr.must_into_socket_addr(), - ); - Poll::Ready(Some( - shadowsocks::relay::udprelay::proxy_socket::UdpPacket { - data: pkt.data.into(), - src: src.into(), - dst: dst.into(), - }, - )) + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let mut g = self.r.try_lock().expect("must acquire"); + let (r, remained) = &mut *g; + + if !remained.is_empty() { + let to_consume = buf.remaining().min(remained.len()); + let consume = remained.split_to(to_consume); + buf.put_slice(&consume); + Poll::Ready(Ok(())) + } else { + match r.poll_next_unpin(cx) { + Poll::Ready(Some(pkt)) => { + let to_comsume = buf.remaining().min(pkt.data.len()); + let consume = pkt.data[..to_comsume].to_vec(); + buf.put_slice(&consume); + if to_comsume < pkt.data.len() { + remained.extend_from_slice(&pkt.data[to_comsume..]); + } + Poll::Ready(Ok(())) + } + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(Ok(())), } - None => Poll::Ready(None), } } + + fn poll_recv_from( + &self, + _: &mut Context<'_>, + _: &mut ReadBuf<'_>, + ) -> Poll> { + Poll::Ready(Err(new_io_error("not supported for shadowsocks udp io"))) + } + + fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } } diff --git a/clash_lib/src/proxy/shadowsocks/mod.rs b/clash_lib/src/proxy/shadowsocks/mod.rs index 5e93903c3..de4bf24a7 100644 --- a/clash_lib/src/proxy/shadowsocks/mod.rs +++ b/clash_lib/src/proxy/shadowsocks/mod.rs @@ -271,14 +271,11 @@ impl OutboundHandler for Handler { ) .await?; - let socket = ProxySocket::from_io( + let socket = ProxySocket::from_socket( UdpSocketType::Client, ctx, &cfg, - Box::new(ShadowsocksUdpIo::new(socket)), - None, - #[cfg(unix)] - None, + ShadowsocksUdpIo::new(socket), ); let d = OutboundDatagramShadowsocks::new( socket,