Skip to content

Commit

Permalink
feat: send code though gRPC to the agent (virt-do#37)
Browse files Browse the repository at this point in the history
* feat(agent/proto): add agent configuration in execute request

Signed-off-by: Martin Moreira de Jesus <martin.moreira-de-jesus@protonmail.com>

* feat: send code through cli and api to vm

Signed-off-by: Mauran <thomas.mauran@etu.umontpellier.fr>

---------

Signed-off-by: Martin Moreira de Jesus <martin.moreira-de-jesus@protonmail.com>
Signed-off-by: Mauran <thomas.mauran@etu.umontpellier.fr>
Co-authored-by: Martin Moreira de Jesus <martin.moreira-de-jesus@protonmail.com>
  • Loading branch information
2 people authored and sylvain-pierrot committed May 2, 2024
1 parent a2046f9 commit 71b9fc5
Show file tree
Hide file tree
Showing 16 changed files with 149 additions and 93 deletions.
17 changes: 15 additions & 2 deletions proto/agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,19 @@ syntax = "proto3";
package cloudlet.agent;
import "google/protobuf/empty.proto";

message ExecuteRequest {}
message ExecuteRequest {
enum Action {
RUN = 0;
PREPARE = 1;
PREPARE_AND_RUN = 2;
}

string workload_name = 1;
string language = 2;
Action action = 3;
string code = 4;
string config_str = 5;
}

message ExecuteResponse {
enum Stage {
Expand All @@ -16,7 +28,8 @@ message ExecuteResponse {

string stdout = 1;
string stderr = 2;
int32 exit_code = 3;
Stage stage = 3;
int32 exit_code = 4;
}

message SignalRequest {
Expand Down
7 changes: 3 additions & 4 deletions proto/vmm.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ service VmmService {
}

message RunVmmRequest {

Language language = 1;
string code = 2;
string env = 3;
string workload_name = 1;
Language language = 2;
string code = 3;
LogLevel log_level = 4;

}
Expand Down
2 changes: 1 addition & 1 deletion src/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ name = "agent"
path = "src/lib.rs"

[dependencies]
clap = { version = "4.5.4", features = ["derive"] }
clap = { version = "4.5.4", features = ["derive", "env"] }
prost = "0.12.4"
rand = "0.8.5"
serde = { version = "1.0.197", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion src/agent/examples/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ address = "localhost"
port = 50051

[build]
source-code-path = "CHANGE/cloudlet/src/agent/examples/main.rs"
source-code-path = "CHANGEME/src/agent/examples/main.rs"
release = true
18 changes: 17 additions & 1 deletion src/agent/src/agents/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::AgentResult;
use crate::{AgentError, AgentResult};
use serde::Deserialize;

#[cfg(feature = "debug-agent")]
Expand Down Expand Up @@ -34,3 +34,19 @@ impl std::fmt::Display for Language {
}
}
}

impl TryFrom<&str> for Language {
type Error = AgentError;

fn try_from(value: &str) -> Result<Self, AgentError> {
match value {
"rust" => Ok(Language::Rust),
#[cfg(feature = "debug-agent")]
"debug" => Ok(Language::Debug),
_ => Err(AgentError::InvalidLanguage(format!(
"Invalid language: {}",
value
))),
}
}
}
10 changes: 5 additions & 5 deletions src/agent/src/agents/rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::{fs::create_dir_all, process::Command};
#[serde(rename_all = "kebab-case")]
struct RustAgentBuildConfig {
release: bool,
source_code_path: String,
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -66,8 +65,6 @@ impl From<workload::config::Config> for RustAgent {

impl Agent for RustAgent {
fn prepare(&self) -> AgentResult<AgentOutput> {
let code = std::fs::read_to_string(&self.rust_config.build.source_code_path).unwrap();

let function_dir = format!(
"/tmp/{}",
Alphanumeric.sample_string(&mut rand::thread_rng(), 16)
Expand All @@ -77,8 +74,11 @@ impl Agent for RustAgent {

create_dir_all(format!("{}/src", &function_dir)).expect("Unable to create directory");

std::fs::write(format!("{}/src/main.rs", &function_dir), code)
.expect("Unable to write main.rs file");
std::fs::write(
format!("{}/src/main.rs", &function_dir),
&self.workload_config.code,
)
.expect("Unable to write main.rs file");

let cargo_toml = format!(
r#"
Expand Down
2 changes: 2 additions & 0 deletions src/agent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod workload;
pub enum AgentError {
OpenConfigFileError(std::io::Error),
ParseConfigError(toml::de::Error),
InvalidLanguage(String),
BuildFailed(AgentOutput),
}

Expand All @@ -17,6 +18,7 @@ impl fmt::Display for AgentError {
AgentError::OpenConfigFileError(e) => write!(f, "Failed to open config file: {}", e),
AgentError::ParseConfigError(e) => write!(f, "Failed to parse config file: {}", e),
AgentError::BuildFailed(output) => write!(f, "Build failed: {:?}", output),
AgentError::InvalidLanguage(e) => write!(f, "Invalid language: {}", e),
}
}
}
Expand Down
19 changes: 8 additions & 11 deletions src/agent/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,29 @@
use agent::{
agent::workload_runner_server::WorkloadRunnerServer,
workload::{config::Config, runner::Runner, service::WorkloadRunnerService},
agent::workload_runner_server::WorkloadRunnerServer, workload::service::WorkloadRunnerService,
};
use clap::Parser;
use std::{net::ToSocketAddrs, path::PathBuf};
use std::net::ToSocketAddrs;
use tonic::transport::Server;

#[derive(Debug, Parser)]
struct Args {
#[clap(short, long, default_value = "/etc/cloudlet/agent/config.toml")]
config: PathBuf,
#[clap(long, env, default_value = "0.0.0.0")]
grpc_server_address: String,
#[clap(long, env, default_value = "50051")]
grpc_server_port: u16,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();

let config = Config::from_file(&args.config).unwrap();

let bind_address = format!("{}:{}", config.server.address, config.server.port)
let bind_address = format!("{}:{}", args.grpc_server_address, args.grpc_server_port)
.to_socket_addrs()
.unwrap()
.next()
.unwrap();

let runner = Runner::new(config);

let server = WorkloadRunnerService::new(runner);
let server = WorkloadRunnerService;

Server::builder()
.add_service(WorkloadRunnerServer::new(server))
Expand Down
38 changes: 27 additions & 11 deletions src/agent/src/workload/config.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use crate::{agents::Language, AgentError, AgentResult};
use crate::{
agent::{execute_request, ExecuteRequest},
agents::Language,
AgentError, AgentResult,
};
use serde::Deserialize;
use std::path::PathBuf;

#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct Server {
pub address: String,
pub port: u16,
}

/// Generic agent configuration.
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "kebab-case")]
Expand All @@ -19,10 +16,9 @@ pub struct Config {
pub language: Language,
/// Action to perform.
pub action: Action,
/// Server configuration.
pub server: Server,
/// Code
pub code: String,
/// Rest of the configuration as a string.
#[serde(skip)]
pub config_string: String,
}

Expand All @@ -38,6 +34,16 @@ impl Config {

Ok(config)
}

pub fn new_from_execute_request(execute_request: ExecuteRequest) -> Result<Self, AgentError> {
Ok(Self {
workload_name: execute_request.workload_name.clone(),
language: Language::try_from(execute_request.language.clone().as_str())?,
action: execute_request.action().into(),
config_string: execute_request.config_str,
code: execute_request.code,
})
}
}

#[derive(Debug, Clone, Deserialize)]
Expand All @@ -47,3 +53,13 @@ pub enum Action {
Run,
PrepareAndRun,
}

impl From<execute_request::Action> for Action {
fn from(value: execute_request::Action) -> Self {
match value {
execute_request::Action::Prepare => Action::Prepare,
execute_request::Action::Run => Action::Run,
execute_request::Action::PrepareAndRun => Action::PrepareAndRun,
}
}
}
8 changes: 7 additions & 1 deletion src/agent/src/workload/runner.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::{
agent::ExecuteRequest,
agents::{rust, Agent, AgentOutput, Language},
workload::config::Action,
AgentResult,
AgentError, AgentResult,
};

#[cfg(feature = "debug-agent")]
Expand All @@ -27,6 +28,11 @@ impl Runner {
Runner { config, agent }
}

pub fn new_from_execute_request(execute_request: ExecuteRequest) -> Result<Self, AgentError> {
let config = Config::new_from_execute_request(execute_request)?;
Ok(Self::new(config))
}

pub fn run(&self) -> AgentResult<AgentOutput> {
let result = match self.config.action {
Action::Prepare => self.agent.prepare()?,
Expand Down
29 changes: 9 additions & 20 deletions src/agent/src/workload/service.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,33 @@
use super::runner::Runner;
use crate::agent::{self, ExecuteRequest, ExecuteResponse, SignalRequest};
use crate::agent::{self, execute_response::Stage, ExecuteRequest, ExecuteResponse, SignalRequest};
use agent::workload_runner_server::WorkloadRunner;
use std::{process, sync::Arc};
use tokio::sync::Mutex;
use std::process;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response};

type Result<T> = std::result::Result<Response<T>, tonic::Status>;

pub struct WorkloadRunnerService {
runner: Arc<Mutex<Runner>>,
}

impl WorkloadRunnerService {
pub fn new(runner: Runner) -> Self {
WorkloadRunnerService {
runner: Arc::new(Mutex::new(runner)),
}
}
}
pub struct WorkloadRunnerService;

#[tonic::async_trait]
impl WorkloadRunner for WorkloadRunnerService {
type ExecuteStream = ReceiverStream<std::result::Result<ExecuteResponse, tonic::Status>>;

async fn execute(&self, _: Request<ExecuteRequest>) -> Result<Self::ExecuteStream> {
async fn execute(&self, req: Request<ExecuteRequest>) -> Result<Self::ExecuteStream> {
let (tx, rx) = tokio::sync::mpsc::channel(4);

// We assume there's only one request at a time
let runner = self
.runner
.try_lock()
.map_err(|e| tonic::Status::unavailable(format!("Runner is busy: {:?}", e)))?;
let execute_request = req.into_inner();

let runner = Runner::new_from_execute_request(execute_request)
.map_err(|e| tonic::Status::internal(e.to_string()))?;

let res = runner
.run()
.map_err(|e| tonic::Status::internal(e.to_string()))?;

let _ = tx
.send(Ok(ExecuteResponse {
stage: Stage::Done as i32,
stdout: res.stdout,
stderr: res.stderr,
exit_code: res.exit_code,
Expand Down
12 changes: 9 additions & 3 deletions src/api/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use actix_web::{post, web, HttpResponse, Responder};
use actix_web_lab::sse;
use async_stream::stream;
use serde::Serialize;
use shared_models::CloudletDtoRequest;
use shared_models::{CloudletDtoRequest, Language};
use tokio_stream::StreamExt;
use tonic::Streaming;

Expand All @@ -16,10 +16,16 @@ pub async fn run(req_body: web::Json<CloudletDtoRequest>) -> impl Responder {

let mut client = VmmClient::new().await.unwrap();

println!("Request: {:?}", req);

let vmm_request = RunVmmRequest {
workload_name: req.workload_name,
code: req.code,
env: req.env,
language: req.language as i32,
language: match req.language {
Language::PYTHON => 0,
Language::NODE => 1,
Language::RUST => 2,
},
log_level: req.log_level as i32,
};

Expand Down
Loading

0 comments on commit 71b9fc5

Please sign in to comment.