Skip to content

Commit

Permalink
feat: child processes are now gracefully sigterm-ed on kill signal re…
Browse files Browse the repository at this point in the history
…ception

Signed-off-by: ESPIE <remi.espie@etu.umontpellier.fr>
  • Loading branch information
remi-espie committed May 1, 2024
1 parent 64a5996 commit b47f929
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 110 deletions.
2 changes: 2 additions & 0 deletions src/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ tokio = { version = "1.37.0", features = ["full"] }
tokio-stream = "0.1.15"
toml = "0.8.12"
tonic = "0.11"
nix = { version = "0.28.0", features = ["signal"] }
once_cell = "1.19.0"

[build-dependencies]
tonic-build = "0.11"
Expand Down
75 changes: 45 additions & 30 deletions src/agent/src/agents/debug.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use super::AgentOutput;
use crate::agents::Agent;
use crate::{workload, AgentResult};
use std::collections::HashSet;
use std::fs::create_dir_all;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::Mutex;

pub struct DebugAgent {
workload_config: workload::config::Config,
Expand All @@ -15,42 +20,52 @@ impl From<workload::config::Config> for DebugAgent {
}

impl Agent for DebugAgent {
fn prepare(&self) -> AgentResult<AgentOutput> {
let dir = format!("/tmp/{}", self.workload_config.workload_name);

println!("Function directory: {}", dir);

create_dir_all(&dir).expect("Unable to create directory");

std::fs::write(
format!("{}/debug.txt", &dir),
format!(
"Debug agent for {} - written at {:?}",
self.workload_config.workload_name,
SystemTime::now(),
),
)
.expect("Unable to write debug.txt file");

Ok(AgentOutput {
exit_code: 0,
stdout: "Build successfully!".into(),
stderr: String::default(),
fn prepare(
&self,
_: &Arc<Mutex<HashSet<u32>>>,
) -> Pin<Box<dyn Future<Output = AgentResult<AgentOutput>> + Send + '_>> {
Box::pin(async {
let dir = format!("/tmp/{}", self.workload_config.workload_name);

println!("Function directory: {}", dir);

create_dir_all(&dir).expect("Unable to create directory");

std::fs::write(
format!("{}/debug.txt", &dir),
format!(
"Debug agent for {} - written at {:?}",
self.workload_config.workload_name,
SystemTime::now(),
),
)
.expect("Unable to write debug.txt file");

Ok(AgentOutput {
exit_code: 0,
stdout: "Build successfully!".into(),
stderr: String::default(),
})
})
}

fn run(&self) -> AgentResult<AgentOutput> {
let dir = format!("/tmp/{}", self.workload_config.workload_name);
fn run(
&self,
_: &Arc<Mutex<HashSet<u32>>>,
) -> Pin<Box<dyn Future<Output = AgentResult<AgentOutput>> + Send + '_>> {
Box::pin(async {
let dir = format!("/tmp/{}", self.workload_config.workload_name);

let content = std::fs::read_to_string(format!("{}/debug.txt", &dir))
.expect("Unable to read debug.txt file");
let content = std::fs::read_to_string(format!("{}/debug.txt", &dir))
.expect("Unable to read debug.txt file");

std::fs::remove_dir_all(dir).expect("Unable to remove directory");
std::fs::remove_dir_all(dir).expect("Unable to remove directory");

Ok(AgentOutput {
exit_code: 0,
stdout: content,
stderr: String::default(),
Ok(AgentOutput {
exit_code: 0,
stdout: content,
stderr: String::default(),
})
})
}
}
15 changes: 13 additions & 2 deletions src/agent/src/agents/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use crate::AgentResult;
use serde::Deserialize;
use std::collections::HashSet;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::Mutex;

#[cfg(feature = "debug-agent")]
pub mod debug;
Expand All @@ -13,8 +18,14 @@ pub struct AgentOutput {
}

pub trait Agent {
fn prepare(&self) -> AgentResult<AgentOutput>;
fn run(&self) -> AgentResult<AgentOutput>;
fn prepare<'a>(
&'a self,
child_processes: &'a Arc<Mutex<HashSet<u32>>>,
) -> Pin<Box<dyn Future<Output = AgentResult<AgentOutput>> + Send + '_>>;
fn run<'a>(
&'a self,
child_processes: &'a Arc<Mutex<HashSet<u32>>>,
) -> Pin<Box<dyn Future<Output = AgentResult<AgentOutput>> + Send + '_>>;
}

#[derive(Debug, Clone, Deserialize)]
Expand Down
162 changes: 94 additions & 68 deletions src/agent/src/agents/rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ use super::{Agent, AgentOutput};
use crate::{workload, AgentError, AgentResult};
use rand::distributions::{Alphanumeric, DistString};
use serde::Deserialize;
use std::collections::HashSet;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::{fs::create_dir_all, process::Command};
use tokio::sync::Mutex;

#[derive(Deserialize)]
#[serde(rename_all = "kebab-case")]
Expand All @@ -22,15 +27,23 @@ pub struct RustAgent {
}

impl RustAgent {
fn build(&self, function_dir: &String) -> AgentResult<AgentOutput> {
async fn build(
&self,
function_dir: &String,
child_processes: &Arc<Mutex<HashSet<u32>>>,
) -> AgentResult<AgentOutput> {
if self.rust_config.build.release {
let output = Command::new("cargo")
let child = Command::new("cargo")
.arg("build")
.arg("--release")
.current_dir(function_dir)
.output()
.spawn()
.expect("Failed to build function");

child_processes.lock().await.insert(child.id());

let output = child.wait_with_output().expect("Failed to wait on child");

Ok(AgentOutput {
exit_code: output.status.code().unwrap(),
stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(),
Expand Down Expand Up @@ -65,88 +78,101 @@ impl From<workload::config::Config> for RustAgent {
}

impl Agent for RustAgent {
fn prepare(&self) -> AgentResult<AgentOutput> {
let code = std::fs::read_to_string(&self.rust_config.build.source_code_path).unwrap();
fn prepare<'a>(
&'a self,
child_processes: &'a Arc<Mutex<HashSet<u32>>>,
) -> Pin<Box<dyn Future<Output = AgentResult<AgentOutput>> + Send + '_>> {
Box::pin(async {
let code = std::fs::read_to_string(&self.rust_config.build.source_code_path).unwrap();

let function_dir = format!(
"/tmp/{}",
Alphanumeric.sample_string(&mut rand::thread_rng(), 16)
);
let function_dir = format!(
"/tmp/{}",
Alphanumeric.sample_string(&mut rand::thread_rng(), 16)
);

println!("Function directory: {}", function_dir);
println!("Function directory: {}", function_dir);

create_dir_all(format!("{}/src", &function_dir)).expect("Unable to create directory");
create_dir_all(format!("{}/src", &function_dir)).expect("Unable to create directory");

std::fs::write(format!("{}/src/main.rs", &function_dir), code)
.expect("Unable to write main.rs file");
std::fs::write(format!("{}/src/main.rs", &function_dir), code)
.expect("Unable to write main.rs file");

let cargo_toml = format!(
r#"
let cargo_toml = format!(
r#"
[package]
name = "{}"
version = "0.1.0"
edition = "2018"
"#,
self.workload_config.workload_name
);
self.workload_config.workload_name
);

std::fs::write(format!("{}/Cargo.toml", &function_dir), cargo_toml)
.expect("Unable to write Cargo.toml file");

let result = self.build(&function_dir, child_processes).await?;

if result.exit_code != 0 {
println!("Build failed: {:?}", result);
return Err(AgentError::BuildFailed(AgentOutput {
exit_code: result.exit_code,
stdout: result.stdout,
stderr: result.stderr,
}));
}

// Copy the binary to /tmp, we could imagine a more complex scenario where we would put this in an artifact repository (like S3)
let binary_path = match self.rust_config.build.release {
true => format!(
"{}/target/release/{}",
&function_dir, self.workload_config.workload_name
),
false => format!(
"{}/target/debug/{}",
&function_dir, self.workload_config.workload_name
),
};

std::fs::copy(
binary_path,
format!("/tmp/{}", self.workload_config.workload_name),
)
.expect("Unable to copy binary");

std::fs::remove_dir_all(&function_dir).expect("Unable to remove directory");

std::fs::write(format!("{}/Cargo.toml", &function_dir), cargo_toml)
.expect("Unable to write Cargo.toml file");

let result = self.build(&function_dir)?;

if result.exit_code != 0 {
println!("Build failed: {:?}", result);
return Err(AgentError::BuildFailed(AgentOutput {
Ok(AgentOutput {
exit_code: result.exit_code,
stdout: result.stdout,
stderr: result.stderr,
}));
}

// Copy the binary to /tmp, we could imagine a more complex scenario where we would put this in an artifact repository (like S3)
let binary_path = match self.rust_config.build.release {
true => format!(
"{}/target/release/{}",
&function_dir, self.workload_config.workload_name
),
false => format!(
"{}/target/debug/{}",
&function_dir, self.workload_config.workload_name
),
};

std::fs::copy(
binary_path,
format!("/tmp/{}", self.workload_config.workload_name),
)
.expect("Unable to copy binary");

std::fs::remove_dir_all(&function_dir).expect("Unable to remove directory");

Ok(AgentOutput {
exit_code: result.exit_code,
stdout: "Build successful".to_string(),
stderr: "".to_string(),
stdout: "Build successful".to_string(),
stderr: "".to_string(),
})
})
}

fn run(&self) -> AgentResult<AgentOutput> {
let output = Command::new(format!("/tmp/{}", self.workload_config.workload_name))
.output()
.expect("Failed to run function");
fn run<'a>(
&'a self,
child_processes: &'a Arc<Mutex<HashSet<u32>>>,
) -> Pin<Box<dyn Future<Output = AgentResult<AgentOutput>> + Send + '_>> {
Box::pin(async {
let child = Command::new(format!("/tmp/{}", self.workload_config.workload_name))
.spawn()
.expect("Failed to run function");

let agent_output = AgentOutput {
exit_code: output.status.code().unwrap(),
stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(),
stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(),
};
child_processes.lock().await.insert(child.id());
let output = child.wait_with_output().expect("Failed to wait on child");

if !output.status.success() {
println!("Run failed: {:?}", agent_output);
return Err(AgentError::BuildFailed(agent_output));
}
let agent_output = AgentOutput {
exit_code: output.status.code().unwrap(),
stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(),
stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(),
};

Ok(agent_output)
if !output.status.success() {
println!("Run failed: {:?}", agent_output);
return Err(AgentError::BuildFailed(agent_output));
}

Ok(agent_output)
})
}
}
11 changes: 9 additions & 2 deletions src/agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,16 @@ use agent::{
workload::{config::Config, runner::Runner, service::WorkloadRunnerService},
};
use clap::Parser;
use once_cell::sync::Lazy;
use std::collections::HashSet;
use std::sync::Arc;
use std::{net::ToSocketAddrs, path::PathBuf};
use tokio::sync::Mutex;
use tonic::transport::Server;

static CHILD_PROCESSES: Lazy<Arc<Mutex<HashSet<u32>>>> =
Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));

#[derive(Debug, Parser)]
struct Args {
#[clap(short, long, default_value = "/etc/cloudlet/agent/config.toml")]
Expand All @@ -24,9 +31,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.next()
.unwrap();

let runner = Runner::new(config);
let runner = Runner::new(config, CHILD_PROCESSES.clone());

let server = WorkloadRunnerService::new(runner);
let server = WorkloadRunnerService::new(runner, CHILD_PROCESSES.clone());

Server::builder()
.add_service(WorkloadRunnerServer::new(server))
Expand Down
Loading

0 comments on commit b47f929

Please sign in to comment.