Skip to content

Commit

Permalink
split out terminal client from dht client (#240)
Browse files Browse the repository at this point in the history
  • Loading branch information
zupzup authored Nov 27, 2024
1 parent 1c0fe9c commit 02df6f8
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 193 deletions.
2 changes: 2 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub struct Config {
pub data_dir: String,
#[arg(default_value_t = String::from("ws://localhost:8800"), long, env = "SURREAL_DB_CONNECTION")]
pub surreal_db_connection: String,
#[arg(default_value_t = false, long, env = "TERMINAL_CLIENT")]
pub terminal_client: bool,
}

impl Config {
Expand Down
197 changes: 4 additions & 193 deletions src/dht/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use libp2p::PeerId;
use log::{error, info};
use std::collections::HashSet;
use std::fs;
use std::io::BufRead;
use std::sync::Arc;
use tokio::sync::broadcast;

Expand Down Expand Up @@ -52,35 +51,8 @@ impl Client {
mut network_events: Receiver<Event>,
mut shutdown_dht_client_receiver: broadcast::Receiver<bool>,
) {
// We need to use blocking stdin, because tokio's async stdin isn't meant for interactive
// use-cases and will block forever on finishing the program
let (stdin_tx, mut stdin_rx) = tokio::sync::mpsc::channel(100);
std::thread::spawn(move || {
let stdin = std::io::stdin();
for line in stdin.lock().lines() {
match line {
Ok(line) => {
let line = line.trim().to_string();
if !line.is_empty() {
if let Err(e) = stdin_tx.blocking_send(line) {
error!("Error handling stdin: {e}");
}
}
}
Err(e) => {
error!("Error reading line from stdin: {e}");
}
}
}
});

loop {
tokio::select! {
line = stdin_rx.recv() => {
if let Some(next_line) = line {
self.handle_input_line(next_line).await
}
},
event = network_events.next() => self.handle_event(event.expect("Swarm stream to be infinite.")).await,
_ = shutdown_dht_client_receiver.recv() => {
info!("Shutting down dht client...");
Expand Down Expand Up @@ -510,21 +482,21 @@ impl Client {
.expect("Command receiver not to be dropped.");
}

async fn send_message(&mut self, msg: Vec<u8>, topic: String) {
pub async fn send_message(&mut self, msg: Vec<u8>, topic: String) {
self.sender
.send(Command::SendMessage { msg, topic })
.await
.expect("Command receiver not to be dropped.");
}

async fn put_record(&mut self, key: String, value: String) {
pub async fn put_record(&mut self, key: String, value: String) {
self.sender
.send(Command::PutRecord { key, value })
.await
.expect("Command receiver not to be dropped.");
}

async fn get_record(&mut self, key: String) -> Record {
pub async fn get_record(&mut self, key: String) -> Record {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Command::GetRecord { key, sender })
Expand All @@ -542,7 +514,7 @@ impl Client {
receiver.await.expect("Sender not to be dropped.");
}

async fn get_providers(&mut self, file_name: String) -> HashSet<PeerId> {
pub async fn get_providers(&mut self, file_name: String) -> HashSet<PeerId> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Command::GetProviders { file_name, sender })
Expand Down Expand Up @@ -651,165 +623,4 @@ impl Client {
}
}
}

//Need for testing from console.
async fn handle_input_line(&mut self, line: String) {
let mut args = line.split(' ');
match args.next() {
Some("PUT") => {
let name: String = {
match args.next() {
Some(name) => String::from(name),
None => {
error!("Expected name.");
return;
}
}
};
self.put(&name).await;
}

Some("GET_BILL") => {
let name: String = {
match args.next() {
Some(name) => String::from(name),
None => {
error!("Expected bill name.");
return;
}
}
};
self.get_bill(name).await;
}

Some("GET_BILL_ATTACHMENT") => {
let name: String = {
match args.next() {
Some(name) => String::from(name),
None => {
error!("Expected bill name.");
return;
}
}
};
let file_name: String = {
match args.next() {
Some(file_name) => String::from(file_name),
None => {
error!("Expected file name.");
return;
}
}
};
if let Err(e) = self.get_bill_attachment(name, file_name).await {
error!("Get Bill Attachment failed: {e}");
}
}

Some("GET_KEY") => {
let name: String = {
match args.next() {
Some(name) => String::from(name),
None => {
error!("Expected bill name.");
return;
}
}
};
self.get_key(name).await;
}

Some("PUT_RECORD") => {
let key = {
match args.next() {
Some(key) => String::from(key),
None => {
error!("Expected key");
return;
}
}
};
let value = {
match args.next() {
Some(value) => String::from(value),
None => {
error!("Expected value");
return;
}
}
};

self.put_record(key, value).await;
}

Some("SEND_MESSAGE") => {
let topic = {
match args.next() {
Some(key) => String::from(key),
None => {
error!("Expected topic");
return;
}
}
};
let msg = {
match args.next() {
Some(value) => String::from(value),
None => {
error!("Expected msg");
return;
}
}
};

self.send_message(msg.into_bytes(), topic).await;
}

Some("SUBSCRIBE") => {
let topic = {
match args.next() {
Some(key) => String::from(key),
None => {
error!("Expected topic");
return;
}
}
};

self.subscribe_to_topic(topic).await;
}

Some("GET_RECORD") => {
let key = {
match args.next() {
Some(key) => String::from(key),
None => {
error!("Expected key");
return;
}
}
};
self.get_record(key).await;
}

Some("GET_PROVIDERS") => {
let key = {
match args.next() {
Some(key) => String::from(key),
None => {
error!("Expected key");
return;
}
}
};
self.get_providers(key).await;
}

_ => {
error!(
"expected GET_BILL, GET_KEY, GET_BILL_ATTACHMENT, PUT, SEND_MESSAGE, SUBSCRIBE, GET_RECORD, PUT_RECORD or GET_PROVIDERS."
);
}
}
}
}
9 changes: 9 additions & 0 deletions src/dht/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod event_loop;

use crate::persistence::bill::BillStoreApi;
use crate::persistence::identity::IdentityStoreApi;
use crate::util;
use anyhow::Result;
pub use client::Client;
use libp2p::identity::Keypair;
Expand All @@ -49,9 +50,17 @@ pub async fn dht_main(
spawn(network_event_loop.run(shutdown_receiver));

let network_client_to_return = network_client.clone();
let network_client_for_terminal_client = network_client.clone();

spawn(network_client.run(network_events, shutdown_sender.subscribe()));

if conf.terminal_client {
spawn(util::terminal::run_terminal_client(
shutdown_sender.subscribe(),
network_client_for_terminal_client,
));
}

Ok(Dht {
client: network_client_to_return,
shutdown_sender,
Expand Down
1 change: 1 addition & 0 deletions src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod file;
pub mod numbers_to_words;
pub mod rsa;
pub mod terminal;
use crate::{
constants::USEDNET, service::bill_service::BitcreditBill, service::identity_service::Identity,
};
Expand Down
Loading

0 comments on commit 02df6f8

Please sign in to comment.