From 1ba751dd9a919fc6704ea91eccd5b5559d2eff8f Mon Sep 17 00:00:00 2001 From: Claudio Cicconetti Date: Tue, 17 Dec 2024 16:07:31 +0100 Subject: [PATCH] API simplification: Remove unnecessary instance_id from SpawnFunctionRequest --- edgeless_api/proto/messages.proto | 2 - edgeless_api/src/function_instance.rs | 2 - .../src/grpc_impl/function_instance.rs | 9 - .../src/controller/controller_task.rs | 1 - edgeless_con/src/controller/test.rs | 3 - edgeless_node/src/agent/mod.rs | 277 ++++++++---------- .../base_runtime/function_instance_runner.rs | 2 +- edgeless_node/src/base_runtime/mod.rs | 8 +- edgeless_node/src/base_runtime/runtime.rs | 32 +- edgeless_node/src/wasm_runner/test/mod.rs | 17 +- edgeless_orc/src/orchestrator/test.rs | 1 - edgeless_orc/src/proxy_redis.rs | 1 - 12 files changed, 156 insertions(+), 199 deletions(-) diff --git a/edgeless_api/proto/messages.proto b/edgeless_api/proto/messages.proto index 63a369d7..6ef96ccc 100644 --- a/edgeless_api/proto/messages.proto +++ b/edgeless_api/proto/messages.proto @@ -51,8 +51,6 @@ message FunctionClassSpecification { // Message to request the creation a new function instance. message SpawnFunctionRequest { - // The function instance identifier. - InstanceId instance_id = 1; // The function class specification. FunctionClassSpecification code = 2; // The set of annotations associated with this function instance. diff --git a/edgeless_api/src/function_instance.rs b/edgeless_api/src/function_instance.rs index 2b725176..921e43bd 100644 --- a/edgeless_api/src/function_instance.rs +++ b/edgeless_api/src/function_instance.rs @@ -53,8 +53,6 @@ impl FunctionClassSpecification { #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] pub struct SpawnFunctionRequest { - #[serde(skip)] - pub instance_id: Option, // XXX pub code: FunctionClassSpecification, pub annotations: std::collections::HashMap, pub state_specification: StateSpecification, diff --git a/edgeless_api/src/grpc_impl/function_instance.rs b/edgeless_api/src/grpc_impl/function_instance.rs index fff488fe..23bcbe3e 100644 --- a/edgeless_api/src/grpc_impl/function_instance.rs +++ b/edgeless_api/src/grpc_impl/function_instance.rs @@ -23,10 +23,6 @@ impl FunctonInstanceConverters { api_request: &crate::grpc_impl::api::SpawnFunctionRequest, ) -> anyhow::Result { Ok(crate::function_instance::SpawnFunctionRequest { - instance_id: match api_request.instance_id.as_ref() { - Some(id) => Some(CommonConverters::parse_instance_id(id)?), - None => None, - }, code: Self::parse_function_class_specification(match api_request.code.as_ref() { Some(val) => val, None => { @@ -70,7 +66,6 @@ impl FunctonInstanceConverters { pub fn serialize_spawn_function_request(req: &crate::function_instance::SpawnFunctionRequest) -> crate::grpc_impl::api::SpawnFunctionRequest { crate::grpc_impl::api::SpawnFunctionRequest { - instance_id: req.instance_id.as_ref().map(CommonConverters::serialize_instance_id), code: Some(Self::serialize_function_class_specification(&req.code)), annotations: req.annotations.clone(), state_specification: Some(Self::serialize_state_specification(&req.state_specification)), @@ -310,10 +305,6 @@ mod tests { #[test] fn serialize_deserialize_spawn_function_request() { let messages = vec![SpawnFunctionRequest { - instance_id: Some(InstanceId { - node_id: uuid::Uuid::new_v4(), - function_id: uuid::Uuid::new_v4(), - }), code: FunctionClassSpecification { function_class_id: "my-func-id".to_string(), function_class_type: "WASM".to_string(), diff --git a/edgeless_con/src/controller/controller_task.rs b/edgeless_con/src/controller/controller_task.rs index 6806318e..c995540f 100644 --- a/edgeless_con/src/controller/controller_task.rs +++ b/edgeless_con/src/controller/controller_task.rs @@ -591,7 +591,6 @@ impl ControllerTask { .fn_client(domain) .ok_or(format!("No function client for domain: {}", domain))? .start(edgeless_api::function_instance::SpawnFunctionRequest { - instance_id: None, code: function.function_class_specification.clone(), annotations: function.annotations.clone(), state_specification: edgeless_api::function_instance::StateSpecification { diff --git a/edgeless_con/src/controller/test.rs b/edgeless_con/src/controller/test.rs index d9a1f53e..f142b01e 100644 --- a/edgeless_con/src/controller/test.rs +++ b/edgeless_con/src/controller/test.rs @@ -186,7 +186,6 @@ async fn single_function_start_stop() { assert!(new_func_id.is_nil()); if let MockFunctionInstanceEvent::StartFunction((id, spawn_req)) = mock_orc_receiver.try_next().unwrap().unwrap() { new_func_id = id; - assert!(spawn_req.instance_id.is_none()); assert_eq!(function_class_specification, spawn_req.code); assert!(spawn_req.annotations.is_empty()); // TODO check state specifications @@ -366,7 +365,6 @@ async fn function_link_loop_start_stop() { assert!(new_func1_id.is_nil()); if let MockFunctionInstanceEvent::StartFunction((id, spawn_req)) = mock_orc_receiver.try_next().unwrap().unwrap() { new_func1_id = id; - assert!(spawn_req.instance_id.is_none()); assert!(spawn_req.annotations.is_empty()); // TODO check state specifications } else { @@ -377,7 +375,6 @@ async fn function_link_loop_start_stop() { assert!(new_func2_id.is_nil()); if let MockFunctionInstanceEvent::StartFunction((id, spawn_req)) = mock_orc_receiver.try_next().unwrap().unwrap() { new_func2_id = id; - assert!(spawn_req.instance_id.is_none()); assert!(spawn_req.annotations.is_empty()); // TODO check state specifications } else { diff --git a/edgeless_node/src/agent/mod.rs b/edgeless_node/src/agent/mod.rs index bc57f593..926c4a5e 100644 --- a/edgeless_node/src/agent/mod.rs +++ b/edgeless_node/src/agent/mod.rs @@ -10,14 +10,17 @@ pub mod test; enum AgentRequest { // Function lifecycle management API. - SpawnFunction(edgeless_api::function_instance::SpawnFunctionRequest), + SpawnFunction( + edgeless_api::function_instance::SpawnFunctionRequest, + futures::channel::oneshot::Sender>, + ), StopFunction(edgeless_api::function_instance::InstanceId), PatchFunction(edgeless_api::common::PatchRequest), // Resource lifecycle management API. SpawnResource( edgeless_api::resource_configuration::ResourceInstanceSpecification, - futures::channel::oneshot::Sender>>, + futures::channel::oneshot::Sender>, ), StopResource( edgeless_api::function_instance::InstanceId, @@ -32,7 +35,6 @@ enum AgentRequest { pub struct Agent { sender: futures::channel::mpsc::UnboundedSender, - node_id: uuid::Uuid, } pub struct ResourceDesc { @@ -57,7 +59,7 @@ impl Agent { Self::main_task(node_id, receiver, runners, resources, data_plane_provider).await; }); - (Agent { sender, node_id }, main_task) + (Agent { sender }, main_task) } async fn main_task( @@ -89,73 +91,66 @@ impl Agent { // value: provider_id let mut resource_instances = std::collections::HashMap::::new(); - log::info!("Starting Edgeless Agent"); + log::info!("Starting EDGELESS node agent"); while let Some(req) = receiver.next().await { match req { - AgentRequest::SpawnFunction(spawn_req) => { - log::debug!("Agent Spawn {:?}", spawn_req); - - // Save function_class for further interaction. - // We can assume that the Optional is present. - if spawn_req.instance_id.is_none() { - log::error!("No instance_id provided for SpawnFunctionRequest!"); - continue; - } - function_instances.insert(spawn_req.instance_id.unwrap().function_id, spawn_req.code.function_class_type.clone()); + AgentRequest::SpawnFunction(spawn_req, responder) => { + log::debug!("Agent SpawnFunction {:?}", spawn_req.code.to_short_string()); // Get runner for function_class of spawn_req - match function_runtimes.get_mut(&spawn_req.code.function_class_type) { + let res = match function_runtimes.get_mut(&spawn_req.code.function_class_type) { Some(runner) => { - // Forward the start request to the correct runner - match runner.start(spawn_req).await { - Ok(_) => {} - Err(err) => { - log::error!("Unhandled Start Error: {}", err); - continue; - } + // Assign a new physical identifier to the function instance being created. + let instance_id = edgeless_api::function_instance::InstanceId::new(node_id); + + // Save function_class for further interaction. + function_instances.insert(instance_id.function_id, spawn_req.code.function_class_type.clone()); + + // Forward the start request to the matching runtime. + match runner.start(instance_id, spawn_req).await { + Ok(_) => edgeless_api::common::StartComponentResponse::InstanceId(instance_id), + Err(err) => edgeless_api::common::StartComponentResponse::ResponseError(edgeless_api::common::ResponseError { + summary: "Could not start function".to_string(), + detail: Some(format!("{}", err)), + }), } } - None => { - log::warn!("Could not find runner for {}", spawn_req.code.function_class_type); - continue; - } - } + None => edgeless_api::common::StartComponentResponse::ResponseError(edgeless_api::common::ResponseError { + summary: "Error when creating a function instance".to_string(), + detail: Some(format!("Could not find runner for {}", spawn_req.code.function_class_type)), + }), + }; + responder + .send(res) + .unwrap_or_else(|_| log::warn!("Agent SpawnFunction: responder send error")); } AgentRequest::StopFunction(stop_function_id) => { - log::debug!("Agent Stop {:?}", stop_function_id); + log::debug!("Agent StopFunction {:?}", stop_function_id); Self::stop_function(&mut function_runtimes, &mut function_instances, stop_function_id).await; } // PatchRequest contains function_id: ComponentId AgentRequest::PatchFunction(update) => { - log::debug!("Agent UpdatePeers {:?}", update); + log::debug!("Agent PatchFunction {:?}", update); // Get function class by looking it up in the instanceId->functionClass map - let function_class: String = match function_instances.get(&update.function_id) { - Some(v) => v.clone(), - None => { - log::error!("Could not find function_class for instanceId {}", update.function_id); - continue; - } - }; - - // Get runner for function_class - match function_runtimes.get_mut(&function_class) { - Some(runner) => { - // Forward the patch request to the correct runner - match runner.patch(update).await { - Ok(_) => {} - Err(err) => { + // and then orward the patch request to the correct runner. + match function_instances.get(&update.function_id) { + Some(function_class) => match function_runtimes.get_mut(function_class) { + Some(runner) => { + if let Err(err) = runner.patch(update).await { log::error!("Unhandled Patch Error: {}", err); } } - } + None => { + log::error!("Could not find runner for function class '{}' when patching", function_class); + } + }, None => { - log::error!("Could not find runner for {}", function_class); - continue; + log::error!("Could not find function class for instanceId '{}' patching", update.function_id); } - } + }; } AgentRequest::UpdatePeers(request) => { log::debug!("Agent UpdatePeers {:?}", request); @@ -170,74 +165,70 @@ impl Agent { }; } AgentRequest::SpawnResource(instance_specification, responder) => { - if let Some((provider_id, resource_desc)) = resource_providers + log::debug!("Agent SpawnResource {:?}", instance_specification); + + let res = if let Some((provider_id, resource_desc)) = resource_providers .iter_mut() .find(|(_provider_id, resource_desc)| resource_desc.class_type == instance_specification.class_type) { - let res = match resource_desc.client.start(instance_specification).await { - Ok(val) => val, - Err(err) => { - responder - .send(Err(anyhow::anyhow!("Internal Resource Error {}", err))) - .unwrap_or_else(|_| log::warn!("Responder Send Error")); - continue; - } - }; - if let edgeless_api::common::StartComponentResponse::InstanceId(id) = res { - log::info!( - "Started resource class_type {}, provider_id {}, node_id {}, fid {}", - resource_desc.class_type, - provider_id, - id.node_id, - id.function_id - ); - resource_instances.insert(id.function_id, provider_id.clone()); - responder - .send(Ok(edgeless_api::common::StartComponentResponse::InstanceId(id))) - .unwrap_or_else(|_| log::warn!("Responder Send Error")); - } else { - responder.send(Ok(res)).unwrap_or_else(|_| log::warn!("Responder Send Error")); + match resource_desc.client.start(instance_specification).await { + Ok(val) => match val { + edgeless_api::common::StartComponentResponse::InstanceId(id) => { + log::info!( + "Started resource class_type {}, provider_id {}, node_id {}, fid {}", + resource_desc.class_type, + provider_id, + id.node_id, + id.function_id + ); + resource_instances.insert(id.function_id, provider_id.clone()); + edgeless_api::common::StartComponentResponse::InstanceId(id) + } + edgeless_api::common::StartComponentResponse::ResponseError(err) => { + edgeless_api::common::StartComponentResponse::ResponseError(err) + } + }, + Err(err) => edgeless_api::common::StartComponentResponse::ResponseError(edgeless_api::common::ResponseError { + summary: "Error when creating a resource".to_string(), + detail: Some(format!("{}", err)), + }), } } else { - responder - .send(Ok(edgeless_api::common::StartComponentResponse::ResponseError( - edgeless_api::common::ResponseError { - summary: "Error when creating a resource".to_string(), - detail: Some(format!("Provider for class_type does not exist: {}", instance_specification.class_type)), - }, - ))) - .unwrap_or_else(|_| log::warn!("Responder Send Error")); - } + edgeless_api::common::StartComponentResponse::ResponseError(edgeless_api::common::ResponseError { + summary: "Error when creating a resource".to_string(), + detail: Some(format!("Provider for class_type does not exist: {}", instance_specification.class_type)), + }) + }; + responder + .send(res) + .unwrap_or_else(|_| log::warn!("Agent SpawnResource: responder send error")); } AgentRequest::StopResource(resource_id, responder) => { + log::debug!("Agent StopResource {:?}", resource_id); + responder .send(Self::stop_resource(&mut resource_providers, &mut resource_instances, resource_id).await) - .unwrap_or_else(|_| log::warn!("Responder Send Error")); + .unwrap_or_else(|_| log::warn!("Agent StopResource: responder send error")); } AgentRequest::PatchResource(update, responder) => { - if let Some(provider_id) = resource_instances.get(&update.function_id) { + log::debug!("Agent PatchResource {:?}", update); + + let res = if let Some(provider_id) = resource_instances.get(&update.function_id) { if let Some(resource_desc) = resource_providers.get_mut(provider_id) { log::info!("Patch resource provider_id {} fid {}", provider_id, update.function_id); - responder - .send(resource_desc.client.patch(update).await) - .unwrap_or_else(|_| log::warn!("Responder Send Error")); - continue; + resource_desc.client.patch(update).await } else { - responder - .send(Err(anyhow::anyhow!( - "Cannot patch a resource, provider not found with provider_id: {}", - provider_id - ))) - .unwrap_or_else(|_| log::warn!("Responder Send Error")); - continue; + Err(anyhow::anyhow!( + "Cannot patch a resource, provider not found with provider_id: {}", + provider_id + )) } - } + } else { + Err(anyhow::anyhow!("Cannot patch a resource, not found with fid: {}", update.function_id)) + }; responder - .send(Err(anyhow::anyhow!( - "Cannot patch a resource, not found with fid: {}", - update.function_id - ))) - .unwrap_or_else(|_| log::warn!("Responder Send Error")); + .send(res) + .unwrap_or_else(|_| log::warn!("Agent PatchResource: responder send error")); } AgentRequest::Reset() => { log::info!("Resetting the node to a clean state"); @@ -342,10 +333,7 @@ impl Agent { pub fn get_api_client(&mut self) -> Box { Box::new(AgentClient { - function_instance_client: Box::new(FunctionInstanceNodeClient { - sender: self.sender.clone(), - node_id: self.node_id, - }), + function_instance_client: Box::new(FunctionInstanceNodeClient { sender: self.sender.clone() }), node_management_client: Box::new(NodeManagementClient { sender: self.sender.clone() }), resource_configuration_client: Box::new(ResourceConfigurationClient { sender: self.sender.clone() }), }) @@ -355,7 +343,6 @@ impl Agent { #[derive(Clone)] pub struct FunctionInstanceNodeClient { sender: futures::channel::mpsc::UnboundedSender, - node_id: uuid::Uuid, } #[derive(Clone)] @@ -382,60 +369,45 @@ impl edgeless_api::function_instance::FunctionInstanceAPI anyhow::Result> { - let mut request = request; - let f_id = match request.instance_id { - Some(id) => id, - None => { - let new_id = edgeless_api::function_instance::InstanceId::new(self.node_id); - request.instance_id = Some(new_id); - new_id - } - }; - match self.sender.send(AgentRequest::SpawnFunction(request)).await { - Ok(_) => Ok(edgeless_api::common::StartComponentResponse::InstanceId(f_id)), - Err(err) => Err(anyhow::anyhow!( - "Agent channel error when creating a function instance: {}", - err.to_string() - )), - } + let (rsp_sender, rsp_receiver) = + futures::channel::oneshot::channel::>(); + let _ = self + .sender + .send(AgentRequest::SpawnFunction(request, rsp_sender)) + .await + .map_err(|err| anyhow::anyhow!("Agent channel error when creating a function instance: {}", err.to_string()))?; + rsp_receiver + .await + .map_err(|err| anyhow::anyhow!("Agent channel error when creating a function instance: {}", err.to_string())) } async fn stop(&mut self, id: edgeless_api::function_instance::InstanceId) -> anyhow::Result<()> { - match self.sender.send(AgentRequest::StopFunction(id)).await { - Ok(_) => Ok(()), - Err(err) => Err(anyhow::anyhow!( - "Agent channel error when stopping a function instance: {}", - err.to_string() - )), - } + self.sender + .send(AgentRequest::StopFunction(id)) + .await + .map_err(|err| anyhow::anyhow!("Agent channel error when stopping a function instance: {}", err.to_string())) } async fn patch(&mut self, update: edgeless_api::common::PatchRequest) -> anyhow::Result<()> { - match self.sender.send(AgentRequest::PatchFunction(update)).await { - Ok(_) => Ok(()), - Err(err) => Err(anyhow::anyhow!( - "Agent channel error when updating the links of a function instance: {}", - err.to_string() - )), - } + self.sender + .send(AgentRequest::PatchFunction(update)) + .await + .map_err(|err| anyhow::anyhow!("Agent channel error when patching a function instance: {}", err.to_string())) } } #[async_trait::async_trait] impl edgeless_api::node_management::NodeManagementAPI for NodeManagementClient { async fn update_peers(&mut self, request: edgeless_api::node_management::UpdatePeersRequest) -> anyhow::Result<()> { - match self.sender.send(AgentRequest::UpdatePeers(request)).await { - Ok(_) => Ok(()), - Err(err) => Err(anyhow::anyhow!( - "Agent channel error when updating the peers of a node: {}", - err.to_string() - )), - } + self.sender + .send(AgentRequest::UpdatePeers(request)) + .await + .map_err(|err| anyhow::anyhow!("Agent channel error when updating a node's peers: {}", err.to_string())) } async fn reset(&mut self) -> anyhow::Result<()> { - match self.sender.send(AgentRequest::Reset()).await { - Ok(_) => Ok(()), - Err(err) => Err(anyhow::anyhow!("Agent channel error when resetting a node: {}", err.to_string())), - } + self.sender + .send(AgentRequest::Reset()) + .await + .map_err(|err| anyhow::anyhow!("Agent channel error when resetting a node: {}", err.to_string())) } } @@ -445,9 +417,8 @@ impl edgeless_api::resource_configuration::ResourceConfigurationAPI anyhow::Result> { - let (rsp_sender, rsp_receiver) = futures::channel::oneshot::channel::< - anyhow::Result>, - >(); + let (rsp_sender, rsp_receiver) = + futures::channel::oneshot::channel::>(); let _ = self .sender .send(AgentRequest::SpawnResource(request, rsp_sender)) @@ -455,7 +426,7 @@ impl edgeless_api::resource_configuration::ResourceConfigurationAPI anyhow::Result<()> { let (rsp_sender, rsp_receiver) = futures::channel::oneshot::channel::>(); @@ -473,10 +444,10 @@ impl edgeless_api::resource_configuration::ResourceConfigurationAPI { impl FunctionInstanceRunner { pub async fn new( + instance_id: edgeless_api::function_instance::InstanceId, spawn_req: edgeless_api::function_instance::SpawnFunctionRequest, data_plane: edgeless_dataplane::handle::DataplaneHandle, runtime_api: futures::channel::mpsc::UnboundedSender, @@ -44,7 +45,6 @@ impl FunctionInstanceRunner, guest_api_host_register: std::sync::Arc>>, ) -> Self { - let instance_id = spawn_req.instance_id.unwrap(); let mut telemetry_handle = telemetry_handle; let mut state_handle = state_handle; diff --git a/edgeless_node/src/base_runtime/mod.rs b/edgeless_node/src/base_runtime/mod.rs index 3047030f..59ad5383 100644 --- a/edgeless_node/src/base_runtime/mod.rs +++ b/edgeless_node/src/base_runtime/mod.rs @@ -7,11 +7,15 @@ pub mod function_instance_runner; pub mod guest_api; pub mod runtime; -/// (Deprecated) Trait to be implemented by each runtime. +/// Trait to be implemented by each runtime. /// You won't need to implement this trait if you use the base_runtime that is generic over the `FunctionInstance` trait #[async_trait::async_trait] pub trait RuntimeAPI { - async fn start(&mut self, request: edgeless_api::function_instance::SpawnFunctionRequest) -> anyhow::Result<()>; + async fn start( + &mut self, + instance_id: edgeless_api::function_instance::InstanceId, + request: edgeless_api::function_instance::SpawnFunctionRequest, + ) -> anyhow::Result<()>; async fn stop(&mut self, instance_id: edgeless_api::function_instance::InstanceId) -> anyhow::Result<()>; async fn patch(&mut self, update: edgeless_api::common::PatchRequest) -> anyhow::Result<()>; } diff --git a/edgeless_node/src/base_runtime/runtime.rs b/edgeless_node/src/base_runtime/runtime.rs index 3fdbde43..12cd8604 100644 --- a/edgeless_node/src/base_runtime/runtime.rs +++ b/edgeless_node/src/base_runtime/runtime.rs @@ -41,7 +41,10 @@ pub struct RuntimeTask { } pub enum RuntimeRequest { - Start(edgeless_api::function_instance::SpawnFunctionRequest), + Start( + edgeless_api::function_instance::InstanceId, + edgeless_api::function_instance::SpawnFunctionRequest, + ), Stop(edgeless_api::function_instance::InstanceId), Patch(edgeless_api::common::PatchRequest), FunctionExit(edgeless_api::function_instance::InstanceId, Result<(), super::FunctionInstanceError>), @@ -93,8 +96,8 @@ impl RuntimeTask { - self.start_function(spawn_request).await; + RuntimeRequest::Start(instance_id, spawn_request) => { + self.start_function(instance_id, spawn_request).await; } RuntimeRequest::Stop(instance_id) => { self.stop_function(instance_id).await; @@ -109,17 +112,16 @@ impl RuntimeTask id, - None => { - return; - } - }; + async fn start_function( + &mut self, + instance_id: edgeless_api::function_instance::InstanceId, + spawn_request: edgeless_api::function_instance::SpawnFunctionRequest, + ) { + log::info!("Start Function {:?}", instance_id); let cloned_req = spawn_request.clone(); let data_plane = self.data_plane_provider.get_handle_for(instance_id).await; let instance = super::function_instance_runner::FunctionInstanceRunner::new( + instance_id, cloned_req, data_plane, self.slf_channel.clone(), @@ -166,8 +168,12 @@ impl RuntimeClient { #[async_trait::async_trait] impl super::RuntimeAPI for RuntimeClient { - async fn start(&mut self, request: edgeless_api::function_instance::SpawnFunctionRequest) -> anyhow::Result<()> { - match self.sender.send(RuntimeRequest::Start(request)).await { + async fn start( + &mut self, + instance_id: edgeless_api::function_instance::InstanceId, + request: edgeless_api::function_instance::SpawnFunctionRequest, + ) -> anyhow::Result<()> { + match self.sender.send(RuntimeRequest::Start(instance_id, request)).await { Ok(_) => Ok(()), Err(_) => Err(anyhow::anyhow!("Runner Channel Error")), } diff --git a/edgeless_node/src/wasm_runner/test/mod.rs b/edgeless_node/src/wasm_runner/test/mod.rs index 8025240f..b965c254 100644 --- a/edgeless_node/src/wasm_runner/test/mod.rs +++ b/edgeless_node/src/wasm_runner/test/mod.rs @@ -95,7 +95,6 @@ async fn basic_lifecycle() { tokio::spawn(async move { rt_task.run().await }); let spawn_req = edgeless_api::function_instance::SpawnFunctionRequest { - instance_id: Some(instance_id), code: edgeless_api::function_instance::FunctionClassSpecification { function_class_id: "EXAMPLE_1".to_string(), function_class_type: "RUST_WASM".to_string(), @@ -112,7 +111,7 @@ async fn basic_lifecycle() { assert!(telemetry_mock_receiver.try_recv().is_err()); - let _res = client.start(spawn_req).await; + let _res = client.start(instance_id, spawn_req).await; // wait for lifetime events created during spawn tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; @@ -235,7 +234,6 @@ async fn messaging_test_setup() -> ( tokio::spawn(async move { rt_task.run().await }); let spawn_req = edgeless_api::function_instance::SpawnFunctionRequest { - instance_id: Some(instance_id), code: edgeless_api::function_instance::FunctionClassSpecification { function_class_id: "EXAMPLE_1".to_string(), function_class_type: "RUST_WASM".to_string(), @@ -252,7 +250,7 @@ async fn messaging_test_setup() -> ( assert!(telemetry_mock_receiver.try_recv().is_err()); - let res = client.start(spawn_req).await; + let res = client.start(instance_id, spawn_req).await; assert!(res.is_ok()); let res = client @@ -514,7 +512,7 @@ async fn messaging_call_raw_input_err() { async fn state_management() { let node_id = uuid::Uuid::new_v4(); let instance_id = edgeless_api::function_instance::InstanceId::new(node_id); - let fid2 = edgeless_api::function_instance::InstanceId::new(node_id); + let instance_id_another = edgeless_api::function_instance::InstanceId::new(node_id); let output_mocks = std::sync::Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())); let (state_mock_sender, mut state_mock_receiver) = futures::channel::mpsc::unbounded::<(uuid::Uuid, String)>(); @@ -545,8 +543,7 @@ async fn state_management() { tokio::spawn(async move { rt_task.run().await }); - let mut spawn_req = edgeless_api::function_instance::SpawnFunctionRequest { - instance_id: Some(instance_id), + let spawn_req = edgeless_api::function_instance::SpawnFunctionRequest { code: edgeless_api::function_instance::FunctionClassSpecification { function_class_id: "EXAMPLE_1".to_string(), function_class_type: "RUST_WASM".to_string(), @@ -563,7 +560,7 @@ async fn state_management() { assert!(telemetry_mock_receiver.try_recv().is_err()); - let res = client.start(spawn_req.clone()).await; + let res = client.start(instance_id, spawn_req.clone()).await; assert!(res.is_ok()); tokio::time::sleep(Duration::from_millis(50)).await; @@ -608,9 +605,7 @@ async fn state_management() { output_mocks.lock().await.insert(instance_id.function_id, "existing_state".to_string()); // TODO(raphaelhetzel) InstanceId reuse leads to problems that need to be fixed. - spawn_req.instance_id = Some(fid2); - - let res2 = client.start(spawn_req).await; + let res2 = client.start(instance_id_another, spawn_req).await; assert!(res2.is_ok()); tokio::time::sleep(Duration::from_millis(100)).await; diff --git a/edgeless_orc/src/orchestrator/test.rs b/edgeless_orc/src/orchestrator/test.rs index 083616d9..f6480f01 100644 --- a/edgeless_orc/src/orchestrator/test.rs +++ b/edgeless_orc/src/orchestrator/test.rs @@ -330,7 +330,6 @@ async fn clear_events(receivers: &mut std::collections::HashMap edgeless_api::function_instance::SpawnFunctionRequest { edgeless_api::function_instance::SpawnFunctionRequest { - instance_id: None, code: FunctionClassSpecification { function_class_id: class_id.to_string(), function_class_type: "RUST_WASM".to_string(), diff --git a/edgeless_orc/src/proxy_redis.rs b/edgeless_orc/src/proxy_redis.rs index 2c955c0d..7a0fab2f 100644 --- a/edgeless_orc/src/proxy_redis.rs +++ b/edgeless_orc/src/proxy_redis.rs @@ -732,7 +732,6 @@ mod test { logical_physical_ids.last().unwrap().0, crate::active_instance::ActiveInstance::Function( SpawnFunctionRequest { - instance_id: None, code: edgeless_api::function_instance::FunctionClassSpecification { function_class_id: "fun".to_string(), function_class_type: "class".to_string(),