Skip to content

Commit

Permalink
edgeless_con: modify WorkflowInstance API
Browse files Browse the repository at this point in the history
The list() method now returns the list of workflow identifiers, while a new method inspect() has been added to retrieve the workflow details.

edgeless_cli is updated accordingly
  • Loading branch information
ccicconetti committed Dec 6, 2024
1 parent a1478f2 commit 48a3128
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 123 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ API changes:
- Update service NodeRegistration.
- Move node health status and performance samples from NodeManagement to
NodeManagement.
- WorkflowInstance: return the list of workflow identifiers in list(); add new
method inspect() to retrieve the workflow details.

## [1.0.0] - 2024-11-12

Expand Down
14 changes: 11 additions & 3 deletions edgeless_api/proto/messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,17 @@ message WorkflowId {
string workflow_id = 1;
}

// List of workflow instances.
message WorkflowInstanceList {
repeated WorkflowInstanceStatus workflow_statuses = 1;
// List of workflow identifiers.
message WorkflowIdList {
repeated string identifiers = 1;
}

// Info about a workflow.
message WorkflowInstanceInfo {
// The workflow spawn request.
SpawnWorkflowRequest request = 1;
// The workflow status.
WorkflowInstanceStatus status = 2;
}

// Identifier of a domain.
Expand Down
13 changes: 9 additions & 4 deletions edgeless_api/proto/services.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,15 @@ service WorkflowInstance {
// Output: none.
rpc Stop (WorkflowId) returns (google.protobuf.Empty);

// List the active workflows or shows the status of a given active workflow.
// Input: the identifier of the active workflow or a special value indicating all workflows.
// Output: the list of status of the active workflow instances.
rpc List (WorkflowId) returns (WorkflowInstanceList);
// List the known workflow identifiers.
// Input: none.
// Output: the list of workflow identifiers..
rpc List (google.protobuf.Empty) returns (WorkflowIdList);

// Inspect a given workflow.
// Input: the identifier of the workflow to inspect.
// Output: information about the workflow identifiers.
rpc Inspect(WorkflowId) returns (WorkflowInstanceInfo);

// List the domain capabilities.
// Input: the identifier of the domain or an empty value to query all.
Expand Down
107 changes: 51 additions & 56 deletions edgeless_api/src/grpc_impl/workflow_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,6 @@ impl WorkflowInstanceConverters {
}
}

pub fn parse_workflow_instance_list(
api_instance: &crate::grpc_impl::api::WorkflowInstanceList,
) -> anyhow::Result<Vec<crate::workflow_instance::WorkflowInstance>> {
let ret: Vec<crate::workflow_instance::WorkflowInstance> = api_instance
.workflow_statuses
.iter()
.map(|x| WorkflowInstanceConverters::parse_workflow_instance(x).unwrap())
.collect();
Ok(ret)
}

pub fn parse_domain_capabilities_list(
api_instance: &crate::grpc_impl::api::DomainCapabilitiesList,
) -> anyhow::Result<std::collections::HashMap<String, crate::domain_registration::DomainCapabilities>> {
Expand Down Expand Up @@ -207,12 +196,6 @@ impl WorkflowInstanceConverters {
}
}

pub fn serialize_workflow_instance_list(instances: &[crate::workflow_instance::WorkflowInstance]) -> crate::grpc_impl::api::WorkflowInstanceList {
crate::grpc_impl::api::WorkflowInstanceList {
workflow_statuses: instances.iter().map(Self::serialize_workflow_instance).collect(),
}
}

pub fn serialize_domain_capabilities_list(
domains: &std::collections::HashMap<String, crate::domain_registration::DomainCapabilities>,
) -> crate::grpc_impl::api::DomainCapabilitiesList {
Expand Down Expand Up @@ -288,15 +271,43 @@ impl crate::workflow_instance::WorkflowInstanceAPI for WorkflowInstanceAPIClient
Err(err) => Err(anyhow::anyhow!("Communication error while stopping a workflow: {}", err.to_string())),
}
}
async fn list(&mut self, id: crate::workflow_instance::WorkflowId) -> anyhow::Result<Vec<crate::workflow_instance::WorkflowInstance>> {
async fn list(&mut self) -> anyhow::Result<Vec<crate::workflow_instance::WorkflowId>> {
let ret = self.client.list(tonic::Request::new(())).await;
match ret {
Ok(ret) => {
return Ok(ret
.into_inner()
.identifiers
.iter()
.map(|val| crate::workflow_instance::WorkflowId {
workflow_id: uuid::Uuid::parse_str(val).unwrap_or_default(),
})
.collect());
}
Err(err) => Err(anyhow::anyhow!("Communication error while listing workflows: {}", err.to_string())),
}
}
async fn inspect(&mut self, id: crate::workflow_instance::WorkflowId) -> anyhow::Result<crate::workflow_instance::WorkflowInfo> {
let ret = self
.client
.list(tonic::Request::new(
.inspect(tonic::Request::new(
crate::grpc_impl::workflow_instance::WorkflowInstanceConverters::serialize_workflow_id(&id),
))
.await;
match ret {
Ok(ret) => return crate::grpc_impl::workflow_instance::WorkflowInstanceConverters::parse_workflow_instance_list(&ret.into_inner()),
Ok(ret) => {
let ret = ret.into_inner();
let request = match &ret.request {
Some(request) => crate::grpc_impl::workflow_instance::WorkflowInstanceConverters::parse_workflow_spawn_request(request)?,
None => anyhow::bail!("Workflow request not present"),
};
let status = match &ret.status {
Some(status) => crate::grpc_impl::workflow_instance::WorkflowInstanceConverters::parse_workflow_instance(status)?,
None => anyhow::bail!("Workflow status not present"),
};

return Ok(crate::workflow_instance::WorkflowInfo { request, status });
}
Err(err) => Err(anyhow::anyhow!("Communication error while listing workflows: {}", err.to_string())),
}
}
Expand Down Expand Up @@ -364,22 +375,34 @@ impl crate::grpc_impl::api::workflow_instance_server::WorkflowInstance for Workf
}
}

async fn list(
async fn list(&self, _request: tonic::Request<()>) -> Result<tonic::Response<crate::grpc_impl::api::WorkflowIdList>, tonic::Status> {
let ret = self.root_api.lock().await.list().await;
match ret {
Ok(identifiers) => Ok(tonic::Response::new(crate::grpc_impl::api::WorkflowIdList {
identifiers: identifiers.iter().map(|x| x.to_string()).collect(),
})),
Err(err) => Err(tonic::Status::internal(format!("Internal error when listing workflows: {}", err))),
}
}

async fn inspect(
&self,
request_id: tonic::Request<crate::grpc_impl::api::WorkflowId>,
) -> Result<tonic::Response<crate::grpc_impl::api::WorkflowInstanceList>, tonic::Status> {
) -> Result<tonic::Response<crate::grpc_impl::api::WorkflowInstanceInfo>, tonic::Status> {
let req = match crate::grpc_impl::workflow_instance::WorkflowInstanceConverters::parse_workflow_id(&request_id.into_inner()) {
Ok(val) => val,
Err(err) => return Err(tonic::Status::internal(format!("Internal error when listing workflows: {}", err))),
Err(err) => return Err(tonic::Status::internal(format!("Internal error when inspecting a workflow: {}", err))),
};
let ret = self.root_api.lock().await.list(req).await;
let ret = self.root_api.lock().await.inspect(req).await;
match ret {
Ok(instances) => Ok(tonic::Response::new(
crate::grpc_impl::workflow_instance::WorkflowInstanceConverters::serialize_workflow_instance_list(&instances),
)),
Err(err) => Err(tonic::Status::internal(format!("Internal error when listing workflows: {}", err))),
Ok(info) => Ok(tonic::Response::new(crate::grpc_impl::api::WorkflowInstanceInfo {
request: Some(crate::grpc_impl::workflow_instance::WorkflowInstanceConverters::serialize_workflow_spawn_request(&info.request)),
status: Some(crate::grpc_impl::workflow_instance::WorkflowInstanceConverters::serialize_workflow_instance(&info.status)),
})),
Err(err) => Err(tonic::Status::internal(format!("Internal error when inspecting a workflow: {}", err))),
}
}

async fn domains(
&self,
domain_id: tonic::Request<crate::grpc_impl::api::DomainId>,
Expand Down Expand Up @@ -568,32 +591,4 @@ mod tests {
}
}
}

#[test]
fn serialize_deserialize_workflow_instance_list() {
let messages = vec![vec![WorkflowInstance {
workflow_id: WorkflowId {
workflow_id: uuid::Uuid::new_v4(),
},
domain_mapping: vec![
WorkflowFunctionMapping {
name: "fun1".to_string(),
function_id: uuid::Uuid::new_v4(),
domain_id: "domain1".to_string(),
},
WorkflowFunctionMapping {
name: "fun2".to_string(),
function_id: uuid::Uuid::new_v4(),
domain_id: "domain2".to_string(),
},
],
}]];

for msg in messages {
match WorkflowInstanceConverters::parse_workflow_instance_list(&WorkflowInstanceConverters::serialize_workflow_instance_list(&msg)) {
Ok(val) => assert_eq!(msg, val),
Err(err) => panic!("{}", err),
}
}
}
}
9 changes: 8 additions & 1 deletion edgeless_api/src/workflow_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ pub struct SpawnWorkflowRequest {
pub annotations: std::collections::HashMap<String, String>,
}

