Skip to content

Commit

Permalink
minor async cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
neonphog committed Oct 23, 2023
1 parent 3457447 commit 38434c3
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 188 deletions.
82 changes: 19 additions & 63 deletions crates/tx5-core/src/uniq.rs
Original file line number Diff line number Diff line change
@@ -1,75 +1,38 @@
use once_cell::sync::Lazy;
use std::sync::{atomic, Arc};

fn gen_node() -> String {
use rand::Rng;
let mut b = [0; 6];
rand::thread_rng().fill(&mut b[..]);
base64::encode_config(b, base64::URL_SAFE_NO_PAD)
}

static NODE_ID: Lazy<Arc<str>> =
Lazy::new(|| gen_node().into_boxed_str().into());

fn gen_cnt() -> u64 {
static CNT_ID: atomic::AtomicU64 = atomic::AtomicU64::new(1);
CNT_ID.fetch_add(1, atomic::Ordering::Relaxed)
}
use std::sync::atomic;

/// Debugging unique identifier helper.
///
/// Construction via `Uniq::default()` will always result in the same
/// "node_id" unique to this process, as well as an incrementing counter.
///
/// Construction via `Uniq::fresh()` will result in a new random
/// "node_id" as well as an incrementing counter.
///
/// ```text
/// Uniq::default() -> "12MUNeh3:1"
/// Uniq::default() -> "12MUNeh3:2"
/// Uniq::fresh() -> "RV1bMaCM:3"
/// Uniq::fresh() -> "3psHcLKE:4"
/// ```
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Uniq(pub Arc<str>);
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Uniq(pub u64);

impl std::fmt::Display for Uniq {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}

