Skip to content

Commit

Permalink
Release 0.1.7
Browse files Browse the repository at this point in the history
  • Loading branch information
dewmal committed Jul 28, 2023
1 parent f118d79 commit e5673e2
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rk-core"
version = "0.1.6"
version = "0.1.7"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "rk-core"
version = "0.1.6"
version = "0.1.7"
description = "Rakun Multi Agent System"
authors = ["Ceylon AI <info@ceylon.ai>"]

Expand Down
4 changes: 2 additions & 2 deletions src/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tokio::sync::Mutex;

use crate::server::application;
use crate::transport::p2p::P2PTransporter;
use crate::transport::redis::RedisTransporter;
use crate::types::EventProcessor;

// pyO3 module
Expand Down Expand Up @@ -63,7 +64,6 @@ impl Server {
let msg_tx = self.msg_tx.clone();
let app_rx = self.app_rx.clone();
std::thread::spawn(move || {
// let mut app_rx = CHANNEL_PAIR.with(|cp| cp.borrow().rx.resubscribe());
tokio::runtime::Runtime::new().unwrap().block_on(async move {
while let msg = app_rx.lock().await.recv().await.unwrap() {
debug!(
Expand Down Expand Up @@ -114,7 +114,7 @@ impl Server {
Ok(())
}

pub fn publisher(&mut self, py: Python) -> MessageProcessor {
pub fn publisher(&mut self) -> MessageProcessor {
MessageProcessor {
msg_tx: Arc::new(Mutex::new(self.app_tx.clone())),
}
Expand Down
8 changes: 4 additions & 4 deletions src/transport/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl Transporter for P2PTransporter {
// Create a random PeerId
let id_keys = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(id_keys.public());
println!("Local peer id: {local_peer_id} {owner}");
info!("Local peer id: {local_peer_id} {owner}");

// Set up an encrypted DNS-enabled TCP Transport over the yamux protocol.
let tcp_transport = tcp::tokio::Transport::new(tcp::Config::default().nodelay(true))
Expand Down Expand Up @@ -166,19 +166,19 @@ impl Transporter for P2PTransporter {

SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Subscribed{peer_id,topic})) => {
let status = format!("{peer_id}");
println!("Subscribed to {owner} {topic} {status}");
debug!("Subscribed to {owner} {topic} {status}");

let is_connected = self.connected_peers.contains(&peer_id);
self.connected_peers.push(peer_id);
if self.connected_peers.len() == 1 && !is_connected {
println!("{owner} START NOW");
debug!("{owner} START NOW");
self.send(TransportStatus::Started).await;
}
},

SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Unsubscribed{peer_id,topic})) => {
let status = format!("{peer_id}");
println!("Subscribed to {owner} {topic} {status}");
debug!("Subscribed to {owner} {topic} {status}");

self.connected_peers.remove(self.connected_peers.iter().position(|x| x == &peer_id).unwrap());
if self.connected_peers.len() == 0 {
Expand Down
14 changes: 7 additions & 7 deletions src/transport/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use futures::StreamExt;
use log::{debug, error};
use log::{debug, error, info};
use redis::{Commands, PubSubCommands};
use tokio::sync::{mpsc, Mutex};
use tokio::sync::mpsc::{Receiver, Sender};
Expand Down Expand Up @@ -56,7 +56,7 @@ impl Transporter for RedisTransporter {
let peer_id = self.peer_id.clone();
let name = self.owner.clone();

println!("Agent {} Stared at: {}", name, peer_id);
info!("Agent {} Stared at: {}", name, peer_id);

let msg_tx = self.msg_tx.clone();
let t1 = tokio::spawn(async move {
Expand All @@ -72,10 +72,10 @@ impl Transporter for RedisTransporter {
let data = TransportMessage::from_bytes(payload.clone());
let data_log = TransportMessage::from_bytes(payload);
if data.sender_id == name { continue; }
println!("Received message from Agent: {:?}-{}-{}", data_log.data, data_log.sender_id, name);
debug!("Received message from Agent: {:?}-{}-{}", data_log.data, data_log.sender_id, name);
SendStatus(msg_tx.clone(), TransportStatus::Data(data)).await;
}
println!("Listening for messages end");
debug!("Listening for messages end");
});

let rx = self.rx.clone();
Expand All @@ -98,7 +98,7 @@ impl Transporter for RedisTransporter {
};
let server_message = TransportMessage::using_bytes(message.clone(),owner_name.to_string(),owner_name.clone());
let _: () = conn.publish(&channel_name, &server_message).unwrap();
println!("Publish message from Agent {}: {:?}", owner_name,message);
debug!("Publish message from Agent {}: {:?}", owner_name,message);
}
}
}
Expand All @@ -107,10 +107,10 @@ impl Transporter for RedisTransporter {

tokio::select! {
_ = t2 => {
println!("Listening for messages");
debug!("Listening for t2");
}
_ = t1 => {
println!("Listening for messages");
debug!("Listening for t1");
}
}
Ok(())
Expand Down

0 comments on commit e5673e2

Please sign in to comment.