Skip to content

Commit

Permalink
edgeless_con: fix issue with relation of workflows upon cluster forma…
Browse files Browse the repository at this point in the history
…tion changes
  • Loading branch information
ccicconetti committed Dec 8, 2024
1 parent 9b1d5c7 commit 21f1647
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 46 deletions.
40 changes: 25 additions & 15 deletions edgeless_con/src/controller/controller_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,20 @@ impl ControllerTask {
pub async fn run(&mut self) {
loop {
tokio::select! {
biased;
Some(req) = self.domain_registration_receiver.next() => {
match req {
super::DomainRegisterRequest::Update(update_domain_request, reply_sender) => {
let reply = self.update_domain(&update_domain_request).await;
match reply_sender.send(reply) {
Ok(_) => {}
Err(err) => {
log::error!("Unhandled: {:?}", err);
}
}
}
}
}
Some(req) = self.workflow_instance_receiver.next() => {
match req {
super::ControllerRequest::Start(spawn_workflow_request, reply_sender) => {
Expand Down Expand Up @@ -98,19 +112,6 @@ impl ControllerTask {
}
}
},
Some(req) = self.domain_registration_receiver.next() => {
match req {
super::DomainRegisterRequest::Update(update_domain_request, reply_sender) => {
let reply = self.update_domain(&update_domain_request).await;
match reply_sender.send(reply) {
Ok(_) => {}
Err(err) => {
log::error!("Unhandled: {:?}", err);
}
}
}
}
}
Some(req) = self.internal_receiver.next() => {
match req {
super::InternalRequest::Refresh(reply_sender) => {
Expand Down Expand Up @@ -160,6 +161,15 @@ impl ControllerTask {
workflow_id: uuid::Uuid::new_v4(),
};

self.relocate_workflow(&wf_id, spawn_workflow_request, target_domain).await
}

async fn relocate_workflow(
&mut self,
wf_id: &edgeless_api::workflow_instance::WorkflowId,
spawn_workflow_request: edgeless_api::workflow_instance::SpawnWorkflowRequest,
target_domain: &str,
) -> anyhow::Result<edgeless_api::workflow_instance::SpawnWorkflowResponse, edgeless_api::workflow_instance::SpawnWorkflowRequest> {
self.active_workflows.insert(
wf_id.clone(),
super::deployment_state::ActiveWorkflow {
Expand Down Expand Up @@ -512,7 +522,7 @@ impl ControllerTask {
for (wf_id, new_domain) in workflows_to_fix {
assert!(!new_domain.is_empty());
if let Some(workflow_request) = self.stop_workflow(&wf_id).await {
match self.start_workflow(workflow_request).await {
match self.relocate_workflow(&wf_id, workflow_request, &new_domain).await {
Ok(response) => {
if let edgeless_api::workflow_instance::SpawnWorkflowResponse::WorkflowInstance(_) = response {
log::info!("orphan workflow '{}' relocated to domain '{}'", wf_id, new_domain);
Expand Down Expand Up @@ -546,7 +556,7 @@ impl ControllerTask {
// orphan list.
for (new_domain, wf_id, workflow_request) in workflow_requests_fixable {
assert!(!new_domain.is_empty());
match self.start_workflow(workflow_request).await {
match self.relocate_workflow(&wf_id, workflow_request, &new_domain).await {
Ok(response) => {
if let edgeless_api::workflow_instance::SpawnWorkflowResponse::WorkflowInstance(_) = response {
log::info!("orphan workflow assigned to domain '{}'", new_domain);
Expand Down
110 changes: 79 additions & 31 deletions edgeless_systemtests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ mod system_tests {
use edgeless_api::workflow_instance::WorkflowInstanceAPI;
use edgeless_orc::proxy::Proxy;

async fn setup(
num_domains: u32,
num_nodes_per_domain: u32,
redis_url: Option<&str>,
) -> (Vec<futures::future::AbortHandle>, Box<(dyn WorkflowInstanceAPI)>) {
struct AbortHandles {
abort_handles_nodes: std::collections::HashMap<uuid::Uuid, futures::future::AbortHandle>,
abort_handles_orchestrators: std::collections::HashMap<String, futures::future::AbortHandle>,
abort_handle_controller: futures::future::AbortHandle,
}

async fn setup(num_domains: u32, num_nodes_per_domain: u32, redis_url: Option<&str>) -> (AbortHandles, Box<(dyn WorkflowInstanceAPI)>) {
assert!(num_domains > 0);
assert!(num_nodes_per_domain > 0);

let mut handles = vec![];

let address = "127.0.0.1";
let mut port = 7001;
let controller_url = format!("http://{}:{}", address, port);
Expand All @@ -33,6 +33,8 @@ mod system_tests {

let domain_register_url = format!("http://{}:{}", address, next_port());

let mut abort_handles_orchestrators = std::collections::HashMap::new();
let mut abort_handles_nodes = std::collections::HashMap::new();
for domain_i in 0..num_domains {
let orchestrator_url = format!("http://{}:{}", address, next_port());
let node_register_url = format!("http://{}:{}", address, next_port());
Expand All @@ -42,7 +44,7 @@ mod system_tests {
general: edgeless_orc::EdgelessOrcGeneralSettings {
domain_register_url: domain_register_url.clone(),
subscription_refresh_interval_sec: 1,
domain_id: domain_id.to_string(),
domain_id: domain_id.clone(),
orchestrator_url: orchestrator_url.to_string(),
orchestrator_url_announced: "".to_string(),
node_register_url: node_register_url.clone(),
Expand All @@ -65,17 +67,18 @@ mod system_tests {
},
}));
tokio::spawn(task);
handles.push(handle);
abort_handles_orchestrators.insert(domain_id, handle);

// The first node in each domain is also assigned a file-log resource.
for node_i in 0..num_nodes_per_domain {
let file_log_provider = match node_i {
0 => Some("file-log-1".to_string()),
_ => None,
};
let node_id = uuid::Uuid::new_v4();
let (task, handle) = futures::future::abortable(edgeless_node::edgeless_node_main(edgeless_node::EdgelessNodeSettings {
general: edgeless_node::EdgelessNodeGeneralSettings {
node_id: uuid::Uuid::new_v4(),
node_id: node_id.clone(),
agent_url: format!("http://{}:{}", address, next_port()),
agent_url_announced: "".to_string(),
invocation_url: format!("http://{}:{}", address, next_port()),
Expand Down Expand Up @@ -106,28 +109,45 @@ mod system_tests {
user_node_capabilities: None,
}));
tokio::spawn(task);
handles.push(handle);
abort_handles_nodes.insert(node_id, handle);
}
}

let (task, handle) = futures::future::abortable(edgeless_con::edgeless_con_main(edgeless_con::EdgelessConSettings {
let (task, abort_handle_controller) = futures::future::abortable(edgeless_con::edgeless_con_main(edgeless_con::EdgelessConSettings {
controller_url: controller_url.clone(),
domain_register_url: domain_register_url.clone(),
}));
tokio::spawn(task);
handles.push(handle);

let mut con_client = edgeless_api::grpc_impl::outer::controller::ControllerAPIClient::new(controller_url.as_str()).await;

tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

(handles, con_client.workflow_instance_api())
(
AbortHandles {
abort_handles_nodes,
abort_handles_orchestrators,
abort_handle_controller,
},
con_client.workflow_instance_api(),
)
}

async fn wf_list(client: &mut Box<(dyn WorkflowInstanceAPI)>) -> Vec<edgeless_api::workflow_instance::WorkflowId> {
(client.list().await).unwrap_or_default()
}

async fn domains_used(client: &mut Box<(dyn WorkflowInstanceAPI)>) -> std::collections::HashSet<String> {
let mut ret = std::collections::HashSet::new();
for wf_id in wf_list(client).await {
let wf_info = client.inspect(wf_id).await.expect("Could not find a workflow just listed");
for mapping in wf_info.status.domain_mapping {
ret.insert(mapping.domain_id);
}
}
ret
}

async fn nodes_in_domain(domain_id: &str, client: &mut Box<(dyn WorkflowInstanceAPI)>) -> u32 {
let res = client.domains(String::default()).await.unwrap_or_default();
if res.is_empty() {
Expand All @@ -140,6 +160,14 @@ mod system_tests {
}
}

async fn nodes_in_cluster(num_domains: u32, client: &mut Box<(dyn WorkflowInstanceAPI)>) -> u32 {
let mut num_nodes_founds = 0;
for domain_id in 0..num_domains {
num_nodes_founds += nodes_in_domain(format!("domain-{}", domain_id).as_str(), client).await;
}
num_nodes_founds
}

fn fixture_spec() -> edgeless_api::function_instance::FunctionClassSpecification {
edgeless_api::function_instance::FunctionClassSpecification {
function_class_id: "system_test".to_string(),
Expand All @@ -150,13 +178,24 @@ mod system_tests {
}
}

fn terminate(handles: Vec<futures::future::AbortHandle>) -> anyhow::Result<()> {
for handle in handles {
fn terminate(handles: AbortHandles) -> anyhow::Result<()> {
for handle in handles.abort_handles_nodes.values() {
handle.abort();
}
for handle in handles.abort_handles_orchestrators.values() {
handle.abort();
}
handles.abort_handle_controller.abort();
Ok(())
}

fn tear_down_domain(domain_id: &str, handles: &mut AbortHandles) {
match handles.abort_handles_orchestrators.remove(domain_id) {
Some(handle) => handle.abort(),
None => panic!("domain {} not found", domain_id),
}
}

#[tokio::test]
#[serial_test::serial]
async fn system_test_single_domain_single_node() -> anyhow::Result<()> {
Expand Down Expand Up @@ -372,28 +411,24 @@ mod system_tests {
// let _ = env_logger::try_init();

// Create the EDGELESS system.
let (handles, mut client) = setup(3, 1, None).await;
let (mut handles, mut client) = setup(3, 1, None).await;

assert!(wf_list(&mut client).await.is_empty());

// Wait for all the nodes to be visible.
let mut num_nodes_founds = 0;
for _ in 0..100 {
num_nodes_founds = 0;
for domain_id in 0..3 {
num_nodes_founds += nodes_in_domain(format!("domain-{}", domain_id).as_str(), &mut client).await;
}
if num_nodes_founds == 3 {
if nodes_in_cluster(3, &mut client).await == 3 {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
assert_eq!(3, num_nodes_founds);
assert_eq!(3, nodes_in_cluster(3, &mut client).await);

// Create 100 workflows
let mut workflow_ids = vec![];
let mut domains = std::collections::HashSet::new();
for _ in 0..100 {
for wf_i in 0..100 {
let err_str = format!("wf#{}, nodes in cluster {}", wf_i, nodes_in_cluster(3, &mut client).await);
let res = client
.start(edgeless_api::workflow_instance::SpawnWorkflowRequest {
workflow_functions: vec![edgeless_api::workflow_instance::WorkflowFunction {
Expand All @@ -409,7 +444,7 @@ mod system_tests {
workflow_ids.push(match res {
Ok(response) => match &response {
edgeless_api::workflow_instance::SpawnWorkflowResponse::ResponseError(err) => {
panic!("workflow rejected: {}", err)
panic!("workflow rejected [{}]: {}", err_str, err)
}
edgeless_api::workflow_instance::SpawnWorkflowResponse::WorkflowInstance(val) => {
assert_eq!(1, val.domain_mapping.len());
Expand All @@ -418,15 +453,28 @@ mod system_tests {
val.workflow_id.clone()
}
},
Err(err) => panic!("could not start the workflow: {}", err),
Err(err) => panic!("could not start the workflow [{}]: {}", err_str, err),
});
}
assert_eq!(100, wf_list(&mut client).await.len());

assert_eq!(
std::collections::HashSet::from([String::from("domain-0"), String::from("domain-1"), String::from("domain-2"),]),
domains
);
let mut all_domains = std::collections::HashSet::new();
for i in 0..3 {
all_domains.insert(format!("domain-{}", i));
}
assert_eq!(all_domains, domains);
assert_eq!(all_domains, domains_used(&mut client).await);

// Tear down one orchestration domain.
tear_down_domain("domain-1", &mut handles);
all_domains.remove("domain-1");
for _ in 0..100 {
if domains_used(&mut client).await == all_domains {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
assert_eq!(all_domains, domains_used(&mut client).await);

// Stop the workflows
for workflow_id in workflow_ids {
Expand Down

0 comments on commit 21f1647

Please sign in to comment.