Skip to content

Commit

Permalink
API simplification: Remove unnecessary instance_id from SpawnFunction…
Browse files Browse the repository at this point in the history
…Request
  • Loading branch information
ccicconetti committed Dec 17, 2024
1 parent b953bdb commit 1ba751d
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 199 deletions.
2 changes: 0 additions & 2 deletions edgeless_api/proto/messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ message FunctionClassSpecification {

// Message to request the creation a new function instance.
message SpawnFunctionRequest {
// The function instance identifier.
InstanceId instance_id = 1;
// The function class specification.
FunctionClassSpecification code = 2;
// The set of annotations associated with this function instance.
Expand Down
2 changes: 0 additions & 2 deletions edgeless_api/src/function_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ impl FunctionClassSpecification {

#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct SpawnFunctionRequest {
#[serde(skip)]
pub instance_id: Option<InstanceId>, // XXX
pub code: FunctionClassSpecification,
pub annotations: std::collections::HashMap<String, String>,
pub state_specification: StateSpecification,
Expand Down
9 changes: 0 additions & 9 deletions edgeless_api/src/grpc_impl/function_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ impl FunctonInstanceConverters {
api_request: &crate::grpc_impl::api::SpawnFunctionRequest,
) -> anyhow::Result<crate::function_instance::SpawnFunctionRequest> {
Ok(crate::function_instance::SpawnFunctionRequest {
instance_id: match api_request.instance_id.as_ref() {
Some(id) => Some(CommonConverters::parse_instance_id(id)?),
None => None,
},
code: Self::parse_function_class_specification(match api_request.code.as_ref() {
Some(val) => val,
None => {
Expand Down Expand Up @@ -70,7 +66,6 @@ impl FunctonInstanceConverters {

pub fn serialize_spawn_function_request(req: &crate::function_instance::SpawnFunctionRequest) -> crate::grpc_impl::api::SpawnFunctionRequest {
crate::grpc_impl::api::SpawnFunctionRequest {
instance_id: req.instance_id.as_ref().map(CommonConverters::serialize_instance_id),
code: Some(Self::serialize_function_class_specification(&req.code)),
annotations: req.annotations.clone(),
state_specification: Some(Self::serialize_state_specification(&req.state_specification)),
Expand Down Expand Up @@ -310,10 +305,6 @@ mod tests {
#[test]
fn serialize_deserialize_spawn_function_request() {
let messages = vec![SpawnFunctionRequest {
instance_id: Some(InstanceId {
node_id: uuid::Uuid::new_v4(),
function_id: uuid::Uuid::new_v4(),
}),
code: FunctionClassSpecification {
function_class_id: "my-func-id".to_string(),
function_class_type: "WASM".to_string(),
Expand Down
1 change: 0 additions & 1 deletion edgeless_con/src/controller/controller_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,6 @@ impl ControllerTask {
.fn_client(domain)
.ok_or(format!("No function client for domain: {}", domain))?
.start(edgeless_api::function_instance::SpawnFunctionRequest {
instance_id: None,
code: function.function_class_specification.clone(),
annotations: function.annotations.clone(),
state_specification: edgeless_api::function_instance::StateSpecification {
Expand Down
3 changes: 0 additions & 3 deletions edgeless_con/src/controller/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ async fn single_function_start_stop() {
assert!(new_func_id.is_nil());
if let MockFunctionInstanceEvent::StartFunction((id, spawn_req)) = mock_orc_receiver.try_next().unwrap().unwrap() {
new_func_id = id;
assert!(spawn_req.instance_id.is_none());
assert_eq!(function_class_specification, spawn_req.code);
assert!(spawn_req.annotations.is_empty());
// TODO check state specifications
Expand Down Expand Up @@ -366,7 +365,6 @@ async fn function_link_loop_start_stop() {
assert!(new_func1_id.is_nil());
if let MockFunctionInstanceEvent::StartFunction((id, spawn_req)) = mock_orc_receiver.try_next().unwrap().unwrap() {
new_func1_id = id;
assert!(spawn_req.instance_id.is_none());
assert!(spawn_req.annotations.is_empty());
// TODO check state specifications
} else {
Expand All @@ -377,7 +375,6 @@ async fn function_link_loop_start_stop() {
assert!(new_func2_id.is_nil());
if let MockFunctionInstanceEvent::StartFunction((id, spawn_req)) = mock_orc_receiver.try_next().unwrap().unwrap() {
new_func2_id = id;
assert!(spawn_req.instance_id.is_none());
assert!(spawn_req.annotations.is_empty());
// TODO check state specifications
} else {
Expand Down
277 changes: 124 additions & 153 deletions edgeless_node/src/agent/mod.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion edgeless_node/src/base_runtime/function_instance_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ struct FunctionInstanceTask<FunctionInstanceType: FunctionInstance> {

impl<FunctionInstanceType: FunctionInstance> FunctionInstanceRunner<FunctionInstanceType> {
pub async fn new(
instance_id: edgeless_api::function_instance::InstanceId,
spawn_req: edgeless_api::function_instance::SpawnFunctionRequest,
data_plane: edgeless_dataplane::handle::DataplaneHandle,
runtime_api: futures::channel::mpsc::UnboundedSender<super::runtime::RuntimeRequest>,
state_handle: Box<dyn crate::state_management::StateHandleAPI>,
telemetry_handle: Box<dyn edgeless_telemetry::telemetry_events::TelemetryHandleAPI>,
guest_api_host_register: std::sync::Arc<tokio::sync::Mutex<Box<dyn super::runtime::GuestAPIHostRegister + Send>>>,
) -> Self {
let instance_id = spawn_req.instance_id.unwrap();
let mut telemetry_handle = telemetry_handle;
let mut state_handle = state_handle;

Expand Down
8 changes: 6 additions & 2 deletions edgeless_node/src/base_runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ pub mod function_instance_runner;
pub mod guest_api;
pub mod runtime;

/// (Deprecated) Trait to be implemented by each runtime.
/// Trait to be implemented by each runtime.
/// You won't need to implement this trait if you use the base_runtime that is generic over the `FunctionInstance` trait
#[async_trait::async_trait]
pub trait RuntimeAPI {
async fn start(&mut self, request: edgeless_api::function_instance::SpawnFunctionRequest) -> anyhow::Result<()>;
async fn start(
&mut self,
instance_id: edgeless_api::function_instance::InstanceId,
request: edgeless_api::function_instance::SpawnFunctionRequest,
) -> anyhow::Result<()>;
async fn stop(&mut self, instance_id: edgeless_api::function_instance::InstanceId) -> anyhow::Result<()>;
async fn patch(&mut self, update: edgeless_api::common::PatchRequest) -> anyhow::Result<()>;
}
Expand Down
32 changes: 19 additions & 13 deletions edgeless_node/src/base_runtime/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ pub struct RuntimeTask<FunctionInstanceType: super::FunctionInstance> {
}

pub enum RuntimeRequest {
Start(edgeless_api::function_instance::SpawnFunctionRequest),
Start(
edgeless_api::function_instance::InstanceId,
edgeless_api::function_instance::SpawnFunctionRequest,
),
Stop(edgeless_api::function_instance::InstanceId),
Patch(edgeless_api::common::PatchRequest),
FunctionExit(edgeless_api::function_instance::InstanceId, Result<(), super::FunctionInstanceError>),
Expand Down Expand Up @@ -93,8 +96,8 @@ impl<FunctionInstanceType: super::FunctionInstance> RuntimeTask<FunctionInstance
log::info!("Starting Edgeless Runner");
while let Some(req) = self.receiver.next().await {
match req {
RuntimeRequest::Start(spawn_request) => {
self.start_function(spawn_request).await;
RuntimeRequest::Start(instance_id, spawn_request) => {
self.start_function(instance_id, spawn_request).await;
}
RuntimeRequest::Stop(instance_id) => {
self.stop_function(instance_id).await;
Expand All @@ -109,17 +112,16 @@ impl<FunctionInstanceType: super::FunctionInstance> RuntimeTask<FunctionInstance
}
}

async fn start_function(&mut self, spawn_request: edgeless_api::function_instance::SpawnFunctionRequest) {
log::info!("Start Function {:?}", spawn_request.instance_id);
let instance_id = match spawn_request.instance_id {
Some(id) => id,
None => {
return;
}
};
async fn start_function(
&mut self,
instance_id: edgeless_api::function_instance::InstanceId,
spawn_request: edgeless_api::function_instance::SpawnFunctionRequest,
) {
log::info!("Start Function {:?}", instance_id);
let cloned_req = spawn_request.clone();
let data_plane = self.data_plane_provider.get_handle_for(instance_id).await;
let instance = super::function_instance_runner::FunctionInstanceRunner::new(
instance_id,
cloned_req,
data_plane,
self.slf_channel.clone(),
Expand Down Expand Up @@ -166,8 +168,12 @@ impl RuntimeClient {

#[async_trait::async_trait]
impl super::RuntimeAPI for RuntimeClient {
async fn start(&mut self, request: edgeless_api::function_instance::SpawnFunctionRequest) -> anyhow::Result<()> {
match self.sender.send(RuntimeRequest::Start(request)).await {
async fn start(
&mut self,
instance_id: edgeless_api::function_instance::InstanceId,
request: edgeless_api::function_instance::SpawnFunctionRequest,
) -> anyhow::Result<()> {
match self.sender.send(RuntimeRequest::Start(instance_id, request)).await {
Ok(_) => Ok(()),
Err(_) => Err(anyhow::anyhow!("Runner Channel Error")),
}
Expand Down
17 changes: 6 additions & 11 deletions edgeless_node/src/wasm_runner/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ async fn basic_lifecycle() {
tokio::spawn(async move { rt_task.run().await });

let spawn_req = edgeless_api::function_instance::SpawnFunctionRequest {
instance_id: Some(instance_id),
code: edgeless_api::function_instance::FunctionClassSpecification {
function_class_id: "EXAMPLE_1".to_string(),
function_class_type: "RUST_WASM".to_string(),
Expand All @@ -112,7 +111,7 @@ async fn basic_lifecycle() {

assert!(telemetry_mock_receiver.try_recv().is_err());

let _res = client.start(spawn_req).await;
let _res = client.start(instance_id, spawn_req).await;

// wait for lifetime events created during spawn
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
Expand Down Expand Up @@ -235,7 +234,6 @@ async fn messaging_test_setup() -> (
tokio::spawn(async move { rt_task.run().await });

let spawn_req = edgeless_api::function_instance::SpawnFunctionRequest {
instance_id: Some(instance_id),
code: edgeless_api::function_instance::FunctionClassSpecification {
function_class_id: "EXAMPLE_1".to_string(),
function_class_type: "RUST_WASM".to_string(),
Expand All @@ -252,7 +250,7 @@ async fn messaging_test_setup() -> (

assert!(telemetry_mock_receiver.try_recv().is_err());

let res = client.start(spawn_req).await;
let res = client.start(instance_id, spawn_req).await;
assert!(res.is_ok());

let res = client
Expand Down Expand Up @@ -514,7 +512,7 @@ async fn messaging_call_raw_input_err() {
async fn state_management() {
let node_id = uuid::Uuid::new_v4();
let instance_id = edgeless_api::function_instance::InstanceId::new(node_id);
let fid2 = edgeless_api::function_instance::InstanceId::new(node_id);
let instance_id_another = edgeless_api::function_instance::InstanceId::new(node_id);

let output_mocks = std::sync::Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new()));
let (state_mock_sender, mut state_mock_receiver) = futures::channel::mpsc::unbounded::<(uuid::Uuid, String)>();
Expand Down Expand Up @@ -545,8 +543,7 @@ async fn state_management() {

tokio::spawn(async move { rt_task.run().await });

let mut spawn_req = edgeless_api::function_instance::SpawnFunctionRequest {
instance_id: Some(instance_id),
let spawn_req = edgeless_api::function_instance::SpawnFunctionRequest {
code: edgeless_api::function_instance::FunctionClassSpecification {
function_class_id: "EXAMPLE_1".to_string(),
function_class_type: "RUST_WASM".to_string(),
Expand All @@ -563,7 +560,7 @@ async fn state_management() {

assert!(telemetry_mock_receiver.try_recv().is_err());

let res = client.start(spawn_req.clone()).await;
let res = client.start(instance_id, spawn_req.clone()).await;
assert!(res.is_ok());

tokio::time::sleep(Duration::from_millis(50)).await;
Expand Down Expand Up @@ -608,9 +605,7 @@ async fn state_management() {
output_mocks.lock().await.insert(instance_id.function_id, "existing_state".to_string());

// TODO(raphaelhetzel) InstanceId reuse leads to problems that need to be fixed.
spawn_req.instance_id = Some(fid2);

let res2 = client.start(spawn_req).await;
let res2 = client.start(instance_id_another, spawn_req).await;
assert!(res2.is_ok());

tokio::time::sleep(Duration::from_millis(100)).await;
Expand Down
1 change: 0 additions & 1 deletion edgeless_orc/src/orchestrator/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,6 @@ async fn clear_events(receivers: &mut std::collections::HashMap<uuid::Uuid, futu

fn make_spawn_function_request(class_id: &str) -> edgeless_api::function_instance::SpawnFunctionRequest {
edgeless_api::function_instance::SpawnFunctionRequest {
instance_id: None,
code: FunctionClassSpecification {
function_class_id: class_id.to_string(),
function_class_type: "RUST_WASM".to_string(),
Expand Down
1 change: 0 additions & 1 deletion edgeless_orc/src/proxy_redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,6 @@ mod test {
logical_physical_ids.last().unwrap().0,
crate::active_instance::ActiveInstance::Function(
SpawnFunctionRequest {
instance_id: None,
code: edgeless_api::function_instance::FunctionClassSpecification {
function_class_id: "fun".to_string(),
function_class_type: "class".to_string(),
Expand Down

0 comments on commit 1ba751d

Please sign in to comment.