Skip to content

Commit

Permalink
refacto(commons): RuntimeId internally uses a ZenohId
Browse files Browse the repository at this point in the history
- ZenohId is required to identify on a Zenoh network,
- Tests were updated to reflect this change: we now use the Vars to transfer the
  `CARGO_MANIFEST_DIR` to the flows.

Signed-off-by: Julien Loudet <julien.loudet@zettascale.tech>
  • Loading branch information
J-Loudet committed Nov 27, 2023
1 parent db42fed commit 6abc057
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 79 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ url = { version = "2.2", features = ["serde"] }
uuid = { version = "1.1", features = ["serde", "v4"] }
zenoh = { version = "0.7.2-rc", features = ["shared-memory"] }
zenoh-collections = { version = "0.7.2-rc" }
zenoh-protocol = { version = "0.7.2-rc" }
zenoh-core = { version = "0.7.2-rc" }
zenoh-ext = { version = "0.7.2-rc" }
zenoh-flow-commons = { path = "./zenoh-flow-commons" }
Expand Down
1 change: 1 addition & 0 deletions zenoh-flow-commons/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
zenoh-keyexpr = { workspace = true }
zenoh-protocol = { workspace = true }
33 changes: 26 additions & 7 deletions zenoh-flow-commons/src/identifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
//

use crate::deserialize::deserialize_id;
use std::fmt::Display;
use std::ops::Deref;
use std::sync::Arc;
use std::{fmt::Display, str::FromStr};

use anyhow::anyhow;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use zenoh_protocol::core::ZenohId;

/// A `NodeId` identifies a Node in a data flow.
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone, Hash)]
Expand Down Expand Up @@ -80,8 +82,15 @@ impl From<&str> for PortId {
}
}

#[derive(Debug, Clone, Hash, PartialEq, Eq, Deserialize, Serialize)]
pub struct RuntimeId(Arc<Uuid>);
#[derive(Debug, Clone, Hash, PartialEq, Eq, Deserialize, Serialize, Default)]
#[repr(transparent)]
pub struct RuntimeId(ZenohId);

impl RuntimeId {
pub fn rand() -> Self {
Self(ZenohId::rand())
}
}

impl Display for RuntimeId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand All @@ -90,16 +99,26 @@ impl Display for RuntimeId {
}

