Skip to content

Commit

Permalink
Change pattern to refresh orchestrator and controller periodic tasks …
Browse files Browse the repository at this point in the history
…so that the minimum refresh period is one second
  • Loading branch information
ccicconetti committed Dec 6, 2024
1 parent 48a3128 commit 9b1d5c7
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 16 deletions.
12 changes: 8 additions & 4 deletions edgeless_con/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ pub(crate) enum DomainRegisterRequest {
}

pub(crate) enum InternalRequest {
Poll(),
Refresh(
// Reply Channel
tokio::sync::oneshot::Sender<()>,
),
}

#[derive(Clone)]
Expand All @@ -73,10 +76,11 @@ impl Controller {

let refresh_task = Box::pin(async move {
let mut sender = internal_sender;
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
loop {
interval.tick().await;
let _ = sender.send(InternalRequest::Poll()).await;
let (reply_sender, reply_receiver) = tokio::sync::oneshot::channel::<()>();
let _ = sender.send(InternalRequest::Refresh(reply_sender)).await;
let _ = reply_receiver.await;
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
});

Expand Down
3 changes: 2 additions & 1 deletion edgeless_con/src/controller/controller_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ impl ControllerTask {
}
Some(req) = self.internal_receiver.next() => {
match req {
super::InternalRequest::Poll() => {
super::InternalRequest::Refresh(reply_sender) => {
self.check_domains().await;
let _ = reply_sender.send(());
}
}
}
Expand Down
16 changes: 11 additions & 5 deletions edgeless_orc/src/node_register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ pub enum NodeRegisterRequest {
}

pub(crate) enum InternalRequest {
Poll(),
Refresh(
// Reply Channel
tokio::sync::oneshot::Sender<()>,
),
}

struct NodeRegisterEntry {
Expand All @@ -44,10 +47,11 @@ impl NodeRegister {

let refresh_task = Box::pin(async move {
let mut sender = internal_sender;
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
loop {
interval.tick().await;
let _ = sender.send(InternalRequest::Poll()).await;
let (reply_sender, reply_receiver) = tokio::sync::oneshot::channel::<()>();
let _ = sender.send(InternalRequest::Refresh(reply_sender)).await;
let _ = reply_receiver.await;
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
});

Expand All @@ -71,7 +75,7 @@ impl NodeRegister {
tokio::select! {
Some(req) = internal_receiver.next() => {
match req {
InternalRequest::Poll() => {
InternalRequest::Refresh(reply_sender) => {
// Find all nodes that are stale, i.e., which have not been
// refreshed by their own indicated deadline.
let mut stale_nodes = vec![];
Expand All @@ -88,6 +92,8 @@ impl NodeRegister {

let _ = orchestrator_sender.send(super::orchestrator::OrchestratorRequest::DelNode(stale_node)).await;
}

let _ = reply_sender.send(());
}
}
},
Expand Down
12 changes: 8 additions & 4 deletions edgeless_orc/src/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ pub enum OrchestratorRequest {
Vec<edgeless_api::node_registration::ResourceProviderSpecification>,
),
DelNode(uuid::Uuid),
Refresh(),
Refresh(
// Reply Channel
tokio::sync::oneshot::Sender<()>,
),
}

pub struct OrchestratorClient {
Expand Down Expand Up @@ -112,10 +115,11 @@ impl Orchestrator {
let refresh_sender = sender.clone();
let refresh_task = Box::pin(async move {
let mut refresh_sender = refresh_sender;
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
loop {
interval.tick().await;
let _ = refresh_sender.send(OrchestratorRequest::Refresh()).await;
let (reply_sender, reply_receiver) = tokio::sync::oneshot::channel::<()>();
let _ = refresh_sender.send(OrchestratorRequest::Refresh(reply_sender)).await;
let _ = reply_receiver.await;
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
});

Expand Down
5 changes: 4 additions & 1 deletion edgeless_orc/src/orchestrator/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,10 @@ async fn test_recreate_fun_after_disconnect() {
assert_eq!(Some(&1), num_events.get("patch-function"));

for _ in 0..5 {
let _ = orc_sender.send(OrchestratorRequest::Refresh()).await;
let (reply_sender, reply_receiver) = tokio::sync::oneshot::channel::<()>();
let _ = orc_sender.send(OrchestratorRequest::Refresh(reply_sender)).await;
let _ = reply_receiver.await;

let mut num_events = std::collections::HashMap::new();
loop {
if let Some((_node_id, event)) = wait_for_events_if_any(&mut nodes).await {
Expand Down
3 changes: 2 additions & 1 deletion edgeless_orc/src/orchestrator_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,9 @@ impl OrchestratorTask {
self.update_domain().await;
self.refresh().await;
}
crate::orchestrator::OrchestratorRequest::Refresh() => {
crate::orchestrator::OrchestratorRequest::Refresh(reply_sender) => {
self.refresh().await;
let _ = reply_sender.send(());
}
}
}
Expand Down

0 comments on commit 9b1d5c7

Please sign in to comment.