Skip to content

Commit

Permalink
Agent: sigterm children (#36)
Browse files Browse the repository at this point in the history
* feat: child processes are now gracefully sigterm-ed on kill signal reception

Signed-off-by: ESPIE <remi.espie@etu.umontpellier.fr>

* chore: merge from main

Signed-off-by: ESPIE <remi.espie@etu.umontpellier.fr>

* fix: changed `std::process::Command` to `tokio::process::Command` to be non blocking

Signed-off-by: ESPIE <remi.espie@etu.umontpellier.fr>

* chore: cargo fmt

Signed-off-by: ESPIE <remi.espie@etu.umontpellier.fr>

* refactor: use async_trait instead of Pin<Box<Future<>>>

Signed-off-by: Mateo Fernandez <mateo.fernandez@etu.umontpellier.fr>

---------

Signed-off-by: ESPIE <remi.espie@etu.umontpellier.fr>
Signed-off-by: Mateo Fernandez <mateo.fernandez@etu.umontpellier.fr>
Co-authored-by: Mateo Fernandez <mateo.fernandez@etu.umontpellier.fr>
  • Loading branch information
remi-espie and mfernd authored May 3, 2024
1 parent 54a0fc3 commit 1f46bc7
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 29 deletions.
3 changes: 3 additions & 0 deletions src/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ name = "agent"
path = "src/lib.rs"

[dependencies]
async-trait = "0.1.80"
clap = { version = "4.5.4", features = ["derive", "env"] }
nix = { version = "0.28.0", features = ["signal"] }
once_cell = "1.19.0"
prost = "0.12.4"
rand = "0.8.5"
serde = { version = "1.0.197", features = ["derive"] }
Expand Down
9 changes: 7 additions & 2 deletions src/agent/src/agents/debug.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use super::AgentOutput;
use crate::agents::Agent;
use crate::{workload, AgentResult};
use async_trait::async_trait;
use std::collections::HashSet;
use std::fs::create_dir_all;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::Mutex;

pub struct DebugAgent {
workload_config: workload::config::Config,
Expand All @@ -14,8 +18,9 @@ impl From<workload::config::Config> for DebugAgent {
}
}

#[async_trait]
impl Agent for DebugAgent {
fn prepare(&self) -> AgentResult<AgentOutput> {
async fn prepare(&self, _: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput> {
let dir = format!("/tmp/{}", self.workload_config.workload_name);

println!("Function directory: {}", dir);
Expand All @@ -39,7 +44,7 @@ impl Agent for DebugAgent {
})
}

fn run(&self) -> AgentResult<AgentOutput> {
async fn run(&self, _: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput> {
let dir = format!("/tmp/{}", self.workload_config.workload_name);

let content = std::fs::read_to_string(format!("{}/debug.txt", &dir))
Expand Down
9 changes: 7 additions & 2 deletions src/agent/src/agents/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use crate::{AgentError, AgentResult};
use async_trait::async_trait;
use serde::Deserialize;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::Mutex;

#[cfg(feature = "debug-agent")]
pub mod debug;
Expand All @@ -12,9 +16,10 @@ pub struct AgentOutput {
pub stderr: String,
}

#[async_trait]
pub trait Agent {
fn prepare(&self) -> AgentResult<AgentOutput>;
fn run(&self) -> AgentResult<AgentOutput>;
async fn prepare(&self, child_processes: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput>;
async fn run(&self, child_processes: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput>;
}

#[derive(Debug, Clone, Deserialize)]
Expand Down
57 changes: 45 additions & 12 deletions src/agent/src/agents/rust.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use super::{Agent, AgentOutput};
use crate::{workload, AgentError, AgentResult};
use async_trait::async_trait;
use rand::distributions::{Alphanumeric, DistString};
use serde::Deserialize;
use std::{fs::create_dir_all, process::Command};
use std::collections::HashSet;
use std::fs::create_dir_all;
use std::sync::Arc;
use tokio::process::Command;
use tokio::sync::Mutex;

#[derive(Deserialize)]
#[serde(rename_all = "kebab-case")]
Expand All @@ -21,27 +26,45 @@ pub struct RustAgent {
}

impl RustAgent {
fn build(&self, function_dir: &String) -> AgentResult<AgentOutput> {
async fn build(
&self,
function_dir: &str,
child_processes: Arc<Mutex<HashSet<u32>>>,
) -> AgentResult<AgentOutput> {
if self.rust_config.build.release {
let output = Command::new("cargo")
let child = Command::new("cargo")
.arg("build")
.arg("--release")
.current_dir(function_dir)
.output()
.spawn()
.expect("Failed to build function");

{
child_processes.lock().await.insert(child.id().unwrap());
}

let output = child
.wait_with_output()
.await
.expect("Failed to wait on child");

Ok(AgentOutput {
exit_code: output.status.code().unwrap(),
stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(),
stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(),
})
} else {
let output = Command::new("cargo")
let child = Command::new("cargo")
.arg("build")
.current_dir(function_dir)
.output()
.spawn()
.expect("Failed to build function");

let output = child
.wait_with_output()
.await
.expect("Failed to wait on child");

Ok(AgentOutput {
exit_code: output.status.code().unwrap(),
stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(),
Expand All @@ -63,8 +86,9 @@ impl From<workload::config::Config> for RustAgent {
}
}

#[async_trait]
impl Agent for RustAgent {
fn prepare(&self) -> AgentResult<AgentOutput> {
async fn prepare(&self, child_processes: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput> {
let function_dir = format!(
"/tmp/{}",
Alphanumeric.sample_string(&mut rand::thread_rng(), 16)
Expand All @@ -85,15 +109,15 @@ impl Agent for RustAgent {
[package]
name = "{}"
version = "0.1.0"
edition = "2018"
edition = "2021"
"#,
self.workload_config.workload_name
);

std::fs::write(format!("{}/Cargo.toml", &function_dir), cargo_toml)
.expect("Unable to write Cargo.toml file");

let result = self.build(&function_dir)?;
let result = self.build(&function_dir, child_processes).await?;

if result.exit_code != 0 {
println!("Build failed: {:?}", result);
Expand Down Expand Up @@ -131,11 +155,20 @@ impl Agent for RustAgent {
})
}

fn run(&self) -> AgentResult<AgentOutput> {
let output = Command::new(format!("/tmp/{}", self.workload_config.workload_name))
.output()
async fn run(&self, child_processes: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput> {
let child = Command::new(format!("/tmp/{}", self.workload_config.workload_name))
.spawn()
.expect("Failed to run function");

{
child_processes.lock().await.insert(child.id().unwrap());
}

let output = child
.wait_with_output()
.await
.expect("Failed to wait on child");

let agent_output = AgentOutput {
exit_code: output.status.code().unwrap(),
stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(),
Expand Down
39 changes: 28 additions & 11 deletions src/agent/src/workload/runner.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,63 @@
use super::config::Config;
use crate::{
agent::ExecuteRequest,
agents::{rust, Agent, AgentOutput, Language},
workload::config::Action,
AgentError, AgentResult,
};
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::Mutex;

#[cfg(feature = "debug-agent")]
use crate::agents::debug;

use super::config::Config;

/// Runner for a workload.
/// Will execute the workload based on the inner agent (language).
pub struct Runner {
config: Config,
agent: Box<dyn Agent + Sync + Send>,
child_processes: Arc<Mutex<HashSet<u32>>>,
}

impl Runner {
pub fn new(config: Config) -> Self {
pub fn new(config: Config, child_processes: Arc<Mutex<HashSet<u32>>>) -> Self {
let agent: Box<dyn Agent + Sync + Send> = match config.language {
Language::Rust => Box::new(rust::RustAgent::from(config.clone())),
#[cfg(feature = "debug-agent")]
Language::Debug => Box::new(debug::DebugAgent::from(config.clone())),
};

Runner { config, agent }
Runner {
config,
agent,
child_processes,
}
}

pub fn new_from_execute_request(execute_request: ExecuteRequest) -> Result<Self, AgentError> {
pub fn new_from_execute_request(
execute_request: ExecuteRequest,
child_processes: Arc<Mutex<HashSet<u32>>>,
) -> Result<Self, AgentError> {
let config = Config::new_from_execute_request(execute_request)?;
Ok(Self::new(config))
Ok(Self::new(config, child_processes))
}

pub fn run(&self) -> AgentResult<AgentOutput> {
pub async fn run(&self) -> AgentResult<AgentOutput> {
let result = match self.config.action {
Action::Prepare => self.agent.prepare()?,
Action::Run => self.agent.run()?,
Action::Prepare => {
self.agent
.prepare(Arc::clone(&self.child_processes))
.await?
}
Action::Run => self.agent.run(Arc::clone(&self.child_processes)).await?,
Action::PrepareAndRun => {
let res = self.agent.prepare()?;
let res = self
.agent
.prepare(Arc::clone(&self.child_processes))
.await?;
println!("Prepare result {:?}", res);
self.agent.run()?
self.agent.run(Arc::clone(&self.child_processes)).await?
}
};

Expand Down
26 changes: 24 additions & 2 deletions src/agent/src/workload/service.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use super::runner::Runner;
use crate::agent::{self, execute_response::Stage, ExecuteRequest, ExecuteResponse, SignalRequest};
use agent::workload_runner_server::WorkloadRunner;
use std::process;
use once_cell::sync::Lazy;
use std::collections::HashSet;
use std::{process, sync::Arc};
use tokio::sync::Mutex;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response};

type Result<T> = std::result::Result<Response<T>, tonic::Status>;

static CHILD_PROCESSES: Lazy<Arc<Mutex<HashSet<u32>>>> =
Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));

pub struct WorkloadRunnerService;

#[tonic::async_trait]
Expand All @@ -18,11 +24,12 @@ impl WorkloadRunner for WorkloadRunnerService {

let execute_request = req.into_inner();

let runner = Runner::new_from_execute_request(execute_request)
let runner = Runner::new_from_execute_request(execute_request, CHILD_PROCESSES.clone())
.map_err(|e| tonic::Status::internal(e.to_string()))?;

let res = runner
.run()
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;

let _ = tx
Expand All @@ -42,6 +49,21 @@ impl WorkloadRunner for WorkloadRunnerService {
}

async fn signal(&self, _: Request<SignalRequest>) -> Result<()> {
let child_processes = CHILD_PROCESSES.lock().await;

for &child_id in child_processes.iter() {
match nix::sys::signal::kill(
nix::unistd::Pid::from_raw(child_id as i32),
nix::sys::signal::Signal::SIGTERM,
) {
Ok(_) => println!("Sent SIGTERM to child process {}", child_id),
Err(e) => println!(
"Failed to send SIGTERM to child process {}: {}",
child_id, e
),
}
}

process::exit(0);
}
}

0 comments on commit 1f46bc7

Please sign in to comment.