diff --git a/src/agent/Cargo.toml b/src/agent/Cargo.toml index 18c2cf0..daa40d6 100644 --- a/src/agent/Cargo.toml +++ b/src/agent/Cargo.toml @@ -8,7 +8,7 @@ name = "agent" path = "src/lib.rs" [dependencies] -clap = { version = "4.5.4", features = ["derive"] } +clap = { version = "4.5.4", features = ["derive", "env"] } prost = "0.12.4" rand = "0.8.5" serde = { version = "1.0.197", features = ["derive"] } diff --git a/src/agent/examples/config.toml b/src/agent/examples/config.toml index 5efd82c..41eaaea 100644 --- a/src/agent/examples/config.toml +++ b/src/agent/examples/config.toml @@ -7,5 +7,5 @@ address = "localhost" port = 50051 [build] -source-code-path = "CHANGE/cloudlet/src/agent/examples/main.rs" +source-code-path = "/opt/repositories/virt-do/cloudlet/src/agent/examples/main.rs" release = true diff --git a/src/agent/src/agents/mod.rs b/src/agent/src/agents/mod.rs index 5d3062b..13e3fff 100644 --- a/src/agent/src/agents/mod.rs +++ b/src/agent/src/agents/mod.rs @@ -34,3 +34,16 @@ impl std::fmt::Display for Language { } } } + +impl TryFrom<&str> for Language { + type Error = String; + + fn try_from(value: &str) -> Result { + match value { + "rust" => Ok(Language::Rust), + #[cfg(feature = "debug-agent")] + "debug" => Ok(Language::Debug), + _ => Err(format!("Invalid language: {}", value)), + } + } +} diff --git a/src/agent/src/agents/rust.rs b/src/agent/src/agents/rust.rs index 5cbd3b4..c34f3b4 100644 --- a/src/agent/src/agents/rust.rs +++ b/src/agent/src/agents/rust.rs @@ -8,7 +8,6 @@ use std::{fs::create_dir_all, process::Command}; #[serde(rename_all = "kebab-case")] struct RustAgentBuildConfig { release: bool, - source_code_path: String, } #[derive(Deserialize)] @@ -66,8 +65,6 @@ impl From for RustAgent { impl Agent for RustAgent { fn prepare(&self) -> AgentResult { - let code = std::fs::read_to_string(&self.rust_config.build.source_code_path).unwrap(); - let function_dir = format!( "/tmp/{}", Alphanumeric.sample_string(&mut rand::thread_rng(), 16) @@ -77,8 +74,11 @@ impl Agent for RustAgent { create_dir_all(format!("{}/src", &function_dir)).expect("Unable to create directory"); - std::fs::write(format!("{}/src/main.rs", &function_dir), code) - .expect("Unable to write main.rs file"); + std::fs::write( + format!("{}/src/main.rs", &function_dir), + &self.workload_config.code, + ) + .expect("Unable to write main.rs file"); let cargo_toml = format!( r#" diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index b9673c9..5945d48 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -1,32 +1,29 @@ use agent::{ - agent::workload_runner_server::WorkloadRunnerServer, - workload::{config::Config, runner::Runner, service::WorkloadRunnerService}, + agent::workload_runner_server::WorkloadRunnerServer, workload::service::WorkloadRunnerService, }; use clap::Parser; -use std::{net::ToSocketAddrs, path::PathBuf}; +use std::net::ToSocketAddrs; use tonic::transport::Server; #[derive(Debug, Parser)] struct Args { - #[clap(short, long, default_value = "/etc/cloudlet/agent/config.toml")] - config: PathBuf, + #[clap(long, env, default_value = "localhost")] + grpc_server_address: String, + #[clap(long, env, default_value = "50051")] + grpc_server_port: u16, } #[tokio::main] async fn main() -> Result<(), Box> { let args = Args::parse(); - let config = Config::from_file(&args.config).unwrap(); - - let bind_address = format!("{}:{}", config.server.address, config.server.port) + let bind_address = format!("{}:{}", args.grpc_server_address, args.grpc_server_port) .to_socket_addrs() .unwrap() .next() .unwrap(); - let runner = Runner::new(config); - - let server = WorkloadRunnerService::new(runner); + let server = WorkloadRunnerService; Server::builder() .add_service(WorkloadRunnerServer::new(server)) diff --git a/src/agent/src/workload/config.rs b/src/agent/src/workload/config.rs index b21f71c..40261b4 100644 --- a/src/agent/src/workload/config.rs +++ b/src/agent/src/workload/config.rs @@ -1,14 +1,11 @@ -use crate::{agents::Language, AgentError, AgentResult}; +use crate::{ + agent::{execute_request, ExecuteRequest}, + agents::Language, + AgentError, AgentResult, +}; use serde::Deserialize; use std::path::PathBuf; -#[derive(Debug, Clone, Deserialize)] -#[serde(rename_all = "kebab-case")] -pub struct Server { - pub address: String, - pub port: u16, -} - /// Generic agent configuration. #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "kebab-case")] @@ -19,10 +16,9 @@ pub struct Config { pub language: Language, /// Action to perform. pub action: Action, - /// Server configuration. - pub server: Server, + /// Code + pub code: String, /// Rest of the configuration as a string. - #[serde(skip)] pub config_string: String, } @@ -38,6 +34,17 @@ impl Config { Ok(config) } + + pub fn new_from_execute_request(execute_request: ExecuteRequest) -> Self { + Self { + workload_name: execute_request.workload_name.clone(), + // TODO: Fix this unwrap + language: Language::try_from(execute_request.language.clone().as_str()).unwrap(), + action: execute_request.action().into(), + config_string: execute_request.config_str, + code: execute_request.code, + } + } } #[derive(Debug, Clone, Deserialize)] @@ -47,3 +54,13 @@ pub enum Action { Run, PrepareAndRun, } + +impl From for Action { + fn from(value: execute_request::Action) -> Self { + match value { + execute_request::Action::Prepare => Action::Prepare, + execute_request::Action::Run => Action::Run, + execute_request::Action::PrepareAndRun => Action::PrepareAndRun, + } + } +} diff --git a/src/agent/src/workload/runner.rs b/src/agent/src/workload/runner.rs index c921ccd..47f4ba2 100644 --- a/src/agent/src/workload/runner.rs +++ b/src/agent/src/workload/runner.rs @@ -1,4 +1,5 @@ use crate::{ + agent::ExecuteRequest, agents::{rust, Agent, AgentOutput, Language}, workload::config::Action, AgentResult, @@ -27,6 +28,11 @@ impl Runner { Runner { config, agent } } + pub fn new_from_execute_request(execute_request: ExecuteRequest) -> Self { + let config = Config::new_from_execute_request(execute_request); + Self::new(config) + } + pub fn run(&self) -> AgentResult { let result = match self.config.action { Action::Prepare => self.agent.prepare()?, diff --git a/src/agent/src/workload/service.rs b/src/agent/src/workload/service.rs index dca0e13..90ae1ff 100644 --- a/src/agent/src/workload/service.rs +++ b/src/agent/src/workload/service.rs @@ -1,37 +1,24 @@ use super::runner::Runner; -use crate::agent::{self, ExecuteRequest, ExecuteResponse, SignalRequest}; +use crate::agent::{self, execute_response::Stage, ExecuteRequest, ExecuteResponse, SignalRequest}; use agent::workload_runner_server::WorkloadRunner; -use std::{process, sync::Arc}; -use tokio::sync::Mutex; +use std::process; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response}; type Result = std::result::Result, tonic::Status>; -pub struct WorkloadRunnerService { - runner: Arc>, -} - -impl WorkloadRunnerService { - pub fn new(runner: Runner) -> Self { - WorkloadRunnerService { - runner: Arc::new(Mutex::new(runner)), - } - } -} +pub struct WorkloadRunnerService; #[tonic::async_trait] impl WorkloadRunner for WorkloadRunnerService { type ExecuteStream = ReceiverStream>; - async fn execute(&self, _: Request) -> Result { + async fn execute(&self, req: Request) -> Result { let (tx, rx) = tokio::sync::mpsc::channel(4); - // We assume there's only one request at a time - let runner = self - .runner - .try_lock() - .map_err(|e| tonic::Status::unavailable(format!("Runner is busy: {:?}", e)))?; + let execute_request = req.into_inner(); + + let runner = Runner::new_from_execute_request(execute_request); let res = runner .run() @@ -39,6 +26,7 @@ impl WorkloadRunner for WorkloadRunnerService { let _ = tx .send(Ok(ExecuteResponse { + stage: Stage::Done as i32, stdout: res.stdout, stderr: res.stderr, exit_code: res.exit_code,