impl std::ops::Deref for Uniq {
type Target = Arc<str>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl Default for Uniq {
fn default() -> Self {
Self(
format!("{}:{}", *NODE_ID, gen_cnt())
.into_boxed_str()
.into(),
)
static CNT_ID: atomic::AtomicU64 = atomic::AtomicU64::new(1);
Self(CNT_ID.fetch_add(1, atomic::Ordering::Relaxed))
}
}

impl Uniq {
/// Get a fresh node uniq.
pub fn fresh() -> Self {
Self(
format!("{}:{}", gen_node(), gen_cnt())
.into_boxed_str()
.into(),
)
/// Get a sub-uniq from this uniq.
pub fn sub(&self) -> SubUniq {
SubUniq(self.0, Uniq::default().0)
}
}

/// Get a sub-uniq from this uniq.
pub fn sub(&self) -> Self {
Self(format!("{}:{}", self, gen_cnt()).into_boxed_str().into())
/// Debugging unique identifier helper.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct SubUniq(pub u64, pub u64);

impl std::fmt::Display for SubUniq {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)?;
f.write_str(":")?;
self.1.fmt(f)
}
}

Expand All @@ -81,16 +44,9 @@ mod tests {
fn sanity() {
let u1 = Uniq::default();
let u2 = Uniq::default();
let s1 = u2.sub();
let s2 = u2.sub();
let u3 = Uniq::fresh();

println!("{u1} {u2} {s1} {s2} {u3}");
println!("{u1} {u2}");

assert_ne!(u1, u2);
assert_ne!(u2, s1);
assert_ne!(s1, s2);
assert_ne!(u1, u3);
assert_ne!(u2, u3);
}
}
4 changes: 2 additions & 2 deletions crates/tx5/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,8 +406,8 @@ async fn new_conn_task(
Ok(r) => r,
};

let state_uniq = conn_state.meta().state_uniq.clone();
let conn_uniq = conn_state.meta().conn_uniq.clone();
let state_uniq = conn_state.meta().state_uniq;
let conn_uniq = conn_state.meta().conn_uniq;
let rem_id = conn_state.meta().cli_url.id().unwrap();

struct Unregister(
Expand Down
2 changes: 1 addition & 1 deletion crates/tx5/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ pub mod deps {

use deps::{serde, serde_json};

use tx5_core::Uniq;
pub use tx5_core::{Error, ErrorExt, Id, Result, Tx5InitConfig, Tx5Url};
use tx5_core::{SubUniq, Uniq};

pub mod actor;

Expand Down
80 changes: 35 additions & 45 deletions crates/tx5/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl StateEvtSnd {
}

pub(crate) struct SendData {
msg_uniq: Uniq,
msg_uniq: SubUniq,
data: BackBuf,
timestamp: std::time::Instant,
resp: Option<tokio::sync::oneshot::Sender<Result<()>>>,
Expand Down Expand Up @@ -228,10 +228,10 @@ impl StateData {
match cmd {
StateCmd::Tick1s => self.tick_1s().await,
StateCmd::TrackSig { rem_id, ty, bytes } => {
self.track_sig(rem_id, ty, bytes).await
self.track_sig(rem_id, ty, bytes)
}
StateCmd::SndDemo => self.snd_demo().await,
StateCmd::ListConnected(resp) => self.list_connected(resp).await,
StateCmd::SndDemo => self.snd_demo(),
StateCmd::ListConnected(resp) => self.list_connected(resp),
StateCmd::AssertListenerSig { sig_url, resp } => {
self.assert_listener_sig(sig_url, resp).await
}
Expand All @@ -255,35 +255,29 @@ impl StateData {
)
.await
}
StateCmd::Ban { rem_id, span } => self.ban(rem_id, span).await,
StateCmd::Ban { rem_id, span } => self.ban(rem_id, span),
StateCmd::Stats(resp) => self.stats(resp).await,
StateCmd::Publish { evt } => self.publish(evt).await,
StateCmd::SigConnected { cli_url } => {
self.sig_connected(cli_url).await
}
StateCmd::Publish { evt } => self.publish(evt),
StateCmd::SigConnected { cli_url } => self.sig_connected(cli_url),
StateCmd::FetchForSend { conn, rem_id } => {
self.fetch_for_send(conn, rem_id).await
self.fetch_for_send(conn, rem_id)
}
StateCmd::InOffer {
sig_url,
rem_id,
data,
} => self.in_offer(sig_url, rem_id, data).await,
StateCmd::InDemo { sig_url, rem_id } => {
self.in_demo(sig_url, rem_id).await
}
StateCmd::CacheIce { rem_id, ice } => {
self.cache_ice(rem_id, ice).await
}
StateCmd::GetCachedIce { rem_id } => {
self.get_cached_ice(rem_id).await
self.in_demo(sig_url, rem_id)
}
StateCmd::CacheIce { rem_id, ice } => self.cache_ice(rem_id, ice),
StateCmd::GetCachedIce { rem_id } => self.get_cached_ice(rem_id),
StateCmd::CloseSig { sig_url, sig, err } => {
self.close_sig(sig_url, sig, err).await
self.close_sig(sig_url, sig, err)
}
StateCmd::ConnReady { cli_url } => self.conn_ready(cli_url).await,
StateCmd::ConnReady { cli_url } => self.conn_ready(cli_url),
StateCmd::CloseConn { rem_id, conn, err } => {
self.close_conn(rem_id, conn, err).await
self.close_conn(rem_id, conn, err)
}
}
}
Expand Down Expand Up @@ -339,7 +333,7 @@ impl StateData {
let meta = conn.meta();

let args = drop_consider::DropConsiderArgs {
conn_uniq: meta.conn_uniq.clone(),
conn_uniq: meta.conn_uniq,
cfg_conn_max_cnt: meta.config.max_conn_count() as i64,
cfg_conn_max_init: meta.config.max_conn_init().as_secs_f64(),
tot_conn_cnt,
Expand Down Expand Up @@ -396,7 +390,7 @@ impl StateData {
Ok(())
}

async fn track_sig(
fn track_sig(
&mut self,
rem_id: Id,
ty: &'static str,
Expand All @@ -410,7 +404,7 @@ impl StateData {
Ok(())
}

async fn snd_demo(&mut self) -> Result<()> {
fn snd_demo(&mut self) -> Result<()> {
for (_, sig) in self.signal_map.iter() {
if let Some(sig) = sig.upgrade() {
sig.snd_demo();
Expand All @@ -420,7 +414,7 @@ impl StateData {
Ok(())
}

async fn list_connected(
fn list_connected(
&mut self,
resp: tokio::sync::oneshot::Sender<Result<Vec<Tx5Url>>>,
) -> Result<()> {
Expand Down Expand Up @@ -477,7 +471,7 @@ impl StateData {
sig_url: Tx5Url,
rem_id: Id,
maybe_offer: Option<BackBuf>,
maybe_msg_uniq: Option<Uniq>,
maybe_msg_uniq: Option<SubUniq>,
) -> Result<()> {
if self.is_banned(rem_id) {
tracing::warn!(
Expand Down Expand Up @@ -514,7 +508,7 @@ impl StateData {
self.meta.metric_conn_count.clone(),
self.this.clone(),
sig,
self.state_uniq.clone(),
self.state_uniq,
conn_uniq,
this_id,
cli_url.clone(),
Expand All @@ -540,7 +534,7 @@ impl StateData {
#[allow(clippy::too_many_arguments)]
async fn send_data(
&mut self,
msg_uniq: Uniq,
msg_uniq: SubUniq,
rem_id: Id,
data: BackBuf,
timestamp: std::time::Instant,
Expand All @@ -561,7 +555,7 @@ impl StateData {
.entry(rem_id)
.or_default()
.push_back(SendData {
msg_uniq: msg_uniq.clone(),
msg_uniq,
data,
timestamp,
resp: Some(data_sent),
Expand All @@ -584,11 +578,7 @@ impl StateData {
.await
}

async fn ban(
&mut self,
rem_id: Id,
span: std::time::Duration,
) -> Result<()> {
fn ban(&mut self, rem_id: Id, span: std::time::Duration) -> Result<()> {
let expires_at = std::time::Instant::now() + span;
self.ban_map.insert(rem_id, expires_at);
self.send_map.remove(&rem_id);
Expand Down Expand Up @@ -645,12 +635,12 @@ impl StateData {
}
}

async fn publish(&mut self, evt: StateEvt) -> Result<()> {
fn publish(&mut self, evt: StateEvt) -> Result<()> {
let _ = self.evt.publish(evt);
Ok(())
}

async fn sig_connected(&mut self, cli_url: Tx5Url) -> Result<()> {
fn sig_connected(&mut self, cli_url: Tx5Url) -> Result<()> {
let loc_id = cli_url.id().unwrap();
if let Some(this_id) = &self.this_id {
if this_id != &loc_id {
Expand All @@ -663,7 +653,7 @@ impl StateData {
Ok(())
}

async fn fetch_for_send(
fn fetch_for_send(
&mut self,
want_conn: ConnStateWeak,
rem_id: Id,
Expand Down Expand Up @@ -743,12 +733,12 @@ impl StateData {
.await
}

async fn in_demo(&mut self, sig_url: Tx5Url, rem_id: Id) -> Result<()> {
fn in_demo(&mut self, sig_url: Tx5Url, rem_id: Id) -> Result<()> {
let cli_url = sig_url.to_client(rem_id);
self.evt.publish(StateEvt::Demo(cli_url))
}

async fn cache_ice(&mut self, rem_id: Id, ice: BackBuf) -> Result<()> {
fn cache_ice(&mut self, rem_id: Id, ice: BackBuf) -> Result<()> {
let list = self.ice_cache.entry(rem_id).or_default();
list.push_back(IceData {
timestamp: std::time::Instant::now(),
Expand All @@ -757,7 +747,7 @@ impl StateData {
Ok(())
}

async fn get_cached_ice(&mut self, rem_id: Id) -> Result<()> {
fn get_cached_ice(&mut self, rem_id: Id) -> Result<()> {
let StateData {
conn_map,
ice_cache,
Expand All @@ -775,7 +765,7 @@ impl StateData {
Ok(())
}

async fn close_sig(
fn close_sig(
&mut self,
sig_url: Tx5Url,
sig: SigStateWeak,
Expand All @@ -794,11 +784,11 @@ impl StateData {
Ok(())
}

async fn conn_ready(&mut self, cli_url: Tx5Url) -> Result<()> {
fn conn_ready(&mut self, cli_url: Tx5Url) -> Result<()> {
self.evt.publish(StateEvt::Connected(cli_url))
}

async fn close_conn(
fn close_conn(
&mut self,
rem_id: Id,
conn: ConnStateWeak,
Expand Down Expand Up @@ -832,7 +822,7 @@ enum StateCmd {
resp: tokio::sync::oneshot::Sender<Result<Tx5Url>>,
},
SendData {
msg_uniq: Uniq,
msg_uniq: SubUniq,
rem_id: Id,
data: BackBuf,
timestamp: std::time::Instant,
Expand Down Expand Up @@ -988,7 +978,7 @@ impl State {
.0;

let meta = StateMeta {
state_uniq: state_uniq.clone(),
state_uniq,
config,
conn_limit,
snd_limit,
Expand Down Expand Up @@ -1136,7 +1126,7 @@ impl State {
let (s_sent, r_sent) = tokio::sync::oneshot::channel();

if let Err(err) = this.0.send(Ok(StateCmd::SendData {
msg_uniq: msg_uniq.clone(),
msg_uniq: *msg_uniq,
rem_id,
data: buf,
timestamp,
Expand Down
Loading

0 comments on commit 38434c3

Please sign in to comment.