Skip to content

Commit

Permalink
Merge pull request #98 from richardschneider/pubsub
Browse files Browse the repository at this point in the history
PubSubApi implementation
  • Loading branch information
richardschneider authored May 14, 2019
2 parents 33a00b7 + 780dc43 commit e7df057
Show file tree
Hide file tree
Showing 10 changed files with 364 additions and 16 deletions.
4 changes: 2 additions & 2 deletions doc/articles/core-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

The [Core API](xref:Ipfs.CoreApi.ICoreApi) is a set of interfaces to the IPFS features and is implemented by the
[engine](xref:Ipfs.Engine.IpfsEngine). The
[FileSystem](filesystem.md) and [PubSub]() features are most often used.
[FileSystem](filesystem.md) and [PubSub](pubsub.md) features are most often used.

```csharp
const string filename = "QmS4ustL54uo8FzR9455qaxZwuMiUhyvMcX9Ba8nUH4uVv/about";
Expand All @@ -28,7 +28,7 @@ Each IPFS feature has it's own interface.
| [Name](xref:Ipfs.CoreApi.INameApi) | Manages the Interplanetary Name Space (IPNS) |
| [Object](xref:Ipfs.CoreApi.IObjectApi) | Manages the IPFS Directed Acrylic Graph |
| [Pin](xref:Ipfs.CoreApi.IPinApi) | Manage objects that are locally stored and permanent |
| [PubSub](xref:Ipfs.CoreApi.IPubSubApi) | Publish and subscribe topic messages |
| [PubSub](pubsub.md) | Publish and subscribe topic messages |
| [Swarm](xref:Ipfs.CoreApi.ISwarmApi) | Manages the swarm of peers |
| [Stats](xref:Ipfs.CoreApi.IStatsApi) | Statistics on IPFS components |

52 changes: 52 additions & 0 deletions doc/articles/pubsub.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# IPFS PubSub system

The publish/subscribe system allows a `message` to be sent to a group of peers that
are subsctibed to a `topic` via the [PubAub API](xref:Ipfs.CoreApi.IPubSubApi).
The `topic` is just a name that indicates a group of related messages.
The [message](xref:Ipfs.IPublishedMessage) contains the author, topic(s),
sequence number and content.



### Publishing

[PublishAsync](xref:Ipfs.CoreApi.IPubSubApi.PublishAsync*) sends a
message to a group of peers that are subscribed to a topic.

The following, sends a "hello world" to all peers subscribed to "nz".

```csharp
await ipfs.PubSub.PublishAsync("nz", "tēnā koutou");
```

### Subscribing

[SubscribeAsync](xref:Ipfs.CoreApi.IPubSubApi.SubscribeAsync*) indicates interest
in messages for the specified topic. The `handler` is invoked when a [unique
message](pubsub/dupmsg.md) is received.

```csharp
var cs = new CancellationTokenSource();
await ipfs.PubSub.SubscribeAsync("nz", msg =>
{
// do something with msg.DataBytes
}, cs.Token);
```

To unsubscribe, simply cancel the subscribe

```csharp
cs.Cancel();
```

### Implementation

The peer talk [notification service](xref:PeerTalk.PubSub.NotificationService)
with a [floodsub router](xref:PeerTalk.PubSub.FloodRouter) is currently
used. In the future a [gossip router](https://github.com/richardschneider/peer-talk/issues/25) will be used.

See also
- [PubSub interface for libp2p](https://github.com/libp2p/specs/tree/master/pubsub)
- [In the beginning was floodsub](https://github.com/libp2p/specs/tree/master/pubsub/gossipsub#in-the-beginning-was-floodsub)
- [And then there is gossipsub](https://github.com/libp2p/specs/tree/master/pubsub/gossipsub#the-gossipsub-protocol)
- [Proximity Aware Epidemic PubSub for libp2p](https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/episub.md)
17 changes: 17 additions & 0 deletions doc/articles/pubsub/dupmsg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Duplicate Message

The message [sequence number](xref:Ipfs.IPublishedMessage.SequenceNumber) is a monotonically
increasing number that is unique among
[messages](xref:Ipfs.IPublishedMessage) originating from a peer.
No two messages from the same peer have the same sequence number.

However, messages from different peers can have the same sequence number,
so this number alone cannot be used to uniquely identify a message.
A peer id is unique, so the `unique message ID` is the concatenation of the
message [author id](xref:Ipfs.IPublishedMessage.Sender) and [sequence number](xref:Ipfs.IPublishedMessage.SequenceNumber)
fields.

Maintaining a list of all message IDs seen by the peer is not scalable.
A [timed cached](xref:PeerTalk.MessageTracker) is used to detect duplicate messages seen in the last
the 10 minutes.

3 changes: 3 additions & 0 deletions doc/articles/pubsub/encrypt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Encrypting a Message

This is not yet implemented.
3 changes: 3 additions & 0 deletions doc/articles/pubsub/sign.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Signing a Message

This is not yet implemented.
11 changes: 10 additions & 1 deletion doc/articles/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
href: intro-ipfs.md
- name: Questions
href: questions.md
- name: Core API
- name: Features
href: core-api.md
- name: File System
href: filesystem.md
Expand All @@ -22,6 +22,15 @@
href: fs/wrap.md
- name: Format
href: fs/format.md
- name: Message System
href: pubsub.md
items:
- name: Encrypting
href: pubsub/encrypt.md
- name: Signing
href: pubsub/sign.md
- name: Duplicate Message
href: pubsub/dupmsg.md
- name: Repository
href: repository.md
items:
Expand Down
30 changes: 18 additions & 12 deletions src/CoreApi/PubSubApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,40 @@ public PubSubApi(IpfsEngine ipfs)
this.ipfs = ipfs;
}

public Task<IEnumerable<Peer>> PeersAsync(string topic = null, CancellationToken cancel = default(CancellationToken))
public async Task<IEnumerable<Peer>> PeersAsync(string topic = null, CancellationToken cancel = default(CancellationToken))
{
throw new NotImplementedException();
var pubsub = await ipfs.PubSubService.ConfigureAwait(false);
return await pubsub.PeersAsync(topic, cancel);
}

public Task PublishAsync(string topic, string message, CancellationToken cancel = default(CancellationToken))
public async Task PublishAsync(string topic, string message, CancellationToken cancel = default(CancellationToken))
{
throw new NotImplementedException();
var pubsub = await ipfs.PubSubService.ConfigureAwait(false);
await pubsub.PublishAsync(topic, message, cancel);
}

public Task PublishAsync(string topic, byte[] message, CancellationToken cancel = default(CancellationToken))
public async Task PublishAsync(string topic, byte[] message, CancellationToken cancel = default(CancellationToken))
{
throw new NotImplementedException();
var pubsub = await ipfs.PubSubService.ConfigureAwait(false);
await pubsub.PublishAsync(topic, message, cancel);
}

public Task PublishAsync(string topic, Stream message, CancellationToken cancel = default(CancellationToken))
public async Task PublishAsync(string topic, Stream message, CancellationToken cancel = default(CancellationToken))
{
throw new NotImplementedException();
var pubsub = await ipfs.PubSubService.ConfigureAwait(false);
await pubsub.PublishAsync(topic, message, cancel);
}

public Task SubscribeAsync(string topic, Action<IPublishedMessage> handler, CancellationToken cancellationToken)
public async Task SubscribeAsync(string topic, Action<IPublishedMessage> handler, CancellationToken cancellationToken)
{
throw new NotImplementedException();
var pubsub = await ipfs.PubSubService.ConfigureAwait(false);
await pubsub.SubscribeAsync(topic, handler, cancellationToken);
}

public Task<IEnumerable<string>> SubscribedTopicsAsync(CancellationToken cancel = default(CancellationToken))
public async Task<IEnumerable<string>> SubscribedTopicsAsync(CancellationToken cancel = default(CancellationToken))
{
throw new NotImplementedException();
var pubsub = await ipfs.PubSubService.ConfigureAwait(false);
return await pubsub.SubscribedTopicsAsync(cancel);
}
}
}
25 changes: 25 additions & 0 deletions src/IpfsEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,20 @@ void Init()
log.Debug("Built DHT service");
return dht;
});
PubSubService = new AsyncLazy<PeerTalk.PubSub.NotificationService>(async () =>
{
log.Debug("Building PubSub service");
var pubsub = new PeerTalk.PubSub.NotificationService
{
LocalPeer = await LocalPeer.ConfigureAwait(false)
};
pubsub.Routers.Add(new PeerTalk.PubSub.FloodRouter
{
Swarm = await SwarmService.ConfigureAwait(false)
});
log.Debug("Built PubSub service");
return pubsub;
});
}

/// <summary>
Expand Down Expand Up @@ -356,6 +370,12 @@ public async Task StartAsync()
stopTasks.Add(async () => await dht.StopAsync().ConfigureAwait(false));
await dht.StartAsync().ConfigureAwait(false);
},
async () =>
{
var pubsub = await PubSubService.ConfigureAwait(false);
stopTasks.Add(async () => await pubsub.StopAsync().ConfigureAwait(false));
await pubsub.StartAsync().ConfigureAwait(false);
},
};

log.Debug("waiting for services to start");
Expand Down Expand Up @@ -540,6 +560,11 @@ public void Stop()
/// </summary>
public AsyncLazy<Swarm> SwarmService { get; private set; }

/// <summary>
/// Manages publishng and subscribing to messages.
/// </summary>
public AsyncLazy<PeerTalk.PubSub.NotificationService> PubSubService { get; private set; }

/// <summary>
/// Exchange blocks with other peers.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/IpfsEngine.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<PackageReference Include="Makaretu.Dns.Unicast" Version="0.8.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.1" />
<PackageReference Include="Nito.AsyncEx.Coordination" Version="5.0.0" />
<PackageReference Include="PeerTalk" Version="0.10.1" />
<PackageReference Include="PeerTalk" Version="0.11.2" />
<PackageReference Include="PeterO.Cbor" Version="3.1.0" />
<PackageReference Include="Portable.BouncyCastle" Version="1.8.5" />
<PackageReference Include="protobuf-net" Version="2.4.0" />
Expand Down
Loading

0 comments on commit e7df057

Please sign in to comment.