diff --git a/Cargo.toml b/Cargo.toml index c16f172..8d9b242 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rk-core" -version = "0.1.6" +version = "0.1.7" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/pyproject.toml b/pyproject.toml index 230b4b3..3f44ea9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "rk-core" -version = "0.1.6" +version = "0.1.7" description = "Rakun Multi Agent System" authors = ["Ceylon AI "] diff --git a/src/server/server.rs b/src/server/server.rs index 15eae79..6ac1e2b 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -6,6 +6,7 @@ use tokio::sync::Mutex; use crate::server::application; use crate::transport::p2p::P2PTransporter; +use crate::transport::redis::RedisTransporter; use crate::types::EventProcessor; // pyO3 module @@ -63,7 +64,6 @@ impl Server { let msg_tx = self.msg_tx.clone(); let app_rx = self.app_rx.clone(); std::thread::spawn(move || { - // let mut app_rx = CHANNEL_PAIR.with(|cp| cp.borrow().rx.resubscribe()); tokio::runtime::Runtime::new().unwrap().block_on(async move { while let msg = app_rx.lock().await.recv().await.unwrap() { debug!( @@ -114,7 +114,7 @@ impl Server { Ok(()) } - pub fn publisher(&mut self, py: Python) -> MessageProcessor { + pub fn publisher(&mut self) -> MessageProcessor { MessageProcessor { msg_tx: Arc::new(Mutex::new(self.app_tx.clone())), } diff --git a/src/transport/p2p.rs b/src/transport/p2p.rs index 1e46831..f0addc5 100644 --- a/src/transport/p2p.rs +++ b/src/transport/p2p.rs @@ -70,7 +70,7 @@ impl Transporter for P2PTransporter { // Create a random PeerId let id_keys = identity::Keypair::generate_ed25519(); let local_peer_id = PeerId::from(id_keys.public()); - println!("Local peer id: {local_peer_id} {owner}"); + info!("Local peer id: {local_peer_id} {owner}"); // Set up an encrypted DNS-enabled TCP Transport over the yamux protocol. let tcp_transport = tcp::tokio::Transport::new(tcp::Config::default().nodelay(true)) @@ -166,19 +166,19 @@ impl Transporter for P2PTransporter { SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Subscribed{peer_id,topic})) => { let status = format!("{peer_id}"); - println!("Subscribed to {owner} {topic} {status}"); + debug!("Subscribed to {owner} {topic} {status}"); let is_connected = self.connected_peers.contains(&peer_id); self.connected_peers.push(peer_id); if self.connected_peers.len() == 1 && !is_connected { - println!("{owner} START NOW"); + debug!("{owner} START NOW"); self.send(TransportStatus::Started).await; } }, SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Unsubscribed{peer_id,topic})) => { let status = format!("{peer_id}"); - println!("Subscribed to {owner} {topic} {status}"); + debug!("Subscribed to {owner} {topic} {status}"); self.connected_peers.remove(self.connected_peers.iter().position(|x| x == &peer_id).unwrap()); if self.connected_peers.len() == 0 { diff --git a/src/transport/redis.rs b/src/transport/redis.rs index 5c3dbd1..c95c796 100644 --- a/src/transport/redis.rs +++ b/src/transport/redis.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use async_trait::async_trait; use futures::StreamExt; -use log::{debug, error}; +use log::{debug, error, info}; use redis::{Commands, PubSubCommands}; use tokio::sync::{mpsc, Mutex}; use tokio::sync::mpsc::{Receiver, Sender}; @@ -56,7 +56,7 @@ impl Transporter for RedisTransporter { let peer_id = self.peer_id.clone(); let name = self.owner.clone(); - println!("Agent {} Stared at: {}", name, peer_id); + info!("Agent {} Stared at: {}", name, peer_id); let msg_tx = self.msg_tx.clone(); let t1 = tokio::spawn(async move { @@ -72,10 +72,10 @@ impl Transporter for RedisTransporter { let data = TransportMessage::from_bytes(payload.clone()); let data_log = TransportMessage::from_bytes(payload); if data.sender_id == name { continue; } - println!("Received message from Agent: {:?}-{}-{}", data_log.data, data_log.sender_id, name); + debug!("Received message from Agent: {:?}-{}-{}", data_log.data, data_log.sender_id, name); SendStatus(msg_tx.clone(), TransportStatus::Data(data)).await; } - println!("Listening for messages end"); + debug!("Listening for messages end"); }); let rx = self.rx.clone(); @@ -98,7 +98,7 @@ impl Transporter for RedisTransporter { }; let server_message = TransportMessage::using_bytes(message.clone(),owner_name.to_string(),owner_name.clone()); let _: () = conn.publish(&channel_name, &server_message).unwrap(); - println!("Publish message from Agent {}: {:?}", owner_name,message); + debug!("Publish message from Agent {}: {:?}", owner_name,message); } } } @@ -107,10 +107,10 @@ impl Transporter for RedisTransporter { tokio::select! { _ = t2 => { - println!("Listening for messages"); + debug!("Listening for t2"); } _ = t1 => { - println!("Listening for messages"); + debug!("Listening for t1"); } } Ok(())