Skip to content

Commit

Permalink
refactor: iroh-share sender
Browse files Browse the repository at this point in the history
  • Loading branch information
ramfox committed Jan 11, 2023
1 parent f23b895 commit 3f9bbc9
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 26 deletions.
1 change: 0 additions & 1 deletion iroh-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ iroh-metrics.workspace = true
iroh-resolver.workspace = true
iroh-rpc-client.workspace = true
iroh-rpc-types.workspace = true
iroh-p2p.workspace = true
iroh-unixfs.workspace = true
iroh-util.workspace = true
libp2p.workspace = true
Expand Down
5 changes: 0 additions & 5 deletions iroh-api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,6 @@ impl Api {
Self { client, resolver }
}

/// Returns a [`Resolver`] you can use to resolve data from the Iroh store or the network.
pub async fn resolver(&self) -> Resolver<FullLoader> {
self.resolver.clone()
}

/// Announces to the DHT that this node can offer the given [`Cid`].
///
/// This publishes a provider record for the [`Cid`] to the DHT, establishing the local
Expand Down
4 changes: 3 additions & 1 deletion iroh-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ pub use crate::p2p::{peer_id_from_multiaddr, PeerIdOrAddr};
pub use bytes::Bytes;
pub use cid::Cid;
pub use iroh_resolver::resolver::Path as IpfsPath;
pub use iroh_rpc_client::{ClientStatus, Lookup, ServiceStatus, ServiceType, StatusType};
pub use iroh_rpc_client::{
ClientStatus, GossipsubEvent, Lookup, ServiceStatus, ServiceType, StatusType,
};
pub use iroh_unixfs::builder::{
Config as UnixfsConfig, DirectoryBuilder, Entry as UnixfsEntry, FileBuilder, SymlinkBuilder,
};
Expand Down
13 changes: 5 additions & 8 deletions iroh-api/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ use std::collections::HashMap;

use anyhow::Result;
use bytes::Bytes;
use futures::stream::{BoxStream, StreamExt};
use iroh_p2p::GossipsubEvent;
use iroh_rpc_client::{Lookup, P2pClient};
use futures::stream::Stream;
use iroh_rpc_client::{GossipsubEvent, Lookup, P2pClient};
use libp2p::{
gossipsub::{MessageId, TopicHash},
multiaddr::Protocol,
Expand Down Expand Up @@ -93,14 +92,12 @@ impl P2p {
pub async fn subscribe(
&self,
topic: String,
) -> Result<BoxStream<'static, Result<GossipsubEvent>>> {
) -> Result<impl Stream<Item = Result<GossipsubEvent>>> {
let topic = TopicHash::from_raw(topic);
Ok(self
.client
self.client
.gossipsub_subscribe(topic)
.await
.map_err(|e| map_service_error("p2p", e))?
.boxed())
.map_err(|e| map_service_error("p2p", e))
}

/// Publish a message on a pub/sub Topic.
Expand Down
2 changes: 1 addition & 1 deletion iroh-p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ mod swarm;
pub use self::config::*;
pub use self::keys::{DiskStorage, Keychain, MemoryStorage};
pub use self::node::*;
pub use iroh_rpc_types::GossipsubEvent;
pub use iroh_rpc_types::{GossipsubEvent, GossipsubEventStream};

pub(crate) const VERSION: &str = env!("CARGO_PKG_VERSION");
7 changes: 2 additions & 5 deletions iroh-p2p/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use tokio::sync::oneshot;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, info, trace};

use crate::{GossipsubEvent, NetworkEvent, DEFAULT_PROVIDER_LIMIT, VERSION};
use crate::{GossipsubEvent, GossipsubEventStream, NetworkEvent, DEFAULT_PROVIDER_LIMIT, VERSION};