impl Deref for RuntimeId {
type Target = Uuid;
type Target = ZenohId;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl From<Uuid> for RuntimeId {
fn from(value: Uuid) -> Self {
Self(Arc::new(value))
impl From<ZenohId> for RuntimeId {
fn from(value: ZenohId) -> Self {
Self(value)
}
}

impl FromStr for RuntimeId {
type Err = anyhow::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(ZenohId::from_str(s)
.map_err(|e| anyhow!("Failed to parse < {} > as a valid ZenohId:\n{:?}", s, e))?
.into())
}
}

Expand Down
34 changes: 8 additions & 26 deletions zenoh-flow-commons/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,41 +52,23 @@ pub fn try_load_from_file<N>(path: impl AsRef<OsStr>, vars: Vars) -> Result<(N,
where
N: for<'a> Deserialize<'a>,
{
let mut path_buf = PathBuf::new();

#[cfg(test)]
{
// When running the test on the CI we cannot know the path of the clone of Zenoh-Flow. By
// using relative paths (w.r.t. the manifest dir) in the tests and, only in tests, prepend
// the paths with this environment variable we obtain a correct absolute path.
path_buf.push(env!("CARGO_MANIFEST_DIR"));
path_buf.push(
path.as_ref()
.to_string_lossy()
.strip_prefix('/')
.expect("Failed to remove leading '/'"),
);
}

#[cfg(not(test))]
path_buf.push(path.as_ref());

let path = std::fs::canonicalize(&path_buf).context(format!(
let path_buf = std::fs::canonicalize(path.as_ref()).context(format!(
"Failed to canonicalize path (did you put an absolute path?):\n{}",
path_buf.display()
path.as_ref().to_string_lossy()
))?;

let mut buf = String::default();
std::fs::File::open(path.clone())
std::fs::File::open(&path_buf)
.context(format!("Failed to open file:\n{}", path_buf.display()))?
.read_to_string(&mut buf)
.context(format!(
"Failed to read the content of file:\n{}",
path_buf.display()
))?;

let merged_vars = vars
.merge_overwrite(deserializer::<Vars>(&path)?(&buf).context("Failed to deserialize Vars")?);
let merged_vars = vars.merge_overwrite(
deserializer::<Vars>(&path_buf)?(&buf).context("Failed to deserialize Vars")?,
);

let expanded_buf = ramhorns::Template::new(buf.as_str())
.context(format!(
Expand All @@ -96,8 +78,8 @@ where
.render(&*merged_vars);

Ok((
(deserializer::<N>(&path))?(&expanded_buf)
.context(format!("Failed to deserialize {}", &path.display()))?,
(deserializer::<N>(&path_buf))?(&expanded_buf)
.context(format!("Failed to deserialize {}", &path_buf.display()))?,
merged_vars,
))
}
6 changes: 3 additions & 3 deletions zenoh-flow-commons/src/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ impl IMergeOverwrite for Vars {
}
}

impl<const N: usize> From<[(&str, &str); N]> for Vars {
fn from(value: [(&str, &str); N]) -> Self {
impl<T: AsRef<str>, U: AsRef<str>, const N: usize> From<[(T, U); N]> for Vars {
fn from(value: [(T, U); N]) -> Self {
Self {
vars: Rc::new(
value
.into_iter()
.map(|(k, v)| (k.into(), v.into()))
.map(|(k, v)| (k.as_ref().into(), v.as_ref().into()))
.collect::<HashMap<Rc<str>, Rc<str>>>(),
),
}
Expand Down
53 changes: 28 additions & 25 deletions zenoh-flow-descriptors/src/flattened/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::{
};
use serde_json::json;
use url::Url;
use uuid::Uuid;
use zenoh_flow_commons::{NodeId, RuntimeId, Vars};

const BASE_DIR: &str = "./tests/descriptors";
Expand All @@ -40,11 +39,24 @@ const SCHEME: &str = "file://";
// See the comments around the "expected" structures for more information.
#[test]
fn test_flatten_descriptor() {
// env variable CARGO_MANIFEST_DIR puts us in zenoh-flow/zenoh-flow-descriptors
let uri = format!("file://{}/data-flow.yml", BASE_DIR);
let base_dir = format!("{}/{}", env!("CARGO_MANIFEST_DIR"), BASE_DIR);
let runtime_1 = RuntimeId::rand();
let runtime_2 = RuntimeId::rand();
let runtime_composite = RuntimeId::rand();

let (descriptor, vars) = try_load_descriptor::<DataFlowDescriptor>(&uri, Vars::default())
.expect("Failed to load DataFlowDescriptor");
let (descriptor, vars) = try_load_descriptor::<DataFlowDescriptor>(
&format!("file://{}/data-flow.yml", base_dir),
Vars::from([
("BASE_DIR", base_dir.as_str()),
("RUNTIME_1", format!("{}", runtime_1).as_str()),
("RUNTIME_2", format!("{}", runtime_2).as_str()),
(
"RUNTIME_COMPOSITE",
format!("{}", runtime_composite).as_str(),
),
]),
)
.expect("Failed to load DataFlowDescriptor");

let flatten = FlattenedDataFlowDescriptor::try_flatten(descriptor, vars).unwrap();

Expand Down Expand Up @@ -300,22 +312,10 @@ fn test_flatten_descriptor() {
assert_eq!(expected_links.len(), flatten.links.len());

let expected_mapping: HashMap<RuntimeId, HashSet<NodeId>> = HashMap::from([
(runtime_1, HashSet::from(["source-1".into()])),
(runtime_2, HashSet::from(["sink-2".into()])),
(
Uuid::parse_str("10628aa2-66ca-4fda-8d5c-d7de63764bcc")
.unwrap()
.into(),
HashSet::from(["source-1".into()]),
),
(
Uuid::parse_str("5f7a170d-cfaf-4f7a-971e-6c3e63c50e1e")
.unwrap()
.into(),
HashSet::from(["sink-2".into()]),
),
(
Uuid::parse_str("e051658a-0cd6-4cef-8b08-0d0f17d3cc5d")
.unwrap()
.into(),
runtime_composite,
HashSet::from([
"source-composite".into(),
"sink-composite".into(),
Expand All @@ -332,28 +332,31 @@ fn test_flatten_descriptor() {

#[test]
fn test_detect_recursion() {
let path = format!("{}{}/data-flow-recursion.yml", SCHEME, BASE_DIR,);
let base_dir = format!("{}/{}", env!("CARGO_MANIFEST_DIR"), BASE_DIR);
let path = format!("{}{}/data-flow-recursion.yml", SCHEME, base_dir);

let (descriptor, vars) = try_load_descriptor::<DataFlowDescriptor>(
&path,
Vars::from([("BASE_DIR", BASE_DIR), ("SCHEME", SCHEME)]),
Vars::from([("BASE_DIR", base_dir.as_str()), ("SCHEME", SCHEME)]),
)
.expect("Failed to parse descriptor");
assert!(FlattenedDataFlowDescriptor::try_flatten(descriptor, vars).is_err());
}

#[test]
fn test_duplicate_composite_at_same_level_not_detected_as_recursion() {
let base_dir = format!("{}/{}", env!("CARGO_MANIFEST_DIR"), BASE_DIR);
let path = format!(
"{}{}/data-flow-recursion-duplicate-composite.yml",
SCHEME, BASE_DIR,
"{}/{}/data-flow-recursion-duplicate-composite.yml",
SCHEME, base_dir,
);

let (descriptor, vars) = try_load_descriptor::<DataFlowDescriptor>(
&path,
Vars::from([("BASE_DIR", BASE_DIR), ("SCHEME", SCHEME)]),
Vars::from([("BASE_DIR", base_dir.as_str()), ("SCHEME", SCHEME)]),
)
.expect("Failed to parse descriptor");

assert!(FlattenedDataFlowDescriptor::try_flatten(descriptor, vars).is_ok());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
name: test-recursion-ok

vars:
SCHEME: file://
BASE_DIR: ./src/tests/descriptors
SCHEME:
BASE_DIR:

sources:
- id: source-composite
Expand Down
8 changes: 4 additions & 4 deletions zenoh-flow-descriptors/tests/descriptors/data-flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ name: test

vars:
SCHEME: file://
BASE_DIR: ./tests/descriptors
RUNTIME_1: 10628aa2-66ca-4fda-8d5c-d7de63764bcc
RUNTIME_2: 5f7a170d-cfaf-4f7a-971e-6c3e63c50e1e
RUNTIME_COMPOSITE: e051658a-0cd6-4cef-8b08-0d0f17d3cc5d
BASE_DIR:
RUNTIME_1:
RUNTIME_2:
RUNTIME_COMPOSITE:

configuration:
foo: "global-outer"
Expand Down
15 changes: 7 additions & 8 deletions zenoh-flow-records/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use crate::{
dataflow::{RECEIVER_SUFFIX, SENDER_SUFFIX},
DataFlowRecord, ReceiverRecord, SenderRecord,
};
use uuid::Uuid;
use zenoh_flow_commons::{NodeId, RuntimeId, Vars};
use zenoh_flow_descriptors::{
DataFlowDescriptor, FlattenedDataFlowDescriptor, InputDescriptor, LinkDescriptor,
Expand Down Expand Up @@ -74,7 +73,7 @@ links:
)
.unwrap();

let default_runtime: RuntimeId = Uuid::new_v4().into();
let default_runtime = RuntimeId::rand();
let record = DataFlowRecord::try_new(flat_desc, &default_runtime).unwrap();

assert!(record.receivers.is_empty());
Expand All @@ -84,7 +83,7 @@ links:

#[test]
fn test_success_same_runtime() {
let runtime: RuntimeId = Uuid::new_v4().into();
let runtime = RuntimeId::rand();
let flow = format!(
r#"
name: base test flow
Expand Down Expand Up @@ -142,7 +141,7 @@ mapping:
)
.unwrap();

let record = DataFlowRecord::try_new(flat_desc, &Uuid::new_v4().into()).unwrap();
let record = DataFlowRecord::try_new(flat_desc, &RuntimeId::rand()).unwrap();

assert!(record.receivers.is_empty());
assert!(record.senders.is_empty());
Expand All @@ -151,9 +150,9 @@ mapping:

#[test]
fn test_success_different_runtime() {
let runtime_thing: RuntimeId = Uuid::new_v4().into();
let runtime_edge: RuntimeId = Uuid::new_v4().into();
let default_runtime: RuntimeId = Uuid::new_v4().into();
let runtime_thing = RuntimeId::rand();
let runtime_edge = RuntimeId::rand();
let default_runtime = RuntimeId::rand();

let desc = format!(
r#"
Expand Down Expand Up @@ -364,7 +363,7 @@ links:
)
.unwrap();

let default_runtime: RuntimeId = Uuid::new_v4().into();
let default_runtime = RuntimeId::rand();
let record = DataFlowRecord::try_new(flat_desc, &default_runtime).unwrap();

let _string = serde_yaml::to_string(&record).expect("Failed to serialize to yaml");
Expand Down
4 changes: 2 additions & 2 deletions zenoh-flow-runtime/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use crate::{
};
use anyhow::{bail, Context as errContext};
use uhlc::HLC;
use uuid::Uuid;
#[cfg(feature = "zenoh")]
use zenoh::Session;
#[cfg(feature = "shared-memory")]
Expand Down Expand Up @@ -58,13 +57,14 @@ impl Runtime {

/// TODO@J-Loudet
pub fn new(
id: RuntimeId,
loader: Loader,
hlc: Arc<HLC>,
#[cfg(feature = "zenoh")] session: Arc<Session>,
#[cfg(feature = "shared-memory")] shared_memory: SharedMemoryConfiguration,
) -> Self {
Self {
runtime_id: Uuid::new_v4().into(),
runtime_id: id,
#[cfg(feature = "zenoh")]
session,
hlc,
Expand Down
4 changes: 2 additions & 2 deletions zenoh-flow-standalone-runtime/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use anyhow::Context;
use async_std::io::ReadExt;
use clap::Parser;
use std::{path::PathBuf, sync::Arc};
use zenoh_flow_commons::Vars;
use zenoh_flow_commons::{RuntimeId, Vars};
use zenoh_flow_descriptors::{DataFlowDescriptor, FlattenedDataFlowDescriptor};
use zenoh_flow_records::DataFlowRecord;
use zenoh_flow_runtime::{
Expand Down Expand Up @@ -76,7 +76,7 @@ async fn main() {
let hlc = Arc::new(uhlc::HLC::default());

let session = zenoh::open(zenoh::peer()).res().await.unwrap().into_arc();
let mut runtime = Runtime::new(loader, hlc, session);
let mut runtime = Runtime::new(RuntimeId::rand(), loader, hlc, session);

let record = DataFlowRecord::try_new(flattened_flow, runtime.id())
.context("Failed to create a Record from the flattened data flow descriptor")
Expand Down

0 comments on commit 6abc057

Please sign in to comment.