diff --git a/doc/articles/core-api.md b/doc/articles/core-api.md index 59d39aa3..dcd60d83 100644 --- a/doc/articles/core-api.md +++ b/doc/articles/core-api.md @@ -2,7 +2,7 @@ The [Core API](xref:Ipfs.CoreApi.ICoreApi) is a set of interfaces to the IPFS features and is implemented by the [engine](xref:Ipfs.Engine.IpfsEngine). The -[FileSystem](filesystem.md) and [PubSub]() features are most often used. +[FileSystem](filesystem.md) and [PubSub](pubsub.md) features are most often used. ```csharp const string filename = "QmS4ustL54uo8FzR9455qaxZwuMiUhyvMcX9Ba8nUH4uVv/about"; @@ -28,7 +28,7 @@ Each IPFS feature has it's own interface. | [Name](xref:Ipfs.CoreApi.INameApi) | Manages the Interplanetary Name Space (IPNS) | | [Object](xref:Ipfs.CoreApi.IObjectApi) | Manages the IPFS Directed Acrylic Graph | | [Pin](xref:Ipfs.CoreApi.IPinApi) | Manage objects that are locally stored and permanent | -| [PubSub](xref:Ipfs.CoreApi.IPubSubApi) | Publish and subscribe topic messages | +| [PubSub](pubsub.md) | Publish and subscribe topic messages | | [Swarm](xref:Ipfs.CoreApi.ISwarmApi) | Manages the swarm of peers | | [Stats](xref:Ipfs.CoreApi.IStatsApi) | Statistics on IPFS components | diff --git a/doc/articles/pubsub.md b/doc/articles/pubsub.md new file mode 100644 index 00000000..91ff4049 --- /dev/null +++ b/doc/articles/pubsub.md @@ -0,0 +1,52 @@ +# IPFS PubSub system + +The publish/subscribe system allows a `message` to be sent to a group of peers that +are subsctibed to a `topic` via the [PubAub API](xref:Ipfs.CoreApi.IPubSubApi). +The `topic` is just a name that indicates a group of related messages. +The [message](xref:Ipfs.IPublishedMessage) contains the author, topic(s), +sequence number and content. + + + +### Publishing + +[PublishAsync](xref:Ipfs.CoreApi.IPubSubApi.PublishAsync*) sends a +message to a group of peers that are subscribed to a topic. + +The following, sends a "hello world" to all peers subscribed to "nz". + +```csharp +await ipfs.PubSub.PublishAsync("nz", "tēnā koutou"); +``` + +### Subscribing + +[SubscribeAsync](xref:Ipfs.CoreApi.IPubSubApi.SubscribeAsync*) indicates interest +in messages for the specified topic. The `handler` is invoked when a [unique +message](pubsub/dupmsg.md) is received. + +```csharp +var cs = new CancellationTokenSource(); +await ipfs.PubSub.SubscribeAsync("nz", msg => +{ + // do something with msg.DataBytes +}, cs.Token); +``` + +To unsubscribe, simply cancel the subscribe + +```csharp +cs.Cancel(); +``` + +### Implementation + +The peer talk [notification service](xref:PeerTalk.PubSub.NotificationService) +with a [floodsub router](xref:PeerTalk.PubSub.FloodRouter) is currently +used. In the future a [gossip router](https://github.com/richardschneider/peer-talk/issues/25) will be used. + +See also +- [PubSub interface for libp2p](https://github.com/libp2p/specs/tree/master/pubsub) +- [In the beginning was floodsub](https://github.com/libp2p/specs/tree/master/pubsub/gossipsub#in-the-beginning-was-floodsub) +- [And then there is gossipsub](https://github.com/libp2p/specs/tree/master/pubsub/gossipsub#the-gossipsub-protocol) +- [Proximity Aware Epidemic PubSub for libp2p](https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/episub.md) diff --git a/doc/articles/pubsub/dupmsg.md b/doc/articles/pubsub/dupmsg.md new file mode 100644 index 00000000..d96aedbd --- /dev/null +++ b/doc/articles/pubsub/dupmsg.md @@ -0,0 +1,17 @@ +# Duplicate Message + +The message [sequence number](xref:Ipfs.IPublishedMessage.SequenceNumber) is a monotonically +increasing number that is unique among +[messages](xref:Ipfs.IPublishedMessage) originating from a peer. +No two messages from the same peer have the same sequence number. + +However, messages from different peers can have the same sequence number, +so this number alone cannot be used to uniquely identify a message. +A peer id is unique, so the `unique message ID` is the concatenation of the +message [author id](xref:Ipfs.IPublishedMessage.Sender) and [sequence number](xref:Ipfs.IPublishedMessage.SequenceNumber) +fields. + +Maintaining a list of all message IDs seen by the peer is not scalable. +A [timed cached](xref:PeerTalk.MessageTracker) is used to detect duplicate messages seen in the last +the 10 minutes. + diff --git a/doc/articles/pubsub/encrypt.md b/doc/articles/pubsub/encrypt.md new file mode 100644 index 00000000..86f0be28 --- /dev/null +++ b/doc/articles/pubsub/encrypt.md @@ -0,0 +1,3 @@ +# Encrypting a Message + +This is not yet implemented. diff --git a/doc/articles/pubsub/sign.md b/doc/articles/pubsub/sign.md new file mode 100644 index 00000000..2c003260 --- /dev/null +++ b/doc/articles/pubsub/sign.md @@ -0,0 +1,3 @@ +# Signing a Message + +This is not yet implemented. diff --git a/doc/articles/toc.yml b/doc/articles/toc.yml index 5674cd80..6c315612 100644 --- a/doc/articles/toc.yml +++ b/doc/articles/toc.yml @@ -7,7 +7,7 @@ href: intro-ipfs.md - name: Questions href: questions.md -- name: Core API +- name: Features href: core-api.md - name: File System href: filesystem.md @@ -22,6 +22,15 @@ href: fs/wrap.md - name: Format href: fs/format.md +- name: Message System + href: pubsub.md + items: + - name: Encrypting + href: pubsub/encrypt.md + - name: Signing + href: pubsub/sign.md + - name: Duplicate Message + href: pubsub/dupmsg.md - name: Repository href: repository.md items: diff --git a/src/CoreApi/PubSubApi.cs b/src/CoreApi/PubSubApi.cs index 1baa6c2e..25e906e5 100644 --- a/src/CoreApi/PubSubApi.cs +++ b/src/CoreApi/PubSubApi.cs @@ -17,34 +17,40 @@ public PubSubApi(IpfsEngine ipfs) this.ipfs = ipfs; } - public Task> PeersAsync(string topic = null, CancellationToken cancel = default(CancellationToken)) + public async Task> PeersAsync(string topic = null, CancellationToken cancel = default(CancellationToken)) { - throw new NotImplementedException(); + var pubsub = await ipfs.PubSubService.ConfigureAwait(false); + return await pubsub.PeersAsync(topic, cancel); } - public Task PublishAsync(string topic, string message, CancellationToken cancel = default(CancellationToken)) + public async Task PublishAsync(string topic, string message, CancellationToken cancel = default(CancellationToken)) { - throw new NotImplementedException(); + var pubsub = await ipfs.PubSubService.ConfigureAwait(false); + await pubsub.PublishAsync(topic, message, cancel); } - public Task PublishAsync(string topic, byte[] message, CancellationToken cancel = default(CancellationToken)) + public async Task PublishAsync(string topic, byte[] message, CancellationToken cancel = default(CancellationToken)) { - throw new NotImplementedException(); + var pubsub = await ipfs.PubSubService.ConfigureAwait(false); + await pubsub.PublishAsync(topic, message, cancel); } - public Task PublishAsync(string topic, Stream message, CancellationToken cancel = default(CancellationToken)) + public async Task PublishAsync(string topic, Stream message, CancellationToken cancel = default(CancellationToken)) { - throw new NotImplementedException(); + var pubsub = await ipfs.PubSubService.ConfigureAwait(false); + await pubsub.PublishAsync(topic, message, cancel); } - public Task SubscribeAsync(string topic, Action handler, CancellationToken cancellationToken) + public async Task SubscribeAsync(string topic, Action handler, CancellationToken cancellationToken) { - throw new NotImplementedException(); + var pubsub = await ipfs.PubSubService.ConfigureAwait(false); + await pubsub.SubscribeAsync(topic, handler, cancellationToken); } - public Task> SubscribedTopicsAsync(CancellationToken cancel = default(CancellationToken)) + public async Task> SubscribedTopicsAsync(CancellationToken cancel = default(CancellationToken)) { - throw new NotImplementedException(); + var pubsub = await ipfs.PubSubService.ConfigureAwait(false); + return await pubsub.SubscribedTopicsAsync(cancel); } } } diff --git a/src/IpfsEngine.cs b/src/IpfsEngine.cs index ee08e60c..7fd5e2d2 100644 --- a/src/IpfsEngine.cs +++ b/src/IpfsEngine.cs @@ -188,6 +188,20 @@ void Init() log.Debug("Built DHT service"); return dht; }); + PubSubService = new AsyncLazy(async () => + { + log.Debug("Building PubSub service"); + var pubsub = new PeerTalk.PubSub.NotificationService + { + LocalPeer = await LocalPeer.ConfigureAwait(false) + }; + pubsub.Routers.Add(new PeerTalk.PubSub.FloodRouter + { + Swarm = await SwarmService.ConfigureAwait(false) + }); + log.Debug("Built PubSub service"); + return pubsub; + }); } /// @@ -356,6 +370,12 @@ public async Task StartAsync() stopTasks.Add(async () => await dht.StopAsync().ConfigureAwait(false)); await dht.StartAsync().ConfigureAwait(false); }, + async () => + { + var pubsub = await PubSubService.ConfigureAwait(false); + stopTasks.Add(async () => await pubsub.StopAsync().ConfigureAwait(false)); + await pubsub.StartAsync().ConfigureAwait(false); + }, }; log.Debug("waiting for services to start"); @@ -540,6 +560,11 @@ public void Stop() /// public AsyncLazy SwarmService { get; private set; } + /// + /// Manages publishng and subscribing to messages. + /// + public AsyncLazy PubSubService { get; private set; } + /// /// Exchange blocks with other peers. /// diff --git a/src/IpfsEngine.csproj b/src/IpfsEngine.csproj index 356ad047..d462edce 100644 --- a/src/IpfsEngine.csproj +++ b/src/IpfsEngine.csproj @@ -43,7 +43,7 @@ - + diff --git a/test/CoreApi/PubSubApiTest.cs b/test/CoreApi/PubSubApiTest.cs new file mode 100644 index 00000000..9b2134dd --- /dev/null +++ b/test/CoreApi/PubSubApiTest.cs @@ -0,0 +1,233 @@ +using Microsoft.VisualStudio.TestTools.UnitTesting; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Ipfs.Engine +{ + + [TestClass] + public class PubSubApiTest + { + + [TestMethod] + public void Api_Exists() + { + var ipfs = TestFixture.Ipfs; + Assert.IsNotNull(ipfs.PubSub); + } + + [TestMethod] + public void Peers_Unknown_Topic() + { + var ipfs = TestFixture.Ipfs; + var topic = "net-ipfs-http-client-test-unknown" + Guid.NewGuid().ToString(); + var peers = ipfs.PubSub.PeersAsync(topic).Result.ToArray(); + Assert.AreEqual(0, peers.Length); + } + + [TestMethod] + public async Task Subscribed_Topics() + { + var ipfs = TestFixture.Ipfs; + var topic = Guid.NewGuid().ToString(); + var cs = new CancellationTokenSource(); + await ipfs.StartAsync(); + try + { + await ipfs.PubSub.SubscribeAsync(topic, msg => { }, cs.Token); + var topics = ipfs.PubSub.SubscribedTopicsAsync().Result.ToArray(); + Assert.IsTrue(topics.Length > 0); + CollectionAssert.Contains(topics, topic); + } + finally + { + await ipfs.StopAsync(); + cs.Cancel(); + } + } + + volatile int messageCount = 0; + + [TestMethod] + public async Task Subscribe() + { + messageCount = 0; + var ipfs = TestFixture.Ipfs; + var topic = Guid.NewGuid().ToString(); + var cs = new CancellationTokenSource(); + await ipfs.StartAsync(); + try + { + await ipfs.PubSub.SubscribeAsync(topic, msg => + { + Interlocked.Increment(ref messageCount); + }, cs.Token); + await ipfs.PubSub.PublishAsync(topic, "hello world!"); + + await Task.Delay(100); + Assert.AreEqual(1, messageCount); + } + finally + { + await ipfs.StopAsync(); + cs.Cancel(); + } + } + + [TestMethod] + public async Task Subscribe_Mutiple_Messages() + { + messageCount = 0; + var messages = "hello world this is pubsub".Split(); + var ipfs = TestFixture.Ipfs; + var topic = Guid.NewGuid().ToString(); + var cs = new CancellationTokenSource(); + await ipfs.StartAsync(); + try + { + await ipfs.PubSub.SubscribeAsync(topic, msg => + { + Interlocked.Increment(ref messageCount); + }, cs.Token); + foreach (var msg in messages) + { + await ipfs.PubSub.PublishAsync(topic, msg); + } + + await Task.Delay(100); + Assert.AreEqual(messages.Length, messageCount); + } + finally + { + await ipfs.StopAsync(); + cs.Cancel(); + } + } + + [TestMethod] + public async Task Multiple_Subscribe_Mutiple_Messages() + { + messageCount = 0; + var messages = "hello world this is pubsub".Split(); + var ipfs = TestFixture.Ipfs; + var topic = Guid.NewGuid().ToString(); + var cs = new CancellationTokenSource(); + + Action processMessage = (msg) => + { + Interlocked.Increment(ref messageCount); + }; + await ipfs.StartAsync(); + try + { + await ipfs.PubSub.SubscribeAsync(topic, processMessage, cs.Token); + await ipfs.PubSub.SubscribeAsync(topic, processMessage, cs.Token); + foreach (var msg in messages) + { + await ipfs.PubSub.PublishAsync(topic, msg); + } + + await Task.Delay(100); + Assert.AreEqual(messages.Length * 2, messageCount); + } + finally + { + await ipfs.StopAsync(); + cs.Cancel(); + } + } + + volatile int messageCount1 = 0; + + [TestMethod] + public async Task Unsubscribe() + { + messageCount1 = 0; + var ipfs = TestFixture.Ipfs; + var topic = Guid.NewGuid().ToString(); + var cs = new CancellationTokenSource(); + await ipfs.StartAsync(); + try + { + await ipfs.PubSub.SubscribeAsync(topic, msg => + { + Interlocked.Increment(ref messageCount1); + }, cs.Token); + await ipfs.PubSub.PublishAsync(topic, "hello world!"); + await Task.Delay(100); + Assert.AreEqual(1, messageCount1); + + cs.Cancel(); + await ipfs.PubSub.PublishAsync(topic, "hello world!!!"); + await Task.Delay(100); + Assert.AreEqual(1, messageCount1); + } + finally + { + await ipfs.StopAsync(); + } + } + + [TestMethod] + public async Task Subscribe_BinaryMessage() + { + var messages = new List(); + var expected = new byte[] { 0, 1, 2, 4, (byte)'a', (byte)'b', 0xfe, 0xff }; + var ipfs = TestFixture.Ipfs; + var topic = Guid.NewGuid().ToString(); + var cs = new CancellationTokenSource(); + await ipfs.StartAsync(); + try + { + await ipfs.PubSub.SubscribeAsync(topic, msg => + { + messages.Add(msg); + }, cs.Token); + await ipfs.PubSub.PublishAsync(topic, expected); + + await Task.Delay(100); + Assert.AreEqual(1, messages.Count); + CollectionAssert.AreEqual(expected, messages[0].DataBytes); + } + finally + { + await ipfs.StopAsync(); + cs.Cancel(); + } + } + + [TestMethod] + public async Task Subscribe_StreamMessage() + { + var messages = new List(); + var expected = new byte[] { 0, 1, 2, 4, (byte)'a', (byte)'b', 0xfe, 0xff }; + var ipfs = TestFixture.Ipfs; + var topic = Guid.NewGuid().ToString(); + var cs = new CancellationTokenSource(); + await ipfs.StartAsync(); + try + { + await ipfs.PubSub.SubscribeAsync(topic, msg => + { + messages.Add(msg); + }, cs.Token); + var ms = new MemoryStream(expected, false); + await ipfs.PubSub.PublishAsync(topic, ms); + + await Task.Delay(100); + Assert.AreEqual(1, messages.Count); + CollectionAssert.AreEqual(expected, messages[0].DataBytes); + } + finally + { + cs.Cancel(); + await ipfs.StopAsync(); + } + } + } +}