Skip to content

Commit

Permalink
feat(runtime): (WIP) independent runtime crate
Browse files Browse the repository at this point in the history
The `zenoh-flow-runtime` crate holds the logic to manage a data flow, i.e. (i)
to dynamically load the shared libraries, (ii) call the constructor of each
node and (iii) start/stop each node.

The built-in Zenoh Source & Sink are also implemented here.

Signed-off-by: Julien Loudet <julien.loudet@zettascale.tech>
  • Loading branch information
J-Loudet committed Nov 8, 2023
1 parent f272e84 commit 6421e23
Show file tree
Hide file tree
Showing 16 changed files with 1,789 additions and 2 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ members = [
"zenoh-flow-nodes",
"zenoh-flow-plugin",
"zenoh-flow-records",
"zenoh-flow-runtime",
"zfctl",
]

Expand Down Expand Up @@ -56,12 +57,13 @@ tracing = { version = "0.1", features = ["log"] }
uhlc = "0.6"
url = "2.2"
uuid = { version = "1.1", features = ["serde", "v4"] }
zenoh = { version = "0.7.2-rc" }
zenoh = { version = "0.7.2-rc", features = ["shared-memory"] }
zenoh-collections = { version = "0.7.2-rc" }
zenoh-core = { version = "0.7.2-rc" }
zenoh-ext = { version = "0.7.2-rc" }
zenoh-flow-commons = { path = "./zenoh-flow-commons" }
zenoh-flow-descriptors = { path = "./zenoh-flow-descriptors" }
zenoh-flow-nodes = { path = "./zenoh-flow-nodes" }
zenoh-flow-records = { path = "./zenoh-flow-records" }
zenoh-keyexpr = { version = "0.7.2-rc" }
zenoh-plugin-trait = { version = "0.7.2-rc", default-features = false }
Expand Down
42 changes: 41 additions & 1 deletion zenoh-flow-records/src/dataflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use crate::{
};
use anyhow::anyhow;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use zenoh_flow_commons::{NodeId, RecordId, Result, RuntimeId};
use zenoh_flow_descriptors::{
FlattenedDataFlowDescriptor, InputDescriptor, LinkDescriptor, OutputDescriptor,
Expand Down Expand Up @@ -208,6 +211,43 @@ Caused by:
pub fn name(&self) -> &Arc<str> {
&self.name
}

/// Returns the list of nodes that are set to run on the specified runtime.
pub fn get_nodes_for_runtime(&self, runtime: &RuntimeId) -> HashSet<&NodeId> {
let mut nodes: HashSet<&NodeId> = HashSet::default();
nodes.extend(
self.sources
.values()
.filter(|source| source.runtime() == runtime)
.map(|op| &op.id),
);
nodes.extend(
self.operators
.values()
.filter(|operator| operator.runtime() == runtime)
.map(|op| &op.id),
);
nodes.extend(
self.sinks
.values()
.filter(|sink| sink.runtime() == runtime)
.map(|sink| &sink.id),
);
nodes.extend(
self.senders
.values()
.filter(|sender| sender.runtime() == runtime)
.map(|sender| &sender.id),
);
nodes.extend(
self.receivers
.values()
.filter(|receiver| receiver.runtime() == runtime)
.map(|receiver| &receiver.id),
);

nodes
}
}

#[cfg(test)]
Expand Down
44 changes: 44 additions & 0 deletions zenoh-flow-runtime/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Copyright (c) 2021 - 2023 ZettaScale Technology
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
#
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
#
# Contributors:
# ZettaScale Zenoh Team, <zenoh@zettascale.tech>
#

[package]
authors = { workspace = true }
categories = { workspace = true }
description = "Internal crate for Zenoh-Flow."
edition = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
name = "zenoh-flow-runtime"
repository = { workspace = true }
version = { workspace = true }


[dependencies]
anyhow = { workspace = true }
async-std = { workspace = true }
async-trait = { workspace = true }
bincode = { version = "1.3" }
flume = { workspace = true }
futures = { workspace = true }
libloading = "0.8"
serde = { workspace = true }
tracing = { workspace = true }
uhlc = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }
zenoh = { workspace = true }
zenoh-flow-commons = { workspace = true }
zenoh-flow-descriptors = { workspace = true }
zenoh-flow-nodes = { workspace = true }
zenoh-flow-records = { workspace = true }
88 changes: 88 additions & 0 deletions zenoh-flow-runtime/src/instance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//
// Copyright (c) 2021 - 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

use crate::runners::Runner;
use std::{
collections::{HashMap, HashSet},
ops::Deref,
};
use zenoh_flow_commons::NodeId;
use zenoh_flow_records::DataFlowRecord;

pub struct DataFlowInstance {
pub(crate) record: DataFlowRecord,
pub(crate) runners: HashMap<NodeId, Runner>,
}

impl Deref for DataFlowInstance {
type Target = DataFlowRecord;

fn deref(&self) -> &Self::Target {
&self.record
}
}

