Skip to content

Commit

Permalink
refacto(commons): RuntimeId uses a uuid
Browse files Browse the repository at this point in the history
In a distributed system, we need to be able to uniquely identify a runtime.
Relying on strings was not a viable option, hence the move to Uuid.

Signed-off-by: Julien Loudet <julien.loudet@zettascale.tech>
  • Loading branch information
J-Loudet committed Nov 1, 2023
1 parent 5215cc3 commit 04902ab
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 54 deletions.
25 changes: 9 additions & 16 deletions zenoh-flow-commons/src/identifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,33 +80,26 @@ impl From<&str> for PortId {
}
}

/// A `PortId` identifies an `Input` or an `Output` of a Node.
#[derive(Debug, Clone, Hash, PartialEq, Eq, Deserialize, Serialize)]
pub struct RuntimeId(Arc<str>);

impl Deref for RuntimeId {
type Target = Arc<str>;

fn deref(&self) -> &Self::Target {
&self.0
}
}
pub struct RuntimeId(Arc<Uuid>);

impl Display for RuntimeId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", &self.0)
}
}

impl From<String> for RuntimeId {
fn from(value: String) -> Self {
Self(value.into())
impl Deref for RuntimeId {
type Target = Uuid;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl From<&str> for RuntimeId {
fn from(value: &str) -> Self {
Self(value.into())
impl From<Uuid> for RuntimeId {
fn from(value: Uuid) -> Self {
Self(Arc::new(value))
}
}

Expand Down
5 changes: 4 additions & 1 deletion zenoh-flow-descriptors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# Contributors:
# ZettaScale Zenoh Team, <zenoh@zettascale.tech>
#

[package]
authors = { workspace = true }
categories = { workspace = true }
Expand All @@ -30,6 +31,8 @@ serde_json = { workspace = true }
serde_yaml = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }
zenoh-flow-commons = { workspace = true }
zenoh-keyexpr = { workspace = true }

[dev-dependencies]
uuid = { workspace = true }
4 changes: 2 additions & 2 deletions zenoh-flow-descriptors/src/dataflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use zenoh_flow_commons::{Configuration, NodeId, RuntimeId};
/// input : in-operator
///
/// mapping:
/// Source-0: zenoh-flow-plugin-0
/// Source-0: 10628aa2-66ca-4fda-8d5c-d7de63764bcc
/// "#;
///
/// let data_flow_yaml = serde_yaml::from_str::<DataFlowDescriptor>(yaml).unwrap();
Expand Down Expand Up @@ -131,7 +131,7 @@ use zenoh_flow_commons::{Configuration, NodeId, RuntimeId};
/// ],
///
/// "mapping": {
/// "Source-0": "zenoh-flow-plugin-0"
/// "Source-0": "10628aa2-66ca-4fda-8d5c-d7de63764bcc"
/// }
/// }
/// "#;
Expand Down
39 changes: 29 additions & 10 deletions zenoh-flow-descriptors/src/flattened/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ use crate::{
OutputDescriptor,
};
use serde_json::json;
use std::collections::HashMap;
use zenoh_flow_commons::Vars;
use std::{collections::HashMap, str::FromStr};
use uuid::Uuid;
use zenoh_flow_commons::{RuntimeId, Vars};

const BASE_DIR: &str = "./tests/descriptors";
const SCHEME: &str = "file://";
Expand Down Expand Up @@ -298,27 +299,45 @@ fn test_flatten_descriptor() {
});
assert_eq!(expected_links.len(), flatten.links.len());

let runtime_1: RuntimeId = Uuid::from_str("10628aa2-66ca-4fda-8d5c-d7de63764bcc")
.unwrap()
.into();
let runtime_2: RuntimeId = Uuid::from_str("5f7a170d-cfaf-4f7a-971e-6c3e63c50e1e")
.unwrap()
.into();
let runtime_source_composite: RuntimeId =
Uuid::from_str("e051658a-0cd6-4cef-8b08-0d0f17d3cc5d")
.unwrap()
.into();
let runtime_operator_composite: RuntimeId =
Uuid::from_str("6a16a7ba-ec2d-4bb8-b303-7683e5477900")
.unwrap()
.into();
let runtime_sink_composite: RuntimeId = Uuid::from_str("25270228-9cb1-4e6e-988c-00769622359f")
.unwrap()
.into();

let expected_mapping = HashMap::from([
("source-1".into(), "runtime-1".into()),
("sink-2".into(), "runtime-2".into()),
("source-composite".into(), "runtime-source".into()),
("source-1".into(), runtime_1),
("sink-2".into(), runtime_2),
("source-composite".into(), runtime_source_composite),
(
"operator-composite/sub-operator-1".into(),
"runtime-composite".into(),
runtime_operator_composite.clone(),
),
(
"operator-composite/sub-operator-composite/sub-sub-operator-1".into(),
"runtime-composite".into(),
runtime_operator_composite.clone(),
),
(
"operator-composite/sub-operator-composite/sub-sub-operator-2".into(),
"runtime-composite".into(),
runtime_operator_composite.clone(),
),
(
"operator-composite/sub-operator-2".into(),
"runtime-composite".into(),
runtime_operator_composite,
),
("sink-composite".into(), "runtime-sink".into()),
("sink-composite".into(), runtime_sink_composite),
]);

assert_eq!(expected_mapping, flatten.mapping);
Expand Down
10 changes: 5 additions & 5 deletions zenoh-flow-descriptors/tests/descriptors/data-flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ links:


mapping:
source-1: runtime-1
sink-2: runtime-2
source-composite: runtime-source
operator-composite: runtime-composite
sink-composite: runtime-sink
source-1: 10628aa2-66ca-4fda-8d5c-d7de63764bcc
sink-2: 5f7a170d-cfaf-4f7a-971e-6c3e63c50e1e
source-composite: e051658a-0cd6-4cef-8b08-0d0f17d3cc5d
operator-composite: 6a16a7ba-ec2d-4bb8-b303-7683e5477900
sink-composite: 25270228-9cb1-4e6e-988c-00769622359f
47 changes: 27 additions & 20 deletions zenoh-flow-records/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

use std::collections::HashMap;

use zenoh_flow_commons::{NodeId, Vars};
use uuid::Uuid;
use zenoh_flow_commons::{NodeId, RuntimeId, Vars};
use zenoh_flow_descriptors::{
DataFlowDescriptor, FlattenedDataFlowDescriptor, InputDescriptor, LinkDescriptor,
OutputDescriptor,
Expand Down Expand Up @@ -76,13 +77,14 @@ fn test_success_no_runtime() {
)
.unwrap();

let record = DataFlowRecord::try_new(flat_desc, &"default".into()).unwrap();
let default_runtime: RuntimeId = Uuid::new_v4().into();
let record = DataFlowRecord::try_new(flat_desc, &default_runtime).unwrap();

assert_eq!(
HashMap::from([
("source-0".into(), "default".into()),
("operator-1".into(), "default".into()),
("sink-2".into(), "default".into())
("source-0".into(), default_runtime.clone()),
("operator-1".into(), default_runtime.clone()),
("sink-2".into(), default_runtime)
]),
record.mapping
);
Expand All @@ -94,16 +96,17 @@ fn test_success_no_runtime() {

#[test]
fn test_success_same_runtime() {
let runtime: RuntimeId = Uuid::new_v4().into();
let desc = format!(
r#"
{}
mapping:
source-0: thing
operator-1: thing
sink-2: thing
source-0: {1}
operator-1: {1}
sink-2: {1}
"#,
BASE_FLOW
BASE_FLOW, runtime,
);

let flat_desc = FlattenedDataFlowDescriptor::try_flatten(
Expand All @@ -112,13 +115,13 @@ mapping:
)
.unwrap();

let record = DataFlowRecord::try_new(flat_desc, &"default".into()).unwrap();
let record = DataFlowRecord::try_new(flat_desc, &Uuid::new_v4().into()).unwrap();

assert_eq!(
HashMap::from([
("source-0".into(), "thing".into()),
("operator-1".into(), "thing".into()),
("sink-2".into(), "thing".into())
("source-0".into(), runtime.clone()),
("operator-1".into(), runtime.clone()),
("sink-2".into(), runtime)
]),
record.mapping
);
Expand All @@ -130,15 +133,19 @@ mapping:

#[test]
fn test_success_different_runtime() {
let runtime_thing: RuntimeId = Uuid::new_v4().into();
let runtime_edge: RuntimeId = Uuid::new_v4().into();
let default_runtime: RuntimeId = Uuid::new_v4().into();

let desc = format!(
r#"
{}
mapping:
source-0: thing
operator-1: edge
source-0: {}
operator-1: {}
"#,
BASE_FLOW
BASE_FLOW, runtime_thing, runtime_edge
);

let flat_desc = FlattenedDataFlowDescriptor::try_flatten(
Expand All @@ -147,13 +154,13 @@ mapping:
)
.unwrap();

let record = DataFlowRecord::try_new(flat_desc, &"default".into()).unwrap();
let record = DataFlowRecord::try_new(flat_desc, &default_runtime).unwrap();

assert_eq!(
HashMap::from([
("source-0".into(), "thing".into()),
("operator-1".into(), "edge".into()),
("sink-2".into(), "default".into())
("source-0".into(), runtime_thing),
("operator-1".into(), runtime_edge),
("sink-2".into(), default_runtime)
]),
record.mapping
);
Expand Down

0 comments on commit 04902ab

Please sign in to comment.