From 7475c69e87dfa042e2a90449ca0952cb9e5df586 Mon Sep 17 00:00:00 2001 From: Claudio Cicconetti Date: Tue, 26 Nov 2024 17:05:35 +0100 Subject: [PATCH] API: add orchestration domain registration structures --- edgeless_api/proto/messages.proto | 52 ++++- edgeless_api/proto/services.proto | 12 +- edgeless_api/src/domain_registration.rs | 87 ++++++++ .../src/grpc_impl/domain_registration.rs | 193 ++++++++++++++++++ edgeless_api/src/grpc_impl/mod.rs | 20 +- edgeless_api/src/lib.rs | 1 + 6 files changed, 345 insertions(+), 20 deletions(-) create mode 100644 edgeless_api/src/domain_registration.rs create mode 100644 edgeless_api/src/grpc_impl/domain_registration.rs diff --git a/edgeless_api/proto/messages.proto b/edgeless_api/proto/messages.proto index 39f091d..1a74fc1 100644 --- a/edgeless_api/proto/messages.proto +++ b/edgeless_api/proto/messages.proto @@ -82,13 +82,13 @@ message StartComponentResponse { optional InstanceIdVariant instance_id = 2; } -// Possible message types of FunctionInstance::UpdateNode(). +// Possible message types of NodeRegistrationAPI::UpdateNode(). enum UpdateNodeRequestType { REGISTER = 0; DEREGISTER = 1; } -// Request message of FunctionInstance::UpdateNode(). +// Request message of NodeRegistrationAPI::UpdateNode(). message UpdateNodeRequest { // Request type: registration or deregistration. Always present. UpdateNodeRequestType request_type = 1; @@ -135,7 +135,7 @@ message NodeCapabilities { uint32 mem_size_gpu = 82; } -// Response message of FunctionInstance::UpdateNode(). +// Response message of NodeRegistrationAPI::UpdateNode(). message UpdateNodeResponse { // If present it means that the request has been rejected. // In this case, other fields may not be present or contain meaningful data. @@ -329,6 +329,52 @@ message WorkflowInstanceList { repeated WorkflowInstanceStatus workflow_statuses = 1; } +// Request message of DomainRegistrationAPI::UpdateDomain(). +message UpdateDomainRequest { + // Domain name. + string domain_id = 10; + // URL of the orchestrator server. + string orchestrator_url = 20; + // Domain capabilities. + DomainCapabilities capabilities = 30; + // Deadline for refreshing the domain request, in seconds since Unix epoch. + // After this time the orchestration domain can be considered to be offline. + uint64 refresh_deadline = 40; +} + +// Domain capabilities exposed from the orchestrator to the controller. +message DomainCapabilities { + // Total number of (actual or virtual) CPUs. + uint32 num_cpus = 10; + // Total number of physical cores. + uint32 num_cores = 11; + // Total size of memory available, in MiB. + uint32 mem_size = 12; + // Superset of all the labels advertised by the nodes in the domain. + repeated string labels = 20; + // Number of nodes with a Trusted Execution Environment. + uint32 num_tee = 30; + // Number of nodes with a Trusted Platform Module. + uint32 num_tpm = 40; + // Superset of all the run-times supported by the nodes in the domain. + repeated string runtimes = 50; + // Total disk space, in MiB. + uint32 disk_tot_space = 60; + // Total number of (actual or virtual) GPUs. + uint32 num_gpus = 70; + // Total GPU memory available, in MiB. + uint32 mem_size_gpu = 71; +} + +// Response message of DomainRegistrationAPI::UpdateDomain(). +message UpdateDomainResponse { + // If present it means that the request has been rejected. + // In this case, other fields may not be present or contain meaningful data. + optional ResponseError response_error = 1; +} + +// message Domain + // Event types. enum EventType { // Function invocation for which a return value is expected. diff --git a/edgeless_api/proto/services.proto b/edgeless_api/proto/services.proto index c6b029f..13c7ccd 100644 --- a/edgeless_api/proto/services.proto +++ b/edgeless_api/proto/services.proto @@ -42,7 +42,7 @@ service NodeManagement { rpc KeepAlive (google.protobuf.Empty) returns (KeepAliveResponse); } -// API that allows nodes to register themselves with the orchestrator +// API that allows nodes to register themselves with the orchestrator. service NodeRegistration { // Register a new node on an orchestrator or deregister an existing node. // Input: registration: identifier of the new node and agent/invocation URLs; @@ -51,6 +51,16 @@ service NodeRegistration { rpc UpdateNode (UpdateNodeRequest) returns (UpdateNodeResponse); } +// API that allows the orchestrators to register with the controller. +service DomainRegistration { + // Register a new orchestration domain on a controller. + // Input: registration data, including a deadline by which the orchestration + // domain can be considered offline; to disconnect a domain, use a value in + // the past for this deadline + // Output: UpdateDomainResponse. + rpc UpdateDomain (UpdateDomainRequest) returns (UpdateDomainResponse); +} + // API to manage the lifecycle of workflow instances (s04). service WorkflowInstance { // Start a new workflow. diff --git a/edgeless_api/src/domain_registration.rs b/edgeless_api/src/domain_registration.rs new file mode 100644 index 0000000..5ea9bff --- /dev/null +++ b/edgeless_api/src/domain_registration.rs @@ -0,0 +1,87 @@ +// SPDX-FileCopyrightText: © 2024 Claudio Cicconetti +// SPDX-License-Identifier: MIT + +#[derive(Debug, Clone, PartialEq, Default)] +pub struct DomainCapabilities { + // Total number of (actual or virtual) CPUs. + pub num_cpus: u32, + // Total number of physical cores. + pub num_cores: u32, + // Total size of memory available, in MiB. + pub mem_size: u32, + // Superset of all the labels advertised by the nodes in the domain. + pub labels: std::collections::HashSet, + // Number of nodes with a Trusted Execution Environment. + pub num_tee: u32, + // Number of nodes with a Trusted Platform Module. + pub num_tpm: u32, + // Superset of all the run-times supported by the nodes in the domain. + pub runtimes: std::collections::HashSet, + // Total disk space, in MiB. + pub disk_tot_space: u32, + // Total number of (actual or virtual) GPUs. + pub num_gpus: u32, + // Total GPU memory available, in MiB. + pub mem_size_gpu: u32, +} + +impl std::fmt::Display for DomainCapabilities { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "{} CPUs ({} cores) with {} MiB, labels [{}], num TEE {}, num TPM {}, runtimes [{}], disk space {} MiB, {} GPUs with {} MiB", + self.num_cpus, + self.num_cores, + self.mem_size, + self.labels.iter().map(|x| x.to_string()).collect::>().join(","), + self.num_tee, + self.num_tpm, + self.runtimes.iter().map(|x| x.to_string()).collect::>().join(","), + self.disk_tot_space, + self.num_gpus, + self.mem_size_gpu, + ) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct UpdateDomainRequest { + // Domain name. + pub domain_id: String, + // URL of the orchestrator server. + pub orchestrator_url: String, + // Domain capabilities. + pub capabilities: DomainCapabilities, + // Deadline for refreshing the domain request. + // After this time the orchestration domain can be considered to be offline. + pub refresh_deadline: std::time::SystemTime, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum UpdateDomainResponse { + ResponseError(crate::common::ResponseError), + Accepted, +} + +#[async_trait::async_trait] +pub trait DomainRegistrationAPI: DomainRegistrationAPIClone + Sync + Send { + async fn update_domain(&mut self, request: UpdateDomainRequest) -> anyhow::Result; +} + +// https://stackoverflow.com/a/30353928 +pub trait DomainRegistrationAPIClone { + fn clone_box(&self) -> Box; +} +impl DomainRegistrationAPIClone for T +where + T: 'static + DomainRegistrationAPI + Clone, +{ + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } +} +impl Clone for Box { + fn clone(&self) -> Box { + self.clone_box() + } +} diff --git a/edgeless_api/src/grpc_impl/domain_registration.rs b/edgeless_api/src/grpc_impl/domain_registration.rs new file mode 100644 index 0000000..825fee0 --- /dev/null +++ b/edgeless_api/src/grpc_impl/domain_registration.rs @@ -0,0 +1,193 @@ +// SPDX-FileCopyrightText: © 2024 Technical University of Munich, Chair of Connected Mobility +// SPDX-FileCopyrightText: © 2024 Claudio Cicconetti +// SPDX-FileCopyrightText: © 2024 Siemens AG +// SPDX-License-Identifier: MIT + +#[derive(Clone)] +pub struct DomainRegistrationClient { + client: crate::grpc_impl::api::domain_registration_client::DomainRegistrationClient, +} + +pub struct DomainRegistrationAPIService { + pub domain_registration_api: tokio::sync::Mutex>, +} + +impl DomainRegistrationClient { + pub async fn new(server_addr: &str, retry_interval: Option) -> anyhow::Result { + loop { + match crate::grpc_impl::api::domain_registration_client::DomainRegistrationClient::connect(server_addr.to_string()).await { + Ok(client) => { + let client = client.max_decoding_message_size(usize::MAX); + return Ok(Self { client }); + } + Err(err) => match retry_interval { + Some(val) => tokio::time::sleep(tokio::time::Duration::from_secs(val)).await, + None => { + return Err(anyhow::anyhow!("Error when connecting to {}: {}", server_addr, err)); + } + }, + } + } + } +} + +#[async_trait::async_trait] +impl crate::domain_registration::DomainRegistrationAPI for DomainRegistrationClient { + async fn update_domain( + &mut self, + request: crate::domain_registration::UpdateDomainRequest, + ) -> anyhow::Result { + match self + .client + .update_domain(tonic::Request::new(serialize_update_domain_request(&request))) + .await + { + Ok(res) => parse_update_domain_response(&res.into_inner()), + Err(err) => Err(anyhow::anyhow!("Communication error while updating a domain: {}", err.to_string())), + } + } +} + +#[async_trait::async_trait] +impl crate::grpc_impl::api::domain_registration_server::DomainRegistration for DomainRegistrationAPIService { + async fn update_domain( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let parsed_request = match parse_update_domain_request(&request.into_inner()) { + Ok(parsed_request) => parsed_request, + Err(err) => { + log::error!("Parse UpdateDomainRequest Failed: {}", err); + return Err(tonic::Status::invalid_argument(format!( + "Error when parsing an UpdateDomainRequest message: {}", + err + ))); + } + }; + match self.domain_registration_api.lock().await.update_domain(parsed_request).await { + Ok(res) => Ok(tonic::Response::new(serialize_update_domain_response(&res))), + Err(err) => Err(tonic::Status::internal(format!("Error when updating a node: {}", err))), + } + } +} + +fn parse_domain_capabilities(api_instance: &crate::grpc_impl::api::DomainCapabilities) -> crate::domain_registration::DomainCapabilities { + crate::domain_registration::DomainCapabilities { + num_cpus: api_instance.num_cpus, + num_cores: api_instance.num_cores, + mem_size: api_instance.mem_size, + labels: std::collections::HashSet::from_iter(api_instance.labels.iter().cloned()), + num_tee: api_instance.num_tee, + num_tpm: api_instance.num_tpm, + runtimes: std::collections::HashSet::from_iter(api_instance.runtimes.iter().cloned()), + disk_tot_space: api_instance.disk_tot_space, + num_gpus: api_instance.num_gpus, + mem_size_gpu: api_instance.mem_size_gpu, + } +} + +fn serialize_domain_capabilities(req: &crate::domain_registration::DomainCapabilities) -> crate::grpc_impl::api::DomainCapabilities { + crate::grpc_impl::api::DomainCapabilities { + num_cpus: req.num_cpus, + num_cores: req.num_cores, + mem_size: req.mem_size, + labels: req.labels.iter().cloned().collect::>(), + num_tee: req.num_tee, + num_tpm: req.num_tpm, + runtimes: req.runtimes.iter().cloned().collect::>(), + disk_tot_space: req.disk_tot_space, + num_gpus: req.num_gpus, + mem_size_gpu: req.mem_size_gpu, + } +} + +fn parse_update_domain_request( + api_instance: &crate::grpc_impl::api::UpdateDomainRequest, +) -> anyhow::Result { + let capabilities = match &api_instance.capabilities { + Some(capabilities) => parse_domain_capabilities(capabilities), + None => crate::domain_registration::DomainCapabilities::default(), + }; + Ok(crate::domain_registration::UpdateDomainRequest { + domain_id: api_instance.domain_id.clone(), + orchestrator_url: api_instance.orchestrator_url.clone(), + capabilities, + refresh_deadline: std::time::UNIX_EPOCH + std::time::Duration::from_secs(api_instance.refresh_deadline), + }) +} + +fn serialize_update_domain_response(req: &crate::domain_registration::UpdateDomainResponse) -> crate::grpc_impl::api::UpdateDomainResponse { + match req { + crate::domain_registration::UpdateDomainResponse::ResponseError(err) => crate::grpc_impl::api::UpdateDomainResponse { + response_error: Some(crate::grpc_impl::api::ResponseError { + summary: err.summary.clone(), + detail: err.detail.clone(), + }), + }, + crate::domain_registration::UpdateDomainResponse::Accepted => crate::grpc_impl::api::UpdateDomainResponse { response_error: None }, + } +} + +fn parse_update_domain_response( + api_instance: &crate::grpc_impl::api::UpdateDomainResponse, +) -> anyhow::Result { + match api_instance.response_error.as_ref() { + Some(err) => Ok(crate::domain_registration::UpdateDomainResponse::ResponseError( + crate::common::ResponseError { + summary: err.summary.clone(), + detail: err.detail.clone(), + }, + )), + None => Ok(crate::domain_registration::UpdateDomainResponse::Accepted), + } +} + +fn serialize_update_domain_request(req: &crate::domain_registration::UpdateDomainRequest) -> crate::grpc_impl::api::UpdateDomainRequest { + crate::grpc_impl::api::UpdateDomainRequest { + domain_id: req.domain_id.clone(), + orchestrator_url: req.orchestrator_url.clone(), + capabilities: Some(serialize_domain_capabilities(&req.capabilities)), + refresh_deadline: req.refresh_deadline.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs(), + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::domain_registration::DomainCapabilities; + use crate::domain_registration::UpdateDomainRequest; + use crate::domain_registration::UpdateDomainResponse; + + #[test] + fn serialize_deserialize_update_domain_request() { + let messages = vec![UpdateDomainRequest { + domain_id: "my-domain".to_string(), + orchestrator_url: "http://127.0.0.1:10000".to_string(), + capabilities: DomainCapabilities::default(), + refresh_deadline: std::time::UNIX_EPOCH + std::time::Duration::from_secs(313714800), + }]; + for msg in messages { + match parse_update_domain_request(&serialize_update_domain_request(&msg)) { + Ok(val) => assert_eq!(msg, val), + Err(err) => panic!("{}", err), + } + } + } + + #[test] + fn serialize_deserialize_update_domain_response() { + let messages = vec![ + UpdateDomainResponse::ResponseError(crate::common::ResponseError { + summary: "error summary".to_string(), + detail: Some("error details".to_string()), + }), + UpdateDomainResponse::Accepted, + ]; + for msg in messages { + match parse_update_domain_response(&serialize_update_domain_response(&msg)) { + Ok(val) => assert_eq!(msg, val), + Err(err) => panic!("{}", err), + } + } + } +} diff --git a/edgeless_api/src/grpc_impl/mod.rs b/edgeless_api/src/grpc_impl/mod.rs index aabdb5f..26b76dc 100644 --- a/edgeless_api/src/grpc_impl/mod.rs +++ b/edgeless_api/src/grpc_impl/mod.rs @@ -2,34 +2,22 @@ // SPDX-FileCopyrightText: © 2023 Claudio Cicconetti // SPDX-FileCopyrightText: © 2023 Siemens AG // SPDX-License-Identifier: MIT -pub mod agent; +pub mod agent; pub mod api { tonic::include_proto!("edgeless_api"); } - pub mod common; - pub mod container_function; - pub mod container_runtime; - pub mod controller; - +pub mod domain_registration; pub mod function_instance; - pub mod guest_api_function; - pub mod guest_api_host; - pub mod invocation; - +pub mod node_management; +pub mod node_registration; pub mod orc; - pub mod resource_configuration; - pub mod workflow_instance; - -pub mod node_management; - -pub mod node_registration; diff --git a/edgeless_api/src/lib.rs b/edgeless_api/src/lib.rs index 8023c2e..e4ca73f 100644 --- a/edgeless_api/src/lib.rs +++ b/edgeless_api/src/lib.rs @@ -6,6 +6,7 @@ pub mod api; pub mod coap_impl; pub mod common; +pub mod domain_registration; pub mod function_instance; #[cfg(feature = "grpc_impl")] pub mod grpc_impl;