From 64a5996f91ec2f9472dc0ac740d1d8a1a97bebc9 Mon Sep 17 00:00:00 2001 From: Thomas Mauran <78204354+thomas-mauran@users.noreply.github.com> Date: Tue, 30 Apr 2024 08:40:57 +0200 Subject: [PATCH] Feat/grpc streams (#31) * feat: streaming and pty setup --------- Signed-off-by: Mauran --- proto/vmm.proto | 18 +++- src/api/Cargo.toml | 7 +- src/api/src/client.rs | 12 ++- src/api/src/lib.rs | 2 +- src/api/src/main.rs | 2 +- src/api/src/service.rs | 65 ++++++++++++ src/api/src/services.rs | 33 ------- src/cli/Cargo.toml | 2 +- src/cli/config/config.yaml | 4 + src/cli/src/main.rs | 17 ++-- src/cli/src/services.rs | 52 +++++++--- src/cli/src/utils.rs | 10 -- src/shared-models/src/lib.rs | 6 +- src/vmm/Cargo.toml | 1 + src/vmm/build.rs | 1 + src/vmm/src/core/cpu/mpspec.rs | 110 ++++++++++----------- src/vmm/src/core/vmm.rs | 1 - src/vmm/src/grpc/client.rs | 40 ++++++++ src/vmm/src/{service.rs => grpc/server.rs} | 73 +++++++++++--- src/vmm/src/lib.rs | 5 +- src/vmm/src/main.rs | 2 +- 21 files changed, 319 insertions(+), 144 deletions(-) create mode 100644 src/api/src/service.rs delete mode 100644 src/api/src/services.rs create mode 100644 src/cli/config/config.yaml create mode 100644 src/vmm/src/grpc/client.rs rename src/vmm/src/{service.rs => grpc/server.rs} (52%) diff --git a/proto/vmm.proto b/proto/vmm.proto index 1debb55..4b394ab 100644 --- a/proto/vmm.proto +++ b/proto/vmm.proto @@ -15,8 +15,24 @@ enum LogLevel { ERROR = 3; } +// TODO: Didn't managed to import it from the agent file +message ExecuteResponse { + enum Stage { + PENDING = 0; + BUILDING = 1; + RUNNING = 2; + DONE = 3; + FAILED = 4; + DEBUG = 5; + } + + string stdout = 1; + string stderr = 2; + int32 exit_code = 3; +} + service VmmService { - rpc Run (RunVmmRequest) returns (RunVmmResponse); + rpc Run (RunVmmRequest) returns (stream ExecuteResponse) {}; } message RunVmmRequest { diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 5a6f3d0..8576126 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -11,6 +11,11 @@ serde = "1.0.197" tonic = "0.9" prost = "0.11" shared_models = { path="../shared-models" } +tokio = { version= "1.37.0", features= ["full"]} +tokio-stream = "0.1.15" +actix-web-lab = "0.20" +async-stream = "0.3" +serde_json = "1.0" [build-dependencies] -tonic-build = "0.9" \ No newline at end of file +tonic-build = "0.9" diff --git a/src/api/src/client.rs b/src/api/src/client.rs index 1abc188..28b772e 100644 --- a/src/api/src/client.rs +++ b/src/api/src/client.rs @@ -1,4 +1,4 @@ -use tonic::transport::Channel; +use tonic::{transport::Channel, Streaming}; use vmmorchestrator::vmm_service_client::VmmServiceClient; pub mod vmmorchestrator { @@ -18,9 +18,13 @@ impl VmmClient { Ok(VmmClient { client }) } - pub async fn run_vmm(&mut self, request: vmmorchestrator::RunVmmRequest) { + pub async fn run_vmm( + &mut self, + request: vmmorchestrator::RunVmmRequest, + ) -> Result, tonic::Status> { let request = tonic::Request::new(request); - let response = self.client.run(request).await.unwrap(); - println!("RESPONSE={:?}", response); + let response_stream = self.client.run(request).await?.into_inner(); + + Ok(response_stream) } } diff --git a/src/api/src/lib.rs b/src/api/src/lib.rs index 24f4aba..431311a 100644 --- a/src/api/src/lib.rs +++ b/src/api/src/lib.rs @@ -1,2 +1,2 @@ pub mod client; -pub mod services; +pub mod service; diff --git a/src/api/src/main.rs b/src/api/src/main.rs index 2de501b..5583801 100644 --- a/src/api/src/main.rs +++ b/src/api/src/main.rs @@ -1,5 +1,5 @@ use actix_web::{App, HttpServer}; -use api::services::{run, shutdown}; +use api::service::{run, shutdown}; #[actix_web::main] async fn main() -> std::io::Result<()> { diff --git a/src/api/src/service.rs b/src/api/src/service.rs new file mode 100644 index 0000000..a312bf8 --- /dev/null +++ b/src/api/src/service.rs @@ -0,0 +1,65 @@ +use crate::client::{ + vmmorchestrator::{ExecuteResponse, RunVmmRequest}, + VmmClient, +}; +use actix_web::{post, web, HttpResponse, Responder}; +use actix_web_lab::sse; +use async_stream::stream; +use serde::Serialize; +use shared_models::CloudletDtoRequest; +use tokio_stream::StreamExt; +use tonic::Streaming; + +#[post("/run")] +pub async fn run(req_body: web::Json) -> impl Responder { + let req = req_body.into_inner(); + + let mut client = VmmClient::new().await.unwrap(); + + let vmm_request = RunVmmRequest { + code: req.code, + env: req.env, + language: req.language as i32, + log_level: req.log_level as i32, + }; + + println!("Request: {:?}", vmm_request); + + println!("Successfully connected to VMM service"); + + let mut response_stream: Streaming = + client.run_vmm(vmm_request).await.unwrap(); + println!("Response stream: {:?}", response_stream); + + let stream = stream! { + while let Some(Ok(exec_response)) = response_stream.next().await { + let json: ExecuteJsonResponse = exec_response.into(); + yield sse::Event::Data(sse::Data::new_json(json).unwrap()); + } + }; + + sse::Sse::from_infallible_stream(stream) +} + +#[derive(Debug, Serialize)] +pub struct ExecuteJsonResponse { + pub stdout: String, + pub stderr: String, + pub exit_code: i32, +} + +impl From for ExecuteJsonResponse { + fn from(value: ExecuteResponse) -> Self { + Self { + stdout: value.stdout, + stderr: value.stderr, + exit_code: value.exit_code, + } + } +} + +#[post("/shutdown")] +pub async fn shutdown(req_body: String) -> impl Responder { + // TODO: Get the id from the body and shutdown the vm + HttpResponse::Ok().body(req_body) +} diff --git a/src/api/src/services.rs b/src/api/src/services.rs deleted file mode 100644 index 1648667..0000000 --- a/src/api/src/services.rs +++ /dev/null @@ -1,33 +0,0 @@ -use crate::client::vmmorchestrator::RunVmmRequest; -use crate::client::VmmClient; -use actix_web::{post, web, HttpResponse, Responder}; -use shared_models::CloudletDtoRequest; - -#[post("/run")] -pub async fn run(req_body: web::Json) -> impl Responder { - let req = req_body.into_inner(); - let grpc_client = VmmClient::new().await; - - let vmm_request = RunVmmRequest { - code: req.code, - env: req.env, - language: req.language as i32, - log_level: req.log_level as i32, - }; - - match grpc_client { - Ok(mut client) => { - println!("Successfully connected to VMM service"); - client.run_vmm(vmm_request).await; - HttpResponse::Ok().body("Successfully ran VMM") - } - Err(e) => HttpResponse::InternalServerError() - .body("Failed to connect to VMM service with error: ".to_string() + &e.to_string()), - } -} - -#[post("/shutdown")] -pub async fn shutdown(req_body: String) -> impl Responder { - // TODO: Get the id from the body and shutdown the vm - HttpResponse::Ok().body(req_body) -} diff --git a/src/cli/Cargo.toml b/src/cli/Cargo.toml index ef09404..957c55c 100644 --- a/src/cli/Cargo.toml +++ b/src/cli/Cargo.toml @@ -14,4 +14,4 @@ serde_yaml = "0.9.34" schemars = "0.8.16" serde_json = "1.0.115" reqwest = "0.12.3" -shared_models = { path="../shared-models" } \ No newline at end of file +shared_models = { path="../shared-models" } diff --git a/src/cli/config/config.yaml b/src/cli/config/config.yaml new file mode 100644 index 0000000..d5a19be --- /dev/null +++ b/src/cli/config/config.yaml @@ -0,0 +1,4 @@ +language: rust +env_path: /home/thomas/Desktop/fork/cloudlet/src/cli/config/example.env +code_path: /home/thomas/Desktop/fork/cloudlet/src/cli/src/main.rs +log_level: debug diff --git a/src/cli/src/main.rs b/src/cli/src/main.rs index 2fa2760..0e1b0e3 100644 --- a/src/cli/src/main.rs +++ b/src/cli/src/main.rs @@ -1,11 +1,9 @@ use clap::Parser; use args::{CliArgs, Commands}; -use shared_models::YamlClientConfigFile; use services::CloudletClient; -use std::io::{self}; -use utils::ConfigFileHandler; +use std::{fs, io, process::exit}; mod args; mod services; @@ -17,13 +15,18 @@ async fn main() -> io::Result<()> { match args.command { Commands::Run { config_path } => { - let yaml_config: YamlClientConfigFile = ConfigFileHandler::load_config(&config_path) - .expect("Error while loading the configuration file"); - let body = CloudletClient::new_cloudlet_config(yaml_config); + let toml_file = match fs::read_to_string(config_path.clone()) { + Ok(c) => c, + Err(_) => { + eprintln!("Could not read file `{:?}`", config_path); + exit(1); + } + }; + let body = CloudletClient::new_cloudlet_config(toml_file); let response = CloudletClient::run(body).await; match response { - Ok(_) => println!("Request successful"), + Ok(_) => println!("Request successful {:?}", response), Err(e) => eprintln!("Error while making the request: {}", e), } } diff --git a/src/cli/src/services.rs b/src/cli/src/services.rs index 84835d2..b2987e0 100644 --- a/src/cli/src/services.rs +++ b/src/cli/src/services.rs @@ -1,29 +1,55 @@ use crate::utils::ConfigFileHandler; use reqwest::Client; -use shared_models::{CloudletDtoRequest, YamlClientConfigFile}; -use std::error::Error; +use serde::Deserialize; +use shared_models::{CloudletDtoRequest, Language}; +use std::{error::Error, path::PathBuf}; + +#[derive(Deserialize)] +struct TomlConfig { + #[serde(rename = "workload-name")] + _workload_name: String, + language: Language, + _action: String, + _server: ServerConfig, + build: BuildConfig, +} + +#[derive(Deserialize)] +struct ServerConfig { + _address: String, + _port: u16, +} + +#[derive(Deserialize)] +struct BuildConfig { + #[serde(rename = "source-code-path")] + source_code_path: PathBuf, + _release: bool, +} + pub struct CloudletClient {} impl CloudletClient { - pub fn new_cloudlet_config(config: YamlClientConfigFile) -> CloudletDtoRequest { - let code: String = ConfigFileHandler::read_file(&config.code_path) + pub fn new_cloudlet_config(config: String) -> CloudletDtoRequest { + let config: TomlConfig = + toml::from_str(&config).expect("Error while parsing the config file"); + + let code: String = ConfigFileHandler::read_file(&config.build.source_code_path) .expect("Error while reading the code file"); - let env = ConfigFileHandler::read_file(&config.env_path) - .expect("Error while reading the environment file"); + let env = ""; + let language = config.language; - let log_level = config.log_level; CloudletDtoRequest { language, - env, code, - log_level, + env: env.to_string(), + log_level: shared_models::LogLevel::INFO, } } pub async fn run(request: CloudletDtoRequest) -> Result<(), Box> { let client = Client::new(); let json = serde_json::to_string(&request)?; - println!("REQUEST : {:?}", request); let res = client .post("http://127.0.0.1:3000/run") .header(reqwest::header::CONTENT_TYPE, "application/json") @@ -31,9 +57,7 @@ impl CloudletClient { .send() .await?; - match res.status().as_u16() { - 200 => Ok(()), - _ => Err("Error while making the request".into()), - } + println!("Response: {:?}", res.text().await?); + Ok(()) } } diff --git a/src/cli/src/utils.rs b/src/cli/src/utils.rs index 0d80519..cb663ad 100644 --- a/src/cli/src/utils.rs +++ b/src/cli/src/utils.rs @@ -1,4 +1,3 @@ -use shared_models::YamlClientConfigFile; use std::fs::File; use std::io::{self, Read}; use std::path::PathBuf; @@ -6,15 +5,6 @@ use std::path::PathBuf; pub struct ConfigFileHandler {} impl ConfigFileHandler { - pub fn load_config(config_path: &PathBuf) -> io::Result { - let mut file = File::open(config_path)?; - let mut contents = String::new(); - file.read_to_string(&mut contents)?; - let config: YamlClientConfigFile = serde_yaml::from_str(&contents) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - Ok(config) - } - pub fn read_file(file_path: &PathBuf) -> io::Result { let mut file = File::open(file_path)?; let mut contents = String::new(); diff --git a/src/shared-models/src/lib.rs b/src/shared-models/src/lib.rs index cfa7582..e32fe1b 100644 --- a/src/shared-models/src/lib.rs +++ b/src/shared-models/src/lib.rs @@ -21,7 +21,7 @@ pub enum LogLevel { } #[derive(Deserialize, Debug)] -pub struct YamlClientConfigFile { +pub struct TomlClientConfigFile { pub language: Language, pub env_path: PathBuf, pub code_path: PathBuf, @@ -35,3 +35,7 @@ pub struct CloudletDtoRequest { pub code: String, pub log_level: LogLevel, } + +#[derive(Serialize, Deserialize, Debug)] + +pub struct AgentExecuteDtoRequest {} diff --git a/src/vmm/Cargo.toml b/src/vmm/Cargo.toml index b549196..6f5aa30 100644 --- a/src/vmm/Cargo.toml +++ b/src/vmm/Cargo.toml @@ -34,6 +34,7 @@ vm-device = "0.1.0" vm-memory = { version = "0.14.1", features = ["backend-mmap"] } vm-superio = "0.7.0" vmm-sys-util = "0.12.1" +tokio-stream = "0.1.15" [build-dependencies] tonic-build = "0.9" diff --git a/src/vmm/build.rs b/src/vmm/build.rs index c53d705..b3cd0d2 100644 --- a/src/vmm/build.rs +++ b/src/vmm/build.rs @@ -1,4 +1,5 @@ fn main() -> Result<(), Box> { tonic_build::compile_protos("../../proto/vmm.proto")?; + tonic_build::compile_protos("../../proto/agent.proto")?; Ok(()) } diff --git a/src/vmm/src/core/cpu/mpspec.rs b/src/vmm/src/core/cpu/mpspec.rs index ac25be0..b7c1fa1 100644 --- a/src/vmm/src/core/cpu/mpspec.rs +++ b/src/vmm/src/core/cpu/mpspec.rs @@ -69,7 +69,7 @@ fn bindgen_test_layout_mpf_intel() { concat!("Alignment of ", stringify!(mpf_intel)) ); assert_eq!( - unsafe { &(*(0 as *const mpf_intel)).signature as *const _ as usize }, + unsafe { &(*std::ptr::null::()).signature as *const _ as usize }, 0usize, concat!( "Alignment of field: ", @@ -79,7 +79,7 @@ fn bindgen_test_layout_mpf_intel() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpf_intel)).physptr as *const _ as usize }, + unsafe { &(*std::ptr::null::()).physptr as *const _ as usize }, 4usize, concat!( "Alignment of field: ", @@ -89,7 +89,7 @@ fn bindgen_test_layout_mpf_intel() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpf_intel)).length as *const _ as usize }, + unsafe { &(*std::ptr::null::()).length as *const _ as usize }, 8usize, concat!( "Alignment of field: ", @@ -99,7 +99,7 @@ fn bindgen_test_layout_mpf_intel() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpf_intel)).specification as *const _ as usize }, + unsafe { &(*std::ptr::null::()).specification as *const _ as usize }, 9usize, concat!( "Alignment of field: ", @@ -109,7 +109,7 @@ fn bindgen_test_layout_mpf_intel() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpf_intel)).checksum as *const _ as usize }, + unsafe { &(*std::ptr::null::()).checksum as *const _ as usize }, 10usize, concat!( "Alignment of field: ", @@ -119,7 +119,7 @@ fn bindgen_test_layout_mpf_intel() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpf_intel)).feature1 as *const _ as usize }, + unsafe { &(*std::ptr::null::()).feature1 as *const _ as usize }, 11usize, concat!( "Alignment of field: ", @@ -129,7 +129,7 @@ fn bindgen_test_layout_mpf_intel() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpf_intel)).feature2 as *const _ as usize }, + unsafe { &(*std::ptr::null::()).feature2 as *const _ as usize }, 12usize, concat!( "Alignment of field: ", @@ -139,7 +139,7 @@ fn bindgen_test_layout_mpf_intel() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpf_intel)).feature3 as *const _ as usize }, + unsafe { &(*std::ptr::null::()).feature3 as *const _ as usize }, 13usize, concat!( "Alignment of field: ", @@ -149,7 +149,7 @@ fn bindgen_test_layout_mpf_intel() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpf_intel)).feature4 as *const _ as usize }, + unsafe { &(*std::ptr::null::()).feature4 as *const _ as usize }, 14usize, concat!( "Alignment of field: ", @@ -159,7 +159,7 @@ fn bindgen_test_layout_mpf_intel() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpf_intel)).feature5 as *const _ as usize }, + unsafe { &(*std::ptr::null::()).feature5 as *const _ as usize }, 15usize, concat!( "Alignment of field: ", @@ -202,7 +202,7 @@ fn bindgen_test_layout_mpc_table() { concat!("Alignment of ", stringify!(mpc_table)) ); assert_eq!( - unsafe { &(*(0 as *const mpc_table)).signature as *const _ as usize }, + unsafe { &(*std::ptr::null::()).signature as *const _ as usize }, 0usize, concat!( "Alignment of field: ", @@ -212,7 +212,7 @@ fn bindgen_test_layout_mpc_table() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_table)).length as *const _ as usize }, + unsafe { &(*std::ptr::null::()).length as *const _ as usize }, 4usize, concat!( "Alignment of field: ", @@ -222,7 +222,7 @@ fn bindgen_test_layout_mpc_table() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_table)).spec as *const _ as usize }, + unsafe { &(*std::ptr::null::()).spec as *const _ as usize }, 6usize, concat!( "Alignment of field: ", @@ -232,7 +232,7 @@ fn bindgen_test_layout_mpc_table() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_table)).checksum as *const _ as usize }, + unsafe { &(*std::ptr::null::()).checksum as *const _ as usize }, 7usize, concat!( "Alignment of field: ", @@ -242,7 +242,7 @@ fn bindgen_test_layout_mpc_table() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_table)).oem as *const _ as usize }, + unsafe { &(*std::ptr::null::()).oem as *const _ as usize }, 8usize, concat!( "Alignment of field: ", @@ -252,7 +252,7 @@ fn bindgen_test_layout_mpc_table() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_table)).productid as *const _ as usize }, + unsafe { &(*std::ptr::null::()).productid as *const _ as usize }, 16usize, concat!( "Alignment of field: ", @@ -262,7 +262,7 @@ fn bindgen_test_layout_mpc_table() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_table)).oemptr as *const _ as usize }, + unsafe { &(*std::ptr::null::()).oemptr as *const _ as usize }, 28usize, concat!( "Alignment of field: ", @@ -272,7 +272,7 @@ fn bindgen_test_layout_mpc_table() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_table)).oemsize as *const _ as usize }, + unsafe { &(*std::ptr::null::()).oemsize as *const _ as usize }, 32usize, concat!( "Alignment of field: ", @@ -282,7 +282,7 @@ fn bindgen_test_layout_mpc_table() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_table)).oemcount as *const _ as usize }, + unsafe { &(*std::ptr::null::()).oemcount as *const _ as usize }, 34usize, concat!( "Alignment of field: ", @@ -292,7 +292,7 @@ fn bindgen_test_layout_mpc_table() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_table)).lapic as *const _ as usize }, + unsafe { &(*std::ptr::null::()).lapic as *const _ as usize }, 36usize, concat!( "Alignment of field: ", @@ -302,7 +302,7 @@ fn bindgen_test_layout_mpc_table() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_table)).reserved as *const _ as usize }, + unsafe { &(*std::ptr::null::()).reserved as *const _ as usize }, 40usize, concat!( "Alignment of field: ", @@ -341,7 +341,7 @@ fn bindgen_test_layout_mpc_cpu() { concat!("Alignment of ", stringify!(mpc_cpu)) ); assert_eq!( - unsafe { &(*(0 as *const mpc_cpu)).type_ as *const _ as usize }, + unsafe { &(*std::ptr::null::()).type_ as *const _ as usize }, 0usize, concat!( "Alignment of field: ", @@ -351,7 +351,7 @@ fn bindgen_test_layout_mpc_cpu() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_cpu)).apicid as *const _ as usize }, + unsafe { &(*std::ptr::null::()).apicid as *const _ as usize }, 1usize, concat!( "Alignment of field: ", @@ -361,7 +361,7 @@ fn bindgen_test_layout_mpc_cpu() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_cpu)).apicver as *const _ as usize }, + unsafe { &(*std::ptr::null::()).apicver as *const _ as usize }, 2usize, concat!( "Alignment of field: ", @@ -371,7 +371,7 @@ fn bindgen_test_layout_mpc_cpu() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_cpu)).cpuflag as *const _ as usize }, + unsafe { &(*std::ptr::null::()).cpuflag as *const _ as usize }, 3usize, concat!( "Alignment of field: ", @@ -381,7 +381,7 @@ fn bindgen_test_layout_mpc_cpu() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_cpu)).cpufeature as *const _ as usize }, + unsafe { &(*std::ptr::null::()).cpufeature as *const _ as usize }, 4usize, concat!( "Alignment of field: ", @@ -391,7 +391,7 @@ fn bindgen_test_layout_mpc_cpu() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_cpu)).featureflag as *const _ as usize }, + unsafe { &(*std::ptr::null::()).featureflag as *const _ as usize }, 8usize, concat!( "Alignment of field: ", @@ -401,7 +401,7 @@ fn bindgen_test_layout_mpc_cpu() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_cpu)).reserved as *const _ as usize }, + unsafe { &(*std::ptr::null::()).reserved as *const _ as usize }, 12usize, concat!( "Alignment of field: ", @@ -436,7 +436,7 @@ fn bindgen_test_layout_mpc_bus() { concat!("Alignment of ", stringify!(mpc_bus)) ); assert_eq!( - unsafe { &(*(0 as *const mpc_bus)).type_ as *const _ as usize }, + unsafe { &(*std::ptr::null::()).type_ as *const _ as usize }, 0usize, concat!( "Alignment of field: ", @@ -446,7 +446,7 @@ fn bindgen_test_layout_mpc_bus() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_bus)).busid as *const _ as usize }, + unsafe { &(*std::ptr::null::()).busid as *const _ as usize }, 1usize, concat!( "Alignment of field: ", @@ -456,7 +456,7 @@ fn bindgen_test_layout_mpc_bus() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_bus)).bustype as *const _ as usize }, + unsafe { &(*std::ptr::null::()).bustype as *const _ as usize }, 2usize, concat!( "Alignment of field: ", @@ -493,7 +493,7 @@ fn bindgen_test_layout_mpc_ioapic() { concat!("Alignment of ", stringify!(mpc_ioapic)) ); assert_eq!( - unsafe { &(*(0 as *const mpc_ioapic)).type_ as *const _ as usize }, + unsafe { &(*std::ptr::null::()).type_ as *const _ as usize }, 0usize, concat!( "Alignment of field: ", @@ -503,7 +503,7 @@ fn bindgen_test_layout_mpc_ioapic() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_ioapic)).apicid as *const _ as usize }, + unsafe { &(*std::ptr::null::()).apicid as *const _ as usize }, 1usize, concat!( "Alignment of field: ", @@ -513,7 +513,7 @@ fn bindgen_test_layout_mpc_ioapic() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_ioapic)).apicver as *const _ as usize }, + unsafe { &(*std::ptr::null::()).apicver as *const _ as usize }, 2usize, concat!( "Alignment of field: ", @@ -523,7 +523,7 @@ fn bindgen_test_layout_mpc_ioapic() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_ioapic)).flags as *const _ as usize }, + unsafe { &(*std::ptr::null::()).flags as *const _ as usize }, 3usize, concat!( "Alignment of field: ", @@ -533,7 +533,7 @@ fn bindgen_test_layout_mpc_ioapic() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_ioapic)).apicaddr as *const _ as usize }, + unsafe { &(*std::ptr::null::()).apicaddr as *const _ as usize }, 4usize, concat!( "Alignment of field: ", @@ -572,7 +572,7 @@ fn bindgen_test_layout_mpc_intsrc() { concat!("Alignment of ", stringify!(mpc_intsrc)) ); assert_eq!( - unsafe { &(*(0 as *const mpc_intsrc)).type_ as *const _ as usize }, + unsafe { &(*std::ptr::null::()).type_ as *const _ as usize }, 0usize, concat!( "Alignment of field: ", @@ -582,7 +582,7 @@ fn bindgen_test_layout_mpc_intsrc() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_intsrc)).irqtype as *const _ as usize }, + unsafe { &(*std::ptr::null::()).irqtype as *const _ as usize }, 1usize, concat!( "Alignment of field: ", @@ -592,7 +592,7 @@ fn bindgen_test_layout_mpc_intsrc() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_intsrc)).irqflag as *const _ as usize }, + unsafe { &(*std::ptr::null::()).irqflag as *const _ as usize }, 2usize, concat!( "Alignment of field: ", @@ -602,7 +602,7 @@ fn bindgen_test_layout_mpc_intsrc() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_intsrc)).srcbus as *const _ as usize }, + unsafe { &(*std::ptr::null::()).srcbus as *const _ as usize }, 4usize, concat!( "Alignment of field: ", @@ -612,7 +612,7 @@ fn bindgen_test_layout_mpc_intsrc() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_intsrc)).srcbusirq as *const _ as usize }, + unsafe { &(*std::ptr::null::()).srcbusirq as *const _ as usize }, 5usize, concat!( "Alignment of field: ", @@ -622,7 +622,7 @@ fn bindgen_test_layout_mpc_intsrc() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_intsrc)).dstapic as *const _ as usize }, + unsafe { &(*std::ptr::null::()).dstapic as *const _ as usize }, 6usize, concat!( "Alignment of field: ", @@ -632,7 +632,7 @@ fn bindgen_test_layout_mpc_intsrc() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_intsrc)).dstirq as *const _ as usize }, + unsafe { &(*std::ptr::null::()).dstirq as *const _ as usize }, 7usize, concat!( "Alignment of field: ", @@ -676,7 +676,7 @@ fn bindgen_test_layout_mpc_lintsrc() { concat!("Alignment of ", stringify!(mpc_lintsrc)) ); assert_eq!( - unsafe { &(*(0 as *const mpc_lintsrc)).type_ as *const _ as usize }, + unsafe { &(*std::ptr::null::()).type_ as *const _ as usize }, 0usize, concat!( "Alignment of field: ", @@ -686,7 +686,7 @@ fn bindgen_test_layout_mpc_lintsrc() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_lintsrc)).irqtype as *const _ as usize }, + unsafe { &(*std::ptr::null::()).irqtype as *const _ as usize }, 1usize, concat!( "Alignment of field: ", @@ -696,7 +696,7 @@ fn bindgen_test_layout_mpc_lintsrc() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_lintsrc)).irqflag as *const _ as usize }, + unsafe { &(*std::ptr::null::()).irqflag as *const _ as usize }, 2usize, concat!( "Alignment of field: ", @@ -706,7 +706,7 @@ fn bindgen_test_layout_mpc_lintsrc() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_lintsrc)).srcbusid as *const _ as usize }, + unsafe { &(*std::ptr::null::()).srcbusid as *const _ as usize }, 4usize, concat!( "Alignment of field: ", @@ -716,7 +716,7 @@ fn bindgen_test_layout_mpc_lintsrc() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_lintsrc)).srcbusirq as *const _ as usize }, + unsafe { &(*std::ptr::null::()).srcbusirq as *const _ as usize }, 5usize, concat!( "Alignment of field: ", @@ -726,7 +726,7 @@ fn bindgen_test_layout_mpc_lintsrc() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_lintsrc)).destapic as *const _ as usize }, + unsafe { &(*std::ptr::null::()).destapic as *const _ as usize }, 6usize, concat!( "Alignment of field: ", @@ -736,7 +736,7 @@ fn bindgen_test_layout_mpc_lintsrc() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_lintsrc)).destapiclint as *const _ as usize }, + unsafe { &(*std::ptr::null::()).destapiclint as *const _ as usize }, 7usize, concat!( "Alignment of field: ", @@ -773,7 +773,7 @@ fn bindgen_test_layout_mpc_oemtable() { concat!("Alignment of ", stringify!(mpc_oemtable)) ); assert_eq!( - unsafe { &(*(0 as *const mpc_oemtable)).signature as *const _ as usize }, + unsafe { &(*std::ptr::null::()).signature as *const _ as usize }, 0usize, concat!( "Alignment of field: ", @@ -783,7 +783,7 @@ fn bindgen_test_layout_mpc_oemtable() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_oemtable)).length as *const _ as usize }, + unsafe { &(*std::ptr::null::()).length as *const _ as usize }, 4usize, concat!( "Alignment of field: ", @@ -793,7 +793,7 @@ fn bindgen_test_layout_mpc_oemtable() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_oemtable)).rev as *const _ as usize }, + unsafe { &(*std::ptr::null::()).rev as *const _ as usize }, 6usize, concat!( "Alignment of field: ", @@ -803,7 +803,7 @@ fn bindgen_test_layout_mpc_oemtable() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_oemtable)).checksum as *const _ as usize }, + unsafe { &(*std::ptr::null::()).checksum as *const _ as usize }, 7usize, concat!( "Alignment of field: ", @@ -813,7 +813,7 @@ fn bindgen_test_layout_mpc_oemtable() { ) ); assert_eq!( - unsafe { &(*(0 as *const mpc_oemtable)).mpc as *const _ as usize }, + unsafe { &(*std::ptr::null::()).mpc as *const _ as usize }, 8usize, concat!( "Alignment of field: ", diff --git a/src/vmm/src/core/vmm.rs b/src/vmm/src/core/vmm.rs index 1b9ea2d..6b5097a 100644 --- a/src/vmm/src/core/vmm.rs +++ b/src/vmm/src/core/vmm.rs @@ -324,7 +324,6 @@ impl VMM { } } } - /// Configure the VMM: /// * `num_vcpus` Number of virtual CPUs /// * `mem_size_mb` Memory size (in MB) diff --git a/src/vmm/src/grpc/client.rs b/src/vmm/src/grpc/client.rs new file mode 100644 index 0000000..c9a93cc --- /dev/null +++ b/src/vmm/src/grpc/client.rs @@ -0,0 +1,40 @@ +use self::agent::{workload_runner_client::WorkloadRunnerClient, ExecuteRequest}; +use log::error; +use std::{net::Ipv4Addr, time::Duration}; +use tonic::{transport::Channel, Streaming}; + +pub mod agent { + tonic::include_proto!("cloudlet.agent"); +} + +pub struct WorkloadClient { + client: WorkloadRunnerClient, +} + +impl WorkloadClient { + pub async fn new(ip: Ipv4Addr, port: u16) -> Result { + let delay = Duration::from_secs(2); // Setting initial delay to 2 seconds + loop { + match WorkloadRunnerClient::connect(format!("http://[{}]:{}", ip, port)).await { + Ok(client) => { + return Ok(WorkloadClient { client }); + } + Err(err) => { + error!("Failed to connect to Agent service: {}", err); + error!("Retrying in {:?}...", delay); + tokio::time::sleep(delay).await; + } + } + } + } + + pub async fn execute( + &mut self, + request: ExecuteRequest, + ) -> Result, tonic::Status> { + let request = tonic::Request::new(request); + let response_stream = self.client.execute(request).await?.into_inner(); + + Ok(response_stream) + } +} diff --git a/src/vmm/src/service.rs b/src/vmm/src/grpc/server.rs similarity index 52% rename from src/vmm/src/service.rs rename to src/vmm/src/grpc/server.rs index bd1dfac..1a8fb2c 100644 --- a/src/vmm/src/service.rs +++ b/src/vmm/src/grpc/server.rs @@ -1,21 +1,28 @@ -use self::vmmorchestrator::{ - vmm_service_server::VmmService as VmmServiceTrait, RunVmmRequest, RunVmmResponse, -}; -use crate::core::vmm::VMM; +use self::vmmorchestrator::{vmm_service_server::VmmService as VmmServiceTrait, RunVmmRequest}; +use crate::grpc::client::agent::ExecuteRequest; use crate::VmmErrors; +use crate::{core::vmm::VMM, grpc::client::WorkloadClient}; +use std::time::Duration; use std::{ convert::From, net::Ipv4Addr, path::{Path, PathBuf}, process::{Command, Stdio}, }; +use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; use tracing::{error, info}; +type Result = std::result::Result, tonic::Status>; + pub mod vmmorchestrator { tonic::include_proto!("vmmorchestrator"); } +pub mod agent { + tonic::include_proto!("cloudlet.agent"); +} + // Implement the From trait for VmmErrors into Status impl From for Status { fn from(error: VmmErrors) -> Self { @@ -33,13 +40,14 @@ pub struct VmmService; #[tonic::async_trait] impl VmmServiceTrait for VmmService { - async fn run( - &self, - _request: Request, - ) -> Result, Status> { - let response = vmmorchestrator::RunVmmResponse {}; + type RunStream = + ReceiverStream>; + + async fn run(&self, _request: Request) -> Result { + let (tx, rx) = tokio::sync::mpsc::channel(4); const HOST_IP: Ipv4Addr = Ipv4Addr::new(172, 29, 0, 1); + const VM_IP: Ipv4Addr = Ipv4Addr::new(172, 29, 0, 2); const HOST_NETMASK: Ipv4Addr = Ipv4Addr::new(255, 255, 0, 0); // Check if the kernel is on the system, else build it @@ -74,9 +82,50 @@ impl VmmServiceTrait for VmmService { // Configure the VMM parameters might need to be calculated rather than hardcoded vmm.configure(1, 512, kernel_path, &Some(initramfs_path)) .map_err(VmmErrors::VmmConfigure)?; - // Run the VMM - vmm.run().map_err(VmmErrors::VmmRun)?; - Ok(Response::new(response)) + // Run the VMM in a separate task + tokio::spawn(async move { + info!("Running VMM"); + if let Err(err) = vmm.run().map_err(VmmErrors::VmmRun) { + error!("Error running VMM: {:?}", err); + } + }); + + let grpc_client = tokio::spawn(async move { + // Wait 2 seconds + tokio::time::sleep(Duration::from_secs(2)).await; + println!("Connecting to Agent service"); + + WorkloadClient::new(VM_IP, 50051).await + }) + .await + .unwrap(); + + // Send the grpc request to start the agent + let execute_request = ExecuteRequest {}; + + match grpc_client { + Ok(mut client) => { + info!("Successfully connected to Agent service"); + + // Start the execution + let mut response_stream = client.execute(execute_request).await?; + + // Process each message as it arrives + while let Some(response) = response_stream.message().await? { + let vmm_response = vmmorchestrator::ExecuteResponse { + stdout: response.stdout, + stderr: response.stderr, + exit_code: response.exit_code, + }; + tx.send(Ok(vmm_response)).await.unwrap(); + } + } + Err(e) => { + error!("ERROR {:?}", e); + } + } + + Ok(Response::new(ReceiverStream::new(rx))) } } diff --git a/src/vmm/src/lib.rs b/src/vmm/src/lib.rs index f5df684..c4b00ad 100644 --- a/src/vmm/src/lib.rs +++ b/src/vmm/src/lib.rs @@ -1,5 +1,8 @@ pub mod core; -pub mod service; +pub mod grpc { + pub mod client; + pub mod server; +} #[derive(Debug)] pub enum VmmErrors { diff --git a/src/vmm/src/main.rs b/src/vmm/src/main.rs index 3662646..9947bf0 100644 --- a/src/vmm/src/main.rs +++ b/src/vmm/src/main.rs @@ -4,7 +4,7 @@ use tonic::transport::Server; use tracing::info; use vmm::{ core::vmm::VMM, - service::{vmmorchestrator, VmmService}, + grpc::server::{vmmorchestrator, VmmService}, VmmErrors, }; mod args;