Skip to content

Commit

Permalink
chore: merge from main
Browse files Browse the repository at this point in the history
  • Loading branch information
remi-espie committed May 2, 2024
1 parent c37f10f commit fe0e4d2
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 67 deletions.
114 changes: 59 additions & 55 deletions src/agent/src/agents/rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ impl Agent for RustAgent {
child_processes: &'a Arc<Mutex<HashSet<u32>>>,
) -> Pin<Box<dyn Future<Output = AgentResult<AgentOutput>> + Send + '_>> {
Box::pin(async {
let code = std::fs::read_to_string(&self.rust_config.build.source_code_path).unwrap();

let function_dir = format!(
"/tmp/{}",
Alphanumeric.sample_string(&mut rand::thread_rng(), 16)
Expand All @@ -93,60 +91,61 @@ impl Agent for RustAgent {

create_dir_all(format!("{}/src", &function_dir)).expect("Unable to create directory");

std::fs::write(
format!("{}/src/main.rs", &function_dir),
&self.workload_config.code,
)
.expect("Unable to write main.rs file");
std::fs::write(
format!("{}/src/main.rs", &function_dir),
&self.workload_config.code,
)
.expect("Unable to write main.rs file");

let cargo_toml = format!(
r#"
let cargo_toml = format!(
r#"
[package]
name = "{}"
version = "0.1.0"
edition = "2018"
"#,
self.workload_config.workload_name
);
self.workload_config.workload_name
);

std::fs::write(format!("{}/Cargo.toml", &function_dir), cargo_toml)
.expect("Unable to write Cargo.toml file");
std::fs::write(format!("{}/Cargo.toml", &function_dir), cargo_toml)
.expect("Unable to write Cargo.toml file");

let result = self.build(&function_dir, child_processes).await?;

if result.exit_code != 0 {
println!("Build failed: {:?}", result);
return Err(AgentError::BuildFailed(AgentOutput {
exit_code: result.exit_code,
stdout: result.stdout,
stderr: result.stderr,
}));
}
if result.exit_code != 0 {
println!("Build failed: {:?}", result);
return Err(AgentError::BuildFailed(AgentOutput {
exit_code: result.exit_code,
stdout: result.stdout,
stderr: result.stderr,
}));
}

// Copy the binary to /tmp, we could imagine a more complex scenario where we would put this in an artifact repository (like S3)
let binary_path = match self.rust_config.build.release {
true => format!(
"{}/target/release/{}",
&function_dir, self.workload_config.workload_name
),
false => format!(
"{}/target/debug/{}",
&function_dir, self.workload_config.workload_name
),
};

std::fs::copy(
binary_path,
format!("/tmp/{}", self.workload_config.workload_name),
)
.expect("Unable to copy binary");

std::fs::remove_dir_all(&function_dir).expect("Unable to remove directory");

// Copy the binary to /tmp, we could imagine a more complex scenario where we would put this in an artifact repository (like S3)
let binary_path = match self.rust_config.build.release {
true => format!(
"{}/target/release/{}",
&function_dir, self.workload_config.workload_name
),
false => format!(
"{}/target/debug/{}",
&function_dir, self.workload_config.workload_name
),
};

std::fs::copy(
binary_path,
format!("/tmp/{}", self.workload_config.workload_name),
)
.expect("Unable to copy binary");

std::fs::remove_dir_all(&function_dir).expect("Unable to remove directory");

Ok(AgentOutput {
exit_code: result.exit_code,
stdout: "Build successful".to_string(),
stderr: "".to_string(),
Ok(AgentOutput {
exit_code: result.exit_code,
stdout: "Build successful".to_string(),
stderr: "".to_string(),
})
})
}

Expand All @@ -159,17 +158,22 @@ impl Agent for RustAgent {
.spawn()
.expect("Failed to run function");

let agent_output = AgentOutput {
exit_code: output.status.code().unwrap(),
stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(),
stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(),
};
child_processes.lock().await.insert(child.id());

let output = child.wait_with_output().expect("Failed to wait on child");

if !output.status.success() {
println!("Run failed: {:?}", agent_output);
return Err(AgentError::BuildFailed(agent_output));
}
let agent_output = AgentOutput {
exit_code: output.status.code().unwrap(),
stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(),
stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(),
};

Ok(agent_output)
if !output.status.success() {
println!("Run failed: {:?}", agent_output);
return Err(AgentError::BuildFailed(agent_output));
}

Ok(agent_output)
})
}
}
9 changes: 1 addition & 8 deletions src/agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,9 @@ use agent::{
agent::workload_runner_server::WorkloadRunnerServer, workload::service::WorkloadRunnerService,
};
use clap::Parser;
use once_cell::sync::Lazy;
use std::collections::HashSet;
use std::sync::Arc;
use std::{net::ToSocketAddrs, path::PathBuf};
use tokio::sync::Mutex;
use std::net::ToSocketAddrs;
use tonic::transport::Server;

static CHILD_PROCESSES: Lazy<Arc<Mutex<HashSet<u32>>>> =
Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));

#[derive(Debug, Parser)]
struct Args {
#[clap(long, env, default_value = "0.0.0.0")]
Expand Down
7 changes: 5 additions & 2 deletions src/agent/src/workload/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ impl Runner {
}
}

pub fn new_from_execute_request(execute_request: ExecuteRequest) -> Result<Self, AgentError> {
pub fn new_from_execute_request(
execute_request: ExecuteRequest,
child_processes: Arc<Mutex<HashSet<u32>>>,
) -> Result<Self, AgentError> {
let config = Config::new_from_execute_request(execute_request)?;
Ok(Self::new(config))
Ok(Self::new(config, child_processes))
}

pub async fn run(&self) -> AgentResult<AgentOutput> {
Expand Down
8 changes: 6 additions & 2 deletions src/agent/src/workload/service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::runner::Runner;
use crate::agent::{self, execute_response::Stage, ExecuteRequest, ExecuteResponse, SignalRequest};
use agent::workload_runner_server::WorkloadRunner;
use once_cell::sync::Lazy;
use std::collections::HashSet;
use std::{process, sync::Arc};
use tokio::sync::Mutex;
Expand All @@ -9,6 +10,9 @@ use tonic::{Request, Response};

type Result<T> = std::result::Result<Response<T>, tonic::Status>;

static CHILD_PROCESSES: Lazy<Arc<Mutex<HashSet<u32>>>> =
Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));

pub struct WorkloadRunnerService;

#[tonic::async_trait]
Expand All @@ -20,7 +24,7 @@ impl WorkloadRunner for WorkloadRunnerService {

let execute_request = req.into_inner();

let runner = Runner::new_from_execute_request(execute_request)
let runner = Runner::new_from_execute_request(execute_request, CHILD_PROCESSES.clone())
.map_err(|e| tonic::Status::internal(e.to_string()))?;

let res = runner
Expand All @@ -45,7 +49,7 @@ impl WorkloadRunner for WorkloadRunnerService {
}

async fn signal(&self, _: Request<SignalRequest>) -> Result<()> {
let child_processes = self.child_processes.lock().await;
let child_processes = CHILD_PROCESSES.lock().await;

for &child_id in child_processes.iter() {
nix::sys::signal::kill(
Expand Down

0 comments on commit fe0e4d2

Please sign in to comment.