Skip to content

Commit

Permalink
feat: stop synchronizing interests
Browse files Browse the repository at this point in the history
With this change the interests ring is no longer synchronized between
peers. A local node will still use the interest-svc to know its own
interests and the beginning of each Recon conversation negotiates shared
interests. Therefore it is no longer necessary to synchronize interests.

In the future we may decide that we want to use Recon to sync interests
instead of a linear sharing of interests before each conversation,
however that is only a performance optimization that is not important at
this stage.

Fixes #610
Fixes #611
  • Loading branch information
nathanielc committed Dec 10, 2024
1 parent 0321da0 commit 56f7c5b
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 100 deletions.
11 changes: 2 additions & 9 deletions one/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use ceramic_sql::sqlite::SqlitePool;
use clap::Args;
use object_store::aws::AmazonS3Builder;
use object_store::local::LocalFileSystem;
use recon::{FullInterests, Recon, ReconInterestProvider};
use recon::{Recon, ReconInterestProvider};
use signal_hook::consts::signal::*;
use signal_hook_tokio::Signals;
use std::sync::Arc;
Expand Down Expand Up @@ -530,13 +530,6 @@ pub async fn run(opts: DaemonOpts) -> Result<()> {
// Construct a recon implementation for peers.
let recon_peer = Recon::new(peer_svc.clone(), PeerKeyInterests, recon_metrics.clone());

// Construct a recon implementation for interests.
let recon_interest = Recon::new(
interest_svc.clone(),
FullInterests::default(),
recon_metrics.clone(),
);

// Construct a recon implementation for models.
let recon_model = Recon::new(
model_svc.clone(),
Expand All @@ -545,7 +538,7 @@ pub async fn run(opts: DaemonOpts) -> Result<()> {
recon_metrics,
);

let recons = Some((recon_peer, recon_interest, recon_model));
let recons = Some((recon_peer, recon_model));
let ipfs_metrics =
ceramic_metrics::MetricsHandle::register(ceramic_kubo_rpc::IpfsMetrics::register);
let p2p_metrics = MetricsHandle::register(ceramic_p2p::Metrics::register);
Expand Down
7 changes: 3 additions & 4 deletions one/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;

use anyhow::{anyhow, Result};
use async_trait::async_trait;
use ceramic_core::{EventId, Interest, NodeId, NodeKey, PeerKey};
use ceramic_core::{EventId, NodeId, NodeKey, PeerKey};
use ceramic_kubo_rpc::{IpfsMetrics, IpfsMetricsMiddleware, IpfsService};
use ceramic_p2p::{Config as P2pConfig, Libp2pConfig, Node, PeerService};
use iroh_rpc_client::P2pClient;
Expand Down Expand Up @@ -34,18 +34,17 @@ impl BuilderState for WithP2p {}

/// Configure the p2p service
impl Builder<Init> {
pub async fn with_p2p<P, I, M, S>(
pub async fn with_p2p<P, M, S>(
self,
libp2p_config: Libp2pConfig,
node_key: NodeKey,
peer_svc: impl PeerService + 'static,
recons: Option<(P, I, M)>,
recons: Option<(P, M)>,
block_store: Arc<S>,
metrics: ceramic_p2p::Metrics,
) -> anyhow::Result<Builder<WithP2p>>
where
P: Recon<Key = PeerKey, Hash = Sha256a>,
I: Recon<Key = Interest, Hash = Sha256a>,
M: Recon<Key = EventId, Hash = Sha256a>,
S: iroh_bitswap::Store,
{
Expand Down
15 changes: 7 additions & 8 deletions p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;

use anyhow::Result;
use ceramic_core::{EventId, Interest, PeerKey};
use ceramic_core::{EventId, PeerKey};
use iroh_bitswap::{Bitswap, Block, Config as BitswapConfig};
use libp2p::{
autonat,
Expand Down Expand Up @@ -36,7 +36,7 @@ pub const AGENT_VERSION: &str = concat!("ceramic-one/", env!("CARGO_PKG_VERSION"
/// Libp2p behaviour for the node.
#[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "Event")]
pub(crate) struct NodeBehaviour<P, I, M, S>
pub(crate) struct NodeBehaviour<P, M, S>
where
S: iroh_bitswap::Store + Send + Sync,
{
Expand All @@ -56,21 +56,20 @@ where
relay: Toggle<relay::Behaviour>,
relay_client: Toggle<relay::client::Behaviour>,
dcutr: Toggle<dcutr::Behaviour>,
recon: Toggle<recon::libp2p::Behaviour<P, I, M>>,
recon: Toggle<recon::libp2p::Behaviour<P, M>>,
}

impl<P, I, M, S> NodeBehaviour<P, I, M, S>
impl<P, M, S> NodeBehaviour<P, M, S>
where
P: Recon<Key = PeerKey, Hash = Sha256a> + Send + Sync,
I: Recon<Key = Interest, Hash = Sha256a> + Send + Sync,
M: Recon<Key = EventId, Hash = Sha256a> + Send + Sync,
S: iroh_bitswap::Store + Send + Sync,
{
pub async fn new(
local_key: &Keypair,
config: &Libp2pConfig,
relay_client: Option<relay::client::Behaviour>,
recons: Option<(P, I, M)>,
recons: Option<(P, M)>,
block_store: Arc<S>,
peers_tx: tokio::sync::mpsc::Sender<peers::Message>,
metrics: Metrics,
Expand Down Expand Up @@ -186,8 +185,8 @@ where
.with_max_pending_incoming(Some(config.max_conns_pending_in))
.with_max_established_per_peer(Some(config.max_conns_per_peer)),
);
let recon = recons.map(|(peer, interest, model)| {
recon::libp2p::Behaviour::new(peer, interest, model, recon::libp2p::Config::default())
let recon = recons.map(|(peer, model)| {
recon::libp2p::Behaviour::new(peer, model, recon::libp2p::Config::default())
});
Ok(NodeBehaviour {
ping: Ping::default(),
Expand Down
23 changes: 9 additions & 14 deletions p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{sync::atomic::Ordering, time::Duration};

use ahash::AHashMap;
use anyhow::{anyhow, bail, Context, Result};
use ceramic_core::{EventId, Interest, NodeKey, PeerKey};
use ceramic_core::{EventId, NodeKey, PeerKey};
use ceramic_metrics::{libp2p_metrics, Recorder};
use cid::Cid;
use futures_util::stream::StreamExt;
Expand Down Expand Up @@ -62,15 +62,14 @@ pub enum NetworkEvent {
/// Node implements a peer to peer node that participates on the Ceramic network.
///
/// Node provides an external API via RpcMessages.
pub struct Node<P, I, M, S>
pub struct Node<P, M, S>
where
P: Recon<Key = PeerKey, Hash = Sha256a>,
I: Recon<Key = Interest, Hash = Sha256a>,
M: Recon<Key = EventId, Hash = Sha256a>,
S: iroh_bitswap::Store,
{
metrics: Metrics,
swarm: Swarm<NodeBehaviour<P, I, M, S>>,
swarm: Swarm<NodeBehaviour<P, M, S>>,
supported_protocols: HashSet<String>,
net_receiver_in: Receiver<RpcMessage>,
dial_queries: AHashMap<PeerId, Vec<OneShotSender<Result<()>>>>,
Expand All @@ -92,10 +91,9 @@ where
active_address_probe: Option<Multiaddr>,
}

impl<P, I, M, S> fmt::Debug for Node<P, I, M, S>
impl<P, M, S> fmt::Debug for Node<P, M, S>
where
P: Recon<Key = PeerKey, Hash = Sha256a>,
I: Recon<Key = Interest, Hash = Sha256a>,
M: Recon<Key = EventId, Hash = Sha256a>,
S: iroh_bitswap::Store,
{
Expand Down Expand Up @@ -128,10 +126,9 @@ const NICE_INTERVAL: Duration = Duration::from_secs(6);
const BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(5 * 60);
const EXPIRY_INTERVAL: Duration = Duration::from_secs(1);

impl<P, I, M, S> Drop for Node<P, I, M, S>
impl<P, M, S> Drop for Node<P, M, S>
where
P: Recon<Key = PeerKey, Hash = Sha256a>,
I: Recon<Key = Interest, Hash = Sha256a>,
M: Recon<Key = EventId, Hash = Sha256a>,
S: iroh_bitswap::Store,
{
Expand All @@ -143,12 +140,10 @@ where

// Allow IntoConnectionHandler deprecated associated type.
// We are not using IntoConnectionHandler directly only referencing the type as part of this event signature.
type NodeSwarmEvent<P, I, M, S> =
SwarmEvent<<NodeBehaviour<P, I, M, S> as NetworkBehaviour>::ToSwarm>;
impl<P, I, M, S> Node<P, I, M, S>
type NodeSwarmEvent<P, M, S> = SwarmEvent<<NodeBehaviour<P, M, S> as NetworkBehaviour>::ToSwarm>;
impl<P, M, S> Node<P, M, S>
where
P: Recon<Key = PeerKey, Hash = Sha256a> + Send + Sync,
I: Recon<Key = Interest, Hash = Sha256a> + Send + Sync,
M: Recon<Key = EventId, Hash = Sha256a> + Send + Sync,
S: iroh_bitswap::Store + Send + Sync,
{
Expand All @@ -157,7 +152,7 @@ where
rpc_addr: P2pAddr,
node_key: NodeKey,
peer_svc: impl PeerService + 'static,
recons: Option<(P, I, M)>,
recons: Option<(P, M)>,
block_store: Arc<S>,
metrics: Metrics,
) -> Result<Self> {
Expand Down Expand Up @@ -494,7 +489,7 @@ where
#[tracing::instrument(skip_all)]
async fn handle_swarm_event(
&mut self,
event: NodeSwarmEvent<P, I, M, S>,
event: NodeSwarmEvent<P, M, S>,
) -> Result<Option<SwarmEventResult>> {
libp2p_metrics().record(&event);
match event {
Expand Down
16 changes: 7 additions & 9 deletions p2p/src/swarm.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Result;
use ceramic_core::{EventId, Interest, PeerKey};
use ceramic_core::{EventId, PeerKey};
use libp2p::{dns, noise, relay, tcp, tls, yamux, Swarm, SwarmBuilder};
use libp2p_identity::Keypair;
use recon::{libp2p::Recon, Sha256a};
Expand Down Expand Up @@ -28,17 +28,16 @@ fn get_dns_config() -> (dns::ResolverConfig, dns::ResolverOpts) {
}
}

pub(crate) async fn build_swarm<P, I, M, S>(
pub(crate) async fn build_swarm<P, M, S>(
config: &Libp2pConfig,
keypair: Keypair,
recons: Option<(P, I, M)>,
recons: Option<(P, M)>,
block_store: Arc<S>,
peers_tx: tokio::sync::mpsc::Sender<peers::Message>,
metrics: Metrics,
) -> Result<Swarm<NodeBehaviour<P, I, M, S>>>
) -> Result<Swarm<NodeBehaviour<P, M, S>>>
where
P: Recon<Key = PeerKey, Hash = Sha256a>,
I: Recon<Key = Interest, Hash = Sha256a>,
M: Recon<Key = EventId, Hash = Sha256a>,
S: iroh_bitswap::Store,
{
Expand Down Expand Up @@ -105,18 +104,17 @@ where
}
}

fn new_behavior<P, I, M, S>(
fn new_behavior<P, M, S>(
config: &Libp2pConfig,
keypair: &Keypair,
relay_client: Option<relay::client::Behaviour>,
recons: Option<(P, I, M)>,
recons: Option<(P, M)>,
block_store: Arc<S>,
peers_tx: tokio::sync::mpsc::Sender<peers::Message>,
metrics: Metrics,
) -> Result<NodeBehaviour<P, I, M, S>>
) -> Result<NodeBehaviour<P, M, S>>
where
P: Recon<Key = PeerKey, Hash = Sha256a> + Send,
I: Recon<Key = Interest, Hash = Sha256a> + Send,
M: Recon<Key = EventId, Hash = Sha256a> + Send,
S: iroh_bitswap::Store,
{
Expand Down
33 changes: 9 additions & 24 deletions recon/src/libp2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mod upgrade;
pub use crate::protocol::Recon;
pub use stream_set::StreamSet;

use ceramic_core::{EventId, Interest, PeerKey};
use ceramic_core::{EventId, PeerKey};
use futures::{future::BoxFuture, FutureExt};
use libp2p::{
core::ConnectedPoint,
Expand All @@ -43,8 +43,6 @@ use crate::{

/// Name of the Recon protocol for synchronizing peers
pub const PROTOCOL_NAME_PEER: &str = "/ceramic/recon/0.1.0/peer";
/// Name of the Recon protocol for synchronizing interests
pub const PROTOCOL_NAME_INTEREST: &str = "/ceramic/recon/0.1.0/interest";
/// Name of the Recon protocol for synchronizing models
pub const PROTOCOL_NAME_MODEL: &str = "/ceramic/recon/0.1.0/model";

Expand Down Expand Up @@ -76,9 +74,8 @@ impl Default for Config {
/// The Behavior tracks all peers on the network that speak the Recon protocol.
/// It is responsible for starting and stopping syncs with various peers depending on the needs of
/// the application.
pub struct Behaviour<P, I, M> {
pub struct Behaviour<P, M> {
peer: P,
interest: I,
model: M,
config: Config,
peers: BTreeMap<PeerId, PeerInfo>,
Expand All @@ -87,15 +84,13 @@ pub struct Behaviour<P, I, M> {
next_sync: Option<BoxFuture<'static, ()>>,
}

impl<P, I, M> std::fmt::Debug for Behaviour<P, I, M>
impl<P, M> std::fmt::Debug for Behaviour<P, M>
where
P: std::fmt::Debug,
I: std::fmt::Debug,
M: std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Behaviour")
.field("interest", &self.interest)
.field("model", &self.model)
.field("config", &self.config)
.field("peers", &self.peers)
Expand Down Expand Up @@ -148,18 +143,16 @@ pub enum PeerStatus {
Stopped,
}

impl<P, I, M> Behaviour<P, I, M> {
impl<P, M> Behaviour<P, M> {
/// Create a new Behavior with the provided Recon implementation.
pub fn new(peer: P, interest: I, model: M, config: Config) -> Self
pub fn new(peer: P, model: M, config: Config) -> Self
where
P: Recon<Key = PeerKey, Hash = Sha256a>,
I: Recon<Key = Interest, Hash = Sha256a>,
M: Recon<Key = EventId, Hash = Sha256a>,
{
let (tx, rx) = tokio::sync::mpsc::channel(1000);
Self {
peer,
interest,
model,
config,
peers: BTreeMap::new(),
Expand All @@ -178,13 +171,12 @@ impl<P, I, M> Behaviour<P, I, M> {
}
}

impl<P, I, M> NetworkBehaviour for Behaviour<P, I, M>
impl<P, M> NetworkBehaviour for Behaviour<P, M>
where
P: Recon<Key = PeerKey, Hash = Sha256a>,
I: Recon<Key = Interest, Hash = Sha256a>,
M: Recon<Key = EventId, Hash = Sha256a>,
{
type ConnectionHandler = Handler<P, I, M>;
type ConnectionHandler = Handler<P, M>;

type ToSwarm = Event;

Expand All @@ -205,13 +197,8 @@ where
next_sync: BTreeMap::from_iter([
// Schedule all stream_sets initially
(StreamSet::Peer, Instant::now()),
// Schedule interests after peers
(
StreamSet::Interest,
Instant::now() + Duration::from_millis(1),
),
// Schedule models after interests
(StreamSet::Model, Instant::now() + Duration::from_millis(2)),
// Schedule models after peers
(StreamSet::Model, Instant::now() + Duration::from_millis(1)),
]),
sync_delay: Default::default(),
});
Expand Down Expand Up @@ -395,7 +382,6 @@ where
connection_id,
handler::State::WaitingInbound,
self.peer.clone(),
self.interest.clone(),
self.model.clone(),
))
}
Expand All @@ -416,7 +402,6 @@ where
stream_set: StreamSet::Peer,
},
self.peer.clone(),
self.interest.clone(),
self.model.clone(),
))
}
Expand Down
Loading

0 comments on commit 56f7c5b

Please sign in to comment.