Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: send code though gRPC to the agent #37

Merged
merged 13 commits into from
May 1, 2024
17 changes: 15 additions & 2 deletions proto/agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,19 @@ syntax = "proto3";
package cloudlet.agent;
import "google/protobuf/empty.proto";

message ExecuteRequest {}
message ExecuteRequest {
enum Action {
RUN = 0;
PREPARE = 1;
PREPARE_AND_RUN = 2;
}

string workload_name = 1;
string language = 2;
Action action = 3;
string code = 4;
string config_str = 5;
}

message ExecuteResponse {
enum Stage {
Expand All @@ -16,7 +28,8 @@ message ExecuteResponse {

string stdout = 1;
string stderr = 2;
int32 exit_code = 3;
Stage stage = 3;
int32 exit_code = 4;
}

message SignalRequest {
Expand Down
7 changes: 3 additions & 4 deletions proto/vmm.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ service VmmService {
}

message RunVmmRequest {

Language language = 1;
string code = 2;
string env = 3;
string workload_name = 1;
Language language = 2;
string code = 3;
LogLevel log_level = 4;

}
Expand Down
2 changes: 1 addition & 1 deletion src/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ name = "agent"
path = "src/lib.rs"

[dependencies]
clap = { version = "4.5.4", features = ["derive"] }
clap = { version = "4.5.4", features = ["derive", "env"] }
prost = "0.12.4"
rand = "0.8.5"
serde = { version = "1.0.197", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion src/agent/examples/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ address = "localhost"
port = 50051

[build]
source-code-path = "CHANGE/cloudlet/src/agent/examples/main.rs"
source-code-path = "CHANGEME/src/agent/examples/main.rs"
release = true
18 changes: 17 additions & 1 deletion src/agent/src/agents/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::AgentResult;
use crate::{AgentError, AgentResult};
use serde::Deserialize;

#[cfg(feature = "debug-agent")]
Expand Down Expand Up @@ -34,3 +34,19 @@ impl std::fmt::Display for Language {
}
}
}

impl TryFrom<&str> for Language {
type Error = AgentError;

fn try_from(value: &str) -> Result<Self, AgentError> {
match value {
"rust" => Ok(Language::Rust),
#[cfg(feature = "debug-agent")]
"debug" => Ok(Language::Debug),
_ => Err(AgentError::InvalidLanguage(format!(
"Invalid language: {}",
value
))),
}
}
}
10 changes: 5 additions & 5 deletions src/agent/src/agents/rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::{fs::create_dir_all, process::Command};
#[serde(rename_all = "kebab-case")]
struct RustAgentBuildConfig {
release: bool,
source_code_path: String,
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -66,8 +65,6 @@ impl From<workload::config::Config> for RustAgent {

impl Agent for RustAgent {
fn prepare(&self) -> AgentResult<AgentOutput> {
let code = std::fs::read_to_string(&self.rust_config.build.source_code_path).unwrap();

let function_dir = format!(
"/tmp/{}",
Alphanumeric.sample_string(&mut rand::thread_rng(), 16)
Expand All @@ -77,8 +74,11 @@ impl Agent for RustAgent {

create_dir_all(format!("{}/src", &function_dir)).expect("Unable to create directory");

std::fs::write(format!("{}/src/main.rs", &function_dir), code)
.expect("Unable to write main.rs file");
std::fs::write(
format!("{}/src/main.rs", &function_dir),
&self.workload_config.code,
)
.expect("Unable to write main.rs file");

let cargo_toml = format!(
r#"
Expand Down
2 changes: 2 additions & 0 deletions src/agent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod workload;
pub enum AgentError {
OpenConfigFileError(std::io::Error),
ParseConfigError(toml::de::Error),
InvalidLanguage(String),
BuildFailed(AgentOutput),
}

Expand All @@ -17,6 +18,7 @@ impl fmt::Display for AgentError {
AgentError::OpenConfigFileError(e) => write!(f, "Failed to open config file: {}", e),
AgentError::ParseConfigError(e) => write!(f, "Failed to parse config file: {}", e),
AgentError::BuildFailed(output) => write!(f, "Build failed: {:?}", output),
AgentError::InvalidLanguage(e) => write!(f, "Invalid language: {}", e),
}
}
}
Expand Down
19 changes: 8 additions & 11 deletions src/agent/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,29 @@
use agent::{
agent::workload_runner_server::WorkloadRunnerServer,
workload::{config::Config, runner::Runner, service::WorkloadRunnerService},
agent::workload_runner_server::WorkloadRunnerServer, workload::service::WorkloadRunnerService,
};
use clap::Parser;
use std::{net::ToSocketAddrs, path::PathBuf};
use std::net::ToSocketAddrs;
use tonic::transport::Server;

#[derive(Debug, Parser)]
struct Args {
#[clap(short, long, default_value = "/etc/cloudlet/agent/config.toml")]
config: PathBuf,
#[clap(long, env, default_value = "0.0.0.0")]
grpc_server_address: String,
#[clap(long, env, default_value = "50051")]
grpc_server_port: u16,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();

let config = Config::from_file(&args.config).unwrap();

let bind_address = format!("{}:{}", config.server.address, config.server.port)
let bind_address = format!("{}:{}", args.grpc_server_address, args.grpc_server_port)
.to_socket_addrs()
.unwrap()
.next()
.unwrap();

let runner = Runner::new(config);

let server = WorkloadRunnerService::new(runner);
let server = WorkloadRunnerService;

Server::builder()
.add_service(WorkloadRunnerServer::new(server))
Expand Down
38 changes: 27 additions & 11 deletions src/agent/src/workload/config.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use crate::{agents::Language, AgentError, AgentResult};
use crate::{
agent::{execute_request, ExecuteRequest},
agents::Language,
AgentError, AgentResult,
};
use serde::Deserialize;
use std::path::PathBuf;

#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct Server {
pub address: String,
pub port: u16,
}

/// Generic agent configuration.
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "kebab-case")]
Expand All @@ -19,10 +16,9 @@ pub struct Config {
pub language: Language,
/// Action to perform.
pub action: Action,
/// Server configuration.
pub server: Server,
/// Code
pub code: String,
/// Rest of the configuration as a string.
#[serde(skip)]
pub config_string: String,
}

Expand All @@ -38,6 +34,16 @@ impl Config {

Ok(config)
}

pub fn new_from_execute_request(execute_request: ExecuteRequest) -> Result<Self, AgentError> {
Ok(Self {
workload_name: execute_request.workload_name.clone(),
language: Language::try_from(execute_request.language.clone().as_str())?,
action: execute_request.action().into(),
config_string: execute_request.config_str,
code: execute_request.code,
})
}
}

#[derive(Debug, Clone, Deserialize)]
Expand All @@ -47,3 +53,13 @@ pub enum Action {
Run,
PrepareAndRun,
}

impl From<execute_request::Action> for Action {
fn from(value: execute_request::Action) -> Self {
match value {
execute_request::Action::Prepare => Action::Prepare,
execute_request::Action::Run => Action::Run,
execute_request::Action::PrepareAndRun => Action::PrepareAndRun,
}
}
}
8 changes: 7 additions & 1 deletion src/agent/src/workload/runner.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::{
agent::ExecuteRequest,
agents::{rust, Agent, AgentOutput, Language},
workload::config::Action,
AgentResult,
AgentError, AgentResult,
};

#[cfg(feature = "debug-agent")]
Expand All @@ -27,6 +28,11 @@ impl Runner {
Runner { config, agent }
}

pub fn new_from_execute_request(execute_request: ExecuteRequest) -> Result<Self, AgentError> {
let config = Config::new_from_execute_request(execute_request)?;
Ok(Self::new(config))
}

pub fn run(&self) -> AgentResult<AgentOutput> {
let result = match self.config.action {
Action::Prepare => self.agent.prepare()?,
Expand Down
29 changes: 9 additions & 20 deletions src/agent/src/workload/service.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,33 @@
use super::runner::Runner;
use crate::agent::{self, ExecuteRequest, ExecuteResponse, SignalRequest};
use crate::agent::{self, execute_response::Stage, ExecuteRequest, ExecuteResponse, SignalRequest};
use agent::workload_runner_server::WorkloadRunner;
use std::{process, sync::Arc};
use tokio::sync::Mutex;
use std::process;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response};

type Result<T> = std::result::Result<Response<T>, tonic::Status>;

pub struct WorkloadRunnerService {
runner: Arc<Mutex<Runner>>,
}

impl WorkloadRunnerService {
pub fn new(runner: Runner) -> Self {
WorkloadRunnerService {
runner: Arc::new(Mutex::new(runner)),
}
}
}
pub struct WorkloadRunnerService;

#[tonic::async_trait]
impl WorkloadRunner for WorkloadRunnerService {
type ExecuteStream = ReceiverStream<std::result::Result<ExecuteResponse, tonic::Status>>;

async fn execute(&self, _: Request<ExecuteRequest>) -> Result<Self::ExecuteStream> {
async fn execute(&self, req: Request<ExecuteRequest>) -> Result<Self::ExecuteStream> {
let (tx, rx) = tokio::sync::mpsc::channel(4);

// We assume there's only one request at a time
let runner = self
.runner
.try_lock()
.map_err(|e| tonic::Status::unavailable(format!("Runner is busy: {:?}", e)))?;
let execute_request = req.into_inner();

let runner = Runner::new_from_execute_request(execute_request)
.map_err(|e| tonic::Status::internal(e.to_string()))?;

let res = runner
.run()
.map_err(|e| tonic::Status::internal(e.to_string()))?;

let _ = tx
.send(Ok(ExecuteResponse {
stage: Stage::Done as i32,
stdout: res.stdout,
stderr: res.stderr,
exit_code: res.exit_code,
Expand Down
12 changes: 9 additions & 3 deletions src/api/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use actix_web::{post, web, HttpResponse, Responder};
use actix_web_lab::sse;
use async_stream::stream;
use serde::Serialize;
use shared_models::CloudletDtoRequest;
use shared_models::{CloudletDtoRequest, Language};
use tokio_stream::StreamExt;
use tonic::Streaming;

Expand All @@ -16,10 +16,16 @@ pub async fn run(req_body: web::Json<CloudletDtoRequest>) -> impl Responder {

let mut client = VmmClient::new().await.unwrap();

println!("Request: {:?}", req);

let vmm_request = RunVmmRequest {
workload_name: req.workload_name,
code: req.code,
env: req.env,
language: req.language as i32,
language: match req.language {
Language::PYTHON => 0,
Language::NODE => 1,
Language::RUST => 2,
},
log_level: req.log_level as i32,
};

Expand Down
Loading