Skip to content

Commit

Permalink
Orchestrator proxy: add methods to fetch the function instance spawn …
Browse files Browse the repository at this point in the history
…requests and resource configurations, respectively
  • Loading branch information
ccicconetti committed Dec 14, 2024
1 parent a453d27 commit 4525126
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 40 deletions.
2 changes: 1 addition & 1 deletion edgeless_api/src/resource_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// SPDX-License-Identifier: MIT
use crate::common::PatchRequest;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[derive(Debug, PartialEq, Clone, serde::Serialize, serde::Deserialize)]
pub struct ResourceInstanceSpecification {
pub class_type: String,
#[serde(skip)]
Expand Down
10 changes: 10 additions & 0 deletions edgeless_orc/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ pub trait Proxy: Sync + Send {
/// Fetch the performance samples.
fn fetch_performance_samples(&mut self) -> std::collections::HashMap<String, std::collections::HashMap<String, Vec<(f64, f64)>>>;

/// Fetch the spawn requests of active function instances.
fn fetch_function_instance_requests(
&mut self,
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, edgeless_api::function_instance::SpawnFunctionRequest>;

/// Fetch the configurations of active resource instances.
fn fetch_resource_instance_configurations(
&mut self,
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, edgeless_api::resource_configuration::ResourceInstanceSpecification>;

/// Fetch the mapping between active function instances and nodes.
fn fetch_function_instances_to_nodes(
&mut self,
Expand Down
11 changes: 11 additions & 0 deletions edgeless_orc/src/proxy_none.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ impl super::proxy::Proxy for ProxyNone {
fn fetch_performance_samples(&mut self) -> std::collections::HashMap<String, std::collections::HashMap<String, Vec<(f64, f64)>>> {
std::collections::HashMap::new()
}
fn fetch_function_instance_requests(
&mut self,
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, edgeless_api::function_instance::SpawnFunctionRequest> {
std::collections::HashMap::new()
}
fn fetch_resource_instance_configurations(
&mut self,
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, edgeless_api::resource_configuration::ResourceInstanceSpecification>
{
std::collections::HashMap::new()
}
fn fetch_function_instances_to_nodes(
&mut self,
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, Vec<edgeless_api::function_instance::NodeId>> {
Expand Down
121 changes: 82 additions & 39 deletions edgeless_orc/src/proxy_redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl ProxyRedis {
format!("{}.{}", duration.as_secs(), duration.subsec_millis())
}

fn fetch_instances(&mut self) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, ActiveInstanceClone> {
fn fetch_instances(&mut self) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, crate::active_instance::ActiveInstance> {
let mut instance_ids = vec![];
for instance_key in self.connection.keys::<&str, Vec<String>>("instance:*").unwrap_or(vec![]) {
let tokens: Vec<&str> = instance_key.split(':').collect();
Expand All @@ -198,7 +198,31 @@ impl ProxyRedis {
for instance_id in instance_ids {
if let Ok(val) = self.connection.get::<String, String>(format!("instance:{}", instance_id)) {
if let Ok(val) = serde_json::from_str::<ActiveInstanceClone>(&val) {
instances.insert(instance_id, val);
instances.insert(
instance_id,
match val {
ActiveInstanceClone::Function(spawn_req, instance_ids_str) => {
let mut instance_ids = vec![];
for instance_id_str in instance_ids_str {
if let Ok(instance_id) = string_to_instance_id(&instance_id_str) {
instance_ids.push(instance_id);
}
}
if !instance_ids.is_empty() {
crate::active_instance::ActiveInstance::Function(spawn_req, instance_ids)
} else {
continue;
}
}
ActiveInstanceClone::Resource(spawn_req, instance_id_str) => {
if let Ok(instance_id) = string_to_instance_id(&instance_id_str) {
crate::active_instance::ActiveInstance::Resource(spawn_req, instance_id)
} else {
continue;
}
}
},
);
}
}
}
Expand Down Expand Up @@ -504,20 +528,38 @@ impl super::proxy::Proxy for ProxyRedis {
samples
}

fn fetch_function_instance_requests(
&mut self,
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, edgeless_api::function_instance::SpawnFunctionRequest> {
let mut instances = std::collections::HashMap::new();
for (logical_id, instance) in self.fetch_instances() {
if let crate::active_instance::ActiveInstance::Function(spawn_function_req, _instance_ids) = instance {
instances.insert(logical_id, spawn_function_req);
}
}
instances
}

fn fetch_resource_instance_configurations(
&mut self,
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, edgeless_api::resource_configuration::ResourceInstanceSpecification>
{
let mut instances = std::collections::HashMap::new();
for (logical_id, instance) in self.fetch_instances() {
if let crate::active_instance::ActiveInstance::Resource(resource_specification, _instance_id) = instance {
instances.insert(logical_id, resource_specification);
}
}
instances
}

fn fetch_function_instances_to_nodes(
&mut self,
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, Vec<edgeless_api::function_instance::NodeId>> {
let mut instances = std::collections::HashMap::new();
for (logical_id, instance) in self.fetch_instances() {
if let ActiveInstanceClone::Function(_, instance_ids) = instance {
instances.insert(
logical_id,
instance_ids
.iter()
.filter_map(|x| string_to_instance_id(x).ok())
.map(|x| x.node_id)
.collect(),
);
if let crate::active_instance::ActiveInstance::Function(_, instance_ids) = instance {
instances.insert(logical_id, instance_ids.iter().map(|x| x.node_id).collect());
}
}
instances
Expand All @@ -529,20 +571,11 @@ impl super::proxy::Proxy for ProxyRedis {
let mut instances = std::collections::HashMap::new();
for (logical_id, instance) in self.fetch_instances() {
match instance {
ActiveInstanceClone::Function(_, instance_ids) => {
instances.insert(
logical_id,
instance_ids
.iter()
.filter_map(|x| string_to_instance_id(x).ok())
.map(|x| x.function_id)
.collect(),
);
crate::active_instance::ActiveInstance::Function(_, instance_ids) => {
instances.insert(logical_id, instance_ids.iter().map(|x| x.function_id).collect());
}
ActiveInstanceClone::Resource(_, instance_id) => {
if let Ok(instance_id) = string_to_instance_id(&instance_id) {
instances.insert(logical_id, vec![instance_id.function_id]);
}
crate::active_instance::ActiveInstance::Resource(_, instance_id) => {
instances.insert(logical_id, vec![instance_id.function_id]);
}
}
}
Expand All @@ -554,10 +587,8 @@ impl super::proxy::Proxy for ProxyRedis {
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, edgeless_api::function_instance::NodeId> {
let mut instances = std::collections::HashMap::new();
for (logical_id, instance) in self.fetch_instances() {
if let ActiveInstanceClone::Resource(_, instance_id) = instance {
if let Ok(instance_id) = string_to_instance_id(&instance_id) {
instances.insert(logical_id, instance_id.node_id);
}
if let crate::active_instance::ActiveInstance::Resource(_, instance_id) = instance {
instances.insert(logical_id, instance_id.node_id);
}
}
instances
Expand All @@ -567,20 +598,16 @@ impl super::proxy::Proxy for ProxyRedis {
let mut nodes_mapping = std::collections::HashMap::new();
for (logical_id, instance) in self.fetch_instances() {
match instance {
ActiveInstanceClone::Function(_, instance_ids) => {
crate::active_instance::ActiveInstance::Function(_, instance_ids) => {
for instance_id in instance_ids {
if let Ok(instance_id) = string_to_instance_id(&instance_id) {
let res = nodes_mapping.entry(instance_id.node_id).or_insert(vec![]);
res.push(crate::proxy::Instance::Function(logical_id));
}
}
}
ActiveInstanceClone::Resource(_, instance_id) => {
if let Ok(instance_id) = string_to_instance_id(&instance_id) {
let res = nodes_mapping.entry(instance_id.node_id).or_insert(vec![]);
res.push(crate::proxy::Instance::Resource(logical_id));
res.push(crate::proxy::Instance::Function(logical_id));
}
}
crate::active_instance::ActiveInstance::Resource(_, instance_id) => {
let res = nodes_mapping.entry(instance_id.node_id).or_insert(vec![]);
res.push(crate::proxy::Instance::Resource(logical_id));
}
}
}
nodes_mapping
Expand All @@ -591,7 +618,7 @@ impl super::proxy::Proxy for ProxyRedis {
mod test {
use edgeless_api::function_instance::SpawnFunctionRequest;

use crate::{deploy_intent::DeployIntent, proxy::Proxy};
use crate::{active_instance, deploy_intent::DeployIntent, proxy::Proxy};

use super::*;

Expand Down Expand Up @@ -668,6 +695,22 @@ mod test {

redis_proxy.update_active_instances(&active_instances);

let mut function_requests_expected = std::collections::HashMap::new();
for (lid, instance) in &active_instances {
if let active_instance::ActiveInstance::Function(req, _) = instance {
function_requests_expected.insert(*lid, req.clone());
}
}
assert_eq!(function_requests_expected, redis_proxy.fetch_function_instance_requests());

let mut resource_configurations_expected = std::collections::HashMap::new();
for (lid, instance) in &active_instances {
if let active_instance::ActiveInstance::Resource(spec, _) = instance {
resource_configurations_expected.insert(*lid, spec.clone());
}
}
assert_eq!(resource_configurations_expected, redis_proxy.fetch_resource_instance_configurations());

let function_instances = redis_proxy.fetch_function_instances_to_nodes();
assert_eq!(function_instances.len(), 10);
for (_instance, nodes) in function_instances {
Expand Down

0 comments on commit 4525126

Please sign in to comment.