impl DataFlowInstance {
pub fn start_sources(&mut self) {
self.start_nodes(self.sources.keys().cloned().collect());
}

pub async fn stop_sources(&mut self) {
self.stop_nodes(self.sources.keys().cloned().collect())
.await;
}

pub fn start_operators(&mut self) {
self.start_nodes(self.receivers.keys().cloned().collect());
self.start_nodes(self.operators.keys().cloned().collect());
self.start_nodes(self.senders.keys().cloned().collect());
}

pub async fn stop_operators(&mut self) {
self.stop_nodes(self.senders.keys().cloned().collect())
.await;
self.stop_nodes(self.operators.keys().cloned().collect())
.await;
self.stop_nodes(self.receivers.keys().cloned().collect())
.await;
}

pub fn start_sinks(&mut self) {
self.start_nodes(self.sinks.keys().cloned().collect());
}

pub async fn stop_sinks(&mut self) {
self.stop_nodes(self.sinks.keys().cloned().collect()).await;
}

fn start_nodes(&mut self, nodes: HashSet<NodeId>) {
for (_, runner) in self
.runners
.iter_mut()
.filter(|(runner_id, _)| nodes.contains(runner_id))
{
runner.start()
}
}

async fn stop_nodes(&mut self, nodes: HashSet<NodeId>) {
for (_, runner) in self
.runners
.iter_mut()
.filter(|(runner_id, _)| nodes.contains(runner_id))
{
runner.stop().await
}
}
}
24 changes: 24 additions & 0 deletions zenoh-flow-runtime/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//
// Copyright (c) 2021 - 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

mod instance;

mod loader;

mod shared_memory;

mod runners;

mod runtime;
pub use runtime::Runtime;
137 changes: 137 additions & 0 deletions zenoh-flow-runtime/src/loader/configuration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
//
// Copyright (c) 2021 - 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

use std::path::PathBuf;

use anyhow::bail;
use serde::{Deserialize, Serialize};
use zenoh_flow_commons::Result;

use super::NodeSymbol;

/// Extensible support for different implementations
/// This represents the configuration for an extension.
///
///
/// Example:
///
/// ```yaml
/// name: python
/// file_extension: py
/// source_lib: ./target/release/libpy_source.so
/// sink_lib: ./target/release/libpy_sink.so
/// operator_lib: ./target/release/libpy_op.so
/// config_lib_key: python-script
/// ```
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExtensibleImplementation {
pub(crate) name: String,
pub(crate) file_extension: String,
pub(crate) source_lib: PathBuf,
pub(crate) sink_lib: PathBuf,
pub(crate) operator_lib: PathBuf,
pub(crate) config_lib_key: String,
}

/// Loader configuration files, it includes the extensions.
///
/// Example:
///
/// ```yaml
/// extensions:
/// - name: python
/// file_extension: py
/// source_lib: ./target/release/libpy_source.so
/// sink_lib: ./target/release/libpy_sink.so
/// operator_lib: ./target/release/libpy_op.so
/// config_lib_key: python-script
/// ```
///
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LoaderConfig {
// NOTE This has to be a vector as we are reading it from a file.
extensions: Vec<ExtensibleImplementation>,
}

impl LoaderConfig {
/// Creates an empty `LoaderConfig`.
pub fn new() -> Self {
Self { extensions: vec![] }
}

/// Adds the given extension.
///
/// # Errors
/// It returns an error variant if the extension is already present.
pub fn try_add_extension(&mut self, ext: ExtensibleImplementation) -> Result<()> {
if self
.extensions
.iter()
.any(|e| e.file_extension == ext.file_extension)
{
bail!(
"Extension < {} > already has an associated configuration",
ext.file_extension
)
}
self.extensions.push(ext);
Ok(())
}

/// Removes the given extension.
pub fn remove_extension(&mut self, name: &str) -> Option<ExtensibleImplementation> {
if let Some(index) = self.extensions.iter().position(|e| e.name == name) {
let ext = self.extensions.remove(index);
return Some(ext);
}
None
}

/// Gets the extension that matches the given `file_extension`.
pub fn get_extension_by_file_extension(
&self,
file_extension: &str,
) -> Option<&ExtensibleImplementation> {
self.extensions
.iter()
.find(|e| e.file_extension == file_extension)
}

pub(crate) fn get_library_path(
&self,
file_extension: &str,
symbol: &NodeSymbol,
) -> Option<&PathBuf> {
self.get_extension_by_file_extension(file_extension)
.map(|extension| match symbol {
NodeSymbol::Source => &extension.source_lib,
NodeSymbol::Operator => &extension.operator_lib,
NodeSymbol::Sink => &extension.sink_lib,
})
}

/// Gets the extension that matches the given `name`.
pub fn get_extension_by_name(&self, name: &str) -> Option<&ExtensibleImplementation> {
if let Some(ext) = self.extensions.iter().find(|e| e.name == name) {
return Some(ext);
}
None
}
}

impl Default for LoaderConfig {
fn default() -> Self {
Self::new()
}
}
Loading

0 comments on commit 6421e23

Please sign in to comment.