Skip to content

Commit

Permalink
Feat/grpc streams (#31)
Browse files Browse the repository at this point in the history
* feat: streaming and pty setup

---------

Signed-off-by: Mauran <thomas.mauran@etu.umontpellier.fr>
  • Loading branch information
thomas-mauran authored Apr 30, 2024
1 parent b547afa commit 64a5996
Show file tree
Hide file tree
Showing 21 changed files with 319 additions and 144 deletions.
18 changes: 17 additions & 1 deletion proto/vmm.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,24 @@ enum LogLevel {
ERROR = 3;
}

// TODO: Didn't managed to import it from the agent file
message ExecuteResponse {
enum Stage {
PENDING = 0;
BUILDING = 1;
RUNNING = 2;
DONE = 3;
FAILED = 4;
DEBUG = 5;
}

string stdout = 1;
string stderr = 2;
int32 exit_code = 3;
}

service VmmService {
rpc Run (RunVmmRequest) returns (RunVmmResponse);
rpc Run (RunVmmRequest) returns (stream ExecuteResponse) {};
}

message RunVmmRequest {
Expand Down
7 changes: 6 additions & 1 deletion src/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ serde = "1.0.197"
tonic = "0.9"
prost = "0.11"
shared_models = { path="../shared-models" }
tokio = { version= "1.37.0", features= ["full"]}
tokio-stream = "0.1.15"
actix-web-lab = "0.20"
async-stream = "0.3"
serde_json = "1.0"

[build-dependencies]
tonic-build = "0.9"
tonic-build = "0.9"
12 changes: 8 additions & 4 deletions src/api/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use tonic::transport::Channel;
use tonic::{transport::Channel, Streaming};
use vmmorchestrator::vmm_service_client::VmmServiceClient;

pub mod vmmorchestrator {
Expand All @@ -18,9 +18,13 @@ impl VmmClient {
Ok(VmmClient { client })
}

pub async fn run_vmm(&mut self, request: vmmorchestrator::RunVmmRequest) {
pub async fn run_vmm(
&mut self,
request: vmmorchestrator::RunVmmRequest,
) -> Result<Streaming<vmmorchestrator::ExecuteResponse>, tonic::Status> {
let request = tonic::Request::new(request);
let response = self.client.run(request).await.unwrap();
println!("RESPONSE={:?}", response);
let response_stream = self.client.run(request).await?.into_inner();

Ok(response_stream)
}
}
2 changes: 1 addition & 1 deletion src/api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pub mod client;
pub mod services;
pub mod service;
2 changes: 1 addition & 1 deletion src/api/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use actix_web::{App, HttpServer};
use api::services::{run, shutdown};
use api::service::{run, shutdown};

#[actix_web::main]
async fn main() -> std::io::Result<()> {
Expand Down
65 changes: 65 additions & 0 deletions src/api/src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use crate::client::{
vmmorchestrator::{ExecuteResponse, RunVmmRequest},
VmmClient,
};
use actix_web::{post, web, HttpResponse, Responder};
use actix_web_lab::sse;
use async_stream::stream;
use serde::Serialize;
use shared_models::CloudletDtoRequest;
use tokio_stream::StreamExt;
use tonic::Streaming;

#[post("/run")]
pub async fn run(req_body: web::Json<CloudletDtoRequest>) -> impl Responder {
let req = req_body.into_inner();

let mut client = VmmClient::new().await.unwrap();

let vmm_request = RunVmmRequest {
code: req.code,
env: req.env,
language: req.language as i32,
log_level: req.log_level as i32,
};

println!("Request: {:?}", vmm_request);

println!("Successfully connected to VMM service");

let mut response_stream: Streaming<ExecuteResponse> =
client.run_vmm(vmm_request).await.unwrap();
println!("Response stream: {:?}", response_stream);

let stream = stream! {
while let Some(Ok(exec_response)) = response_stream.next().await {
let json: ExecuteJsonResponse = exec_response.into();
yield sse::Event::Data(sse::Data::new_json(json).unwrap());
}
};

sse::Sse::from_infallible_stream(stream)
}

#[derive(Debug, Serialize)]
pub struct ExecuteJsonResponse {
pub stdout: String,
pub stderr: String,
pub exit_code: i32,
}

impl From<ExecuteResponse> for ExecuteJsonResponse {
fn from(value: ExecuteResponse) -> Self {
Self {
stdout: value.stdout,
stderr: value.stderr,
exit_code: value.exit_code,
}
}
}

#[post("/shutdown")]
pub async fn shutdown(req_body: String) -> impl Responder {
// TODO: Get the id from the body and shutdown the vm
HttpResponse::Ok().body(req_body)
}
33 changes: 0 additions & 33 deletions src/api/src/services.rs

This file was deleted.

2 changes: 1 addition & 1 deletion src/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ serde_yaml = "0.9.34"
schemars = "0.8.16"
serde_json = "1.0.115"
reqwest = "0.12.3"
shared_models = { path="../shared-models" }
shared_models = { path="../shared-models" }
4 changes: 4 additions & 0 deletions src/cli/config/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
language: rust
env_path: /home/thomas/Desktop/fork/cloudlet/src/cli/config/example.env
code_path: /home/thomas/Desktop/fork/cloudlet/src/cli/src/main.rs
log_level: debug
17 changes: 10 additions & 7 deletions src/cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use clap::Parser;

use args::{CliArgs, Commands};
use shared_models::YamlClientConfigFile;

use services::CloudletClient;
use std::io::{self};
use utils::ConfigFileHandler;
use std::{fs, io, process::exit};

mod args;
mod services;
Expand All @@ -17,13 +15,18 @@ async fn main() -> io::Result<()> {

match args.command {
Commands::Run { config_path } => {
let yaml_config: YamlClientConfigFile = ConfigFileHandler::load_config(&config_path)
.expect("Error while loading the configuration file");
let body = CloudletClient::new_cloudlet_config(yaml_config);
let toml_file = match fs::read_to_string(config_path.clone()) {
Ok(c) => c,
Err(_) => {
eprintln!("Could not read file `{:?}`", config_path);
exit(1);
}
};
let body = CloudletClient::new_cloudlet_config(toml_file);
let response = CloudletClient::run(body).await;

match response {
Ok(_) => println!("Request successful"),
Ok(_) => println!("Request successful {:?}", response),
Err(e) => eprintln!("Error while making the request: {}", e),
}
}
Expand Down
52 changes: 38 additions & 14 deletions src/cli/src/services.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,63 @@
use crate::utils::ConfigFileHandler;
use reqwest::Client;
use shared_models::{CloudletDtoRequest, YamlClientConfigFile};
use std::error::Error;
use serde::Deserialize;
use shared_models::{CloudletDtoRequest, Language};
use std::{error::Error, path::PathBuf};

#[derive(Deserialize)]
struct TomlConfig {
#[serde(rename = "workload-name")]
_workload_name: String,
language: Language,
_action: String,
_server: ServerConfig,
build: BuildConfig,
}

#[derive(Deserialize)]
struct ServerConfig {
_address: String,
_port: u16,
}

#[derive(Deserialize)]
struct BuildConfig {
#[serde(rename = "source-code-path")]
source_code_path: PathBuf,
_release: bool,
}

pub struct CloudletClient {}

impl CloudletClient {
pub fn new_cloudlet_config(config: YamlClientConfigFile) -> CloudletDtoRequest {
let code: String = ConfigFileHandler::read_file(&config.code_path)
pub fn new_cloudlet_config(config: String) -> CloudletDtoRequest {
let config: TomlConfig =
toml::from_str(&config).expect("Error while parsing the config file");

let code: String = ConfigFileHandler::read_file(&config.build.source_code_path)
.expect("Error while reading the code file");
let env = ConfigFileHandler::read_file(&config.env_path)
.expect("Error while reading the environment file");
let env = "";

let language = config.language;
let log_level = config.log_level;
CloudletDtoRequest {
language,
env,
code,
log_level,
env: env.to_string(),
log_level: shared_models::LogLevel::INFO,
}
}

pub async fn run(request: CloudletDtoRequest) -> Result<(), Box<dyn Error>> {
let client = Client::new();
let json = serde_json::to_string(&request)?;
println!("REQUEST : {:?}", request);
let res = client
.post("http://127.0.0.1:3000/run")
.header(reqwest::header::CONTENT_TYPE, "application/json")
.body(json)
.send()
.await?;

match res.status().as_u16() {
200 => Ok(()),
_ => Err("Error while making the request".into()),
}
println!("Response: {:?}", res.text().await?);
Ok(())
}
}
10 changes: 0 additions & 10 deletions src/cli/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,10 @@
use shared_models::YamlClientConfigFile;
use std::fs::File;
use std::io::{self, Read};
use std::path::PathBuf;

pub struct ConfigFileHandler {}

impl ConfigFileHandler {
pub fn load_config(config_path: &PathBuf) -> io::Result<YamlClientConfigFile> {
let mut file = File::open(config_path)?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
let config: YamlClientConfigFile = serde_yaml::from_str(&contents)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
Ok(config)
}

pub fn read_file(file_path: &PathBuf) -> io::Result<String> {
let mut file = File::open(file_path)?;
let mut contents = String::new();
Expand Down
6 changes: 5 additions & 1 deletion src/shared-models/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub enum LogLevel {
}

#[derive(Deserialize, Debug)]
pub struct YamlClientConfigFile {
pub struct TomlClientConfigFile {
pub language: Language,
pub env_path: PathBuf,
pub code_path: PathBuf,
Expand All @@ -35,3 +35,7 @@ pub struct CloudletDtoRequest {
pub code: String,
pub log_level: LogLevel,
}

#[derive(Serialize, Deserialize, Debug)]

pub struct AgentExecuteDtoRequest {}
1 change: 1 addition & 0 deletions src/vmm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ vm-device = "0.1.0"
vm-memory = { version = "0.14.1", features = ["backend-mmap"] }
vm-superio = "0.7.0"
vmm-sys-util = "0.12.1"
tokio-stream = "0.1.15"

[build-dependencies]
tonic-build = "0.9"
1 change: 1 addition & 0 deletions src/vmm/build.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("../../proto/vmm.proto")?;
tonic_build::compile_protos("../../proto/agent.proto")?;
Ok(())
}
Loading

0 comments on commit 64a5996

Please sign in to comment.