Skip to content

Commit

Permalink
API: add orchestration domain registration structures
Browse files Browse the repository at this point in the history
  • Loading branch information
ccicconetti committed Nov 26, 2024
1 parent 11beb71 commit 7475c69
Show file tree
Hide file tree
Showing 6 changed files with 345 additions and 20 deletions.
52 changes: 49 additions & 3 deletions edgeless_api/proto/messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ message StartComponentResponse {
optional InstanceIdVariant instance_id = 2;
}

// Possible message types of FunctionInstance::UpdateNode().
// Possible message types of NodeRegistrationAPI::UpdateNode().
enum UpdateNodeRequestType {
REGISTER = 0;
DEREGISTER = 1;
}

// Request message of FunctionInstance::UpdateNode().
// Request message of NodeRegistrationAPI::UpdateNode().
message UpdateNodeRequest {
// Request type: registration or deregistration. Always present.
UpdateNodeRequestType request_type = 1;
Expand Down Expand Up @@ -135,7 +135,7 @@ message NodeCapabilities {
uint32 mem_size_gpu = 82;
}

// Response message of FunctionInstance::UpdateNode().
// Response message of NodeRegistrationAPI::UpdateNode().
message UpdateNodeResponse {
// If present it means that the request has been rejected.
// In this case, other fields may not be present or contain meaningful data.
Expand Down Expand Up @@ -329,6 +329,52 @@ message WorkflowInstanceList {
repeated WorkflowInstanceStatus workflow_statuses = 1;
}

// Request message of DomainRegistrationAPI::UpdateDomain().
message UpdateDomainRequest {
// Domain name.
string domain_id = 10;
// URL of the orchestrator server.
string orchestrator_url = 20;
// Domain capabilities.
DomainCapabilities capabilities = 30;
// Deadline for refreshing the domain request, in seconds since Unix epoch.
// After this time the orchestration domain can be considered to be offline.
uint64 refresh_deadline = 40;
}

// Domain capabilities exposed from the orchestrator to the controller.
message DomainCapabilities {
// Total number of (actual or virtual) CPUs.
uint32 num_cpus = 10;
// Total number of physical cores.
uint32 num_cores = 11;
// Total size of memory available, in MiB.
uint32 mem_size = 12;
// Superset of all the labels advertised by the nodes in the domain.
repeated string labels = 20;
// Number of nodes with a Trusted Execution Environment.
uint32 num_tee = 30;
// Number of nodes with a Trusted Platform Module.
uint32 num_tpm = 40;
// Superset of all the run-times supported by the nodes in the domain.
repeated string runtimes = 50;
// Total disk space, in MiB.
uint32 disk_tot_space = 60;
// Total number of (actual or virtual) GPUs.
uint32 num_gpus = 70;
// Total GPU memory available, in MiB.
uint32 mem_size_gpu = 71;
}

// Response message of DomainRegistrationAPI::UpdateDomain().
message UpdateDomainResponse {
// If present it means that the request has been rejected.
// In this case, other fields may not be present or contain meaningful data.
optional ResponseError response_error = 1;
}

// message Domain

// Event types.
enum EventType {
// Function invocation for which a return value is expected.
Expand Down
12 changes: 11 additions & 1 deletion edgeless_api/proto/services.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ service NodeManagement {
rpc KeepAlive (google.protobuf.Empty) returns (KeepAliveResponse);
}

// API that allows nodes to register themselves with the orchestrator
// API that allows nodes to register themselves with the orchestrator.
service NodeRegistration {
// Register a new node on an orchestrator or deregister an existing node.
// Input: registration: identifier of the new node and agent/invocation URLs;
Expand All @@ -51,6 +51,16 @@ service NodeRegistration {
rpc UpdateNode (UpdateNodeRequest) returns (UpdateNodeResponse);
}

// API that allows the orchestrators to register with the controller.
service DomainRegistration {
// Register a new orchestration domain on a controller.
// Input: registration data, including a deadline by which the orchestration
// domain can be considered offline; to disconnect a domain, use a value in
// the past for this deadline
// Output: UpdateDomainResponse.
rpc UpdateDomain (UpdateDomainRequest) returns (UpdateDomainResponse);
}

// API to manage the lifecycle of workflow instances (s04).
service WorkflowInstance {
// Start a new workflow.
Expand Down
87 changes: 87 additions & 0 deletions edgeless_api/src/domain_registration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// SPDX-FileCopyrightText: © 2024 Claudio Cicconetti <c.cicconetti@iit.cnr.it>
// SPDX-License-Identifier: MIT

#[derive(Debug, Clone, PartialEq, Default)]
pub struct DomainCapabilities {
// Total number of (actual or virtual) CPUs.
pub num_cpus: u32,
// Total number of physical cores.
pub num_cores: u32,
// Total size of memory available, in MiB.
pub mem_size: u32,
// Superset of all the labels advertised by the nodes in the domain.
pub labels: std::collections::HashSet<String>,
// Number of nodes with a Trusted Execution Environment.
pub num_tee: u32,
// Number of nodes with a Trusted Platform Module.
pub num_tpm: u32,
// Superset of all the run-times supported by the nodes in the domain.
pub runtimes: std::collections::HashSet<String>,
// Total disk space, in MiB.
pub disk_tot_space: u32,
// Total number of (actual or virtual) GPUs.
pub num_gpus: u32,
// Total GPU memory available, in MiB.
pub mem_size_gpu: u32,
}

impl std::fmt::Display for DomainCapabilities {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"{} CPUs ({} cores) with {} MiB, labels [{}], num TEE {}, num TPM {}, runtimes [{}], disk space {} MiB, {} GPUs with {} MiB",
self.num_cpus,
self.num_cores,
self.mem_size,
self.labels.iter().map(|x| x.to_string()).collect::<Vec<String>>().join(","),
self.num_tee,
self.num_tpm,
self.runtimes.iter().map(|x| x.to_string()).collect::<Vec<String>>().join(","),
self.disk_tot_space,
self.num_gpus,
self.mem_size_gpu,
)
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct UpdateDomainRequest {
// Domain name.
pub domain_id: String,
// URL of the orchestrator server.
pub orchestrator_url: String,
// Domain capabilities.
pub capabilities: DomainCapabilities,
// Deadline for refreshing the domain request.
// After this time the orchestration domain can be considered to be offline.
pub refresh_deadline: std::time::SystemTime,
}

#[derive(Debug, Clone, PartialEq)]
pub enum UpdateDomainResponse {
ResponseError(crate::common::ResponseError),
Accepted,
}

#[async_trait::async_trait]
pub trait DomainRegistrationAPI: DomainRegistrationAPIClone + Sync + Send {
async fn update_domain(&mut self, request: UpdateDomainRequest) -> anyhow::Result<UpdateDomainResponse>;
}

// https://stackoverflow.com/a/30353928
pub trait DomainRegistrationAPIClone {
fn clone_box(&self) -> Box<dyn DomainRegistrationAPI>;
}
impl<T> DomainRegistrationAPIClone for T
where
T: 'static + DomainRegistrationAPI + Clone,
{
fn clone_box(&self) -> Box<dyn DomainRegistrationAPI> {
Box::new(self.clone())
}
}
impl Clone for Box<dyn DomainRegistrationAPI> {
fn clone(&self) -> Box<dyn DomainRegistrationAPI> {
self.clone_box()
}
}
193 changes: 193 additions & 0 deletions edgeless_api/src/grpc_impl/domain_registration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// SPDX-FileCopyrightText: © 2024 Technical University of Munich, Chair of Connected Mobility
// SPDX-FileCopyrightText: © 2024 Claudio Cicconetti <c.cicconetti@iit.cnr.it>
// SPDX-FileCopyrightText: © 2024 Siemens AG
// SPDX-License-Identifier: MIT

#[derive(Clone)]
pub struct DomainRegistrationClient {
client: crate::grpc_impl::api::domain_registration_client::DomainRegistrationClient<tonic::transport::Channel>,
}

pub struct DomainRegistrationAPIService {
pub domain_registration_api: tokio::sync::Mutex<Box<dyn crate::domain_registration::DomainRegistrationAPI>>,
}

impl DomainRegistrationClient {
pub async fn new(server_addr: &str, retry_interval: Option<u64>) -> anyhow::Result<Self> {
loop {
match crate::grpc_impl::api::domain_registration_client::DomainRegistrationClient::connect(server_addr.to_string()).await {
Ok(client) => {
let client = client.max_decoding_message_size(usize::MAX);
return Ok(Self { client });
}
Err(err) => match retry_interval {
Some(val) => tokio::time::sleep(tokio::time::Duration::from_secs(val)).await,
None => {
return Err(anyhow::anyhow!("Error when connecting to {}: {}", server_addr, err));
}
},
}
}
}
}

#[async_trait::async_trait]
impl crate::domain_registration::DomainRegistrationAPI for DomainRegistrationClient {
async fn update_domain(
&mut self,
request: crate::domain_registration::UpdateDomainRequest,
) -> anyhow::Result<crate::domain_registration::UpdateDomainResponse> {
match self
.client
.update_domain(tonic::Request::new(serialize_update_domain_request(&request)))
.await
{
Ok(res) => parse_update_domain_response(&res.into_inner()),
Err(err) => Err(anyhow::anyhow!("Communication error while updating a domain: {}", err.to_string())),
}
}
}

#[async_trait::async_trait]
impl crate::grpc_impl::api::domain_registration_server::DomainRegistration for DomainRegistrationAPIService {
async fn update_domain(
&self,
request: tonic::Request<crate::grpc_impl::api::UpdateDomainRequest>,
) -> Result<tonic::Response<crate::grpc_impl::api::UpdateDomainResponse>, tonic::Status> {
let parsed_request = match parse_update_domain_request(&request.into_inner()) {
Ok(parsed_request) => parsed_request,
Err(err) => {
log::error!("Parse UpdateDomainRequest Failed: {}", err);
return Err(tonic::Status::invalid_argument(format!(
"Error when parsing an UpdateDomainRequest message: {}",
err
)));
}
};
match self.domain_registration_api.lock().await.update_domain(parsed_request).await {
Ok(res) => Ok(tonic::Response::new(serialize_update_domain_response(&res))),
Err(err) => Err(tonic::Status::internal(format!("Error when updating a node: {}", err))),
}
}
}

fn parse_domain_capabilities(api_instance: &crate::grpc_impl::api::DomainCapabilities) -> crate::domain_registration::DomainCapabilities {
crate::domain_registration::DomainCapabilities {
num_cpus: api_instance.num_cpus,
num_cores: api_instance.num_cores,
mem_size: api_instance.mem_size,
labels: std::collections::HashSet::from_iter(api_instance.labels.iter().cloned()),
num_tee: api_instance.num_tee,
num_tpm: api_instance.num_tpm,
runtimes: std::collections::HashSet::from_iter(api_instance.runtimes.iter().cloned()),
disk_tot_space: api_instance.disk_tot_space,
num_gpus: api_instance.num_gpus,
mem_size_gpu: api_instance.mem_size_gpu,
}
}

fn serialize_domain_capabilities(req: &crate::domain_registration::DomainCapabilities) -> crate::grpc_impl::api::DomainCapabilities {
crate::grpc_impl::api::DomainCapabilities {
num_cpus: req.num_cpus,
num_cores: req.num_cores,
mem_size: req.mem_size,
labels: req.labels.iter().cloned().collect::<Vec<String>>(),
num_tee: req.num_tee,
num_tpm: req.num_tpm,
runtimes: req.runtimes.iter().cloned().collect::<Vec<String>>(),
disk_tot_space: req.disk_tot_space,
num_gpus: req.num_gpus,
mem_size_gpu: req.mem_size_gpu,
}
}

fn parse_update_domain_request(
api_instance: &crate::grpc_impl::api::UpdateDomainRequest,
) -> anyhow::Result<crate::domain_registration::UpdateDomainRequest> {
let capabilities = match &api_instance.capabilities {
Some(capabilities) => parse_domain_capabilities(capabilities),
None => crate::domain_registration::DomainCapabilities::default(),
};
Ok(crate::domain_registration::UpdateDomainRequest {
domain_id: api_instance.domain_id.clone(),
orchestrator_url: api_instance.orchestrator_url.clone(),
capabilities,
refresh_deadline: std::time::UNIX_EPOCH + std::time::Duration::from_secs(api_instance.refresh_deadline),
})
}

fn serialize_update_domain_response(req: &crate::domain_registration::UpdateDomainResponse) -> crate::grpc_impl::api::UpdateDomainResponse {
match req {
crate::domain_registration::UpdateDomainResponse::ResponseError(err) => crate::grpc_impl::api::UpdateDomainResponse {
response_error: Some(crate::grpc_impl::api::ResponseError {
summary: err.summary.clone(),
detail: err.detail.clone(),
}),
},
crate::domain_registration::UpdateDomainResponse::Accepted => crate::grpc_impl::api::UpdateDomainResponse { response_error: None },
}
}

fn parse_update_domain_response(
api_instance: &crate::grpc_impl::api::UpdateDomainResponse,
) -> anyhow::Result<crate::domain_registration::UpdateDomainResponse> {
match api_instance.response_error.as_ref() {
Some(err) => Ok(crate::domain_registration::UpdateDomainResponse::ResponseError(
crate::common::ResponseError {
summary: err.summary.clone(),
detail: err.detail.clone(),
},
)),
None => Ok(crate::domain_registration::UpdateDomainResponse::Accepted),
}
}

fn serialize_update_domain_request(req: &crate::domain_registration::UpdateDomainRequest) -> crate::grpc_impl::api::UpdateDomainRequest {
crate::grpc_impl::api::UpdateDomainRequest {
domain_id: req.domain_id.clone(),
orchestrator_url: req.orchestrator_url.clone(),
capabilities: Some(serialize_domain_capabilities(&req.capabilities)),
refresh_deadline: req.refresh_deadline.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs(),
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::domain_registration::DomainCapabilities;
use crate::domain_registration::UpdateDomainRequest;
use crate::domain_registration::UpdateDomainResponse;

#[test]
fn serialize_deserialize_update_domain_request() {
let messages = vec![UpdateDomainRequest {
domain_id: "my-domain".to_string(),
orchestrator_url: "http://127.0.0.1:10000".to_string(),
capabilities: DomainCapabilities::default(),
refresh_deadline: std::time::UNIX_EPOCH + std::time::Duration::from_secs(313714800),
}];
for msg in messages {
match parse_update_domain_request(&serialize_update_domain_request(&msg)) {
Ok(val) => assert_eq!(msg, val),
Err(err) => panic!("{}", err),
}
}
}

#[test]
fn serialize_deserialize_update_domain_response() {
let messages = vec![
UpdateDomainResponse::ResponseError(crate::common::ResponseError {
summary: "error summary".to_string(),
detail: Some("error details".to_string()),
}),
UpdateDomainResponse::Accepted,
];
for msg in messages {
match parse_update_domain_response(&serialize_update_domain_response(&msg)) {
Ok(val) => assert_eq!(msg, val),
Err(err) => panic!("{}", err),
}
}
}
}
Loading

0 comments on commit 7475c69

Please sign in to comment.