diff --git a/src/agent/Cargo.toml b/src/agent/Cargo.toml index daa40d6..3f531e1 100644 --- a/src/agent/Cargo.toml +++ b/src/agent/Cargo.toml @@ -8,7 +8,10 @@ name = "agent" path = "src/lib.rs" [dependencies] +async-trait = "0.1.80" clap = { version = "4.5.4", features = ["derive", "env"] } +nix = { version = "0.28.0", features = ["signal"] } +once_cell = "1.19.0" prost = "0.12.4" rand = "0.8.5" serde = { version = "1.0.197", features = ["derive"] } diff --git a/src/agent/src/agents/debug.rs b/src/agent/src/agents/debug.rs index ceae270..dbbf241 100644 --- a/src/agent/src/agents/debug.rs +++ b/src/agent/src/agents/debug.rs @@ -1,8 +1,12 @@ use super::AgentOutput; use crate::agents::Agent; use crate::{workload, AgentResult}; +use async_trait::async_trait; +use std::collections::HashSet; use std::fs::create_dir_all; +use std::sync::Arc; use std::time::SystemTime; +use tokio::sync::Mutex; pub struct DebugAgent { workload_config: workload::config::Config, @@ -14,8 +18,9 @@ impl From for DebugAgent { } } +#[async_trait] impl Agent for DebugAgent { - fn prepare(&self) -> AgentResult { + async fn prepare(&self, _: Arc>>) -> AgentResult { let dir = format!("/tmp/{}", self.workload_config.workload_name); println!("Function directory: {}", dir); @@ -39,7 +44,7 @@ impl Agent for DebugAgent { }) } - fn run(&self) -> AgentResult { + async fn run(&self, _: Arc>>) -> AgentResult { let dir = format!("/tmp/{}", self.workload_config.workload_name); let content = std::fs::read_to_string(format!("{}/debug.txt", &dir)) diff --git a/src/agent/src/agents/mod.rs b/src/agent/src/agents/mod.rs index f70d43b..9d4032c 100644 --- a/src/agent/src/agents/mod.rs +++ b/src/agent/src/agents/mod.rs @@ -1,5 +1,9 @@ use crate::{AgentError, AgentResult}; +use async_trait::async_trait; use serde::Deserialize; +use std::collections::HashSet; +use std::sync::Arc; +use tokio::sync::Mutex; #[cfg(feature = "debug-agent")] pub mod debug; @@ -12,9 +16,10 @@ pub struct AgentOutput { pub stderr: String, } +#[async_trait] pub trait Agent { - fn prepare(&self) -> AgentResult; - fn run(&self) -> AgentResult; + async fn prepare(&self, child_processes: Arc>>) -> AgentResult; + async fn run(&self, child_processes: Arc>>) -> AgentResult; } #[derive(Debug, Clone, Deserialize)] diff --git a/src/agent/src/agents/rust.rs b/src/agent/src/agents/rust.rs index c34f3b4..207276e 100644 --- a/src/agent/src/agents/rust.rs +++ b/src/agent/src/agents/rust.rs @@ -1,8 +1,13 @@ use super::{Agent, AgentOutput}; use crate::{workload, AgentError, AgentResult}; +use async_trait::async_trait; use rand::distributions::{Alphanumeric, DistString}; use serde::Deserialize; -use std::{fs::create_dir_all, process::Command}; +use std::collections::HashSet; +use std::fs::create_dir_all; +use std::sync::Arc; +use tokio::process::Command; +use tokio::sync::Mutex; #[derive(Deserialize)] #[serde(rename_all = "kebab-case")] @@ -21,27 +26,45 @@ pub struct RustAgent { } impl RustAgent { - fn build(&self, function_dir: &String) -> AgentResult { + async fn build( + &self, + function_dir: &str, + child_processes: Arc>>, + ) -> AgentResult { if self.rust_config.build.release { - let output = Command::new("cargo") + let child = Command::new("cargo") .arg("build") .arg("--release") .current_dir(function_dir) - .output() + .spawn() .expect("Failed to build function"); + { + child_processes.lock().await.insert(child.id().unwrap()); + } + + let output = child + .wait_with_output() + .await + .expect("Failed to wait on child"); + Ok(AgentOutput { exit_code: output.status.code().unwrap(), stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(), stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(), }) } else { - let output = Command::new("cargo") + let child = Command::new("cargo") .arg("build") .current_dir(function_dir) - .output() + .spawn() .expect("Failed to build function"); + let output = child + .wait_with_output() + .await + .expect("Failed to wait on child"); + Ok(AgentOutput { exit_code: output.status.code().unwrap(), stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(), @@ -63,8 +86,9 @@ impl From for RustAgent { } } +#[async_trait] impl Agent for RustAgent { - fn prepare(&self) -> AgentResult { + async fn prepare(&self, child_processes: Arc>>) -> AgentResult { let function_dir = format!( "/tmp/{}", Alphanumeric.sample_string(&mut rand::thread_rng(), 16) @@ -85,7 +109,7 @@ impl Agent for RustAgent { [package] name = "{}" version = "0.1.0" - edition = "2018" + edition = "2021" "#, self.workload_config.workload_name ); @@ -93,7 +117,7 @@ impl Agent for RustAgent { std::fs::write(format!("{}/Cargo.toml", &function_dir), cargo_toml) .expect("Unable to write Cargo.toml file"); - let result = self.build(&function_dir)?; + let result = self.build(&function_dir, child_processes).await?; if result.exit_code != 0 { println!("Build failed: {:?}", result); @@ -131,11 +155,20 @@ impl Agent for RustAgent { }) } - fn run(&self) -> AgentResult { - let output = Command::new(format!("/tmp/{}", self.workload_config.workload_name)) - .output() + async fn run(&self, child_processes: Arc>>) -> AgentResult { + let child = Command::new(format!("/tmp/{}", self.workload_config.workload_name)) + .spawn() .expect("Failed to run function"); + { + child_processes.lock().await.insert(child.id().unwrap()); + } + + let output = child + .wait_with_output() + .await + .expect("Failed to wait on child"); + let agent_output = AgentOutput { exit_code: output.status.code().unwrap(), stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(), diff --git a/src/agent/src/workload/runner.rs b/src/agent/src/workload/runner.rs index ae4e175..1cbeb61 100644 --- a/src/agent/src/workload/runner.rs +++ b/src/agent/src/workload/runner.rs @@ -1,46 +1,63 @@ +use super::config::Config; use crate::{ agent::ExecuteRequest, agents::{rust, Agent, AgentOutput, Language}, workload::config::Action, AgentError, AgentResult, }; +use std::collections::HashSet; +use std::sync::Arc; +use tokio::sync::Mutex; #[cfg(feature = "debug-agent")] use crate::agents::debug; -use super::config::Config; - /// Runner for a workload. /// Will execute the workload based on the inner agent (language). pub struct Runner { config: Config, agent: Box, + child_processes: Arc>>, } impl Runner { - pub fn new(config: Config) -> Self { + pub fn new(config: Config, child_processes: Arc>>) -> Self { let agent: Box = match config.language { Language::Rust => Box::new(rust::RustAgent::from(config.clone())), #[cfg(feature = "debug-agent")] Language::Debug => Box::new(debug::DebugAgent::from(config.clone())), }; - Runner { config, agent } + Runner { + config, + agent, + child_processes, + } } - pub fn new_from_execute_request(execute_request: ExecuteRequest) -> Result { + pub fn new_from_execute_request( + execute_request: ExecuteRequest, + child_processes: Arc>>, + ) -> Result { let config = Config::new_from_execute_request(execute_request)?; - Ok(Self::new(config)) + Ok(Self::new(config, child_processes)) } - pub fn run(&self) -> AgentResult { + pub async fn run(&self) -> AgentResult { let result = match self.config.action { - Action::Prepare => self.agent.prepare()?, - Action::Run => self.agent.run()?, + Action::Prepare => { + self.agent + .prepare(Arc::clone(&self.child_processes)) + .await? + } + Action::Run => self.agent.run(Arc::clone(&self.child_processes)).await?, Action::PrepareAndRun => { - let res = self.agent.prepare()?; + let res = self + .agent + .prepare(Arc::clone(&self.child_processes)) + .await?; println!("Prepare result {:?}", res); - self.agent.run()? + self.agent.run(Arc::clone(&self.child_processes)).await? } }; diff --git a/src/agent/src/workload/service.rs b/src/agent/src/workload/service.rs index 3b05386..996d256 100644 --- a/src/agent/src/workload/service.rs +++ b/src/agent/src/workload/service.rs @@ -1,12 +1,18 @@ use super::runner::Runner; use crate::agent::{self, execute_response::Stage, ExecuteRequest, ExecuteResponse, SignalRequest}; use agent::workload_runner_server::WorkloadRunner; -use std::process; +use once_cell::sync::Lazy; +use std::collections::HashSet; +use std::{process, sync::Arc}; +use tokio::sync::Mutex; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response}; type Result = std::result::Result, tonic::Status>; +static CHILD_PROCESSES: Lazy>>> = + Lazy::new(|| Arc::new(Mutex::new(HashSet::new()))); + pub struct WorkloadRunnerService; #[tonic::async_trait] @@ -18,11 +24,12 @@ impl WorkloadRunner for WorkloadRunnerService { let execute_request = req.into_inner(); - let runner = Runner::new_from_execute_request(execute_request) + let runner = Runner::new_from_execute_request(execute_request, CHILD_PROCESSES.clone()) .map_err(|e| tonic::Status::internal(e.to_string()))?; let res = runner .run() + .await .map_err(|e| tonic::Status::internal(e.to_string()))?; let _ = tx @@ -42,6 +49,21 @@ impl WorkloadRunner for WorkloadRunnerService { } async fn signal(&self, _: Request) -> Result<()> { + let child_processes = CHILD_PROCESSES.lock().await; + + for &child_id in child_processes.iter() { + match nix::sys::signal::kill( + nix::unistd::Pid::from_raw(child_id as i32), + nix::sys::signal::Signal::SIGTERM, + ) { + Ok(_) => println!("Sent SIGTERM to child process {}", child_id), + Err(e) => println!( + "Failed to send SIGTERM to child process {}: {}", + child_id, e + ), + } + } + process::exit(0); } }