Skip to content

Commit

Permalink
feat(agent): modify rust agent to run from gRPC config
Browse files Browse the repository at this point in the history
Signed-off-by: Mauran <thomas.mauran@etu.umontpellier.fr>
  • Loading branch information
mmoreiradj authored and thomas-mauran committed May 1, 2024
1 parent 06f10be commit 2413a19
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 49 deletions.
2 changes: 1 addition & 1 deletion src/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ name = "agent"
path = "src/lib.rs"

[dependencies]
clap = { version = "4.5.4", features = ["derive"] }
clap = { version = "4.5.4", features = ["derive", "env"] }
prost = "0.12.4"
rand = "0.8.5"
serde = { version = "1.0.197", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion src/agent/examples/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ address = "localhost"
port = 50051

[build]
source-code-path = "CHANGE/cloudlet/src/agent/examples/main.rs"
source-code-path = "/opt/repositories/virt-do/cloudlet/src/agent/examples/main.rs"
release = true
13 changes: 13 additions & 0 deletions src/agent/src/agents/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,16 @@ impl std::fmt::Display for Language {
}
}
}

impl TryFrom<&str> for Language {
type Error = String;

fn try_from(value: &str) -> Result<Self, Self::Error> {
match value {
"rust" => Ok(Language::Rust),
#[cfg(feature = "debug-agent")]
"debug" => Ok(Language::Debug),
_ => Err(format!("Invalid language: {}", value)),
}
}
}
10 changes: 5 additions & 5 deletions src/agent/src/agents/rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::{fs::create_dir_all, process::Command};
#[serde(rename_all = "kebab-case")]
struct RustAgentBuildConfig {
release: bool,
source_code_path: String,
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -66,8 +65,6 @@ impl From<workload::config::Config> for RustAgent {

impl Agent for RustAgent {
fn prepare(&self) -> AgentResult<AgentOutput> {
let code = std::fs::read_to_string(&self.rust_config.build.source_code_path).unwrap();

let function_dir = format!(
"/tmp/{}",
Alphanumeric.sample_string(&mut rand::thread_rng(), 16)
Expand All @@ -77,8 +74,11 @@ impl Agent for RustAgent {

create_dir_all(format!("{}/src", &function_dir)).expect("Unable to create directory");

std::fs::write(format!("{}/src/main.rs", &function_dir), code)
.expect("Unable to write main.rs file");
std::fs::write(
format!("{}/src/main.rs", &function_dir),
&self.workload_config.code,
)
.expect("Unable to write main.rs file");

let cargo_toml = format!(
r#"
Expand Down
19 changes: 8 additions & 11 deletions src/agent/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,29 @@
use agent::{
agent::workload_runner_server::WorkloadRunnerServer,
workload::{config::Config, runner::Runner, service::WorkloadRunnerService},
agent::workload_runner_server::WorkloadRunnerServer, workload::service::WorkloadRunnerService,
};
use clap::Parser;
use std::{net::ToSocketAddrs, path::PathBuf};
use std::net::ToSocketAddrs;
use tonic::transport::Server;

#[derive(Debug, Parser)]
struct Args {
#[clap(short, long, default_value = "/etc/cloudlet/agent/config.toml")]
config: PathBuf,
#[clap(long, env, default_value = "localhost")]
grpc_server_address: String,
#[clap(long, env, default_value = "50051")]
grpc_server_port: u16,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();

let config = Config::from_file(&args.config).unwrap();

let bind_address = format!("{}:{}", config.server.address, config.server.port)
let bind_address = format!("{}:{}", args.grpc_server_address, args.grpc_server_port)
.to_socket_addrs()
.unwrap()
.next()
.unwrap();

let runner = Runner::new(config);

let server = WorkloadRunnerService::new(runner);
let server = WorkloadRunnerService;

Server::builder()
.add_service(WorkloadRunnerServer::new(server))
Expand Down
39 changes: 28 additions & 11 deletions src/agent/src/workload/config.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use crate::{agents::Language, AgentError, AgentResult};
use crate::{
agent::{execute_request, ExecuteRequest},
agents::Language,
AgentError, AgentResult,
};
use serde::Deserialize;
use std::path::PathBuf;

#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct Server {
pub address: String,
pub port: u16,
}

/// Generic agent configuration.
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "kebab-case")]
Expand All @@ -19,10 +16,9 @@ pub struct Config {
pub language: Language,
/// Action to perform.
pub action: Action,
/// Server configuration.
pub server: Server,
/// Code
pub code: String,
/// Rest of the configuration as a string.
#[serde(skip)]
pub config_string: String,
}

Expand All @@ -38,6 +34,17 @@ impl Config {

Ok(config)
}

pub fn new_from_execute_request(execute_request: ExecuteRequest) -> Self {
Self {
workload_name: execute_request.workload_name.clone(),
// TODO: Fix this unwrap
language: Language::try_from(execute_request.language.clone().as_str()).unwrap(),
action: execute_request.action().into(),
config_string: execute_request.config_str,
code: execute_request.code,
}
}
}

#[derive(Debug, Clone, Deserialize)]
Expand All @@ -47,3 +54,13 @@ pub enum Action {
Run,
PrepareAndRun,
}

impl From<execute_request::Action> for Action {
fn from(value: execute_request::Action) -> Self {
match value {
execute_request::Action::Prepare => Action::Prepare,
execute_request::Action::Run => Action::Run,
execute_request::Action::PrepareAndRun => Action::PrepareAndRun,
}
}
}
6 changes: 6 additions & 0 deletions src/agent/src/workload/runner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
agent::ExecuteRequest,
agents::{rust, Agent, AgentOutput, Language},
workload::config::Action,
AgentResult,
Expand Down Expand Up @@ -27,6 +28,11 @@ impl Runner {
Runner { config, agent }
}

pub fn new_from_execute_request(execute_request: ExecuteRequest) -> Self {
let config = Config::new_from_execute_request(execute_request);
Self::new(config)
}

pub fn run(&self) -> AgentResult<AgentOutput> {
let result = match self.config.action {
Action::Prepare => self.agent.prepare()?,
Expand Down
28 changes: 8 additions & 20 deletions src/agent/src/workload/service.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,32 @@
use super::runner::Runner;
use crate::agent::{self, ExecuteRequest, ExecuteResponse, SignalRequest};
use crate::agent::{self, execute_response::Stage, ExecuteRequest, ExecuteResponse, SignalRequest};
use agent::workload_runner_server::WorkloadRunner;
use std::{process, sync::Arc};
use tokio::sync::Mutex;
use std::process;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response};

type Result<T> = std::result::Result<Response<T>, tonic::Status>;

pub struct WorkloadRunnerService {
runner: Arc<Mutex<Runner>>,
}

impl WorkloadRunnerService {
pub fn new(runner: Runner) -> Self {
WorkloadRunnerService {
runner: Arc::new(Mutex::new(runner)),
}
}
}
pub struct WorkloadRunnerService;

#[tonic::async_trait]
impl WorkloadRunner for WorkloadRunnerService {
type ExecuteStream = ReceiverStream<std::result::Result<ExecuteResponse, tonic::Status>>;

async fn execute(&self, _: Request<ExecuteRequest>) -> Result<Self::ExecuteStream> {
async fn execute(&self, req: Request<ExecuteRequest>) -> Result<Self::ExecuteStream> {
let (tx, rx) = tokio::sync::mpsc::channel(4);

// We assume there's only one request at a time
let runner = self
.runner
.try_lock()
.map_err(|e| tonic::Status::unavailable(format!("Runner is busy: {:?}", e)))?;
let execute_request = req.into_inner();

let runner = Runner::new_from_execute_request(execute_request);

let res = runner
.run()
.map_err(|e| tonic::Status::internal(e.to_string()))?;

let _ = tx
.send(Ok(ExecuteResponse {
stage: Stage::Done as i32,
stdout: res.stdout,
stderr: res.stderr,
exit_code: res.exit_code,
Expand Down

0 comments on commit 2413a19

Please sign in to comment.