Skip to content

Commit

Permalink
do not do io on the behaviour,
Browse files Browse the repository at this point in the history
use a oneshot channel in the provider instead.
  • Loading branch information
jxs committed Sep 1, 2023
1 parent 194c792 commit e9b5cd6
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 43 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion examples/upnp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,5 @@ license = "MIT"

[dependencies]
async-std = { version = "1.12", features = ["attributes"] }
async-trait = "0.1"
futures = "0.3.28"
libp2p = { path = "../../libp2p", features = ["async-std", "dns", "macros", "noise", "ping", "tcp", "websocket", "yamux", "upnp"] }
1 change: 0 additions & 1 deletion protocols/upnp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[dependencies]
async-trait = "0.1.68"
futures = "0.3.28"
futures-timer = "3.0.2"
igd-next = "0.14.2"
Expand Down
46 changes: 24 additions & 22 deletions protocols/upnp/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use std::{
};

use crate::provider::{is_addr_global, Gateway, Provider};
use futures::{future::BoxFuture, Future, FutureExt, StreamExt};
use futures::{channel::oneshot, Future, StreamExt};
use futures_timer::Delay;
use igd_next::PortMappingProtocol;
use libp2p_core::{multiaddr, transport::ListenerId, Endpoint, Multiaddr};
Expand Down Expand Up @@ -127,7 +127,7 @@ enum MappingState {

/// Current state of the UPnP [`Gateway`].
enum GatewayState {
Searching(BoxFuture<'static, Result<Gateway, Box<dyn std::error::Error>>>),
Searching(oneshot::Receiver<Result<Gateway, Box<dyn std::error::Error + Send + Sync>>>),
Available(Gateway),
GatewayNotFound,
NonRoutableGateway(IpAddr),
Expand Down Expand Up @@ -230,7 +230,7 @@ where
{
fn default() -> Self {
Self {
state: GatewayState::Searching(P::search_gateway().boxed()),
state: GatewayState::Searching(P::search_gateway()),
mappings: Default::default(),
pending_events: VecDeque::new(),
provider: PhantomData,
Expand Down Expand Up @@ -396,27 +396,29 @@ where
loop {
match self.state {
GatewayState::Searching(ref mut fut) => match Pin::new(fut).poll(cx) {
Poll::Ready(result) => match result {
Ok(gateway) => {
if !is_addr_global(gateway.external_addr) {
self.state =
GatewayState::NonRoutableGateway(gateway.external_addr);
log::debug!(
"the gateway is not routable, its address is {}",
gateway.external_addr
);
return Poll::Ready(ToSwarm::GenerateEvent(
Event::NonRoutableGateway,
));
Poll::Ready(result) => {
match result.expect("sender shouldn't have been dropped") {
Ok(gateway) => {
if !is_addr_global(gateway.external_addr) {
self.state =
GatewayState::NonRoutableGateway(gateway.external_addr);
log::debug!(
"the gateway is not routable, its address is {}",
gateway.external_addr
);
return Poll::Ready(ToSwarm::GenerateEvent(
Event::NonRoutableGateway,
));
}
self.state = GatewayState::Available(gateway);
}
Err(err) => {
log::debug!("could not find gateway: {err}");
self.state = GatewayState::GatewayNotFound;
return Poll::Ready(ToSwarm::GenerateEvent(Event::GatewayNotFound));
}
self.state = GatewayState::Available(gateway);
}
Err(err) => {
log::debug!("could not find gateway: {err}");
self.state = GatewayState::GatewayNotFound;
return Poll::Ready(ToSwarm::GenerateEvent(Event::GatewayNotFound));
}
},
}
Poll::Pending => return Poll::Pending,
},
GatewayState::Available(ref mut gateway) => {
Expand Down
58 changes: 41 additions & 17 deletions protocols/upnp/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
use std::{error::Error, net::IpAddr};

use crate::behaviour::{GatewayEvent, GatewayRequest};
use async_trait::async_trait;
use futures::channel::mpsc::{Receiver, Sender};
use futures::channel::{mpsc, oneshot};

//TODO: remove when `IpAddr::is_global` stabilizes.
pub(crate) fn is_addr_global(addr: IpAddr) -> bool {
Expand Down Expand Up @@ -78,38 +77,67 @@ pub(crate) fn is_addr_global(addr: IpAddr) -> bool {

/// Interface that interacts with the inner gateway by messages,
/// `GatewayRequest`s and `GatewayEvent`s.
#[derive(Debug)]
pub struct Gateway {
pub(crate) sender: Sender<GatewayRequest>,
pub(crate) receiver: Receiver<GatewayEvent>,
pub(crate) sender: mpsc::Sender<GatewayRequest>,
pub(crate) receiver: mpsc::Receiver<GatewayEvent>,
pub(crate) external_addr: IpAddr,
}

/// Abstraction to allow for compatibility with various async runtimes.
#[async_trait]
pub trait Provider {
async fn search_gateway() -> Result<Gateway, Box<dyn Error>>;
fn search_gateway() -> oneshot::Receiver<Result<Gateway, Box<dyn Error + Send + Sync>>>;
}

macro_rules! impl_provider {
($impl:ident, $executor: ident, $gateway:ident, $protocol: ident) => {
use super::Gateway;
use crate::behaviour::{GatewayEvent, GatewayRequest};

use async_trait::async_trait;
use futures::{channel::mpsc, SinkExt, StreamExt};
use futures::{
channel::{mpsc, oneshot},
SinkExt, StreamExt,
};
use igd_next::SearchOptions;
use std::error::Error;

#[async_trait]
impl super::Provider for $impl {
async fn search_gateway() -> Result<super::Gateway, Box<dyn Error>> {
let gateway = $gateway::search_gateway(SearchOptions::default()).await?;
let external_addr = gateway.get_external_ip().await?;
fn search_gateway(
) -> oneshot::Receiver<Result<super::Gateway, Box<dyn Error + Send + Sync>>> {
let (search_result_sender, search_result_receiver) = oneshot::channel();

let (events_sender, mut task_receiver) = mpsc::channel(10);
let (mut task_sender, events_queue) = mpsc::channel(0);

$executor::spawn(async move {
let gateway = match $gateway::search_gateway(SearchOptions::default()).await {
Ok(gateway) => gateway,
Err(err) => {
search_result_sender
.send(Err(err.into()))
.expect("receiver shouldn't have been dropped");
return;
}
};

let external_addr = match gateway.get_external_ip().await {
Ok(addr) => addr,
Err(err) => {
search_result_sender
.send(Err(err.into()))
.expect("receiver shouldn't have been dropped");
return;
}
};

search_result_sender
.send(Ok(Gateway {
sender: events_sender,
receiver: events_queue,
external_addr,
}))
.expect("receiver shouldn't have been dropped");

loop {
// The task sender has dropped so we can return.
let Some(req) = task_receiver.next().await else {
Expand Down Expand Up @@ -153,11 +181,7 @@ macro_rules! impl_provider {
}
});

Ok(Gateway {
sender: events_sender,
receiver: events_queue,
external_addr,
})
search_result_receiver
}
}
};
Expand Down

0 comments on commit e9b5cd6

Please sign in to comment.