Skip to content

Commit

Permalink
merge upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
neonphog committed Oct 11, 2024
1 parent 9c0ada8 commit ec92e35
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions crates/tx5/src/backend/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ pub async fn connect(
_config: &Arc<Config>,
_url: &str,
_listener: bool,
) -> Result<(DynBackEp, DynBackEpRecv)> {
) -> Result<(DynBackEp, DynBackEpRecvCon)> {
let (ep, ep_recv) = STAT.listen();
let ep: DynBackEp = ep;
let ep_recv: DynBackEpRecv = Box::new(ep_recv);
let ep_recv: DynBackEpRecvCon = Box::new(ep_recv);

Ok((ep, ep_recv))
}

struct ConRecv(tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>);
struct ConRecvData(tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>);

impl BackConRecv for ConRecv {
impl BackConRecvData for ConRecvData {
fn recv(&mut self) -> BoxFuture<'_, Option<Vec<u8>>> {
Box::pin(async { self.0.recv().await })
}
Expand All @@ -33,12 +33,12 @@ struct Con {

impl Con {
#[allow(clippy::new_ret_no_self)]
pub fn new(pub_key: PubKey) -> (DynBackCon, DynBackConRecv) {
pub fn new(pub_key: PubKey) -> (DynBackCon, DynBackConRecvData) {
let (send, recv) = tokio::sync::mpsc::unbounded_channel();

let con: DynBackCon = Arc::new(Self { pub_key, send });

let con_recv: DynBackConRecv = Box::new(ConRecv(recv));
let con_recv: DynBackConRecvData = Box::new(ConRecvData(recv));

(con, con_recv)
}
Expand Down Expand Up @@ -74,14 +74,14 @@ impl BackCon for Con {
struct WaitCon {
pub_key: PubKey,
con: Option<DynBackCon>,
con_recv: Option<DynBackConRecv>,
con_recv: Option<DynBackConRecvData>,
}

impl BackWaitCon for WaitCon {
fn wait(
&mut self,
_recv_limit: Arc<tokio::sync::Semaphore>,
) -> BoxFuture<'static, Result<(DynBackCon, DynBackConRecv)>> {
) -> BoxFuture<'static, Result<(DynBackCon, DynBackConRecvData)>> {
let con = self.con.take().unwrap();
let con_recv = self.con_recv.take().unwrap();
Box::pin(async { Ok((con, con_recv)) })
Expand All @@ -92,9 +92,9 @@ impl BackWaitCon for WaitCon {
}
}

struct EpRecv(tokio::sync::mpsc::UnboundedReceiver<DynBackWaitCon>);
struct EpRecvCon(tokio::sync::mpsc::UnboundedReceiver<DynBackWaitCon>);

impl BackEpRecv for EpRecv {
impl BackEpRecvCon for EpRecvCon {
fn recv(&mut self) -> BoxFuture<'_, Option<DynBackWaitCon>> {
Box::pin(async { self.0.recv().await })
}
Expand Down Expand Up @@ -136,7 +136,7 @@ impl Drop for Ep {
}

impl Ep {
pub fn new(uniq: u64, loc: usize) -> (Arc<Self>, EpRecv) {
pub fn new(uniq: u64, loc: usize) -> (Arc<Self>, EpRecvCon) {
let pub_key = gen_pub_key(uniq, loc);

let (send, recv) = tokio::sync::mpsc::unbounded_channel();
Expand All @@ -148,7 +148,7 @@ impl Ep {
pub_key,
send,
}),
EpRecv(recv),
EpRecvCon(recv),
)
}
}
Expand Down Expand Up @@ -190,7 +190,7 @@ impl Stat {
}
}

pub fn listen(&self) -> (Arc<Ep>, EpRecv) {
pub fn listen(&self) -> (Arc<Ep>, EpRecvCon) {
static UNIQ: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(1);
let uniq = UNIQ.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Expand Down Expand Up @@ -285,7 +285,7 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn mem_backend_e2e() {
let config = Arc::new(Config {
backend_module: BackendModule::Mock,
backend_module: BackendModule::Mem,
..Default::default()
});

Expand Down

0 comments on commit ec92e35

Please sign in to comment.