From 9adc9c65317f478ddb8aede15b5fd1a58515ff2e Mon Sep 17 00:00:00 2001 From: Kasey Date: Tue, 10 Jan 2023 21:23:06 -0500 Subject: [PATCH] refactor: iroh-share sender --- iroh-api/Cargo.toml | 1 - iroh-api/src/lib.rs | 4 +- iroh-api/src/p2p.rs | 13 ++- iroh-p2p/src/lib.rs | 2 +- iroh-p2p/src/rpc.rs | 7 +- iroh-rpc-client/src/lib.rs | 1 + iroh-rpc-types/src/gossipsub_event.rs | 5 ++ iroh-rpc-types/src/lib.rs | 2 +- iroh-share/src/data.rs | 4 +- iroh-share/src/lib.rs | 27 +++++- iroh-share/src/sender.rs | 114 ++++++++++++++++++++++++++ 11 files changed, 159 insertions(+), 21 deletions(-) diff --git a/iroh-api/Cargo.toml b/iroh-api/Cargo.toml index 4dc10879035..8d750c0c817 100644 --- a/iroh-api/Cargo.toml +++ b/iroh-api/Cargo.toml @@ -20,7 +20,6 @@ iroh-metrics.workspace = true iroh-resolver.workspace = true iroh-rpc-client.workspace = true iroh-rpc-types.workspace = true -iroh-p2p.workspace = true iroh-unixfs.workspace = true iroh-util.workspace = true libp2p.workspace = true diff --git a/iroh-api/src/lib.rs b/iroh-api/src/lib.rs index 4af1b8db5b3..14ef34f2f54 100644 --- a/iroh-api/src/lib.rs +++ b/iroh-api/src/lib.rs @@ -7,7 +7,9 @@ pub use crate::p2p::{peer_id_from_multiaddr, PeerIdOrAddr}; pub use bytes::Bytes; pub use cid::Cid; pub use iroh_resolver::resolver::Path as IpfsPath; -pub use iroh_rpc_client::{ClientStatus, Lookup, ServiceStatus, ServiceType, StatusType}; +pub use iroh_rpc_client::{ + ClientStatus, GossipsubEvent, Lookup, ServiceStatus, ServiceType, StatusType, +}; pub use iroh_unixfs::builder::{ Config as UnixfsConfig, DirectoryBuilder, Entry as UnixfsEntry, FileBuilder, SymlinkBuilder, }; diff --git a/iroh-api/src/p2p.rs b/iroh-api/src/p2p.rs index 9aa19aad1ff..8c5f918c84f 100644 --- a/iroh-api/src/p2p.rs +++ b/iroh-api/src/p2p.rs @@ -2,9 +2,8 @@ use std::collections::HashMap; use anyhow::Result; use bytes::Bytes; -use futures::stream::{BoxStream, StreamExt}; -use iroh_p2p::GossipsubEvent; -use iroh_rpc_client::{Lookup, P2pClient}; +use futures::stream::Stream; +use iroh_rpc_client::{GossipsubEvent, Lookup, P2pClient}; use libp2p::{ gossipsub::{MessageId, TopicHash}, multiaddr::Protocol, @@ -93,14 +92,12 @@ impl P2p { pub async fn subscribe( &self, topic: String, - ) -> Result>> { + ) -> Result>> { let topic = TopicHash::from_raw(topic); - Ok(self - .client + self.client .gossipsub_subscribe(topic) .await - .map_err(|e| map_service_error("p2p", e))? - .boxed()) + .map_err(|e| map_service_error("p2p", e)) } /// Publish a message on a pub/sub Topic. diff --git a/iroh-p2p/src/lib.rs b/iroh-p2p/src/lib.rs index 293736fd38e..dbac614c9ec 100644 --- a/iroh-p2p/src/lib.rs +++ b/iroh-p2p/src/lib.rs @@ -11,6 +11,6 @@ mod swarm; pub use self::config::*; pub use self::keys::{DiskStorage, Keychain, MemoryStorage}; pub use self::node::*; -pub use iroh_rpc_types::GossipsubEvent; +pub use iroh_rpc_types::{GossipsubEvent, GossipsubEventStream}; pub(crate) const VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/iroh-p2p/src/rpc.rs b/iroh-p2p/src/rpc.rs index d6a40c7b8da..e568e79d32d 100644 --- a/iroh-p2p/src/rpc.rs +++ b/iroh-p2p/src/rpc.rs @@ -25,7 +25,7 @@ use tokio::sync::oneshot; use tokio_stream::wrappers::ReceiverStream; use tracing::{debug, info, trace}; -use crate::{GossipsubEvent, NetworkEvent, DEFAULT_PROVIDER_LIMIT, VERSION}; +use crate::{GossipsubEvent, GossipsubEventStream, NetworkEvent, DEFAULT_PROVIDER_LIMIT, VERSION}; #[derive(Clone)] pub(crate) struct P2p { @@ -392,10 +392,7 @@ impl P2p { } #[tracing::instrument(skip(self))] - fn gossipsub_subscribe( - self, - req: GossipsubSubscribeRequest, - ) -> BoxStream<'static, Box> { + fn gossipsub_subscribe(self, req: GossipsubSubscribeRequest) -> GossipsubEventStream { async move { self.gossipsub_subscribe_0(req) .await diff --git a/iroh-rpc-client/src/lib.rs b/iroh-rpc-client/src/lib.rs index b4181d5c0dd..ec99005c599 100644 --- a/iroh-rpc-client/src/lib.rs +++ b/iroh-rpc-client/src/lib.rs @@ -6,6 +6,7 @@ pub mod status; pub mod store; pub use self::config::Config; pub use client::Client; +pub use iroh_rpc_types::GossipsubEvent; use iroh_rpc_types::{gateway::GatewayService, p2p::P2pService, store::StoreService, Addr}; pub use network::{Lookup, P2pClient}; use quic_rpc::{ diff --git a/iroh-rpc-types/src/gossipsub_event.rs b/iroh-rpc-types/src/gossipsub_event.rs index 8afc2967c98..21044d2f59d 100644 --- a/iroh-rpc-types/src/gossipsub_event.rs +++ b/iroh-rpc-types/src/gossipsub_event.rs @@ -1,9 +1,14 @@ +use futures::stream::BoxStream; use libp2p::{ gossipsub::{GossipsubMessage, MessageId, TopicHash}, PeerId, }; use serde::{Deserialize, Serialize}; +use crate::p2p::GossipsubSubscribeResponse; + +pub type GossipsubEventStream = BoxStream<'static, Box>; + #[derive(Debug, Clone, Serialize, Deserialize)] pub enum GossipsubEvent { Subscribed { diff --git a/iroh-rpc-types/src/lib.rs b/iroh-rpc-types/src/lib.rs index d935849b91e..bc0a0f49a0b 100644 --- a/iroh-rpc-types/src/lib.rs +++ b/iroh-rpc-types/src/lib.rs @@ -7,7 +7,7 @@ pub mod store; use std::fmt; pub use addr::Addr; -pub use gossipsub_event::GossipsubEvent; +pub use gossipsub_event::{GossipsubEvent, GossipsubEventStream}; use serde::{Deserialize, Serialize}; diff --git a/iroh-share/src/data.rs b/iroh-share/src/data.rs index 269750fe523..7c9a19e7f42 100644 --- a/iroh-share/src/data.rs +++ b/iroh-share/src/data.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use anyhow::{Context, Result}; use futures::Stream; use iroh_resolver::{ resolver::{Out, OutPrettyReader, Resolver, UnixfsType}, @@ -37,7 +37,7 @@ where self.root.pretty(self.resolver, Default::default(), None) } - pub async fn read_file(&self, link: &Link) -> Result { + pub async fn read_file(&self, link: &Link) -> Result> { let root = self .resolver .resolve(Path::from_cid(link.cid)) diff --git a/iroh-share/src/lib.rs b/iroh-share/src/lib.rs index 0a2aac90c1d..8991eba02ad 100644 --- a/iroh-share/src/lib.rs +++ b/iroh-share/src/lib.rs @@ -7,9 +7,16 @@ use cid::Cid; use libp2p::{Multiaddr, PeerId}; use serde::{Deserialize, Serialize}; +// pub use crate::receiver::{ProgressEvent, Receiver}; +// pub use crate::sender::Sender; +pub use crate::receiver::ProgressEvent; + +// TODO(ramfox): remove re export pub use crate::iroh::build as build_iroh; -pub use crate::receiver::{ProgressEvent, Receiver}; -pub use crate::sender::Sender; + +use anyhow::Result; +use libp2p::gossipsub::{Sha256Topic, TopicHash}; +use rand::Rng; /// Ticket describing the peer, their addresses, and the topic /// on which to discuss the data transfer @@ -21,6 +28,22 @@ pub struct Ticket { } impl Ticket { + pub fn new(peer_id: PeerId, addrs: Vec) -> Self { + let id: u64 = rand::thread_rng().gen(); + let topic = Sha256Topic::new(format!("iroh-share-{id}")) + .hash() + .to_string(); + Self { + peer_id, + addrs, + topic, + } + } + + pub fn topic_hash(&self) -> TopicHash { + TopicHash::from_raw(self.topic.clone()) + } + pub fn as_bytes(&self) -> Vec { bincode::serialize(self).expect("failed to serialize") } diff --git a/iroh-share/src/sender.rs b/iroh-share/src/sender.rs index 8b137891791..58d182f5e56 100644 --- a/iroh-share/src/sender.rs +++ b/iroh-share/src/sender.rs @@ -1 +1,115 @@ +use std::path::Path; +use anyhow::{anyhow, Result}; +use futures::{channel::oneshot, stream::BoxStream, StreamExt}; +use iroh_api::{Cid, UnixfsEntry}; +use iroh_api::{GossipsubEvent, P2pApi}; +use iroh_embed::Iroh; +use tokio::task::JoinHandle; +use tracing::{debug, info, warn}; + +use crate::{iroh::build as build_iroh, ReceiverMessage, SenderMessage, Ticket}; + +pub struct Sender { + iroh: Iroh, +} + +type EventStream = BoxStream<'static, Result>; +type ProgressStream = BoxStream<'static, Result<(Cid, u64)>>; + +impl Sender { + pub async fn new(database_path: &Path) -> Result { + let iroh = build_iroh(9990, database_path).await?; + Ok(Self { iroh }) + } + + pub async fn make_available(&self, entry: UnixfsEntry) -> Result { + self.iroh.api().add_stream(entry).await + } + + pub async fn transfer(&self, root: Cid, num_parts: usize) -> Result { + Transfer::new(self.iroh.api().p2p()?.clone(), root, num_parts).await + } +} + +struct Transfer { + api: P2pApi, + ticket: Ticket, + // progress: TODO + event_task: JoinHandle<()>, + done: oneshot::Receiver>, +} + +// make available progress +// transfer started +// transfer succeeded +// transfer failed + +impl Transfer { + pub async fn new(api: P2pApi, root: Cid, num_parts: usize) -> Result { + let peer_id = api.peer_id().await?; + let addrs = api.addrs().await?; + let ticket = Ticket::new(peer_id, addrs); + let mut events = api.subscribe(ticket.topic.clone()).await?; + let th = ticket.topic_hash(); + let (done_sender, done_receiver) = futures::channel::oneshot::channel(); + let p2p = api.clone(); + let event_task = tokio::task::spawn(async move { + let mut current_peer = None; + while let Some(Ok(e)) = events.next().await { + match e { + GossipsubEvent::Subscribed { peer_id, topic } => { + if topic == th && current_peer.is_none() { + info!("connected to {}", peer_id); + current_peer = Some(peer_id); + + let start = + bincode::serialize(&SenderMessage::Start { root, num_parts }) + .expect("serialize failure"); + p2p.publish(topic.to_string(), start.into()).await.unwrap(); + } + } + GossipsubEvent::Message { from, message, .. } => { + println!("received message from {}", from); + debug!("received message from {}", from); + if let Some(current_peer) = current_peer { + if from == current_peer { + match bincode::deserialize(&message.data) { + Ok(ReceiverMessage::FinishOk) => { + println!("finished transfer"); + info!("finished transfer"); + done_sender.send(Ok(())).ok(); + break; + } + Ok(ReceiverMessage::FinishError(err)) => { + println!("transfer failed: {}", err); + info!("transfer failed: {}", err); + done_sender.send(Err(anyhow!("{}", err))).ok(); + break; + } + Err(err) => { + warn!("unexpected message: {:?}", err); + } + } + } + } + } + _ => {} + } + } + }); + + Ok(Self { + api, + ticket, + event_task, + done: done_receiver, + }) + } + + pub async fn done(self) -> Result<()> { + self.done.await??; + self.event_task.await?; + Ok(()) + } +}