Skip to content

Commit

Permalink
feat(rpc, api): add gossipsub methods to Api
Browse files Browse the repository at this point in the history
This commit represents the state of n0-computer#669
before the repo changes.
  • Loading branch information
ramfox authored and nathanielc committed Mar 7, 2023
1 parent 48748c4 commit 3f39db1
Show file tree
Hide file tree
Showing 26 changed files with 778 additions and 811 deletions.
2 changes: 1 addition & 1 deletion examples/embed/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async fn main() -> Result<()> {
"/ip4/0.0.0.0/tcp/0".parse().unwrap(),
"/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap(),
];
let p2p = P2pService::new(p2p_config, dir, store.addr()).await?;
let (p2p,_network_events) = P2pService::new(p2p_config, dir, store.addr()).await?;

// Note by default this is configured with an indexer, but not with http resolvers.
let iroh = IrohBuilder::new().store(store).p2p(p2p).build().await?;
Expand Down
43 changes: 36 additions & 7 deletions iroh-api/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::path::{Path, PathBuf};

Expand All @@ -9,17 +9,17 @@ use anyhow::{ensure, Context, Result};
use cid::Cid;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use iroh_resolver::resolver::Resolver;
use iroh_rpc_client::{Client, ClientStatus};
use iroh_unixfs::{
builder::Entry as UnixfsEntry,
content_loader::{FullLoader, FullLoaderConfig},
use iroh_resolver::{
resolver::Resolver, ContentLoader, FullLoader, FullLoaderConfig, LoaderFromProviders,
};
use iroh_rpc_client::{Client, ClientStatus};
use iroh_unixfs::builder::Entry as UnixfsEntry;
use iroh_util::{iroh_config_path, make_config};
use relative_path::RelativePathBuf;
use tokio::io::{AsyncRead, AsyncReadExt};

use crate::store::add_blocks_to_store;
use crate::PeerId;

/// API to interact with an iroh system.
///
Expand Down Expand Up @@ -128,7 +128,36 @@ impl Api {
);

tracing::debug!("get {:?}", ipfs_path);
let resolver = self.resolver.clone();
self.get_with_resolver(ipfs_path, self.resolver.clone())
}

pub fn get_from(
&self,
ipfs_path: &IpfsPath,
peers: HashSet<PeerId>,
) -> Result<BoxStream<'static, Result<(RelativePathBuf, OutType)>>> {
ensure!(
ipfs_path.cid().is_some(),
"IPFS path does not refer to a CID"
);

tracing::debug!("get {:?} from peers {:#?}", ipfs_path, peers);
// TODO(ramfox): path of least resistance to hack this together as POC. Might be the way
// was want to go, or, if it's bad news bears to have (potentially) more than one resolver,
// we can bake in passing down providers through a LoadConfig (or some similar name) that allows you to tune
// your actions down in the loader
let loader = LoaderFromProviders::new(self.client.clone(), peers);
// TODO(ramfox): is it okay to create these ad-hoc. If so, we probably shouldn't be storing
// one in general
let resolver = Resolver::new(loader);
self.get_with_resolver(ipfs_path, resolver)
}

fn get_with_resolver<T: ContentLoader + std::marker::Unpin>(
&self,
ipfs_path: &IpfsPath,
resolver: Resolver<T>,
) -> Result<BoxStream<'static, Result<(RelativePathBuf, OutType)>>> {
let results = resolver.resolve_recursive_with_paths(ipfs_path.clone());
let sub_path = ipfs_path.to_relative_string();

