Skip to content

Commit

Permalink
multiconnectin works
Browse files Browse the repository at this point in the history
  • Loading branch information
inkrement committed Dec 31, 2013
1 parent c5b4662 commit 402d61d
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 32 deletions.
8 changes: 4 additions & 4 deletions lib/Messenger/src/peers/connections/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ abstract class Connection{

///completer for connection
///TODO: use another generic type
Completer<String> connection_completer;
Completer<String> listen_completer;
Completer<int> connection_completer;
Completer<int> listen_completer;

Connection([Logger logger=null]){
if (logger == null) this.log = new Logger("Connection");
else this.log = logger;

//init
listen_completer = new Completer<String>();
connection_completer = new Completer<String>();
listen_completer = new Completer<int>();
connection_completer = new Completer<int>();
newMessageController = new StreamController<NewMessageEvent>();
}

Expand Down
27 changes: 20 additions & 7 deletions lib/Messenger/src/peers/connections/jswebrtcconnection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ class JsWebRtcConnection extends Connection{
/* set channel events */
_dc.onmessage = (MessageEvent event)=>newMessageController.add(new NewMessageEvent(new Message.fromString(event.data)));

_dc.onopen = (_)=>changeReadyState(new ReadyState.fromDataChannel(_dc.readyState));
_dc.onopen = (_){
changeReadyState(new ReadyState.fromDataChannel(_dc.readyState));
listen_completer.complete(sc.id);
};
_dc.onclose = (_)=>changeReadyState(new ReadyState.fromDataChannel(_dc.readyState));
_dc.onerror = (x)=>log.shout("rtc error callback: " + x.toString());

Expand Down Expand Up @@ -64,17 +67,15 @@ class JsWebRtcConnection extends Connection{
*/
case MessageType.PEER_ID:
log.fine("PEER_ID received: connection established");
listen_completer.complete(sc.id.toString());
//listen_completer.complete(sc.id);

sc.send(new Message(this.hashCode.toString(), MessageType.AKN_PEER_ID));
changeReadyState(ReadyState.CONNECTED);

break;
case MessageType.AKN_PEER_ID:
log.fine("AKN_PEER_ID received: connection established");
connection_completer.complete(sc.id.toString());

changeReadyState(ReadyState.CONNECTED);
break;
case MessageType.WEBRTC_OFFER:
log.fine("received sdp offer");
Expand Down Expand Up @@ -127,7 +128,7 @@ class JsWebRtcConnection extends Connection{
/**
* listen for incoming connections
*/
Future listen(SignalingChannel sc){
Future<int> listen(SignalingChannel sc){
log.finest("start listening");

this.sc = sc;
Expand Down Expand Up @@ -163,7 +164,7 @@ class JsWebRtcConnection extends Connection{
/**
* connect to WebrtcPeer
*/
Future connect(SignalingChannel sc){
Future<int> connect(SignalingChannel sc){
log.finest("try to connect");

//listen for incoming connection
Expand All @@ -175,7 +176,12 @@ class JsWebRtcConnection extends Connection{
_dc = _rtcPeerConnection.createDataChannel("sendDataChannel", js.map(dataChannelOptions));
log.fine('created new data channel');

_dc.onopen = (_)=>changeReadyState(new ReadyState.fromDataChannel(_dc.readyState));
_dc.onopen = (_){
changeReadyState(new ReadyState.fromDataChannel(_dc.readyState));
connection_completer.complete(sc.id);

changeReadyState(ReadyState.CONNECTED);
};
_dc.onclose = (_)=>changeReadyState(_dc.readyState);

_dc.onmessage = (MessageEvent event)=>newMessageController.add(new NewMessageEvent(new Message.fromString(event.data)));
Expand Down Expand Up @@ -204,6 +210,13 @@ class JsWebRtcConnection extends Connection{

send(Message msg){
//serialize
if(_dc == null){
log.warning("could not send message. No DataChannel open!");
return;
}

//TODO: not ready, pipe message.
log.info("send message to : ${sc.id.toString()}");
_dc.send(Message.serialize(msg));
}

Expand Down
20 changes: 11 additions & 9 deletions lib/Messenger/src/peers/jswebrtcpeer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ class JsWebRtcPeer extends Peer{

Stream<NewConnectionEvent> listen(SignalingChannel sc){
JsWebRtcConnection c = new JsWebRtcConnection(log);
Future<String> f = c.listen(sc);
Future<int> f = c.listen(sc);

//add to list of connections. index is identity of other peer
//TODO: test if identity is unique
f.then((String hash){
_connections[hash] = c;
f.then((int id){
_connections[id] = c;
log.info("new connection added! (now: ${connections.length.toString()})");
newConnectionController.add(new NewConnectionEvent(c));

//redirect messages
Expand All @@ -28,10 +29,11 @@ class JsWebRtcPeer extends Peer{

Stream<NewConnectionEvent> connect(SignalingChannel sc){
JsWebRtcConnection c = new JsWebRtcConnection(log);
Future<String> f = c.connect(sc);
Future<int> f = c.connect(sc);

f.then((String hash) {
_connections[hash] = c;
f.then((int id) {
_connections[id] = c;
log.info("new connection added! (now: ${connections.length.toString()})");
newConnectionController.add(new NewConnectionEvent(c));

//redirect messages
Expand Down Expand Up @@ -70,13 +72,13 @@ class JsWebRtcPeer extends Peer{
*
* @ TODO: check if datachannel open. else throw exception
*/
send(String name, Message msg){
send(int id, Message msg){
log.info("send message!");

if(!_connections.containsKey(name))
if(!_connections.containsKey(id))
throw new StateError("list of connections does not contain peer ${name}");

_connections[name].send(msg.toString());
_connections[id].send(msg);
}

}
4 changes: 2 additions & 2 deletions lib/Messenger/src/peers/messagepassingpeer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import './peer.dart';
import '../message.dart';
import '../events/newmessageevent.dart';
*/

/*
class MessagePassingPeer extends Peer{
connect(MessagePassingPeer p){
//TODO: connect?!
Expand All @@ -17,4 +17,4 @@ class MessagePassingPeer extends Peer{
}
close(){}
}
}*/
14 changes: 7 additions & 7 deletions lib/Messenger/src/peers/peer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ abstract class Peer{
///new message event stream
StreamController<NewMessageEvent> newMessageController;

Map<String, Connection> _connections;
Map<int, Connection> _connections;

static List<Peer> peers = new List<Peer>();

Expand Down Expand Up @@ -55,7 +55,7 @@ abstract class Peer{
//init
newMessageController = new StreamController<NewMessageEvent>.broadcast();
newConnectionController = new StreamController<NewConnectionEvent>.broadcast();
_connections = new Map<String, Connection>();
_connections = new Map<int, Connection>();

listen_completer = new Completer<String>();
connection_completer = new Completer<String>();
Expand All @@ -69,7 +69,7 @@ abstract class Peer{
/**
* connections getter
*/
Map<String, Connection> get connections => _connections;
Map<int, Connection> get connections => _connections;

/**
* get identifer of this object
Expand Down Expand Up @@ -116,18 +116,18 @@ abstract class Peer{
* @param String receiverId receiver of message
* @param Message msg is content of message
*/
send(String receiverId, Message msg);
send(int receiverId, Message msg);

/**
* send string to other peer
*/
sendString(String receiverId, String msg) => send(receiverId, new Message(msg));
sendString(int receiverId, String msg) => send(receiverId, new Message(msg));

/**
* send message to multiple peers
*/
broadcast(List<String> receiverIds, Message msg){
receiverIds.forEach((String id){
broadcast(Iterable<int> receiverIds, Message msg){
receiverIds.forEach((int id){
this.send(id, msg);
});
}
Expand Down
156 changes: 154 additions & 2 deletions tests/jsrtcpeer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,11 @@ void main() {

//set callbacks
alice.newConnectionController.stream.listen(
expectAsync1((_)=>expect(alice.connections.keys.first, bob_sc.id.toString()))
expectAsync1((_)=>expect(alice.connections.keys.first, bob_sc.id))
);

bob.newConnectionController.stream.listen(
expectAsync1((_)=>expect(bob.connections.keys.first, alice_sc.id.toString()))
expectAsync1((_)=>expect(bob.connections.keys.first, alice_sc.id))
);

//connect peer
Expand All @@ -141,6 +141,158 @@ void main() {
});


group('multi connections', (){

test('three connections at the same time', (){
JsWebRtcPeer alice = new JsWebRtcPeer("alice_m3", Level.OFF);
JsWebRtcPeer bob = new JsWebRtcPeer("bob_m3", Level.OFF);
JsWebRtcPeer clark = new JsWebRtcPeer("clark_m3", Level.OFF);

//setup signaling channels
MessagePassing alice_bob_sc = new MessagePassing();
MessagePassing bob_alice_sc = new MessagePassing();

MessagePassing alice_clark_sc = new MessagePassing();
MessagePassing clark_alice_sc = new MessagePassing();

MessagePassing clark_bob_sc = new MessagePassing();
MessagePassing bob_clark_sc = new MessagePassing();

alice_bob_sc.connect(bob_alice_sc.identityMap());
bob_alice_sc.connect(alice_bob_sc.identityMap());

alice_clark_sc.connect(clark_alice_sc.identityMap());
clark_alice_sc.connect(alice_clark_sc.identityMap());

clark_bob_sc.connect(bob_clark_sc.identityMap());
bob_clark_sc.connect(clark_bob_sc.identityMap());


//set callbacks

int alice_c = 1;
int bob_c = 1;
int clark_c = 1;

alice.newConnectionController.stream.listen(
expectAsync1((_)=>expect(alice.connections.length, alice_c++), count:2)
);

bob.newConnectionController.stream.listen(
expectAsync1((_)=>expect(bob.connections.length, bob_c++), count:2)
);

clark.newConnectionController.stream.listen(
expectAsync1((_)=>expect(clark.connections.length, clark_c++), count:2)
);


//connect clark/bob bob/clark
bob.listen(clark_bob_sc);
clark.connect(bob_clark_sc);

//connect alice/bob bob/alice
alice.listen(bob_alice_sc);
bob.connect(alice_bob_sc);

//connect alice/clark clark/alice
alice.listen(clark_alice_sc);
clark.connect(alice_clark_sc);

});



test('3 clients send messages', (){
JsWebRtcPeer alice = new JsWebRtcPeer("alice_s3", Level.ALL);
JsWebRtcPeer bob = new JsWebRtcPeer("bob_s3", Level.ALL);
JsWebRtcPeer clark = new JsWebRtcPeer("clark_s3", Level.ALL);

//setup signaling channels
MessagePassing alice_bob_sc = new MessagePassing();
MessagePassing bob_alice_sc = new MessagePassing();

MessagePassing alice_clark_sc = new MessagePassing();
MessagePassing clark_alice_sc = new MessagePassing();

MessagePassing clark_bob_sc = new MessagePassing();
MessagePassing bob_clark_sc = new MessagePassing();

alice_bob_sc.connect(bob_alice_sc.identityMap());
bob_alice_sc.connect(alice_bob_sc.identityMap());

alice_clark_sc.connect(clark_alice_sc.identityMap());
clark_alice_sc.connect(alice_clark_sc.identityMap());

clark_bob_sc.connect(bob_clark_sc.identityMap());
bob_clark_sc.connect(clark_bob_sc.identityMap());


//set callbacks

String s = "some random string";
Message tm = new Message(s);

//each sould receive two messages
alice.newMessageController.stream.listen(
expectAsync1((NewMessageEvent e){
expect(e.data.msg, s);
}, count: 2)
);

bob.newMessageController.stream.listen(
expectAsync1((NewMessageEvent e){
expect(e.data.msg, s);
}, count: 2)
);

clark.newMessageController.stream.listen(
expectAsync1((NewMessageEvent e){
expect(e.data.msg, s);
}, count: 2)
);


bob.newConnectionController.stream.listen((_){
if(bob.connections.length > 1){
logMessage("connected to more than 1 client. looks quite good");

bob.multicast(tm);
}
});

clark.newConnectionController.stream.listen((_){
if(clark.connections.length > 1){
logMessage("connected to more than 1 client. looks quite good");

clark.multicast(tm);
}
});

alice.newConnectionController.stream.listen((_){
if(alice.connections.length > 1){
logMessage("connected to more than 1 client. looks quite good");

alice.multicast(tm);
}
});

//connect clark/bob bob/clark
bob.listen(clark_bob_sc);
clark.connect(bob_clark_sc);

//connect alice/bob bob/alice
alice.listen(bob_alice_sc);
bob.connect(alice_bob_sc);

//connect alice/clark clark/alice
alice.listen(clark_alice_sc);
clark.connect(alice_clark_sc);

});

});

/**
* test DataChannel's readyState opens
Expand Down
2 changes: 1 addition & 1 deletion tests/unittests.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ void main() {
/// setup html environment
// override configuration to set custom timeout
final HtmlEnhancedConfiguration sc = new HtmlEnhancedConfiguration(false);
sc.timeout = new Duration(seconds: 5);
sc.timeout = new Duration(seconds: 10);
unittestConfiguration = sc;

/*
Expand Down

0 comments on commit 402d61d

Please sign in to comment.