Skip to content

Commit

Permalink
Perf (#102)
Browse files Browse the repository at this point in the history
* wip

* choose chksum algo
  • Loading branch information
ibigbug authored Sep 29, 2023
1 parent f61023c commit cd961d6
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 66 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions clash/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
extern crate clash_lib as clash;

use clap::Parser;
use clash::TokioRuntime;
use std::path::PathBuf;

#[derive(Parser)]
Expand All @@ -27,6 +28,7 @@ fn main() {
clash::start(clash::Options {
config: clash::Config::File("".to_string(), cli.config.to_string_lossy().to_string()),
cwd: cli.directory.map(|x| x.to_string_lossy().to_string()),
rt: Some(TokioRuntime::MultiThread),
})
.unwrap();
}
3 changes: 2 additions & 1 deletion clash_lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ hyper-boring = { git = "https://github.com/Watfaq/boring.git", rev = "24c006f" }
tokio-boring = { git = "https://github.com/Watfaq/boring.git", rev = "24c006f" }
ip_network_table-deps-treebitmap = "0.5.0"
once_cell = "1.18.0"
arc-swap = "1.6.0"

# opentelemetry
opentelemetry = "0.20"
Expand All @@ -60,7 +61,7 @@ tower-http = { version = "0.4.0", features = ["fs", "trace", "cors"] }
chrono = { version = "0.4.26", features = ["serde"] }

tun = { git = "https://github.com/Watfaq/rust-tun.git", rev = "28936b6", features = ["async"] }
netstack-lwip = { git = "https://github.com/Watfaq/netstack-lwip.git", rev = "8c8c0b0" }
netstack-lwip = { git = "https://github.com/Watfaq/netstack-lwip.git", rev = "5ad376f" }
boringtun = { version = "0.6.0", features = ["device"] }

serde = { version = "1.0", features=["derive"] }
Expand Down
8 changes: 4 additions & 4 deletions clash_lib/src/app/api/handlers/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub fn routes(outbound_manager: ThreadSafeOutboundManager) -> Router<Arc<AppStat
}

