Skip to content

Commit

Permalink
replace UnoderedFutures on the main behaviour
Browse files Browse the repository at this point in the history
for a channel that polls the futures on an async task.
  • Loading branch information
jxs committed Jul 6, 2023
1 parent 4172755 commit 6fc4cfc
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 46 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion protocols/upnp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ libp2p-core = { workspace = true }
libp2p-swarm = { workspace = true }
log = "0.4.19"
void = "1.0.2"
tokio = { version = "1.29", default-features = false, features = ["rt", "rt-multi-thread"], optional = true }
async-std = { version = "1.12.0"}

[features]
tokio = ["dep:igd"]
tokio = ["dep:igd", "dep:tokio"]
async-std = ["dep:igd_async_std"]

97 changes: 71 additions & 26 deletions protocols/upnp/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
use std::{
borrow::Borrow,
collections::HashMap,
error::Error,
hash::{Hash, Hasher},
net::{Ipv4Addr, SocketAddrV4},
pin::Pin,
Expand All @@ -36,7 +35,13 @@ use crate::{
gateway::{Gateway, Protocol},
Config,
};
use futures::{future::BoxFuture, stream::FuturesUnordered, Future, FutureExt, StreamExt};
use futures::{
channel::mpsc::{self, Receiver, UnboundedSender},
future::BoxFuture,
select,
stream::FuturesUnordered,
Future, FutureExt, SinkExt, StreamExt,
};
use futures_timer::Delay;
use libp2p_core::{multiaddr, transport::ListenerId, Endpoint, Multiaddr};
use libp2p_swarm::{
Expand Down Expand Up @@ -85,11 +90,11 @@ enum Event {
/// Port was successfully mapped.
Mapped(Mapping),
/// There was a failure mapping port.
MapFailure(Mapping, Box<dyn Error>),
MapFailure(Mapping, String),
/// Port was successfully removed.
Removed(Mapping),
/// There was a failure removing the mapping port.
RemovalFailure(Mapping, Box<dyn Error>),
RemovalFailure(Mapping, String),
}

/// Mapping of a Protocol and Port on the gateway.
Expand Down Expand Up @@ -155,7 +160,10 @@ where
mappings: HashMap<Mapping, MappingState>,

/// Pending gateway events.
pending_events: FuturesUnordered<BoxFuture<'static, Event>>,
events_queue: Receiver<Event>,

/// Events sender.
events_sender: UnboundedSender<BoxFuture<'static, Event>>,
}

impl<P> Default for Behaviour<P>
Expand All @@ -173,11 +181,29 @@ where
{
/// Builds a new `UPnP` behaviour.
pub fn new(config: Config) -> Self {
let (events_sender, mut task_receiver) = mpsc::unbounded();
let (mut task_sender, events_queue) = mpsc::channel(0);
P::spawn(async move {
let mut futs = FuturesUnordered::new();
loop {
select! {
fut = task_receiver.select_next_some() => {
futs.push(fut);
},
event = futs.select_next_some() => {
task_sender.send(event).await.expect("receiver should be available");
}
complete => break,
}
}
});

Self {
config,
state: GatewayState::Searching(P::search(config).boxed()),
mappings: Default::default(),
pending_events: Default::default(),
events_queue,
events_sender,
}
}
}
Expand Down Expand Up @@ -255,10 +281,16 @@ where
multiaddr: multiaddr.clone(),
};

self.pending_events.push(
map_port::<P>(gateway.clone(), mapping.clone(), self.config.permanent)
self.events_sender
.unbounded_send(
map_port::<P>(
gateway.clone(),
mapping.clone(),
self.config.permanent,
)
.boxed(),
);
)
.expect("receiver should be available");