#[derive(Clone)]
pub(crate) struct P2p {
Expand Down Expand Up @@ -392,10 +392,7 @@ impl P2p {
}

#[tracing::instrument(skip(self))]
fn gossipsub_subscribe(
self,
req: GossipsubSubscribeRequest,
) -> BoxStream<'static, Box<GossipsubSubscribeResponse>> {
fn gossipsub_subscribe(self, req: GossipsubSubscribeRequest) -> GossipsubEventStream {
async move {
self.gossipsub_subscribe_0(req)
.await
Expand Down
1 change: 1 addition & 0 deletions iroh-rpc-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod status;
pub mod store;
pub use self::config::Config;
pub use client::Client;
pub use iroh_rpc_types::GossipsubEvent;
use iroh_rpc_types::{gateway::GatewayService, p2p::P2pService, store::StoreService, Addr};
pub use network::{Lookup, P2pClient};
use quic_rpc::{
Expand Down
5 changes: 5 additions & 0 deletions iroh-rpc-types/src/gossipsub_event.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use futures::stream::BoxStream;
use libp2p::{
gossipsub::{GossipsubMessage, MessageId, TopicHash},
PeerId,
};
use serde::{Deserialize, Serialize};

use crate::p2p::GossipsubSubscribeResponse;

pub type GossipsubEventStream = BoxStream<'static, Box<GossipsubSubscribeResponse>>;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum GossipsubEvent {
Subscribed {
Expand Down
2 changes: 1 addition & 1 deletion iroh-rpc-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub mod store;
use std::fmt;

pub use addr::Addr;
pub use gossipsub_event::GossipsubEvent;
pub use gossipsub_event::{GossipsubEvent, GossipsubEventStream};

use serde::{Deserialize, Serialize};

Expand Down
4 changes: 2 additions & 2 deletions iroh-share/src/data.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::Result;
use anyhow::{Context, Result};
use futures::Stream;
use iroh_resolver::{
resolver::{Out, OutPrettyReader, Resolver, UnixfsType},
Expand Down Expand Up @@ -37,7 +37,7 @@ where
self.root.pretty(self.resolver, Default::default(), None)
}

pub async fn read_file(&self, link: &Link) -> Result<Data> {
pub async fn read_file(&self, link: &Link) -> Result<Data<C>> {
let root = self
.resolver
.resolve(Path::from_cid(link.cid))
Expand Down
27 changes: 25 additions & 2 deletions iroh-share/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@ use cid::Cid;
use libp2p::{Multiaddr, PeerId};
use serde::{Deserialize, Serialize};

// pub use crate::receiver::{ProgressEvent, Receiver};
// pub use crate::sender::Sender;
pub use crate::receiver::ProgressEvent;

// TODO(ramfox): remove re export
pub use crate::iroh::build as build_iroh;
pub use crate::receiver::{ProgressEvent, Receiver};
pub use crate::sender::Sender;

use anyhow::Result;
use libp2p::gossipsub::{Sha256Topic, TopicHash};
use rand::Rng;

/// Ticket describing the peer, their addresses, and the topic
/// on which to discuss the data transfer
Expand All @@ -21,6 +28,22 @@ pub struct Ticket {
}

impl Ticket {
pub fn new(peer_id: PeerId, addrs: Vec<Multiaddr>) -> Self {
let id: u64 = rand::thread_rng().gen();
let topic = Sha256Topic::new(format!("iroh-share-{id}"))
.hash()
.to_string();
Self {
peer_id,
addrs,
topic,
}
}

pub fn topic_hash(&self) -> TopicHash {
TopicHash::from_raw(self.topic.clone())
}

pub fn as_bytes(&self) -> Vec<u8> {
bincode::serialize(self).expect("failed to serialize")
}
Expand Down
114 changes: 114 additions & 0 deletions iroh-share/src/sender.rs
Original file line number Diff line number Diff line change
@@ -1 +1,115 @@
use std::path::Path;

use anyhow::{anyhow, Result};
use futures::{channel::oneshot, stream::BoxStream, StreamExt};
use iroh_api::{Cid, UnixfsEntry};
use iroh_api::{GossipsubEvent, P2pApi};
use iroh_embed::Iroh;
use tokio::task::JoinHandle;
use tracing::{debug, info, warn};

use crate::{iroh::build as build_iroh, ReceiverMessage, SenderMessage, Ticket};

pub struct Sender {
iroh: Iroh,
}

type EventStream = BoxStream<'static, Result<GossipsubEvent>>;
type ProgressStream = BoxStream<'static, Result<(Cid, u64)>>;

impl Sender {
pub async fn new(database_path: &Path) -> Result<Self> {
let iroh = build_iroh(9990, database_path).await?;
Ok(Self { iroh })
}

pub async fn make_available(&self, entry: UnixfsEntry) -> Result<ProgressStream> {
self.iroh.api().add_stream(entry).await
}

pub async fn transfer(&self, root: Cid, num_parts: usize) -> Result<Transfer> {
Transfer::new(self.iroh.api().p2p()?.clone(), root, num_parts).await
}
}

struct Transfer {
api: P2pApi,
ticket: Ticket,
// progress: TODO
event_task: JoinHandle<()>,
done: oneshot::Receiver<Result<()>>,
}

// make available progress
// transfer started
// transfer succeeded
// transfer failed

impl Transfer {
pub async fn new(api: P2pApi, root: Cid, num_parts: usize) -> Result<Self> {
let peer_id = api.peer_id().await?;
let addrs = api.addrs().await?;
let ticket = Ticket::new(peer_id, addrs);
let mut events = api.subscribe(ticket.topic.clone()).await?;
let th = ticket.topic_hash();
let (done_sender, done_receiver) = futures::channel::oneshot::channel();
let p2p = api.clone();
let event_task = tokio::task::spawn(async move {
let mut current_peer = None;
while let Some(Ok(e)) = events.next().await {
match e {
GossipsubEvent::Subscribed { peer_id, topic } => {
if topic == th && current_peer.is_none() {
info!("connected to {}", peer_id);
current_peer = Some(peer_id);

let start =
bincode::serialize(&SenderMessage::Start { root, num_parts })
.expect("serialize failure");
p2p.publish(topic.to_string(), start.into()).await.unwrap();
}
}
GossipsubEvent::Message { from, message, .. } => {
println!("received message from {}", from);
debug!("received message from {}", from);
if let Some(current_peer) = current_peer {
if from == current_peer {
match bincode::deserialize(&message.data) {
Ok(ReceiverMessage::FinishOk) => {
println!("finished transfer");
info!("finished transfer");
done_sender.send(Ok(())).ok();
break;
}
Ok(ReceiverMessage::FinishError(err)) => {
println!("transfer failed: {}", err);
info!("transfer failed: {}", err);
done_sender.send(Err(anyhow!("{}", err))).ok();
break;
}
Err(err) => {
warn!("unexpected message: {:?}", err);
}
}
}
}
}
_ => {}
}
}
});

Ok(Self {
api,
ticket,
event_task,
done: done_receiver,
})
}

pub async fn done(self) -> Result<()> {
self.done.await??;
self.event_task.await?;
Ok(())
}
}

0 comments on commit 3f9bbc9

Please sign in to comment.