async fn get_providers(State(state): State<ProviderState>) -> impl IntoResponse {
let outbound_manager = state.outbound_manager.read().await;
let outbound_manager = state.outbound_manager.clone();
let mut res = HashMap::new();

let mut providers = HashMap::new();
Expand All @@ -76,7 +76,7 @@ async fn find_proxy_provider_by_name<B>(
mut req: Request<B>,
next: Next<B>,
) -> Response {
let outbound_manager = state.outbound_manager.read().await;
let outbound_manager = state.outbound_manager.clone();
if let Some(provider) = outbound_manager.get_proxy_provider(&name) {
req.extensions_mut().insert(provider);
next.run(req).await
Expand Down Expand Up @@ -154,7 +154,7 @@ async fn get_proxy(
Extension(proxy): Extension<AnyOutboundHandler>,
State(state): State<ProviderState>,
) -> impl IntoResponse {
let outbound_manager = state.outbound_manager.read().await;
let outbound_manager = state.outbound_manager.clone();
axum::response::Json(outbound_manager.get_proxy(&proxy).await)
}

Expand All @@ -168,7 +168,7 @@ async fn get_proxy_delay(
Extension(proxy): Extension<AnyOutboundHandler>,
Query(q): Query<DelayRequest>,
) -> impl IntoResponse {
let outbound_manager = state.outbound_manager.read().await;
let outbound_manager = state.outbound_manager.clone();
let timeout = Duration::from_millis(q.timeout.into());
let n = proxy.name().to_owned();
match outbound_manager.url_test(proxy, &q.url, timeout).await {
Expand Down
10 changes: 5 additions & 5 deletions clash_lib/src/app/api/handlers/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub fn routes(
}

async fn get_proxies(State(state): State<ProxyState>) -> impl IntoResponse {
let outbound_manager = state.outbound_manager.read().await;
let outbound_manager = state.outbound_manager.clone();
let mut res = HashMap::new();
let proxies = outbound_manager.get_proxies().await;
res.insert("proxies".to_owned(), proxies);
Expand All @@ -63,7 +63,7 @@ async fn find_proxy_by_name<B>(
mut req: Request<B>,
next: Next<B>,
) -> Response {
let outbound_manager = state.outbound_manager.read().await;
let outbound_manager = state.outbound_manager.clone();
if let Some(proxy) = outbound_manager.get_outbound(&name) {
req.extensions_mut().insert(proxy);
next.run(req).await
Expand All @@ -76,7 +76,7 @@ async fn get_proxy(
Extension(proxy): Extension<AnyOutboundHandler>,
State(state): State<ProxyState>,
) -> impl IntoResponse {
let outbound_manager = state.outbound_manager.read().await;
let outbound_manager = state.outbound_manager.clone();
axum::response::Json(outbound_manager.get_proxy(&proxy).await)
}

Expand All @@ -91,7 +91,7 @@ async fn update_proxy(
Extension(proxy): Extension<AnyOutboundHandler>,
Json(payload): Json<UpdateProxyRequest>,
) -> impl IntoResponse {
let outbound_manager = state.outbound_manager.read().await;
let outbound_manager = state.outbound_manager.clone();
if let Some(ctrl) = outbound_manager.get_selector_control(proxy.name()) {
match ctrl.lock().await.select(&payload.name).await {
Ok(_) => {
Expand Down Expand Up @@ -130,7 +130,7 @@ async fn get_proxy_delay(
Extension(proxy): Extension<AnyOutboundHandler>,
Query(q): Query<DelayRequest>,
) -> impl IntoResponse {
let outbound_manager = state.outbound_manager.read().await;
let outbound_manager = state.outbound_manager.clone();
let timeout = Duration::from_millis(q.timeout.into());
let n = proxy.name().to_owned();
match outbound_manager.url_test(proxy, &q.url, timeout).await {
Expand Down
37 changes: 18 additions & 19 deletions clash_lib/src/app/dispatcher/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::config::internal::proxy::PROXY_GLOBAL;
use crate::proxy::datagram::UdpPacket;
use crate::proxy::AnyInboundDatagram;
use crate::session::Session;
use arc_swap::ArcSwap;
use futures::SinkExt;
use futures::StreamExt;
use std::collections::HashMap;
Expand All @@ -17,7 +18,6 @@ use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use tokio::io::{copy_bidirectional, AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tracing::info_span;
Expand All @@ -34,7 +34,7 @@ pub struct Dispatcher {
outbound_manager: ThreadSafeOutboundManager,
router: ThreadSafeRouter,
resolver: ThreadSafeDNSResolver,
mode: Arc<RwLock<RunMode>>,
mode: ArcSwap<RunMode>,

manager: Arc<Manager>,
}
Expand All @@ -58,20 +58,19 @@ impl Dispatcher {
outbound_manager,
router,
resolver,
mode: Arc::new(RwLock::new(mode)),
mode: Arc::new(mode).into(),
manager: statistics_manager,
}
}

pub async fn set_mode(&self, mode: RunMode) {
info!("run mode switched to {}", mode);
let mut m = self.mode.write().await;
*m = mode;

self.mode.store(Arc::new(mode));
}

pub async fn get_mode(&self) -> RunMode {
let mode = self.mode.read().await;
mode.clone()
**self.mode.load()
}

#[instrument(skip(lhs))]
Expand Down Expand Up @@ -107,15 +106,15 @@ impl Dispatcher {
sess
};

let mode = self.mode.read().await;
let mode = **self.mode.load();
debug!("dispatching {} with mode {}", sess, mode);
let (outbound_name, rule) = match *mode {
let (outbound_name, rule) = match mode {
RunMode::Global => (PROXY_GLOBAL, None),
RunMode::Rule => self.router.match_route(&sess).await,
RunMode::Direct => (PROXY_DIRECT, None),
};

let mgr = self.outbound_manager.read().await;
let mgr = self.outbound_manager.clone();
let handler = mgr.get_outbound(outbound_name).unwrap_or_else(|| {
debug!("unknown rule: {}, fallback to direct", outbound_name);
mgr.get_outbound(PROXY_DIRECT).unwrap()
Expand Down Expand Up @@ -186,7 +185,7 @@ impl Dispatcher {
let router = self.router.clone();
let outbound_manager = self.outbound_manager.clone();
let resolver = self.resolver.clone();
let mode = self.mode.clone();
let mode = **self.mode.load();
let manager = self.manager.clone();

let (mut local_w, mut local_r) = udp_inbound.split();
Expand Down Expand Up @@ -236,10 +235,10 @@ impl Dispatcher {
let mut packet = packet;
packet.dst_addr = sess.destination.clone();

let mode = mode.read().await;
let mode = mode.clone();
trace!("dispatching {} with mode {}", sess, mode);

let (outbound_name, rule) = match *mode {
let (outbound_name, rule) = match mode {
RunMode::Global => (PROXY_GLOBAL, None),
RunMode::Rule => router.match_route(&sess).await,
RunMode::Direct => (PROXY_DIRECT, None),
Expand All @@ -249,7 +248,7 @@ impl Dispatcher {

let remote_receiver_w = remote_receiver_w.clone();

let mgr = outbound_manager.read().await;
let mgr = outbound_manager.clone();
let handler = mgr.get_outbound(&outbound_name).unwrap_or_else(|| {
debug!("unknown rule: {}, fallback to direct", outbound_name);
mgr.get_outbound(PROXY_DIRECT).unwrap()
Expand Down Expand Up @@ -381,7 +380,7 @@ impl Dispatcher {
type OutboundPacketSender = tokio::sync::mpsc::Sender<UdpPacket>; // outbound packet sender

struct TimeoutUdpSessionManager {
map: Arc<Mutex<OutboundHandleMap>>,
map: Arc<RwLock<OutboundHandleMap>>,

cleaner: Option<JoinHandle<()>>,
}
Expand All @@ -395,7 +394,7 @@ impl Drop for TimeoutUdpSessionManager {

impl TimeoutUdpSessionManager {
fn new() -> Self {
let map = Arc::new(Mutex::new(OutboundHandleMap::new()));
let map = Arc::new(RwLock::new(OutboundHandleMap::new()));
let timeout = Duration::from_secs(10);

let map_cloned = map.clone();
Expand All @@ -405,7 +404,7 @@ impl TimeoutUdpSessionManager {
tokio::time::sleep(Duration::from_secs(10)).await;

trace!("timeout udp session cleaner scanning");
let mut g = map_cloned.lock().await;
let mut g = map_cloned.write().await;
let mut alived = 0;
let mut expired = 0;
g.0.retain(|k, x| {
Expand Down Expand Up @@ -445,7 +444,7 @@ impl TimeoutUdpSessionManager {
send_handle: JoinHandle<()>,
sender: OutboundPacketSender,
) {
let mut map = self.map.lock().await;
let mut map = self.map.write().await;
map.insert(outbound_name, src_addr, recv_handle, send_handle, sender);
}

Expand All @@ -454,7 +453,7 @@ impl TimeoutUdpSessionManager {
outbound_name: &str,
src_addr: SocketAddr,
) -> Option<OutboundPacketSender> {
let mut map = self.map.lock().await;
let mut map = self.map.write().await;
map.get_outbound_sender_mut(outbound_name, src_addr)
}
}
Expand Down
2 changes: 1 addition & 1 deletion clash_lib/src/app/outbound/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub struct OutboundManager {

static DEFAULT_LATENCY_TEST_URL: &str = "http://www.gstatic.com/generate_204";

pub type ThreadSafeOutboundManager = Arc<RwLock<OutboundManager>>;
pub type ThreadSafeOutboundManager = Arc<OutboundManager>;

impl OutboundManager {
pub async fn new(
Expand Down
11 changes: 10 additions & 1 deletion clash_lib/src/app/remote_content_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,16 @@ struct ProxyState {
pub struct ProxyManager {
proxy_state: Arc<RwLock<HashMap<String, ProxyState>>>,
dns_resolver: ThreadSafeDNSResolver,

connector_map: Arc<RwLock<HashMap<String, HttpsConnector<LocalConnector>>>>,
}

impl ProxyManager {
pub fn new(dns_resolver: ThreadSafeDNSResolver) -> Self {
Self {
dns_resolver,
proxy_state: Arc::new(RwLock::new(HashMap::new())),
connector_map: Arc::new(RwLock::new(HashMap::new())),
}
}

Expand Down Expand Up @@ -165,7 +168,13 @@ impl ProxyManager {
ssl.set_alpn_protos(b"\x02h2\x08http/1.1")
.map_err(map_io_error)?;

let connector = HttpsConnector::with_connector(connector, ssl).map_err(map_io_error)?;
let mut g = self.connector_map.write().await;
let connector = g
.entry(name.clone())
.or_insert(HttpsConnector::with_connector(connector, ssl).map_err(map_io_error)?);

let connector = connector.clone();

let client = hyper::Client::builder().build::<_, hyper::Body>(connector);

let req = Request::get(url)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::{collections::HashMap, sync::Arc};

use async_trait::async_trait;
use erased_serde::Serialize;
use tokio::sync::Mutex;
use tracing::debug;

use crate::{
Expand All @@ -16,14 +15,10 @@ use crate::{

use super::proxy_provider::ProxyProvider;

struct Inner {
hc: Arc<HealthCheck>,
}

pub struct PlainProvider {
name: String,
proxies: Vec<AnyOutboundHandler>,
inner: Arc<Mutex<Inner>>,
hc: Arc<HealthCheck>,
}

impl PlainProvider {
Expand All @@ -46,11 +41,7 @@ impl PlainProvider {
});
}

Ok(Self {
name,
proxies,
inner: Arc::new(Mutex::new(Inner { hc })),
})
Ok(Self { name, proxies, hc })
}
}

Expand Down Expand Up @@ -93,10 +84,10 @@ impl ProxyProvider for PlainProvider {
}

async fn touch(&self) {
self.inner.lock().await.hc.touch().await;
self.hc.touch().await;
}

async fn healthcheck(&self) {
self.inner.lock().await.hc.check().await;
self.hc.check().await;
}
}
Loading

0 comments on commit cd961d6

Please sign in to comment.