From 7453194476ecbd89b20f960850707d064dc7e559 Mon Sep 17 00:00:00 2001 From: ESPIE Date: Wed, 1 May 2024 12:59:48 +0200 Subject: [PATCH] feat: child processes are now gracefully sigterm-ed on kill signal reception --- src/agent/Cargo.toml | 2 + src/agent/src/agents/debug.rs | 75 ++++++++------ src/agent/src/agents/mod.rs | 15 ++- src/agent/src/agents/rust.rs | 162 +++++++++++++++++------------- src/agent/src/main.rs | 11 +- src/agent/src/workload/runner.rs | 22 ++-- src/agent/src/workload/service.rs | 16 ++- 7 files changed, 193 insertions(+), 110 deletions(-) diff --git a/src/agent/Cargo.toml b/src/agent/Cargo.toml index 18c2cf0..9bfe93e 100644 --- a/src/agent/Cargo.toml +++ b/src/agent/Cargo.toml @@ -16,6 +16,8 @@ tokio = { version = "1.37.0", features = ["full"] } tokio-stream = "0.1.15" toml = "0.8.12" tonic = "0.11" +nix = { version = "0.28.0", features = ["signal"] } +once_cell = "1.19.0" [build-dependencies] tonic-build = "0.11" diff --git a/src/agent/src/agents/debug.rs b/src/agent/src/agents/debug.rs index ceae270..3161d1c 100644 --- a/src/agent/src/agents/debug.rs +++ b/src/agent/src/agents/debug.rs @@ -1,8 +1,13 @@ use super::AgentOutput; use crate::agents::Agent; use crate::{workload, AgentResult}; +use std::collections::HashSet; use std::fs::create_dir_all; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; use std::time::SystemTime; +use tokio::sync::Mutex; pub struct DebugAgent { workload_config: workload::config::Config, @@ -15,42 +20,52 @@ impl From for DebugAgent { } impl Agent for DebugAgent { - fn prepare(&self) -> AgentResult { - let dir = format!("/tmp/{}", self.workload_config.workload_name); - - println!("Function directory: {}", dir); - - create_dir_all(&dir).expect("Unable to create directory"); - - std::fs::write( - format!("{}/debug.txt", &dir), - format!( - "Debug agent for {} - written at {:?}", - self.workload_config.workload_name, - SystemTime::now(), - ), - ) - .expect("Unable to write debug.txt file"); - - Ok(AgentOutput { - exit_code: 0, - stdout: "Build successfully!".into(), - stderr: String::default(), + fn prepare( + &self, + _: &Arc>>, + ) -> Pin> + Send + '_>> { + Box::pin(async { + let dir = format!("/tmp/{}", self.workload_config.workload_name); + + println!("Function directory: {}", dir); + + create_dir_all(&dir).expect("Unable to create directory"); + + std::fs::write( + format!("{}/debug.txt", &dir), + format!( + "Debug agent for {} - written at {:?}", + self.workload_config.workload_name, + SystemTime::now(), + ), + ) + .expect("Unable to write debug.txt file"); + + Ok(AgentOutput { + exit_code: 0, + stdout: "Build successfully!".into(), + stderr: String::default(), + }) }) } - fn run(&self) -> AgentResult { - let dir = format!("/tmp/{}", self.workload_config.workload_name); + fn run( + &self, + _: &Arc>>, + ) -> Pin> + Send + '_>> { + Box::pin(async { + let dir = format!("/tmp/{}", self.workload_config.workload_name); - let content = std::fs::read_to_string(format!("{}/debug.txt", &dir)) - .expect("Unable to read debug.txt file"); + let content = std::fs::read_to_string(format!("{}/debug.txt", &dir)) + .expect("Unable to read debug.txt file"); - std::fs::remove_dir_all(dir).expect("Unable to remove directory"); + std::fs::remove_dir_all(dir).expect("Unable to remove directory"); - Ok(AgentOutput { - exit_code: 0, - stdout: content, - stderr: String::default(), + Ok(AgentOutput { + exit_code: 0, + stdout: content, + stderr: String::default(), + }) }) } } diff --git a/src/agent/src/agents/mod.rs b/src/agent/src/agents/mod.rs index 5d3062b..6c1fb83 100644 --- a/src/agent/src/agents/mod.rs +++ b/src/agent/src/agents/mod.rs @@ -1,5 +1,10 @@ use crate::AgentResult; use serde::Deserialize; +use std::collections::HashSet; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use tokio::sync::Mutex; #[cfg(feature = "debug-agent")] pub mod debug; @@ -13,8 +18,14 @@ pub struct AgentOutput { } pub trait Agent { - fn prepare(&self) -> AgentResult; - fn run(&self) -> AgentResult; + fn prepare<'a>( + &'a self, + child_processes: &'a Arc>>, + ) -> Pin> + Send + '_>>; + fn run<'a>( + &'a self, + child_processes: &'a Arc>>, + ) -> Pin> + Send + '_>>; } #[derive(Debug, Clone, Deserialize)] diff --git a/src/agent/src/agents/rust.rs b/src/agent/src/agents/rust.rs index 5cbd3b4..9521b38 100644 --- a/src/agent/src/agents/rust.rs +++ b/src/agent/src/agents/rust.rs @@ -2,7 +2,12 @@ use super::{Agent, AgentOutput}; use crate::{workload, AgentError, AgentResult}; use rand::distributions::{Alphanumeric, DistString}; use serde::Deserialize; +use std::collections::HashSet; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; use std::{fs::create_dir_all, process::Command}; +use tokio::sync::Mutex; #[derive(Deserialize)] #[serde(rename_all = "kebab-case")] @@ -22,15 +27,23 @@ pub struct RustAgent { } impl RustAgent { - fn build(&self, function_dir: &String) -> AgentResult { + async fn build( + &self, + function_dir: &String, + child_processes: &Arc>>, + ) -> AgentResult { if self.rust_config.build.release { - let output = Command::new("cargo") + let child = Command::new("cargo") .arg("build") .arg("--release") .current_dir(function_dir) - .output() + .spawn() .expect("Failed to build function"); + child_processes.lock().await.insert(child.id()); + + let output = child.wait_with_output().expect("Failed to wait on child"); + Ok(AgentOutput { exit_code: output.status.code().unwrap(), stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(), @@ -65,88 +78,101 @@ impl From for RustAgent { } impl Agent for RustAgent { - fn prepare(&self) -> AgentResult { - let code = std::fs::read_to_string(&self.rust_config.build.source_code_path).unwrap(); + fn prepare<'a>( + &'a self, + child_processes: &'a Arc>>, + ) -> Pin> + Send + '_>> { + Box::pin(async { + let code = std::fs::read_to_string(&self.rust_config.build.source_code_path).unwrap(); - let function_dir = format!( - "/tmp/{}", - Alphanumeric.sample_string(&mut rand::thread_rng(), 16) - ); + let function_dir = format!( + "/tmp/{}", + Alphanumeric.sample_string(&mut rand::thread_rng(), 16) + ); - println!("Function directory: {}", function_dir); + println!("Function directory: {}", function_dir); - create_dir_all(format!("{}/src", &function_dir)).expect("Unable to create directory"); + create_dir_all(format!("{}/src", &function_dir)).expect("Unable to create directory"); - std::fs::write(format!("{}/src/main.rs", &function_dir), code) - .expect("Unable to write main.rs file"); + std::fs::write(format!("{}/src/main.rs", &function_dir), code) + .expect("Unable to write main.rs file"); - let cargo_toml = format!( - r#" + let cargo_toml = format!( + r#" [package] name = "{}" version = "0.1.0" edition = "2018" "#, - self.workload_config.workload_name - ); + self.workload_config.workload_name + ); + + std::fs::write(format!("{}/Cargo.toml", &function_dir), cargo_toml) + .expect("Unable to write Cargo.toml file"); + + let result = self.build(&function_dir, child_processes).await?; + + if result.exit_code != 0 { + println!("Build failed: {:?}", result); + return Err(AgentError::BuildFailed(AgentOutput { + exit_code: result.exit_code, + stdout: result.stdout, + stderr: result.stderr, + })); + } + + // Copy the binary to /tmp, we could imagine a more complex scenario where we would put this in an artifact repository (like S3) + let binary_path = match self.rust_config.build.release { + true => format!( + "{}/target/release/{}", + &function_dir, self.workload_config.workload_name + ), + false => format!( + "{}/target/debug/{}", + &function_dir, self.workload_config.workload_name + ), + }; + + std::fs::copy( + binary_path, + format!("/tmp/{}", self.workload_config.workload_name), + ) + .expect("Unable to copy binary"); + + std::fs::remove_dir_all(&function_dir).expect("Unable to remove directory"); - std::fs::write(format!("{}/Cargo.toml", &function_dir), cargo_toml) - .expect("Unable to write Cargo.toml file"); - - let result = self.build(&function_dir)?; - - if result.exit_code != 0 { - println!("Build failed: {:?}", result); - return Err(AgentError::BuildFailed(AgentOutput { + Ok(AgentOutput { exit_code: result.exit_code, - stdout: result.stdout, - stderr: result.stderr, - })); - } - - // Copy the binary to /tmp, we could imagine a more complex scenario where we would put this in an artifact repository (like S3) - let binary_path = match self.rust_config.build.release { - true => format!( - "{}/target/release/{}", - &function_dir, self.workload_config.workload_name - ), - false => format!( - "{}/target/debug/{}", - &function_dir, self.workload_config.workload_name - ), - }; - - std::fs::copy( - binary_path, - format!("/tmp/{}", self.workload_config.workload_name), - ) - .expect("Unable to copy binary"); - - std::fs::remove_dir_all(&function_dir).expect("Unable to remove directory"); - - Ok(AgentOutput { - exit_code: result.exit_code, - stdout: "Build successful".to_string(), - stderr: "".to_string(), + stdout: "Build successful".to_string(), + stderr: "".to_string(), + }) }) } - fn run(&self) -> AgentResult { - let output = Command::new(format!("/tmp/{}", self.workload_config.workload_name)) - .output() - .expect("Failed to run function"); + fn run<'a>( + &'a self, + child_processes: &'a Arc>>, + ) -> Pin> + Send + '_>> { + Box::pin(async { + let child = Command::new(format!("/tmp/{}", self.workload_config.workload_name)) + .spawn() + .expect("Failed to run function"); - let agent_output = AgentOutput { - exit_code: output.status.code().unwrap(), - stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(), - stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(), - }; + child_processes.lock().await.insert(child.id()); + let output = child.wait_with_output().expect("Failed to wait on child"); - if !output.status.success() { - println!("Run failed: {:?}", agent_output); - return Err(AgentError::BuildFailed(agent_output)); - } + let agent_output = AgentOutput { + exit_code: output.status.code().unwrap(), + stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(), + stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(), + }; - Ok(agent_output) + if !output.status.success() { + println!("Run failed: {:?}", agent_output); + return Err(AgentError::BuildFailed(agent_output)); + } + + Ok(agent_output) + }) } } diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index b9673c9..c645920 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -3,9 +3,16 @@ use agent::{ workload::{config::Config, runner::Runner, service::WorkloadRunnerService}, }; use clap::Parser; +use once_cell::sync::Lazy; +use std::collections::HashSet; +use std::sync::Arc; use std::{net::ToSocketAddrs, path::PathBuf}; +use tokio::sync::Mutex; use tonic::transport::Server; +static CHILD_PROCESSES: Lazy>>> = + Lazy::new(|| Arc::new(Mutex::new(HashSet::new()))); + #[derive(Debug, Parser)] struct Args { #[clap(short, long, default_value = "/etc/cloudlet/agent/config.toml")] @@ -24,9 +31,9 @@ async fn main() -> Result<(), Box> { .next() .unwrap(); - let runner = Runner::new(config); + let runner = Runner::new(config, CHILD_PROCESSES.clone()); - let server = WorkloadRunnerService::new(runner); + let server = WorkloadRunnerService::new(runner, CHILD_PROCESSES.clone()); Server::builder() .add_service(WorkloadRunnerServer::new(server)) diff --git a/src/agent/src/workload/runner.rs b/src/agent/src/workload/runner.rs index c921ccd..f0c44bc 100644 --- a/src/agent/src/workload/runner.rs +++ b/src/agent/src/workload/runner.rs @@ -3,6 +3,9 @@ use crate::{ workload::config::Action, AgentResult, }; +use std::collections::HashSet; +use std::sync::Arc; +use tokio::sync::Mutex; #[cfg(feature = "debug-agent")] use crate::agents::debug; @@ -14,27 +17,32 @@ use super::config::Config; pub struct Runner { config: Config, agent: Box, + child_processes: Arc>>, } impl Runner { - pub fn new(config: Config) -> Self { + pub fn new(config: Config, child_processes: Arc>>) -> Self { let agent: Box = match config.language { Language::Rust => Box::new(rust::RustAgent::from(config.clone())), #[cfg(feature = "debug-agent")] Language::Debug => Box::new(debug::DebugAgent::from(config.clone())), }; - Runner { config, agent } + Runner { + config, + agent, + child_processes, + } } - pub fn run(&self) -> AgentResult { + pub async fn run(&self) -> AgentResult { let result = match self.config.action { - Action::Prepare => self.agent.prepare()?, - Action::Run => self.agent.run()?, + Action::Prepare => self.agent.prepare(&self.child_processes).await?, + Action::Run => self.agent.run(&self.child_processes).await?, Action::PrepareAndRun => { - let res = self.agent.prepare()?; + let res = self.agent.prepare(&self.child_processes).await?; println!("Prepare result {:?}", res); - self.agent.run()? + self.agent.run(&self.child_processes).await? } }; diff --git a/src/agent/src/workload/service.rs b/src/agent/src/workload/service.rs index dca0e13..3036dfc 100644 --- a/src/agent/src/workload/service.rs +++ b/src/agent/src/workload/service.rs @@ -1,6 +1,7 @@ use super::runner::Runner; use crate::agent::{self, ExecuteRequest, ExecuteResponse, SignalRequest}; use agent::workload_runner_server::WorkloadRunner; +use std::collections::HashSet; use std::{process, sync::Arc}; use tokio::sync::Mutex; use tokio_stream::wrappers::ReceiverStream; @@ -10,12 +11,14 @@ type Result = std::result::Result, tonic::Status>; pub struct WorkloadRunnerService { runner: Arc>, + child_processes: Arc>>, } impl WorkloadRunnerService { - pub fn new(runner: Runner) -> Self { + pub fn new(runner: Runner, child_processes: Arc>>) -> Self { WorkloadRunnerService { runner: Arc::new(Mutex::new(runner)), + child_processes, } } } @@ -35,6 +38,7 @@ impl WorkloadRunner for WorkloadRunnerService { let res = runner .run() + .await .map_err(|e| tonic::Status::internal(e.to_string()))?; let _ = tx @@ -53,6 +57,16 @@ impl WorkloadRunner for WorkloadRunnerService { } async fn signal(&self, _: Request) -> Result<()> { + let child_processes = self.child_processes.lock().await; + + for &child_id in child_processes.iter() { + nix::sys::signal::kill( + nix::unistd::Pid::from_raw(child_id as i32), + nix::sys::signal::Signal::SIGTERM, + ) + .unwrap(); + } + process::exit(0); } }