From 71b9fc538d13c3adc15baa70831218bd1dea00c2 Mon Sep 17 00:00:00 2001 From: Thomas Mauran <78204354+thomas-mauran@users.noreply.github.com> Date: Wed, 1 May 2024 16:33:21 +0200 Subject: [PATCH] feat: send code though gRPC to the agent (#37) * feat(agent/proto): add agent configuration in execute request Signed-off-by: Martin Moreira de Jesus * feat: send code through cli and api to vm Signed-off-by: Mauran --------- Signed-off-by: Martin Moreira de Jesus Signed-off-by: Mauran Co-authored-by: Martin Moreira de Jesus --- proto/agent.proto | 17 ++++++++++++-- proto/vmm.proto | 7 +++--- src/agent/Cargo.toml | 2 +- src/agent/examples/config.toml | 2 +- src/agent/src/agents/mod.rs | 18 ++++++++++++++- src/agent/src/agents/rust.rs | 10 ++++---- src/agent/src/lib.rs | 2 ++ src/agent/src/main.rs | 19 +++++++--------- src/agent/src/workload/config.rs | 38 ++++++++++++++++++++++--------- src/agent/src/workload/runner.rs | 8 ++++++- src/agent/src/workload/service.rs | 29 ++++++++--------------- src/api/src/service.rs | 12 +++++++--- src/cli/src/services.rs | 32 +++++++++----------------- src/shared-models/src/lib.rs | 20 ++++++++++++++-- src/vmm/src/grpc/server.rs | 18 ++++++++++++--- tools/rootfs/mkrootfs.sh | 8 +------ 16 files changed, 149 insertions(+), 93 deletions(-) diff --git a/proto/agent.proto b/proto/agent.proto index 5b6f521..84b3798 100644 --- a/proto/agent.proto +++ b/proto/agent.proto @@ -2,7 +2,19 @@ syntax = "proto3"; package cloudlet.agent; import "google/protobuf/empty.proto"; -message ExecuteRequest {} +message ExecuteRequest { + enum Action { + RUN = 0; + PREPARE = 1; + PREPARE_AND_RUN = 2; + } + + string workload_name = 1; + string language = 2; + Action action = 3; + string code = 4; + string config_str = 5; +} message ExecuteResponse { enum Stage { @@ -16,7 +28,8 @@ message ExecuteResponse { string stdout = 1; string stderr = 2; - int32 exit_code = 3; + Stage stage = 3; + int32 exit_code = 4; } message SignalRequest { diff --git a/proto/vmm.proto b/proto/vmm.proto index 4b394ab..e954584 100644 --- a/proto/vmm.proto +++ b/proto/vmm.proto @@ -36,10 +36,9 @@ service VmmService { } message RunVmmRequest { - - Language language = 1; - string code = 2; - string env = 3; + string workload_name = 1; + Language language = 2; + string code = 3; LogLevel log_level = 4; } diff --git a/src/agent/Cargo.toml b/src/agent/Cargo.toml index 18c2cf0..daa40d6 100644 --- a/src/agent/Cargo.toml +++ b/src/agent/Cargo.toml @@ -8,7 +8,7 @@ name = "agent" path = "src/lib.rs" [dependencies] -clap = { version = "4.5.4", features = ["derive"] } +clap = { version = "4.5.4", features = ["derive", "env"] } prost = "0.12.4" rand = "0.8.5" serde = { version = "1.0.197", features = ["derive"] } diff --git a/src/agent/examples/config.toml b/src/agent/examples/config.toml index 5efd82c..013193c 100644 --- a/src/agent/examples/config.toml +++ b/src/agent/examples/config.toml @@ -7,5 +7,5 @@ address = "localhost" port = 50051 [build] -source-code-path = "CHANGE/cloudlet/src/agent/examples/main.rs" +source-code-path = "CHANGEME/src/agent/examples/main.rs" release = true diff --git a/src/agent/src/agents/mod.rs b/src/agent/src/agents/mod.rs index 5d3062b..f70d43b 100644 --- a/src/agent/src/agents/mod.rs +++ b/src/agent/src/agents/mod.rs @@ -1,4 +1,4 @@ -use crate::AgentResult; +use crate::{AgentError, AgentResult}; use serde::Deserialize; #[cfg(feature = "debug-agent")] @@ -34,3 +34,19 @@ impl std::fmt::Display for Language { } } } + +impl TryFrom<&str> for Language { + type Error = AgentError; + + fn try_from(value: &str) -> Result { + match value { + "rust" => Ok(Language::Rust), + #[cfg(feature = "debug-agent")] + "debug" => Ok(Language::Debug), + _ => Err(AgentError::InvalidLanguage(format!( + "Invalid language: {}", + value + ))), + } + } +} diff --git a/src/agent/src/agents/rust.rs b/src/agent/src/agents/rust.rs index 5cbd3b4..c34f3b4 100644 --- a/src/agent/src/agents/rust.rs +++ b/src/agent/src/agents/rust.rs @@ -8,7 +8,6 @@ use std::{fs::create_dir_all, process::Command}; #[serde(rename_all = "kebab-case")] struct RustAgentBuildConfig { release: bool, - source_code_path: String, } #[derive(Deserialize)] @@ -66,8 +65,6 @@ impl From for RustAgent { impl Agent for RustAgent { fn prepare(&self) -> AgentResult { - let code = std::fs::read_to_string(&self.rust_config.build.source_code_path).unwrap(); - let function_dir = format!( "/tmp/{}", Alphanumeric.sample_string(&mut rand::thread_rng(), 16) @@ -77,8 +74,11 @@ impl Agent for RustAgent { create_dir_all(format!("{}/src", &function_dir)).expect("Unable to create directory"); - std::fs::write(format!("{}/src/main.rs", &function_dir), code) - .expect("Unable to write main.rs file"); + std::fs::write( + format!("{}/src/main.rs", &function_dir), + &self.workload_config.code, + ) + .expect("Unable to write main.rs file"); let cargo_toml = format!( r#" diff --git a/src/agent/src/lib.rs b/src/agent/src/lib.rs index 9332611..0c43daf 100644 --- a/src/agent/src/lib.rs +++ b/src/agent/src/lib.rs @@ -8,6 +8,7 @@ pub mod workload; pub enum AgentError { OpenConfigFileError(std::io::Error), ParseConfigError(toml::de::Error), + InvalidLanguage(String), BuildFailed(AgentOutput), } @@ -17,6 +18,7 @@ impl fmt::Display for AgentError { AgentError::OpenConfigFileError(e) => write!(f, "Failed to open config file: {}", e), AgentError::ParseConfigError(e) => write!(f, "Failed to parse config file: {}", e), AgentError::BuildFailed(output) => write!(f, "Build failed: {:?}", output), + AgentError::InvalidLanguage(e) => write!(f, "Invalid language: {}", e), } } } diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index b9673c9..93437f2 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -1,32 +1,29 @@ use agent::{ - agent::workload_runner_server::WorkloadRunnerServer, - workload::{config::Config, runner::Runner, service::WorkloadRunnerService}, + agent::workload_runner_server::WorkloadRunnerServer, workload::service::WorkloadRunnerService, }; use clap::Parser; -use std::{net::ToSocketAddrs, path::PathBuf}; +use std::net::ToSocketAddrs; use tonic::transport::Server; #[derive(Debug, Parser)] struct Args { - #[clap(short, long, default_value = "/etc/cloudlet/agent/config.toml")] - config: PathBuf, + #[clap(long, env, default_value = "0.0.0.0")] + grpc_server_address: String, + #[clap(long, env, default_value = "50051")] + grpc_server_port: u16, } #[tokio::main] async fn main() -> Result<(), Box> { let args = Args::parse(); - let config = Config::from_file(&args.config).unwrap(); - - let bind_address = format!("{}:{}", config.server.address, config.server.port) + let bind_address = format!("{}:{}", args.grpc_server_address, args.grpc_server_port) .to_socket_addrs() .unwrap() .next() .unwrap(); - let runner = Runner::new(config); - - let server = WorkloadRunnerService::new(runner); + let server = WorkloadRunnerService; Server::builder() .add_service(WorkloadRunnerServer::new(server)) diff --git a/src/agent/src/workload/config.rs b/src/agent/src/workload/config.rs index b21f71c..dd3c660 100644 --- a/src/agent/src/workload/config.rs +++ b/src/agent/src/workload/config.rs @@ -1,14 +1,11 @@ -use crate::{agents::Language, AgentError, AgentResult}; +use crate::{ + agent::{execute_request, ExecuteRequest}, + agents::Language, + AgentError, AgentResult, +}; use serde::Deserialize; use std::path::PathBuf; -#[derive(Debug, Clone, Deserialize)] -#[serde(rename_all = "kebab-case")] -pub struct Server { - pub address: String, - pub port: u16, -} - /// Generic agent configuration. #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "kebab-case")] @@ -19,10 +16,9 @@ pub struct Config { pub language: Language, /// Action to perform. pub action: Action, - /// Server configuration. - pub server: Server, + /// Code + pub code: String, /// Rest of the configuration as a string. - #[serde(skip)] pub config_string: String, } @@ -38,6 +34,16 @@ impl Config { Ok(config) } + + pub fn new_from_execute_request(execute_request: ExecuteRequest) -> Result { + Ok(Self { + workload_name: execute_request.workload_name.clone(), + language: Language::try_from(execute_request.language.clone().as_str())?, + action: execute_request.action().into(), + config_string: execute_request.config_str, + code: execute_request.code, + }) + } } #[derive(Debug, Clone, Deserialize)] @@ -47,3 +53,13 @@ pub enum Action { Run, PrepareAndRun, } + +impl From for Action { + fn from(value: execute_request::Action) -> Self { + match value { + execute_request::Action::Prepare => Action::Prepare, + execute_request::Action::Run => Action::Run, + execute_request::Action::PrepareAndRun => Action::PrepareAndRun, + } + } +} diff --git a/src/agent/src/workload/runner.rs b/src/agent/src/workload/runner.rs index c921ccd..ae4e175 100644 --- a/src/agent/src/workload/runner.rs +++ b/src/agent/src/workload/runner.rs @@ -1,7 +1,8 @@ use crate::{ + agent::ExecuteRequest, agents::{rust, Agent, AgentOutput, Language}, workload::config::Action, - AgentResult, + AgentError, AgentResult, }; #[cfg(feature = "debug-agent")] @@ -27,6 +28,11 @@ impl Runner { Runner { config, agent } } + pub fn new_from_execute_request(execute_request: ExecuteRequest) -> Result { + let config = Config::new_from_execute_request(execute_request)?; + Ok(Self::new(config)) + } + pub fn run(&self) -> AgentResult { let result = match self.config.action { Action::Prepare => self.agent.prepare()?, diff --git a/src/agent/src/workload/service.rs b/src/agent/src/workload/service.rs index dca0e13..3b05386 100644 --- a/src/agent/src/workload/service.rs +++ b/src/agent/src/workload/service.rs @@ -1,37 +1,25 @@ use super::runner::Runner; -use crate::agent::{self, ExecuteRequest, ExecuteResponse, SignalRequest}; +use crate::agent::{self, execute_response::Stage, ExecuteRequest, ExecuteResponse, SignalRequest}; use agent::workload_runner_server::WorkloadRunner; -use std::{process, sync::Arc}; -use tokio::sync::Mutex; +use std::process; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response}; type Result = std::result::Result, tonic::Status>; -pub struct WorkloadRunnerService { - runner: Arc>, -} - -impl WorkloadRunnerService { - pub fn new(runner: Runner) -> Self { - WorkloadRunnerService { - runner: Arc::new(Mutex::new(runner)), - } - } -} +pub struct WorkloadRunnerService; #[tonic::async_trait] impl WorkloadRunner for WorkloadRunnerService { type ExecuteStream = ReceiverStream>; - async fn execute(&self, _: Request) -> Result { + async fn execute(&self, req: Request) -> Result { let (tx, rx) = tokio::sync::mpsc::channel(4); - // We assume there's only one request at a time - let runner = self - .runner - .try_lock() - .map_err(|e| tonic::Status::unavailable(format!("Runner is busy: {:?}", e)))?; + let execute_request = req.into_inner(); + + let runner = Runner::new_from_execute_request(execute_request) + .map_err(|e| tonic::Status::internal(e.to_string()))?; let res = runner .run() @@ -39,6 +27,7 @@ impl WorkloadRunner for WorkloadRunnerService { let _ = tx .send(Ok(ExecuteResponse { + stage: Stage::Done as i32, stdout: res.stdout, stderr: res.stderr, exit_code: res.exit_code, diff --git a/src/api/src/service.rs b/src/api/src/service.rs index a312bf8..c35bc95 100644 --- a/src/api/src/service.rs +++ b/src/api/src/service.rs @@ -6,7 +6,7 @@ use actix_web::{post, web, HttpResponse, Responder}; use actix_web_lab::sse; use async_stream::stream; use serde::Serialize; -use shared_models::CloudletDtoRequest; +use shared_models::{CloudletDtoRequest, Language}; use tokio_stream::StreamExt; use tonic::Streaming; @@ -16,10 +16,16 @@ pub async fn run(req_body: web::Json) -> impl Responder { let mut client = VmmClient::new().await.unwrap(); + println!("Request: {:?}", req); + let vmm_request = RunVmmRequest { + workload_name: req.workload_name, code: req.code, - env: req.env, - language: req.language as i32, + language: match req.language { + Language::PYTHON => 0, + Language::NODE => 1, + Language::RUST => 2, + }, log_level: req.log_level as i32, }; diff --git a/src/cli/src/services.rs b/src/cli/src/services.rs index b2987e0..2ec38ba 100644 --- a/src/cli/src/services.rs +++ b/src/cli/src/services.rs @@ -1,32 +1,19 @@ use crate::utils::ConfigFileHandler; use reqwest::Client; use serde::Deserialize; -use shared_models::{CloudletDtoRequest, Language}; -use std::{error::Error, path::PathBuf}; +use shared_models::{BuildConfig, CloudletDtoRequest, Language, ServerConfig}; +use std::error::Error; -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] struct TomlConfig { #[serde(rename = "workload-name")] - _workload_name: String, + workload_name: String, language: Language, - _action: String, - _server: ServerConfig, + action: String, + server: ServerConfig, build: BuildConfig, } -#[derive(Deserialize)] -struct ServerConfig { - _address: String, - _port: u16, -} - -#[derive(Deserialize)] -struct BuildConfig { - #[serde(rename = "source-code-path")] - source_code_path: PathBuf, - _release: bool, -} - pub struct CloudletClient {} impl CloudletClient { @@ -34,16 +21,19 @@ impl CloudletClient { let config: TomlConfig = toml::from_str(&config).expect("Error while parsing the config file"); + let workload_name = config.workload_name; let code: String = ConfigFileHandler::read_file(&config.build.source_code_path) .expect("Error while reading the code file"); - let env = ""; let language = config.language; CloudletDtoRequest { + workload_name, language, code, - env: env.to_string(), log_level: shared_models::LogLevel::INFO, + server: config.server, + build: config.build, + action: config.action, } } diff --git a/src/shared-models/src/lib.rs b/src/shared-models/src/lib.rs index e32fe1b..2b0b673 100644 --- a/src/shared-models/src/lib.rs +++ b/src/shared-models/src/lib.rs @@ -22,18 +22,34 @@ pub enum LogLevel { #[derive(Deserialize, Debug)] pub struct TomlClientConfigFile { + pub worklaod_name: String, pub language: Language, - pub env_path: PathBuf, pub code_path: PathBuf, pub log_level: LogLevel, } #[derive(Serialize, Deserialize, Debug)] pub struct CloudletDtoRequest { + pub workload_name: String, pub language: Language, - pub env: String, pub code: String, pub log_level: LogLevel, + pub action: String, + pub server: ServerConfig, + pub build: BuildConfig, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct ServerConfig { + pub address: String, + pub port: u16, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct BuildConfig { + #[serde(rename = "source-code-path")] + pub source_code_path: PathBuf, + pub release: bool, } #[derive(Serialize, Deserialize, Debug)] diff --git a/src/vmm/src/grpc/server.rs b/src/vmm/src/grpc/server.rs index 24c01b6..facac57 100644 --- a/src/vmm/src/grpc/server.rs +++ b/src/vmm/src/grpc/server.rs @@ -43,7 +43,7 @@ impl VmmServiceTrait for VmmService { type RunStream = ReceiverStream>; - async fn run(&self, _request: Request) -> Result { + async fn run(&self, request: Request) -> Result { let (tx, rx) = tokio::sync::mpsc::channel(4); const HOST_IP: Ipv4Addr = Ipv4Addr::new(172, 29, 0, 1); @@ -102,14 +102,26 @@ impl VmmServiceTrait for VmmService { .unwrap(); // Send the grpc request to start the agent - let execute_request = ExecuteRequest {}; + let vmm_request = request.into_inner(); + let agent_request = ExecuteRequest { + workload_name: vmm_request.workload_name, + language: match vmm_request.language { + 0 => "python".to_string(), + 1 => "node".to_string(), + 2 => "rust".to_string(), + _ => unreachable!("Invalid language"), + }, + action: 2, // Prepare and run + code: vmm_request.code, + config_str: "[build]\nrelease = true".to_string(), + }; match grpc_client { Ok(mut client) => { info!("Successfully connected to Agent service"); // Start the execution - let mut response_stream = client.execute(execute_request).await?; + let mut response_stream = client.execute(agent_request).await?; // Process each message as it arrives while let Some(response) = response_stream.message().await? { diff --git a/tools/rootfs/mkrootfs.sh b/tools/rootfs/mkrootfs.sh index 61799ae..4058dae 100755 --- a/tools/rootfs/mkrootfs.sh +++ b/tools/rootfs/mkrootfs.sh @@ -27,13 +27,7 @@ mount -t sysfs sysfs /sys ip link set up dev lo -slattach -L /dev/ttyS1& - -while ! ifconfig sl0 &> /dev/null; do - sleep 1 -done - -ifconfig sl0 172.30.0.11 netmask 255.255.0.0 up +ifconfig eth0 172.29.0.2 netmask 255.255.0.0 up /agent