From f2f42fd1f7629a324f6cd02744b9eb4a0407ad24 Mon Sep 17 00:00:00 2001 From: Kasey Date: Thu, 5 Jan 2023 00:39:08 -0500 Subject: [PATCH] add more gossipsub methods to api Add `api.store()` method as way to get `StoreApi` from the API. This exposes the `put` method, which is needed in `iroh-share`. also moves CLI specific `connect` method to `iroh` and adds more general `connect` method to `iroh-api` --- iroh-api/src/api.rs | 14 +++++++-- iroh-api/src/lib.rs | 2 +- iroh-api/src/p2p.rs | 71 ++++++++++++++++++++++++++++++++----------- iroh-api/src/store.rs | 38 ++++++++++++++++++----- iroh-p2p/src/node.rs | 27 ---------------- iroh/src/p2p.rs | 21 +++++++++---- 6 files changed, 113 insertions(+), 60 deletions(-) diff --git a/iroh-api/src/api.rs b/iroh-api/src/api.rs index 7cb6952fdeb..63c212490bd 100644 --- a/iroh-api/src/api.rs +++ b/iroh-api/src/api.rs @@ -21,7 +21,7 @@ use mockall::automock; use relative_path::RelativePathBuf; use tokio::io::{AsyncRead, AsyncReadExt}; -use crate::store::add_blocks_to_store; +use crate::store::{add_blocks_to_store, StoreApi}; /// API to interact with an iroh system. /// @@ -104,6 +104,11 @@ impl Api { Self { client, resolver } } + /// Returns a [`Resolver`] you can use to resolve data from the Iroh store or the network. + pub async fn resolver(&self) -> Resolver { + self.resolver.clone() + } + /// Announces to the DHT that this node can offer the given [`Cid`]. /// /// This publishes a provider record for the [`Cid`] to the DHT, establishing the local @@ -117,6 +122,11 @@ impl Api { Ok(P2pApi::new(p2p_client)) } + pub fn store(&self) -> Result { + let store_client = self.client.try_store()?; + Ok(StoreApi::new(store_client)) + } + /// High level get, equivalent of CLI `iroh get`. /// /// Returns a stream of items, where items can be either blobs or UnixFs components. @@ -192,7 +202,7 @@ impl Api { }; Ok(Box::pin( - add_blocks_to_store(Some(self.client.clone()), blocks).await, + add_blocks_to_store(Some(self.store()?), blocks).await, )) } diff --git a/iroh-api/src/lib.rs b/iroh-api/src/lib.rs index 05a9e3662f6..78d525bcfe8 100644 --- a/iroh-api/src/lib.rs +++ b/iroh-api/src/lib.rs @@ -9,7 +9,7 @@ pub use crate::error::ApiError; pub use crate::p2p::MockP2p as P2pApi; #[cfg(not(feature = "testing"))] pub use crate::p2p::P2p as P2pApi; -pub use crate::p2p::PeerIdOrAddr; +pub use crate::p2p::{peer_id_from_multiaddr, PeerIdOrAddr}; pub use bytes::Bytes; pub use cid::Cid; pub use iroh_resolver::resolver::Path as IpfsPath; diff --git a/iroh-api/src/p2p.rs b/iroh-api/src/p2p.rs index 3b0c08a936f..e6be7e9058d 100644 --- a/iroh-api/src/p2p.rs +++ b/iroh-api/src/p2p.rs @@ -34,7 +34,27 @@ impl P2p { } pub async fn lookup_local(&self) -> Result { - self.client.lookup_local().await + self.client + .lookup_local() + .await + .map_err(|e| map_service_error("p2p", e)) + } + + /// The [`PeerId`] for this Iroh p2p nod + pub async fn peer_id(&self) -> Result { + self.client + .local_peer_id() + .await + .map_err(|e| map_service_error("p2p", e)) + } + + /// The list of [`Multiaddr`] that the Iroh p2p node is listening on + pub async fn addrs(&self) -> Result> { + self.client + .get_listening_addrs() + .await + .map(|(_, addrs)| addrs) + .map_err(|e| map_service_error("p2p", e)) } pub async fn lookup(&self, addr: &PeerIdOrAddr) -> Result { @@ -48,15 +68,15 @@ impl P2p { .map_err(|e| map_service_error("p2p", e)) } - pub async fn connect(&self, addr: &PeerIdOrAddr) -> Result<()> { - match addr { - PeerIdOrAddr::PeerId(peer_id) => self.client.connect(*peer_id, vec![]).await, - PeerIdOrAddr::Multiaddr(addr) => { - let peer_id = peer_id_from_multiaddr(addr)?; - self.client.connect(peer_id, vec![addr.clone()]).await - } - } - .map_err(|e| map_service_error("p2p", e)) + /// Connect to a peer using a [`PeerId`] and `Vec` of [`Multiaddr`] + /// + /// If there is an empty `Vec` of `Multiaddr`s, Iroh will attempt to find + /// the peer on the DHT using the `PeerId`. + pub async fn connect(&self, peer_id: PeerId, addrs: Vec) -> Result<()> { + self.client + .connect(peer_id, addrs) + .await + .map_err(|e| map_service_error("p2p", e)) } pub async fn peers(&self) -> Result>> { @@ -94,23 +114,40 @@ impl P2p { // a stream of only the gossipsub messages on that topic pub async fn gossipsub_subscribe(&self, topic: String) -> Result { let topic = TopicHash::from_raw(topic); - self.client.gossipsub_subscribe(topic).await + self.client + .gossipsub_subscribe(topic) + .await + .map_err(|e| map_service_error("p2p", e)) } /// Publish a message on a Gossipsub Topic. /// - /// This allows you to publish a message on a given topic to anyone in your - /// network that is subscribed to that topic. + /// This allows you to publish a message on a given topic to anyone in your network that is + /// subscribed to that topic. /// - /// Read the [`P2p::gossipsub_subscribe`] documentation for how to subscribe - /// and receive Gossipsub messages. + /// Read the [`gossipsub_subscribe`] documentation for how to subscribe and receive + /// Gossipsub messages. pub async fn gossipsub_publish(&self, topic: String, data: Bytes) -> Result { let topic = TopicHash::from_raw(topic); - self.client.gossipsub_publish(topic, data).await + self.client + .gossipsub_publish(topic, data) + .await + .map_err(|e| map_service_error("p2p", e)) + } + + /// Add a peer to the list of Gossipsub peers we are explicitly connected to. + /// + /// We will attempt to stay connected and forward all relevant Gossipsub messages + /// to this peer. + pub async fn gossipsub_add_peer(&self, peer_id: PeerId) -> Result<()> { + self.client + .gossipsub_add_explicit_peer(peer_id) + .await + .map_err(|e| map_service_error("p2p", e)) } } -fn peer_id_from_multiaddr(addr: &Multiaddr) -> Result { +pub fn peer_id_from_multiaddr(addr: &Multiaddr) -> Result { match addr.iter().find(|p| matches!(*p, Protocol::P2p(_))) { Some(Protocol::P2p(peer_id)) => { PeerId::from_multihash(peer_id).map_err(|m| anyhow::anyhow!("Multiaddress contains invalid p2p multihash {:?}. Cannot derive a PeerId from this address.", m )) diff --git a/iroh-api/src/store.rs b/iroh-api/src/store.rs index 921f39e1a80..e59cafcf630 100644 --- a/iroh-api/src/store.rs +++ b/iroh-api/src/store.rs @@ -6,8 +6,9 @@ use async_trait::async_trait; use bytes::Bytes; use cid::Cid; use futures::{Stream, StreamExt}; -use iroh_rpc_client::Client; +use iroh_rpc_client::StoreClient; use iroh_unixfs::Block; +#[cfg(feature = "testing")] /// How many chunks to buffer up when adding content. const _ADD_PAR: usize = 24; @@ -19,20 +20,43 @@ pub trait Store: 'static + Send + Sync + Clone { async fn put_many(&self, blocks: Vec) -> Result<()>; } +#[derive(Debug, Clone)] +pub struct StoreApi { + client: StoreClient, +} + +impl StoreApi { + pub fn new(client: StoreClient) -> Self { + Self { client } + } + + pub async fn has(&self, cid: Cid) -> Result { + self.client.has(cid).await + } + + pub async fn put(&self, cid: Cid, blob: Bytes, links: Vec) -> Result<()> { + self.client.put(cid, blob, links).await + } + + pub async fn put_many(&self, blocks: Vec) -> Result<()> { + self.client + .put_many(blocks.into_iter().map(|x| x.into_parts()).collect()) + .await + } +} + #[async_trait] -impl Store for Client { +impl Store for StoreApi { async fn has(&self, cid: Cid) -> Result { - self.try_store()?.has(cid).await + self.has(cid).await } async fn put(&self, cid: Cid, blob: Bytes, links: Vec) -> Result<()> { - self.try_store()?.put(cid, blob, links).await + self.put(cid, blob, links).await } async fn put_many(&self, blocks: Vec) -> Result<()> { - self.try_store()? - .put_many(blocks.into_iter().map(|x| x.into_parts()).collect()) - .await + self.put_many(blocks).await } } diff --git a/iroh-p2p/src/node.rs b/iroh-p2p/src/node.rs index 969abad5292..3fc66356e6d 100644 --- a/iroh-p2p/src/node.rs +++ b/iroh-p2p/src/node.rs @@ -10,7 +10,6 @@ use iroh_metrics::{core::MRecorder, inc, libp2p_metrics, p2p::P2PMetrics}; use iroh_rpc_client::Client as RpcClient; use iroh_rpc_types::p2p::P2pAddr; use libp2p::core::Multiaddr; -// use libp2p::gossipsub::{GossipsubMessage, MessageId, TopicHash}; pub use libp2p::gossipsub::{IdentTopic, Topic}; use libp2p::identify::{Event as IdentifyEvent, Info as IdentifyInfo}; use libp2p::identity::Keypair; @@ -45,32 +44,6 @@ use crate::{ }; use crate::{GossipsubEvent, NetworkEvent}; -// #[allow(clippy::large_enum_variant)] -// #[derive(Debug, Clone)] -// pub enum NetworkEvent { -// PeerConnected(PeerId), -// PeerDisconnected(PeerId), -// Gossipsub(GossipsubEvent), -// CancelLookupQuery(PeerId), -// } - -// #[derive(Debug, Clone)] -// pub enum GossipsubEvent { -// Subscribed { -// peer_id: PeerId, -// topic: TopicHash, -// }, -// Unsubscribed { -// peer_id: PeerId, -// topic: TopicHash, -// }, -// Message { -// from: PeerId, -// id: MessageId, -// message: GossipsubMessage, -// }, -// } - pub struct Node { swarm: Swarm, net_receiver_in: Receiver, diff --git a/iroh/src/p2p.rs b/iroh/src/p2p.rs index f466d7f32c0..358592f4b2f 100644 --- a/iroh/src/p2p.rs +++ b/iroh/src/p2p.rs @@ -2,7 +2,7 @@ use crate::doc; use anyhow::{Error, Result}; use clap::{Args, Subcommand}; use crossterm::style::Stylize; -use iroh_api::{Lookup, Multiaddr, P2pApi, PeerId, PeerIdOrAddr}; +use iroh_api::{peer_id_from_multiaddr, Lookup, Multiaddr, P2pApi, PeerId, PeerIdOrAddr}; use std::{collections::HashMap, fmt::Display, str::FromStr}; #[derive(Args, Debug, Clone)] @@ -63,12 +63,21 @@ impl Display for PeerIdOrAddrArg { pub async fn run_command(p2p: &P2pApi, cmd: &P2p) -> Result<()> { match &cmd.command { - P2pCommands::Connect { addr } => match p2p.connect(&addr.0).await { - Ok(_) => { - println!("Connected to {addr}!"); + P2pCommands::Connect { addr } => { + let res = match &addr.0 { + PeerIdOrAddr::PeerId(peer_id) => p2p.connect(*peer_id, vec![]).await, + PeerIdOrAddr::Multiaddr(addr) => { + let peer_id = peer_id_from_multiaddr(addr)?; + p2p.connect(peer_id, vec![addr.clone()]).await + } + }; + match res { + Ok(_) => { + println!("Connected to {addr}!"); + } + Err(e) => return Err(e), } - Err(e) => return Err(e), - }, + } P2pCommands::Lookup { addr } => { let lookup = match addr { Some(addr) => p2p.lookup(&addr.0).await?,