Skip to content

Commit

Permalink
Fix message passing
Browse files Browse the repository at this point in the history
  • Loading branch information
dewmal committed Jun 12, 2024
1 parent 240dc1a commit 84a5919
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 42 deletions.
10 changes: 5 additions & 5 deletions bindings/ceylon/src/agent/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ pub struct AgentCore {
_workspace_id: Option<String>,
_processor: Arc<Mutex<Arc<dyn Processor>>>,
_on_message: Arc<Mutex<Arc<dyn MessageHandler>>>,
rx_0: Arc<Mutex<tokio::sync::mpsc::Receiver<Vec<u8>>>>,
tx_0: tokio::sync::mpsc::Sender<Vec<u8>>,
rx_0: Arc<Mutex<tokio::sync::mpsc::Receiver<Message>>>,
tx_0: tokio::sync::mpsc::Sender<Message>,
}

impl AgentCore {
pub fn new(name: String, is_leader: bool, on_message: Arc<dyn MessageHandler>, processor: Arc<dyn Processor>) -> Self {
let (tx_0, rx_0) = tokio::sync::mpsc::channel::<Vec<u8>>(100);
let (tx_0, rx_0) = tokio::sync::mpsc::channel::<Message>(100);
let id = uuid::Uuid::new_v4().to_string();
Self {
_name: name,
Expand Down Expand Up @@ -67,14 +67,14 @@ impl AgentCore {
}

pub async fn broadcast(&self, message: Vec<u8>) {
self.tx_0.send(message).await.unwrap();
self.tx_0.send(Message::data(self._name.clone(), self._id.clone(), message)).await.unwrap();
}
}

impl AgentCore {
pub(crate) async fn start(&self, topic: String, url: String, inputs: HashMap<String, String>) {
let agent_name = self._name.clone();
let (tx_0, rx_0) = tokio::sync::mpsc::channel::<Vec<u8>>(100);
let (tx_0, rx_0) = tokio::sync::mpsc::channel::<Message>(100);
let (mut node_0, mut rx_o_0) = create_node(agent_name.clone(), true, rx_0);
let on_message = self._on_message.clone();

Expand Down
2 changes: 1 addition & 1 deletion bindings/ceylon/src/agent/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl Workspace {

pub async fn run(&self, inputs: HashMap<String, String>) {
debug!("Workspace {} running", self.id);
let rt = Runtime::new().unwrap();
let mut rt = Runtime::new().unwrap();
let mut tasks = vec![];
let _inputs = inputs.clone();
for agent in self._agents.iter() {
Expand Down
3 changes: 2 additions & 1 deletion bindings/ceylon/src/ceylon.udl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ dictionary Message{
bytes data;
string message;
u64 time;
string from;
string originator;
string originator_id;
MessageType type;
};

Expand Down
2 changes: 1 addition & 1 deletion bindings/ceylon/tests/ceylon_agent_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def __init__(self, name, is_leader):
async def on_message(self, agent_id, message):
if message.type == MessageType.MESSAGE:
dt = bytes(message.data)
print(dt.decode("utf-8"))
print(self.id(), self.name(), dt.decode("utf-8"), message.originator_id, message.originator)
# message = json.loads(str(message, "utf-8"))
# if message and message.get("type") == "Message":
# print(self.name(), message["from"])
Expand Down
72 changes: 38 additions & 34 deletions libs/sangedama/src/node/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,29 +51,31 @@ pub struct Message {
pub data: Vec<u8>,
pub message: String,
pub time: u64,
pub from: String,
pub originator: String,
pub originator_id: String,
pub r#type: MessageType,
}

impl Message {
fn new(from: String, message: String, data: Vec<u8>, message_type: MessageType) -> Self {
fn new(originator: String, originator_id: String, message: String, data: Vec<u8>, message_type: MessageType) -> Self {
Self {
data,
time: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
from,
originator,
originator_id,
r#type: message_type,
message,
}
}
fn event(from: String, event: EventType) -> Self {
Self::new(from, event.as_str().to_string(), vec![], MessageType::Event)
fn event(originator: String, event: EventType) -> Self {
Self::new(originator, "SELF".to_string(), event.as_str().to_string(), vec![], MessageType::Event)
}

fn data(from: String, data: Vec<u8>) -> Self {
Self::new(from, "".to_string(), data, MessageType::Message)
pub fn data(from: String, originator_id: String, data: Vec<u8>) -> Self {
Self::new(from, originator_id, "DATA-MESSAGE".to_string(), data, MessageType::Message)
}

fn to_json(&self) -> String {
Expand Down Expand Up @@ -112,8 +114,9 @@ pub struct Node {
is_leader: bool,
subscribed_topics: Vec<String>,

in_rx: mpsc::Receiver<Vec<u8>>,
in_rx: mpsc::Receiver<Message>,
out_tx: mpsc::Sender<Message>,
id: String,
}

impl Node {
Expand All @@ -138,7 +141,7 @@ impl Node {
}
}

pub fn broadcast(&mut self, message: Vec<u8>) -> Result<Vec<MessageId>, PublishError> {
pub fn broadcast(&mut self, message: Message) -> Result<Vec<MessageId>, PublishError> {
let mut message_ids = vec![];
for topic in self.subscribed_topics.clone() {
let topic = gossipsub::IdentTopic::new(topic);
Expand All @@ -147,7 +150,7 @@ impl Node {
.swarm
.behaviour_mut()
.gossipsub
.publish(topic, message.clone())
.publish(topic, message.to_json().as_bytes())
{
Ok(id) => {
message_ids.push(id);
Expand All @@ -169,7 +172,7 @@ impl Node {
select! {
message = self.in_rx.recv() => match message {
Some(message) => {
debug!("{:?} Received To Broadcast: {:?}", self.name, String::from_utf8_lossy(&message));
debug!("{:?} Received To Broadcast", self.name);
match self.broadcast(message){
Ok(message_ids) => {
debug!("{:?} Broadcasted message: {:?}", self.name, message_ids);
Expand All @@ -188,16 +191,16 @@ impl Node {
event = self.swarm.select_next_some() => match event {
SwarmEvent::NewListenAddr { address, .. } => {
debug!("{:?} Listening on {:?}", self.name, address);
self.pass_message_to_node(Message::event( self.swarm.local_peer_id().to_string(),EventType::OnListen,)).await
self.pass_message_to_node(Message::event(self.swarm.local_peer_id().to_string(),EventType::OnListen,)).await

},
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
debug!("{:?} Connected to {:?}", self.name, peer_id);
self.pass_message_to_node(Message::event( self.swarm.local_peer_id().to_string(),EventType::OnConnectionEstablished,)).await
self.pass_message_to_node(Message::event(self.swarm.local_peer_id().to_string(),EventType::OnConnectionEstablished,)).await
},
SwarmEvent::ConnectionClosed { peer_id, .. } => {
debug!("{:?} Disconnected from {:?}", self.name, peer_id);
self.pass_message_to_node(Message::event( self.swarm.local_peer_id().to_string(),EventType::OnConnectionClosed ,)).await
self.pass_message_to_node(Message::event(self.swarm.local_peer_id().to_string(),EventType::OnConnectionClosed ,)).await
},

SwarmEvent::Behaviour(Event::Gossipsub(event)) => {
Expand All @@ -206,8 +209,7 @@ impl Node {
match event {
gossipsub::Event::Message { propagation_source, message_id, message } => {
debug!("{:?} Received message '{:?}' from {:?} on {:?}", self.name, String::from_utf8_lossy(&message.data), propagation_source, message_id);

let msg = Message::data(self.name.clone(), message.data.clone());
let msg = serde_json::from_slice(message.data.as_slice()).unwrap();
self.pass_message_to_node(msg).await
},

Expand Down Expand Up @@ -254,7 +256,7 @@ impl Node {
pub fn create_node(
name: String,
is_leader: bool,
in_rx: mpsc::Receiver<Vec<u8>>,
in_rx: mpsc::Receiver<Message>,
) -> (Node, mpsc::Receiver<Message>) {
let swarm = SwarmBuilder::with_new_identity()
.with_tokio()
Expand Down Expand Up @@ -303,6 +305,7 @@ pub fn create_node(
(
Node {
name,
id: swarm.local_peer_id().to_string(),
swarm,
is_leader,
subscribed_topics: Vec::new(),
Expand All @@ -319,7 +322,7 @@ mod tests {
use log::{debug, info, trace, warn};
use serde_json::json;

use crate::node::node::create_node;
use crate::node::node::{create_node, Message};

#[test]
fn test_ping() {
Expand All @@ -329,11 +332,14 @@ mod tests {

let url = format!("/ip4/0.0.0.0/tcp/{}", port_id);

let (tx_0, mut rx_0) = tokio::sync::mpsc::channel::<Vec<u8>>(100);
let (tx_1, mut rx_1) = tokio::sync::mpsc::channel::<Vec<u8>>(100);
let (tx_0, mut rx_0) = tokio::sync::mpsc::channel::<Message>(100);
let (tx_1, mut rx_1) = tokio::sync::mpsc::channel::<Message>(100);

let (mut node_0, mut rx_o_0) = create_node("node_0".to_string(), true, rx_0);
let (mut node_1, mut rx_o_1) = create_node("node_1".to_string(), false, rx_1);

let node_0_id = node_0.id.clone();
let node_1_id = node_1.id.clone();

let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
Expand All @@ -343,14 +349,13 @@ mod tests {
runtime.spawn(async move {
while let Some(message_data) = rx_o_0.recv().await {
debug!("Node_0 Received: {:?}", message_data);
tx_0.send(
json!({
"data": format!("Hi from Node_1: {}", message_data.message).as_str(),
let msg = Message::data("node_0".to_string(),node_0_id.clone(), json!({
"data": format!("Hi from Node_0: {}", message_data.message).as_str(),
})
.to_string()
.as_bytes()
.to_vec(),
)
.to_string()
.as_bytes()
.to_vec());
tx_0.send(msg)
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(3000)).await;
Expand All @@ -359,15 +364,14 @@ mod tests {

runtime.spawn(async move {
while let Some(message_data) = rx_o_1.recv().await {
debug!("Node_0 Received: {:?}", message_data);
tx_1.send(
json!({
debug!("Node_1 Received: {:?}", message_data);
let msg = Message::data("node_1".to_string(), node_1_id.clone(), json!({
"data": format!("Hi from Node_1: {}", message_data.message).as_str(),
})
.to_string()
.as_bytes()
.to_vec(),
)
.to_string()
.as_bytes()
.to_vec());
tx_1.send(msg)
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(3000)).await;
Expand Down

0 comments on commit 84a5919

Please sign in to comment.