self.mappings.insert(mapping, MappingState::Pending);
}
Expand All @@ -275,9 +307,11 @@ where
}) => {
if let GatewayState::Available((gateway, _external_addr)) = &self.state {
if let Some((mapping, _state)) = self.mappings.remove_entry(&listener_id) {
self.pending_events.push(
remove_port_mapping::<P>(gateway.clone(), mapping.clone()).boxed(),
);
self.events_sender
.unbounded_send(
remove_port_mapping::<P>(gateway.clone(), mapping.clone()).boxed(),
)
.expect("receiver should be available");
self.mappings.insert(mapping, MappingState::Pending);
}
}
Expand Down Expand Up @@ -328,7 +362,7 @@ where
},
GatewayState::Available((ref gateway, external_addr)) => {
// Check pending mappings.
if let Poll::Ready(Some(result)) = self.pending_events.poll_next_unpin(cx) {
if let Poll::Ready(Some(result)) = self.events_queue.poll_next_unpin(cx) {
match result {
Event::Mapped(mapping) => {
let state = self
Expand Down Expand Up @@ -430,9 +464,12 @@ where
mapping.internal_addr,
mapping.protocol
);
self.pending_events.push(
remove_port_mapping::<P>(gateway.clone(), mapping).boxed(),
);
self.events_sender
.unbounded_send(
remove_port_mapping::<P>(gateway.clone(), mapping.clone())
.boxed(),
)
.expect("receiver should be available");
}
}
}
Expand All @@ -441,22 +478,30 @@ where
for (mapping, state) in self.mappings.iter_mut() {
match state {
MappingState::Inactive => {
self.pending_events.push(
map_port::<P>(
gateway.clone(),
mapping.clone(),
self.config.permanent,
self.events_sender
.unbounded_send(
map_port::<P>(
gateway.clone(),
mapping.clone(),
self.config.permanent,
)
.boxed(),
)
.boxed(),
);
.expect("receiver should be available");
*state = MappingState::Pending;
}
MappingState::Active(timeout) => {
if Pin::new(timeout).poll(cx).is_ready() {
self.pending_events.push(
map_port::<P>(gateway.clone(), mapping.clone(), false)
self.events_sender
.unbounded_send(
map_port::<P>(
gateway.clone(),
mapping.clone(),
self.config.permanent,
)
.boxed(),
);
)
.expect("receiver should be available");
}
}
MappingState::Pending | MappingState::Permanent => {}
Expand Down
12 changes: 9 additions & 3 deletions protocols/upnp/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::{

use crate::Config;
use async_trait::async_trait;
use futures::Future;

#[cfg(feature = "async-std")]
pub mod async_std;
Expand Down Expand Up @@ -63,9 +64,14 @@ pub trait Gateway: Sized + Send + Sync {
protocol: Protocol,
addr: SocketAddrV4,
duration: Duration,
) -> Result<(), Box<dyn Error>>;
) -> Result<(), String>;

/// Remove port mapping on the gateway.
async fn remove_port(_: Arc<Self>, protocol: Protocol, port: u16)
-> Result<(), Box<dyn Error>>;
async fn remove_port(_: Arc<Self>, protocol: Protocol, port: u16) -> Result<(), String>;

// /// Spawn a future on the executor.
fn spawn<F>(f: F)
where
F: Future + Send + 'static,
F::Output: Send + 'static;
}
20 changes: 12 additions & 8 deletions protocols/upnp/src/gateway/async_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use super::Protocol;
use crate::Config;

use async_trait::async_trait;
use futures::Future;
use igd_async_std::{
aio::{self, Gateway},
PortMappingProtocol, SearchOptions,
Expand All @@ -52,7 +53,7 @@ impl super::Gateway for Gateway {
protocol: Protocol,
addr: SocketAddrV4,
duration: Duration,
) -> Result<(), Box<dyn Error>> {
) -> Result<(), String> {
let protocol = match protocol {
Protocol::Tcp => PortMappingProtocol::TCP,
Protocol::Udp => PortMappingProtocol::UDP,
Expand All @@ -66,22 +67,25 @@ impl super::Gateway for Gateway {
"rust-libp2p mapping",
)
.await
.map_err(|err| err.into())
.map_err(|err| err.to_string())
}

async fn remove_port(
gateway: Arc<Self>,
protocol: Protocol,
port: u16,
) -> Result<(), Box<dyn Error>> {
async fn remove_port(gateway: Arc<Self>, protocol: Protocol, port: u16) -> Result<(), String> {
let protocol = match protocol {
Protocol::Tcp => PortMappingProtocol::TCP,
Protocol::Udp => PortMappingProtocol::UDP,
};
gateway
.remove_port(protocol, port)
.await
.map_err(|err| err.into())
.map_err(|err| err.to_string())
}
fn spawn<F>(f: F)
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
async_std::task::spawn(f);
}
}

Expand Down
21 changes: 13 additions & 8 deletions protocols/upnp/src/gateway/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use super::Protocol;
use crate::Config;

use async_trait::async_trait;
use futures::Future;
use igd::{
aio::{self, Gateway},
PortMappingProtocol, SearchOptions,
Expand All @@ -52,7 +53,7 @@ impl super::Gateway for Gateway {
protocol: Protocol,
addr: SocketAddrV4,
duration: Duration,
) -> Result<(), Box<dyn Error>> {
) -> Result<(), String> {
let protocol = match protocol {
Protocol::Tcp => PortMappingProtocol::TCP,
Protocol::Udp => PortMappingProtocol::UDP,
Expand All @@ -66,22 +67,26 @@ impl super::Gateway for Gateway {
"rust-libp2p mapping",
)
.await
.map_err(|err| err.into())
.map_err(|err| err.to_string())
}

async fn remove_port(
gateway: Arc<Self>,
protocol: Protocol,
port: u16,
) -> Result<(), Box<dyn Error>> {
async fn remove_port(gateway: Arc<Self>, protocol: Protocol, port: u16) -> Result<(), String> {
let protocol = match protocol {
Protocol::Tcp => PortMappingProtocol::TCP,
Protocol::Udp => PortMappingProtocol::UDP,
};
gateway
.remove_port(protocol, port)
.await
.map_err(|err| err.into())
.map_err(|err| err.to_string())
}

fn spawn<F>(f: F)
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
tokio::spawn(f);
}
}

Expand Down

0 comments on commit 6fc4cfc

Please sign in to comment.