Skip to content

Commit

Permalink
feat: Add Gossipsub config setting (#389)
Browse files Browse the repository at this point in the history
# Description

This PR implements the following features:

- [x] Add `enable_gossipsub` config
- [x] Add `PubSubError` with `NotEnabled` and wrapped Gossipsub
`SubscriptionError` and `PublishE
- [x] Add runtime network `Error` with `PubSubError`
- [x] Handle Gossipsub on/off cases for `gossip_subscribe` and
`gossip_publish`

## Link to issue

Closes #388

## Type of change

- [x] New feature (non-breaking change that adds functionality)
- [x] Refactor (non-breaking change that updates existing functionality)

## Test plan (required)

All existing tests should pass. We will test Gossipsub on/off in 
upcoming multi-node workflow integration tests.

---------

Co-authored-by: Zeeshan Lakhani <zeeshan.lakhani@gmail.com>
  • Loading branch information
bgins and zeeshanlakhani authored Oct 19, 2023
1 parent 3508861 commit fc542ef
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 41 deletions.
4 changes: 4 additions & 0 deletions homestar-runtime/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub(crate) struct EventHandler<DB: Database> {
connected_peers: FnvHashMap<PeerId, ConnectedPoint>,
request_response_senders: FnvHashMap<RequestId, (RequestResponseKey, P2PSender)>,
rendezvous_cookies: FnvHashMap<PeerId, Cookie>,
pubsub_enabled: bool,
ws_msg_sender: ws::Notifier,
node_addresses: Vec<libp2p::Multiaddr>,
announce_addresses: Vec<libp2p::Multiaddr>,
Expand All @@ -81,6 +82,7 @@ pub(crate) struct EventHandler<DB: Database> {
connected_peers: FnvHashMap<PeerId, ConnectedPoint>,
request_response_senders: FnvHashMap<RequestId, (RequestResponseKey, P2PSender)>,
rendezvous_cookies: FnvHashMap<PeerId, Cookie>,
pubsub_enabled: bool,
node_addresses: Vec<libp2p::Multiaddr>,
announce_addresses: Vec<libp2p::Multiaddr>,
external_address_limit: u32,
Expand Down Expand Up @@ -120,6 +122,7 @@ where
request_response_senders: FnvHashMap::default(),
connected_peers: FnvHashMap::default(),
rendezvous_cookies: FnvHashMap::default(),
pubsub_enabled: settings.network.enable_pubsub,
ws_msg_sender,
node_addresses: settings.network.node_addresses.clone(),
announce_addresses: settings.network.announce_addresses.clone(),
Expand All @@ -143,6 +146,7 @@ where
connected_peers: FnvHashMap::default(),
request_response_senders: FnvHashMap::default(),
rendezvous_cookies: FnvHashMap::default(),
pubsub_enabled: settings.network.enable_pubsub,
node_addresses: settings.network.node_addresses.clone(),
announce_addresses: settings.network.announce_addresses.clone(),
external_address_limit: settings.network.max_announce_addresses,
Expand Down
26 changes: 14 additions & 12 deletions homestar-runtime/src/event_handler/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,19 +211,21 @@ impl Captured {
}
}

match event_handler.swarm.behaviour_mut().gossip_publish(
pubsub::RECEIPTS_TOPIC,
TopicMessage::CapturedReceipt(receipt),
) {
Ok(msg_id) => info!(
"message {msg_id} published on {} topic for receipt with cid: {receipt_cid}",
pubsub::RECEIPTS_TOPIC
),
Err(_err) => {
error!(
"message not published on {} topic for receipt with cid: {receipt_cid}",
if event_handler.pubsub_enabled {
match event_handler.swarm.behaviour_mut().gossip_publish(
pubsub::RECEIPTS_TOPIC,
TopicMessage::CapturedReceipt(receipt),
) {
Ok(msg_id) => info!(
"message {msg_id} published on {} topic for receipt with cid: {receipt_cid}",
pubsub::RECEIPTS_TOPIC
)
),
Err(_err) => {
error!(
"message not published on {} topic for receipt with cid: {receipt_cid}",
pubsub::RECEIPTS_TOPIC
)
}
}
}

Expand Down
21 changes: 21 additions & 0 deletions homestar-runtime/src/network/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#[derive(thiserror::Error, Debug)]
pub(crate) enum Error {
#[error("pubsub error: {0}")]
PubSubError(#[from] PubSubError),
}

#[derive(thiserror::Error, Debug)]
pub(crate) enum PubSubError {
#[error("insufficient peers subscribed to topic {0} for publishing")]
InsufficientPeers(String),
#[error("not enabled")]
NotEnabled,
#[error(transparent)]
Publish(#[from] libp2p::gossipsub::PublishError),
#[error(transparent)]
Subscription(#[from] libp2p::gossipsub::SubscriptionError),
// TODO: We may be able to remove this error type once we clean-up erroring
// through the runtime.
#[error("error on conversion: {0}")]
Conversion(#[from] anyhow::Error),
}
1 change: 1 addition & 0 deletions homestar-runtime/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! [websocket]: axum::extract::ws
//! [ipfs]: ipfs_api
pub(crate) mod error;
#[cfg(feature = "ipfs")]
pub(crate) mod ipfs;
pub(crate) mod pubsub;
Expand Down
78 changes: 49 additions & 29 deletions homestar-runtime/src/network/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@
//! [libp2p]: libp2p
//! [Swarm]: libp2p::Swarm
use crate::{network::pubsub, settings, Receipt, RECEIPT_TAG, WORKFLOW_TAG};
use anyhow::{anyhow, Context, Result};
use crate::{
network::{error::PubSubError, pubsub},
settings, Receipt, RECEIPT_TAG, WORKFLOW_TAG,
};
use anyhow::{Context, Result};
use enum_assoc::Assoc;
use faststr::FastStr;
use libp2p::{
core::upgrade,
gossipsub::{self, MessageId, SubscriptionError, TopicHash},
gossipsub::{self, MessageId, TopicHash},
identify,
kad::{
self,
Expand Down Expand Up @@ -53,7 +56,11 @@ pub(crate) async fn new(settings: &settings::Node) -> Result<Swarm<ComposedBehav
let mut swarm = SwarmBuilder::with_tokio_executor(
transport,
ComposedBehaviour {
gossipsub: pubsub::new(keypair.clone(), settings)?,
gossipsub: Toggle::from(if settings.network.enable_pubsub {
Some(pubsub::new(keypair.clone(), settings)?)
} else {
None
}),
kademlia: Kademlia::with_config(
peer_id,
MemoryStore::with_config(
Expand Down Expand Up @@ -171,10 +178,12 @@ pub(crate) fn init(
}
}

// join `receipts` topic
swarm
.behaviour_mut()
.gossip_subscribe(pubsub::RECEIPTS_TOPIC)?;
if settings.enable_pubsub {
// join `receipts` topic
swarm
.behaviour_mut()
.gossip_subscribe(pubsub::RECEIPTS_TOPIC)?;
}

Ok(())
}
Expand Down Expand Up @@ -248,7 +257,7 @@ pub(crate) enum TopicMessage {
#[behaviour(to_swarm = "ComposedEvent")]
pub(crate) struct ComposedBehaviour {
/// [gossipsub::Behaviour] behaviour.
pub(crate) gossipsub: gossipsub::Behaviour,
pub(crate) gossipsub: Toggle<gossipsub::Behaviour>,
/// In-memory [kademlia: Kademlia] behaviour.
pub(crate) kademlia: Kademlia<MemoryStore>,
/// [request_response::Behaviour] CBOR-flavored behaviour.
Expand All @@ -265,30 +274,41 @@ pub(crate) struct ComposedBehaviour {

impl ComposedBehaviour {
/// Subscribe to [gossipsub] topic.
pub(crate) fn gossip_subscribe(&mut self, topic: &str) -> Result<bool, SubscriptionError> {
let topic = gossipsub::IdentTopic::new(topic);
self.gossipsub.subscribe(&topic)
pub(crate) fn gossip_subscribe(&mut self, topic: &str) -> Result<bool, PubSubError> {
if let Some(gossipsub) = self.gossipsub.as_mut() {
let topic = gossipsub::IdentTopic::new(topic);
let subscribed = gossipsub.subscribe(&topic)?;

Ok(subscribed)
} else {
Err(PubSubError::NotEnabled)
}
}

/// Serialize [TopicMessage] and publish to [gossipsub] topic.
pub(crate) fn gossip_publish(&mut self, topic: &str, msg: TopicMessage) -> Result<MessageId> {
let id_topic = gossipsub::IdentTopic::new(topic);
// Make this a match once we have other topics.
let TopicMessage::CapturedReceipt(receipt) = msg;
let msg_bytes: Vec<u8> = receipt.try_into()?;
if self
.gossipsub
.mesh_peers(&TopicHash::from_raw(topic))
.peekable()
.peek()
.is_some()
{
let msg_id = self.gossipsub.publish(id_topic, msg_bytes)?;
Ok(msg_id)
pub(crate) fn gossip_publish(
&mut self,
topic: &str,
msg: TopicMessage,
) -> Result<MessageId, PubSubError> {
if let Some(gossipsub) = self.gossipsub.as_mut() {
let id_topic = gossipsub::IdentTopic::new(topic);
// Make this a match once we have other topics.
let TopicMessage::CapturedReceipt(receipt) = msg;
let msg_bytes: Vec<u8> = receipt.try_into()?;
if gossipsub
.mesh_peers(&TopicHash::from_raw(topic))
.peekable()
.peek()
.is_some()
{
let msg_id = gossipsub.publish(id_topic, msg_bytes)?;
Ok(msg_id)
} else {
Err(PubSubError::InsufficientPeers(topic.to_owned()))
}
} else {
Err(anyhow!(
"insufficient peers subscribed to topic {topic} for publishing"
))
Err(PubSubError::NotEnabled)
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions homestar-runtime/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ pub struct Network {
/// Timeout for p2p requests for a provided record.
#[serde_as(as = "DurationSeconds<u64>")]
pub(crate) p2p_provider_timeout: Duration,
/// Enable pub/sub.
pub(crate) enable_pubsub: bool,
/// Pub/sub duplicate cache time.
#[serde_as(as = "DurationSeconds<u64>")]
pub(crate) pubsub_duplication_cache_time: Duration,
Expand Down Expand Up @@ -191,6 +193,7 @@ impl Default for Network {
mdns_query_interval: Duration::from_secs(5 * 60),
mdns_ttl: Duration::from_secs(60 * 9),
p2p_provider_timeout: Duration::new(30, 0),
enable_pubsub: true,
pubsub_duplication_cache_time: Duration::new(1, 0),
pubsub_heartbeat: Duration::new(60, 0),
pubsub_idle_timeout: Duration::new(60 * 60 * 24, 0),
Expand Down

0 comments on commit fc542ef

Please sign in to comment.