Expand Down
1 change: 1 addition & 0 deletions iroh-api/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ async fn save_get_stream(
while let Some(block) = blocks.next().await {
let (path, out) = block?;
let full_path = path.to_path(root_path);
println!("full path: {:?}", full_path);
match out {
OutType::Dir => {
tokio::fs::create_dir_all(full_path).await?;
Expand Down
6 changes: 4 additions & 2 deletions iroh-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ pub use crate::api::OutType;
pub use crate::config::Config;
pub use crate::error::ApiError;
pub use crate::p2p::P2p as P2pApi;
pub use crate::p2p::PeerIdOrAddr;
pub use crate::p2p::{peer_id_from_multiaddr, PeerIdOrAddr};
pub use bytes::Bytes;
pub use cid::Cid;
pub use iroh_resolver::resolver::Path as IpfsPath;
pub use iroh_rpc_client::{ClientStatus, Lookup, ServiceStatus, ServiceType, StatusType};
pub use iroh_rpc_client::{
ClientStatus, GossipsubEvent, Lookup, ServiceStatus, ServiceType, StatusType,
};
pub use iroh_unixfs::builder::{
Config as UnixfsConfig, DirectoryBuilder, Entry as UnixfsEntry, FileBuilder, SymlinkBuilder,
};
Expand Down
109 changes: 93 additions & 16 deletions iroh-api/src/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
use crate::error::map_service_error;
use anyhow::Result;
use iroh_rpc_client::{Lookup, P2pClient};
use libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
use std::collections::HashMap;

#[derive(Debug)]
use anyhow::Result;
use bytes::Bytes;
use futures::stream::Stream;
use iroh_rpc_client::{GossipsubEvent, Lookup, P2pClient};
use libp2p::{
gossipsub::{MessageId, TopicHash},
multiaddr::Protocol,
Multiaddr, PeerId,
};

use crate::error::map_service_error;

#[derive(Debug, Clone)]
pub struct P2p {
client: P2pClient,
}
Expand All @@ -21,7 +29,27 @@ impl P2p {
}

pub async fn lookup_local(&self) -> Result<Lookup> {
self.client.lookup_local().await
self.client
.lookup_local()
.await
.map_err(|e| map_service_error("p2p", e))
}

/// The [`PeerId`] for this Iroh p2p nod
pub async fn peer_id(&self) -> Result<PeerId> {
self.client
.local_peer_id()
.await
.map_err(|e| map_service_error("p2p", e))
}

/// The list of [`Multiaddr`] that the Iroh p2p node is listening on
pub async fn addrs(&self) -> Result<Vec<Multiaddr>> {
self.client
.get_listening_addrs()
.await
.map(|(_, addrs)| addrs)
.map_err(|e| map_service_error("p2p", e))
}

pub async fn lookup(&self, addr: &PeerIdOrAddr) -> Result<Lookup> {
Expand All @@ -35,15 +63,15 @@ impl P2p {
.map_err(|e| map_service_error("p2p", e))
}

pub async fn connect(&self, addr: &PeerIdOrAddr) -> Result<()> {
match addr {
PeerIdOrAddr::PeerId(peer_id) => self.client.connect(*peer_id, vec![]).await,
PeerIdOrAddr::Multiaddr(addr) => {
let peer_id = peer_id_from_multiaddr(addr)?;
self.client.connect(peer_id, vec![addr.clone()]).await
}
}
.map_err(|e| map_service_error("p2p", e))
/// Connect to a peer using a [`PeerId`] and `Vec` of [`Multiaddr`]
///
/// If there is an empty `Vec` of `Multiaddr`s, Iroh will attempt to find
/// the peer on the DHT using the `PeerId`.
pub async fn connect(&self, peer_id: PeerId, addrs: Vec<Multiaddr>) -> Result<()> {
self.client
.connect(peer_id, addrs)
.await
.map_err(|e| map_service_error("p2p", e))
}

pub async fn peers(&self) -> Result<HashMap<PeerId, Vec<Multiaddr>>> {
Expand All @@ -52,9 +80,58 @@ impl P2p {
.await
.map_err(|e| map_service_error("p2p", e))
}

/// Subscribe to a pub/sub Topic
///
/// We use Gossipsub as the pub/sub protocol. This method will subscribe you
/// to a Gossipsub topic and return a stream of [`GossipsubEvent`]s relevant
/// to that topic.
///
/// Learn more about the Gossipsub protocol in the `libp2p-gossipsub`
/// [documentation](https://docs.rs/libp2p-gossipsub/latest/libp2p_gossipsub/).
pub async fn subscribe(
&self,
topic: String,
) -> Result<impl Stream<Item = Result<GossipsubEvent>>> {
let topic = TopicHash::from_raw(topic);
self.client
.gossipsub_subscribe(topic)
.await
.map_err(|e| map_service_error("p2p", e))
}

/// Publish a message on a pub/sub Topic.
///
/// We use Gossipsub as the pub/sub protocol. This method allows you to publish
/// a message on a given topic to anyone in your network that is subscribed to
/// that topic.
///
/// Read the [`P2p::subscribe`] documentation for how to subscribe and receive
/// Gossipsub messages.
pub async fn publish(&self, topic: String, data: Bytes) -> Result<MessageId> {
let topic = TopicHash::from_raw(topic);
self.client
.gossipsub_publish(topic, data)
.await
.map_err(|e| map_service_error("p2p", e))
}

/// Explicitly add a peer to our pub/sub network.
///
/// We use Gossipsub as our pub/sub protocol.
///
/// We will attempt to stay connected and forward all relevant Gossipsub messages
/// to this peer. Read the [`P2p::subscribe`] and [`P2p::publish`] documentation
/// for how to subscribe, read, and publish messages.
pub async fn add_pubsub_peer(&self, peer_id: PeerId) -> Result<()> {
self.client
.gossipsub_add_explicit_peer(peer_id)
.await
.map_err(|e| map_service_error("p2p", e))
}
}

fn peer_id_from_multiaddr(addr: &Multiaddr) -> Result<PeerId> {
pub fn peer_id_from_multiaddr(addr: &Multiaddr) -> Result<PeerId> {
match addr.iter().find(|p| matches!(*p, Protocol::P2p(_))) {
Some(Protocol::P2p(peer_id)) => {
PeerId::from_multihash(peer_id).map_err(|m| anyhow::anyhow!("Multiaddress contains invalid p2p multihash {:?}. Cannot derive a PeerId from this address.", m ))
Expand Down
2 changes: 1 addition & 1 deletion iroh-one/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn main() -> Result<()> {
}
}

let (store_rpc, p2p_rpc) = {
let (store_rpc, (_network_events, p2p_rpc)) = {
let store_recv = Addr::new_mem();
let store_sender = store_recv.clone();
let p2p_recv = Addr::new_mem();
Expand Down
1 change: 1 addition & 0 deletions iroh-p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ mod swarm;
pub use self::config::*;
pub use self::keys::{DiskStorage, Keychain, MemoryStorage};
pub use self::node::*;
pub use iroh_rpc_types::{GossipsubEvent, GossipsubEventStream};

pub(crate) const VERSION: &str = env!("CARGO_PKG_VERSION");
Loading

0 comments on commit 3f39db1

Please sign in to comment.