Skip to content

Commit

Permalink
feature/subscribe (#62)
Browse files Browse the repository at this point in the history
* readded suubscription code

* added tes

* changed the way channels are created

* closes channel on stop
  • Loading branch information
decanus authored Aug 6, 2019
1 parent 6291da6 commit 48a12fa
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 4 deletions.
21 changes: 17 additions & 4 deletions mvds_batch_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package main

import (
"testing"
"time"

"github.com/stretchr/testify/suite"
"github.com/vacp2p/mvds/node"
"github.com/vacp2p/mvds/peers"
"github.com/vacp2p/mvds/state"
"github.com/vacp2p/mvds/store"
"github.com/vacp2p/mvds/transport"
"testing"
"time"
)

func TestMVDSBatchSuite(t *testing.T) {
Expand Down Expand Up @@ -67,7 +68,10 @@ func (s *MVDSBatchSuite) TearDownTest() {
}

func (s *MVDSBatchSuite) TestSendClient1ToClient2() {
messageID, err := s.client1.AppendMessage(s.groupID, []byte("message 1"))
subscription := s.client2.Subscribe()
content := []byte("message 1")

messageID, err := s.client1.AppendMessage(s.groupID, content)
s.Require().NoError(err)

// Check message is in store
Expand All @@ -80,10 +84,16 @@ func (s *MVDSBatchSuite) TestSendClient1ToClient2() {
message1Receiver, err := s.ds2.Get(messageID)
return err == nil && message1Receiver != nil
}, 1*time.Second, 10*time.Millisecond)

message := <- subscription
s.Equal(message.Body, content)
}

func (s *MVDSBatchSuite) TestSendClient2ToClient1() {
messageID, err := s.client2.AppendMessage(s.groupID, []byte("message 1"))
subscription := s.client1.Subscribe()
content := []byte("message 1")

messageID, err := s.client2.AppendMessage(s.groupID, content)
s.Require().NoError(err)

// Check message is in store
Expand All @@ -96,6 +106,9 @@ func (s *MVDSBatchSuite) TestSendClient2ToClient1() {
message1Receiver, err := s.ds1.Get(messageID)
return err == nil && message1Receiver != nil
}, 1*time.Second, 10*time.Millisecond)

message := <- subscription
s.Equal(message.Body, content)
}

func (s *MVDSBatchSuite) TestAcks() {
Expand Down
19 changes: 19 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type Node struct {

epoch int64
mode Mode

subscription chan protobuf.Message
}

// NewNode returns a new node.
Expand Down Expand Up @@ -115,9 +117,21 @@ func (n *Node) Start(duration time.Duration) {
// Stop message reading and epoch processing
func (n *Node) Stop() {
log.Print("Stopping node")
close(n.subscription)
n.cancel()
}

// Subscribe subscribes to incoming messages.
func (n *Node) Subscribe() chan protobuf.Message {
n.subscription = make(chan protobuf.Message)
return n.subscription
}

// Unsubscribe closes the listening channels
func (n *Node) Unsubscribe() {
close(n.subscription)
}

// AppendMessage sends a message to a given group.
func (n *Node) AppendMessage(groupID state.GroupID, data []byte) (state.MessageID, error) {
m := protobuf.Message{
Expand Down Expand Up @@ -359,6 +373,7 @@ func (n *Node) onMessage(sender state.PeerID, msg protobuf.Message) error {
if err != nil {
return err
}

for _, peer := range peers {
if peer == sender {
continue
Expand All @@ -367,6 +382,10 @@ func (n *Node) onMessage(sender state.PeerID, msg protobuf.Message) error {
n.insertSyncState(&groupID, id, peer, state.OFFER)
}

if n.subscription != nil {
n.subscription <- msg
}

return nil
}

Expand Down

0 comments on commit 48a12fa

Please sign in to comment.