From 6abc057dfc2e7108ace73fc3b1529f720d286889 Mon Sep 17 00:00:00 2001 From: Julien Loudet Date: Mon, 27 Nov 2023 14:53:03 +0100 Subject: [PATCH] refacto(commons): RuntimeId internally uses a ZenohId - ZenohId is required to identify on a Zenoh network, - Tests were updated to reflect this change: we now use the Vars to transfer the `CARGO_MANIFEST_DIR` to the flows. Signed-off-by: Julien Loudet --- Cargo.toml | 1 + zenoh-flow-commons/Cargo.toml | 1 + zenoh-flow-commons/src/identifiers.rs | 33 +++++++++--- zenoh-flow-commons/src/utils.rs | 34 +++--------- zenoh-flow-commons/src/vars.rs | 6 +-- zenoh-flow-descriptors/src/flattened/tests.rs | 53 ++++++++++--------- ...ata-flow-recursion-duplicate-composite.yml | 4 +- .../tests/descriptors/data-flow.yml | 8 +-- zenoh-flow-records/src/tests.rs | 15 +++--- zenoh-flow-runtime/src/runtime.rs | 4 +- zenoh-flow-standalone-runtime/src/main.rs | 4 +- 11 files changed, 84 insertions(+), 79 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bdcb2103..b333366a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,7 @@ url = { version = "2.2", features = ["serde"] } uuid = { version = "1.1", features = ["serde", "v4"] } zenoh = { version = "0.7.2-rc", features = ["shared-memory"] } zenoh-collections = { version = "0.7.2-rc" } +zenoh-protocol = { version = "0.7.2-rc" } zenoh-core = { version = "0.7.2-rc" } zenoh-ext = { version = "0.7.2-rc" } zenoh-flow-commons = { path = "./zenoh-flow-commons" } diff --git a/zenoh-flow-commons/Cargo.toml b/zenoh-flow-commons/Cargo.toml index b87fa53b..31833a8f 100644 --- a/zenoh-flow-commons/Cargo.toml +++ b/zenoh-flow-commons/Cargo.toml @@ -33,3 +33,4 @@ serde = { workspace = true } serde_json = { workspace = true } serde_yaml = { workspace = true } zenoh-keyexpr = { workspace = true } +zenoh-protocol = { workspace = true } diff --git a/zenoh-flow-commons/src/identifiers.rs b/zenoh-flow-commons/src/identifiers.rs index 43adfa69..8ec76b0f 100644 --- a/zenoh-flow-commons/src/identifiers.rs +++ b/zenoh-flow-commons/src/identifiers.rs @@ -13,12 +13,14 @@ // use crate::deserialize::deserialize_id; -use std::fmt::Display; use std::ops::Deref; use std::sync::Arc; +use std::{fmt::Display, str::FromStr}; +use anyhow::anyhow; use serde::{Deserialize, Serialize}; use uuid::Uuid; +use zenoh_protocol::core::ZenohId; /// A `NodeId` identifies a Node in a data flow. #[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone, Hash)] @@ -80,8 +82,15 @@ impl From<&str> for PortId { } } -#[derive(Debug, Clone, Hash, PartialEq, Eq, Deserialize, Serialize)] -pub struct RuntimeId(Arc); +#[derive(Debug, Clone, Hash, PartialEq, Eq, Deserialize, Serialize, Default)] +#[repr(transparent)] +pub struct RuntimeId(ZenohId); + +impl RuntimeId { + pub fn rand() -> Self { + Self(ZenohId::rand()) + } +} impl Display for RuntimeId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -90,16 +99,26 @@ impl Display for RuntimeId { } impl Deref for RuntimeId { - type Target = Uuid; + type Target = ZenohId; fn deref(&self) -> &Self::Target { &self.0 } } -impl From for RuntimeId { - fn from(value: Uuid) -> Self { - Self(Arc::new(value)) +impl From for RuntimeId { + fn from(value: ZenohId) -> Self { + Self(value) + } +} + +impl FromStr for RuntimeId { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + Ok(ZenohId::from_str(s) + .map_err(|e| anyhow!("Failed to parse < {} > as a valid ZenohId:\n{:?}", s, e))? + .into()) } } diff --git a/zenoh-flow-commons/src/utils.rs b/zenoh-flow-commons/src/utils.rs index d2086cee..3ec3aeab 100644 --- a/zenoh-flow-commons/src/utils.rs +++ b/zenoh-flow-commons/src/utils.rs @@ -52,32 +52,13 @@ pub fn try_load_from_file(path: impl AsRef, vars: Vars) -> Result<(N, where N: for<'a> Deserialize<'a>, { - let mut path_buf = PathBuf::new(); - - #[cfg(test)] - { - // When running the test on the CI we cannot know the path of the clone of Zenoh-Flow. By - // using relative paths (w.r.t. the manifest dir) in the tests and, only in tests, prepend - // the paths with this environment variable we obtain a correct absolute path. - path_buf.push(env!("CARGO_MANIFEST_DIR")); - path_buf.push( - path.as_ref() - .to_string_lossy() - .strip_prefix('/') - .expect("Failed to remove leading '/'"), - ); - } - - #[cfg(not(test))] - path_buf.push(path.as_ref()); - - let path = std::fs::canonicalize(&path_buf).context(format!( + let path_buf = std::fs::canonicalize(path.as_ref()).context(format!( "Failed to canonicalize path (did you put an absolute path?):\n{}", - path_buf.display() + path.as_ref().to_string_lossy() ))?; let mut buf = String::default(); - std::fs::File::open(path.clone()) + std::fs::File::open(&path_buf) .context(format!("Failed to open file:\n{}", path_buf.display()))? .read_to_string(&mut buf) .context(format!( @@ -85,8 +66,9 @@ where path_buf.display() ))?; - let merged_vars = vars - .merge_overwrite(deserializer::(&path)?(&buf).context("Failed to deserialize Vars")?); + let merged_vars = vars.merge_overwrite( + deserializer::(&path_buf)?(&buf).context("Failed to deserialize Vars")?, + ); let expanded_buf = ramhorns::Template::new(buf.as_str()) .context(format!( @@ -96,8 +78,8 @@ where .render(&*merged_vars); Ok(( - (deserializer::(&path))?(&expanded_buf) - .context(format!("Failed to deserialize {}", &path.display()))?, + (deserializer::(&path_buf))?(&expanded_buf) + .context(format!("Failed to deserialize {}", &path_buf.display()))?, merged_vars, )) } diff --git a/zenoh-flow-commons/src/vars.rs b/zenoh-flow-commons/src/vars.rs index f3a1b314..9a6d3c30 100644 --- a/zenoh-flow-commons/src/vars.rs +++ b/zenoh-flow-commons/src/vars.rs @@ -53,13 +53,13 @@ impl IMergeOverwrite for Vars { } } -impl From<[(&str, &str); N]> for Vars { - fn from(value: [(&str, &str); N]) -> Self { +impl, U: AsRef, const N: usize> From<[(T, U); N]> for Vars { + fn from(value: [(T, U); N]) -> Self { Self { vars: Rc::new( value .into_iter() - .map(|(k, v)| (k.into(), v.into())) + .map(|(k, v)| (k.as_ref().into(), v.as_ref().into())) .collect::, Rc>>(), ), } diff --git a/zenoh-flow-descriptors/src/flattened/tests.rs b/zenoh-flow-descriptors/src/flattened/tests.rs index 11a11dcb..00876c18 100644 --- a/zenoh-flow-descriptors/src/flattened/tests.rs +++ b/zenoh-flow-descriptors/src/flattened/tests.rs @@ -23,7 +23,6 @@ use crate::{ }; use serde_json::json; use url::Url; -use uuid::Uuid; use zenoh_flow_commons::{NodeId, RuntimeId, Vars}; const BASE_DIR: &str = "./tests/descriptors"; @@ -40,11 +39,24 @@ const SCHEME: &str = "file://"; // See the comments around the "expected" structures for more information. #[test] fn test_flatten_descriptor() { - // env variable CARGO_MANIFEST_DIR puts us in zenoh-flow/zenoh-flow-descriptors - let uri = format!("file://{}/data-flow.yml", BASE_DIR); + let base_dir = format!("{}/{}", env!("CARGO_MANIFEST_DIR"), BASE_DIR); + let runtime_1 = RuntimeId::rand(); + let runtime_2 = RuntimeId::rand(); + let runtime_composite = RuntimeId::rand(); - let (descriptor, vars) = try_load_descriptor::(&uri, Vars::default()) - .expect("Failed to load DataFlowDescriptor"); + let (descriptor, vars) = try_load_descriptor::( + &format!("file://{}/data-flow.yml", base_dir), + Vars::from([ + ("BASE_DIR", base_dir.as_str()), + ("RUNTIME_1", format!("{}", runtime_1).as_str()), + ("RUNTIME_2", format!("{}", runtime_2).as_str()), + ( + "RUNTIME_COMPOSITE", + format!("{}", runtime_composite).as_str(), + ), + ]), + ) + .expect("Failed to load DataFlowDescriptor"); let flatten = FlattenedDataFlowDescriptor::try_flatten(descriptor, vars).unwrap(); @@ -300,22 +312,10 @@ fn test_flatten_descriptor() { assert_eq!(expected_links.len(), flatten.links.len()); let expected_mapping: HashMap> = HashMap::from([ + (runtime_1, HashSet::from(["source-1".into()])), + (runtime_2, HashSet::from(["sink-2".into()])), ( - Uuid::parse_str("10628aa2-66ca-4fda-8d5c-d7de63764bcc") - .unwrap() - .into(), - HashSet::from(["source-1".into()]), - ), - ( - Uuid::parse_str("5f7a170d-cfaf-4f7a-971e-6c3e63c50e1e") - .unwrap() - .into(), - HashSet::from(["sink-2".into()]), - ), - ( - Uuid::parse_str("e051658a-0cd6-4cef-8b08-0d0f17d3cc5d") - .unwrap() - .into(), + runtime_composite, HashSet::from([ "source-composite".into(), "sink-composite".into(), @@ -332,11 +332,12 @@ fn test_flatten_descriptor() { #[test] fn test_detect_recursion() { - let path = format!("{}{}/data-flow-recursion.yml", SCHEME, BASE_DIR,); + let base_dir = format!("{}/{}", env!("CARGO_MANIFEST_DIR"), BASE_DIR); + let path = format!("{}{}/data-flow-recursion.yml", SCHEME, base_dir); let (descriptor, vars) = try_load_descriptor::( &path, - Vars::from([("BASE_DIR", BASE_DIR), ("SCHEME", SCHEME)]), + Vars::from([("BASE_DIR", base_dir.as_str()), ("SCHEME", SCHEME)]), ) .expect("Failed to parse descriptor"); assert!(FlattenedDataFlowDescriptor::try_flatten(descriptor, vars).is_err()); @@ -344,16 +345,18 @@ fn test_detect_recursion() { #[test] fn test_duplicate_composite_at_same_level_not_detected_as_recursion() { + let base_dir = format!("{}/{}", env!("CARGO_MANIFEST_DIR"), BASE_DIR); let path = format!( - "{}{}/data-flow-recursion-duplicate-composite.yml", - SCHEME, BASE_DIR, + "{}/{}/data-flow-recursion-duplicate-composite.yml", + SCHEME, base_dir, ); let (descriptor, vars) = try_load_descriptor::( &path, - Vars::from([("BASE_DIR", BASE_DIR), ("SCHEME", SCHEME)]), + Vars::from([("BASE_DIR", base_dir.as_str()), ("SCHEME", SCHEME)]), ) .expect("Failed to parse descriptor"); + assert!(FlattenedDataFlowDescriptor::try_flatten(descriptor, vars).is_ok()); } diff --git a/zenoh-flow-descriptors/tests/descriptors/data-flow-recursion-duplicate-composite.yml b/zenoh-flow-descriptors/tests/descriptors/data-flow-recursion-duplicate-composite.yml index 4de8decd..ff1b3e8b 100644 --- a/zenoh-flow-descriptors/tests/descriptors/data-flow-recursion-duplicate-composite.yml +++ b/zenoh-flow-descriptors/tests/descriptors/data-flow-recursion-duplicate-composite.yml @@ -1,8 +1,8 @@ name: test-recursion-ok vars: - SCHEME: file:// - BASE_DIR: ./src/tests/descriptors + SCHEME: + BASE_DIR: sources: - id: source-composite diff --git a/zenoh-flow-descriptors/tests/descriptors/data-flow.yml b/zenoh-flow-descriptors/tests/descriptors/data-flow.yml index 354f5f2e..109a11b2 100644 --- a/zenoh-flow-descriptors/tests/descriptors/data-flow.yml +++ b/zenoh-flow-descriptors/tests/descriptors/data-flow.yml @@ -2,10 +2,10 @@ name: test vars: SCHEME: file:// - BASE_DIR: ./tests/descriptors - RUNTIME_1: 10628aa2-66ca-4fda-8d5c-d7de63764bcc - RUNTIME_2: 5f7a170d-cfaf-4f7a-971e-6c3e63c50e1e - RUNTIME_COMPOSITE: e051658a-0cd6-4cef-8b08-0d0f17d3cc5d + BASE_DIR: + RUNTIME_1: + RUNTIME_2: + RUNTIME_COMPOSITE: configuration: foo: "global-outer" diff --git a/zenoh-flow-records/src/tests.rs b/zenoh-flow-records/src/tests.rs index 82313eff..f548941c 100644 --- a/zenoh-flow-records/src/tests.rs +++ b/zenoh-flow-records/src/tests.rs @@ -16,7 +16,6 @@ use crate::{ dataflow::{RECEIVER_SUFFIX, SENDER_SUFFIX}, DataFlowRecord, ReceiverRecord, SenderRecord, }; -use uuid::Uuid; use zenoh_flow_commons::{NodeId, RuntimeId, Vars}; use zenoh_flow_descriptors::{ DataFlowDescriptor, FlattenedDataFlowDescriptor, InputDescriptor, LinkDescriptor, @@ -74,7 +73,7 @@ links: ) .unwrap(); - let default_runtime: RuntimeId = Uuid::new_v4().into(); + let default_runtime = RuntimeId::rand(); let record = DataFlowRecord::try_new(flat_desc, &default_runtime).unwrap(); assert!(record.receivers.is_empty()); @@ -84,7 +83,7 @@ links: #[test] fn test_success_same_runtime() { - let runtime: RuntimeId = Uuid::new_v4().into(); + let runtime = RuntimeId::rand(); let flow = format!( r#" name: base test flow @@ -142,7 +141,7 @@ mapping: ) .unwrap(); - let record = DataFlowRecord::try_new(flat_desc, &Uuid::new_v4().into()).unwrap(); + let record = DataFlowRecord::try_new(flat_desc, &RuntimeId::rand()).unwrap(); assert!(record.receivers.is_empty()); assert!(record.senders.is_empty()); @@ -151,9 +150,9 @@ mapping: #[test] fn test_success_different_runtime() { - let runtime_thing: RuntimeId = Uuid::new_v4().into(); - let runtime_edge: RuntimeId = Uuid::new_v4().into(); - let default_runtime: RuntimeId = Uuid::new_v4().into(); + let runtime_thing = RuntimeId::rand(); + let runtime_edge = RuntimeId::rand(); + let default_runtime = RuntimeId::rand(); let desc = format!( r#" @@ -364,7 +363,7 @@ links: ) .unwrap(); - let default_runtime: RuntimeId = Uuid::new_v4().into(); + let default_runtime = RuntimeId::rand(); let record = DataFlowRecord::try_new(flat_desc, &default_runtime).unwrap(); let _string = serde_yaml::to_string(&record).expect("Failed to serialize to yaml"); diff --git a/zenoh-flow-runtime/src/runtime.rs b/zenoh-flow-runtime/src/runtime.rs index 8413afad..25e1e141 100644 --- a/zenoh-flow-runtime/src/runtime.rs +++ b/zenoh-flow-runtime/src/runtime.rs @@ -25,7 +25,6 @@ use crate::{ }; use anyhow::{bail, Context as errContext}; use uhlc::HLC; -use uuid::Uuid; #[cfg(feature = "zenoh")] use zenoh::Session; #[cfg(feature = "shared-memory")] @@ -58,13 +57,14 @@ impl Runtime { /// TODO@J-Loudet pub fn new( + id: RuntimeId, loader: Loader, hlc: Arc, #[cfg(feature = "zenoh")] session: Arc, #[cfg(feature = "shared-memory")] shared_memory: SharedMemoryConfiguration, ) -> Self { Self { - runtime_id: Uuid::new_v4().into(), + runtime_id: id, #[cfg(feature = "zenoh")] session, hlc, diff --git a/zenoh-flow-standalone-runtime/src/main.rs b/zenoh-flow-standalone-runtime/src/main.rs index 1808f310..512bae73 100644 --- a/zenoh-flow-standalone-runtime/src/main.rs +++ b/zenoh-flow-standalone-runtime/src/main.rs @@ -16,7 +16,7 @@ use anyhow::Context; use async_std::io::ReadExt; use clap::Parser; use std::{path::PathBuf, sync::Arc}; -use zenoh_flow_commons::Vars; +use zenoh_flow_commons::{RuntimeId, Vars}; use zenoh_flow_descriptors::{DataFlowDescriptor, FlattenedDataFlowDescriptor}; use zenoh_flow_records::DataFlowRecord; use zenoh_flow_runtime::{ @@ -76,7 +76,7 @@ async fn main() { let hlc = Arc::new(uhlc::HLC::default()); let session = zenoh::open(zenoh::peer()).res().await.unwrap().into_arc(); - let mut runtime = Runtime::new(loader, hlc, session); + let mut runtime = Runtime::new(RuntimeId::rand(), loader, hlc, session); let record = DataFlowRecord::try_new(flattened_flow, runtime.id()) .context("Failed to create a Record from the flattened data flow descriptor")