#[derive(Clone, Debug, PartialEq, serde::Serialize)]
pub struct WorkflowInfo {
pub request: SpawnWorkflowRequest,
pub status: WorkflowInstance,
}

#[derive(Clone, Debug, PartialEq, serde::Serialize)]
pub enum SpawnWorkflowResponse {
ResponseError(crate::common::ResponseError),
Expand All @@ -83,7 +89,8 @@ pub enum SpawnWorkflowResponse {
pub trait WorkflowInstanceAPI: WorkflowInstanceAPIClone + Send + Sync {
async fn start(&mut self, request: SpawnWorkflowRequest) -> anyhow::Result<SpawnWorkflowResponse>;
async fn stop(&mut self, id: WorkflowId) -> anyhow::Result<()>;
async fn list(&mut self, id: WorkflowId) -> anyhow::Result<Vec<WorkflowInstance>>;
async fn list(&mut self) -> anyhow::Result<Vec<WorkflowId>>;
async fn inspect(&mut self, id: WorkflowId) -> anyhow::Result<WorkflowInfo>;
async fn domains(
&mut self,
domain_id: String,
Expand Down
57 changes: 48 additions & 9 deletions edgeless_cli/src/edgeless_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ enum WorkflowCommands {
Start { spec_file: String },
Stop { id: String },
List {},
Inspect { id: String },
}

#[derive(Debug, clap::Subcommand)]
Expand Down Expand Up @@ -199,26 +200,64 @@ async fn main() -> anyhow::Result<()> {
}
}
WorkflowCommands::Stop { id } => {
let parsed_id = uuid::Uuid::parse_str(&id)?;
match wf_client
.stop(edgeless_api::workflow_instance::WorkflowId { workflow_id: parsed_id })
.stop(edgeless_api::workflow_instance::WorkflowId {
workflow_id: uuid::Uuid::parse_str(&id)?,
})
.await
{
Ok(_) => println!("Workflow Stopped"),
Err(err) => println!("{}", err),
}
}
WorkflowCommands::List {} => match wf_client.list(edgeless_api::workflow_instance::WorkflowId::none()).await {
Ok(instances) => {
for instance in instances.iter() {
println!("workflow: {}", instance.workflow_id);
for function in instance.domain_mapping.iter() {
println!("\t{:?}", function);
}
WorkflowCommands::List {} => match wf_client.list().await {
Ok(identifiers) => {
for wf_id in identifiers {
println!("{}", wf_id);
}
}
Err(err) => println!("{}", err),
},
WorkflowCommands::Inspect { id } => {
match wf_client
.inspect(edgeless_api::workflow_instance::WorkflowId {
workflow_id: uuid::Uuid::parse_str(&id)?,
})
.await
{
Ok(info) => {
assert_eq!(id, info.status.workflow_id.to_string());
for fun in info.request.workflow_functions {
println!("* function {}", fun.name);
println!("{}", fun.function_class_specification.to_short_string());
for (out, next) in fun.output_mapping {
println!("OUT {} -> {}", out, next);
}
for (name, annotation) in fun.annotations {
println!("F_ANN {} -> {}", name, annotation);
}
}
for res in info.request.workflow_resources {
println!("* resource {}", res.name);
println!("{}", res.class_type);
for (out, next) in res.output_mapping {
println!("OUT {} -> {}", out, next);
}
for (name, annotation) in res.configurations {
println!("CONF{} -> {}", name, annotation);
}
}
println!("* mapping");
for (name, annotation) in info.request.annotations {
println!("W_ANN {} -> {}", name, annotation);
}
for mapping in info.status.domain_mapping {
println!("MAP {} -> {} [logical ID {}]", mapping.name, mapping.domain_id, mapping.function_id);
}
}
Err(err) => println!("{}", err),
}
}
}
}
Commands::Function { function_command } => match function_command {
Expand Down
6 changes: 5 additions & 1 deletion edgeless_con/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ pub(crate) enum ControllerRequest {
),
Stop(edgeless_api::workflow_instance::WorkflowId),
List(
// Reply Channel
tokio::sync::oneshot::Sender<anyhow::Result<Vec<edgeless_api::workflow_instance::WorkflowId>>>,
),
Inspect(
edgeless_api::workflow_instance::WorkflowId,
// Reply Channel
tokio::sync::oneshot::Sender<anyhow::Result<Vec<edgeless_api::workflow_instance::WorkflowInstance>>>,
tokio::sync::oneshot::Sender<anyhow::Result<edgeless_api::workflow_instance::WorkflowInfo>>,
),
Domains(
String,
Expand Down
20 changes: 13 additions & 7 deletions edgeless_con/src/controller/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,19 @@ impl edgeless_api::workflow_instance::WorkflowInstanceAPI for ControllerWorkflow
Err(err) => Err(anyhow::anyhow!("Controller Channel Error: {}", err)),
}
}
async fn list(
&mut self,
id: edgeless_api::workflow_instance::WorkflowId,
) -> anyhow::Result<Vec<edgeless_api::workflow_instance::WorkflowInstance>> {
let (reply_sender, reply_receiver) =
tokio::sync::oneshot::channel::<anyhow::Result<Vec<edgeless_api::workflow_instance::WorkflowInstance>>>();
if let Err(err) = self.sender.send(super::ControllerRequest::List(id.clone(), reply_sender)).await {
async fn list(&mut self) -> anyhow::Result<Vec<edgeless_api::workflow_instance::WorkflowId>> {
let (reply_sender, reply_receiver) = tokio::sync::oneshot::channel::<anyhow::Result<Vec<edgeless_api::workflow_instance::WorkflowId>>>();
if let Err(err) = self.sender.send(super::ControllerRequest::List(reply_sender)).await {
anyhow::bail!("Controller Channel Error: {}", err);
}
match reply_receiver.await {
Ok(ret) => ret,
Err(err) => Err(anyhow::anyhow!("Controller Channel Error: {}", err)),
}
}
async fn inspect(&mut self, id: edgeless_api::workflow_instance::WorkflowId) -> anyhow::Result<edgeless_api::workflow_instance::WorkflowInfo> {
let (reply_sender, reply_receiver) = tokio::sync::oneshot::channel::<anyhow::Result<edgeless_api::workflow_instance::WorkflowInfo>>();
if let Err(err) = self.sender.send(super::ControllerRequest::Inspect(id.clone(), reply_sender)).await {
anyhow::bail!("Controller Channel Error: {}", err);
}
match reply_receiver.await {
Expand Down
Loading

0 comments on commit 48a3128

Please sign in to comment.