diff --git a/Cargo.toml b/Cargo.toml index d0b2e16c..df350c56 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "zenoh-flow-nodes", "zenoh-flow-plugin", "zenoh-flow-records", + "zenoh-flow-runtime", "zfctl", ] @@ -56,12 +57,13 @@ tracing = { version = "0.1", features = ["log"] } uhlc = "0.6" url = "2.2" uuid = { version = "1.1", features = ["serde", "v4"] } -zenoh = { version = "0.7.2-rc" } +zenoh = { version = "0.7.2-rc", features = ["shared-memory"] } zenoh-collections = { version = "0.7.2-rc" } zenoh-core = { version = "0.7.2-rc" } zenoh-ext = { version = "0.7.2-rc" } zenoh-flow-commons = { path = "./zenoh-flow-commons" } zenoh-flow-descriptors = { path = "./zenoh-flow-descriptors" } +zenoh-flow-nodes = { path = "./zenoh-flow-nodes" } zenoh-flow-records = { path = "./zenoh-flow-records" } zenoh-keyexpr = { version = "0.7.2-rc" } zenoh-plugin-trait = { version = "0.7.2-rc", default-features = false } diff --git a/zenoh-flow-records/src/dataflow.rs b/zenoh-flow-records/src/dataflow.rs index 9c889236..fc2cba2d 100644 --- a/zenoh-flow-records/src/dataflow.rs +++ b/zenoh-flow-records/src/dataflow.rs @@ -18,7 +18,10 @@ use crate::{ }; use anyhow::anyhow; use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use zenoh_flow_commons::{NodeId, RecordId, Result, RuntimeId}; use zenoh_flow_descriptors::{ FlattenedDataFlowDescriptor, InputDescriptor, LinkDescriptor, OutputDescriptor, @@ -208,6 +211,43 @@ Caused by: pub fn name(&self) -> &Arc { &self.name } + + /// Returns the list of nodes that are set to run on the specified runtime. + pub fn get_nodes_for_runtime(&self, runtime: &RuntimeId) -> HashSet<&NodeId> { + let mut nodes: HashSet<&NodeId> = HashSet::default(); + nodes.extend( + self.sources + .values() + .filter(|source| source.runtime() == runtime) + .map(|op| &op.id), + ); + nodes.extend( + self.operators + .values() + .filter(|operator| operator.runtime() == runtime) + .map(|op| &op.id), + ); + nodes.extend( + self.sinks + .values() + .filter(|sink| sink.runtime() == runtime) + .map(|sink| &sink.id), + ); + nodes.extend( + self.senders + .values() + .filter(|sender| sender.runtime() == runtime) + .map(|sender| &sender.id), + ); + nodes.extend( + self.receivers + .values() + .filter(|receiver| receiver.runtime() == runtime) + .map(|receiver| &receiver.id), + ); + + nodes + } } #[cfg(test)] diff --git a/zenoh-flow-runtime/Cargo.toml b/zenoh-flow-runtime/Cargo.toml new file mode 100644 index 00000000..61b426b4 --- /dev/null +++ b/zenoh-flow-runtime/Cargo.toml @@ -0,0 +1,44 @@ +# +# Copyright (c) 2021 - 2023 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# + +[package] +authors = { workspace = true } +categories = { workspace = true } +description = "Internal crate for Zenoh-Flow." +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +name = "zenoh-flow-runtime" +repository = { workspace = true } +version = { workspace = true } + + +[dependencies] +anyhow = { workspace = true } +async-std = { workspace = true } +async-trait = { workspace = true } +bincode = { version = "1.3" } +flume = { workspace = true } +futures = { workspace = true } +libloading = "0.8" +serde = { workspace = true } +tracing = { workspace = true } +uhlc = { workspace = true } +url = { workspace = true } +uuid = { workspace = true } +zenoh = { workspace = true } +zenoh-flow-commons = { workspace = true } +zenoh-flow-descriptors = { workspace = true } +zenoh-flow-nodes = { workspace = true } +zenoh-flow-records = { workspace = true } diff --git a/zenoh-flow-runtime/src/instance.rs b/zenoh-flow-runtime/src/instance.rs new file mode 100644 index 00000000..3c7383f4 --- /dev/null +++ b/zenoh-flow-runtime/src/instance.rs @@ -0,0 +1,88 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use crate::runners::Runner; +use std::{ + collections::{HashMap, HashSet}, + ops::Deref, +}; +use zenoh_flow_commons::NodeId; +use zenoh_flow_records::DataFlowRecord; + +pub struct DataFlowInstance { + pub(crate) record: DataFlowRecord, + pub(crate) runners: HashMap, +} + +impl Deref for DataFlowInstance { + type Target = DataFlowRecord; + + fn deref(&self) -> &Self::Target { + &self.record + } +} + +impl DataFlowInstance { + pub fn start_sources(&mut self) { + self.start_nodes(self.sources.keys().cloned().collect()); + } + + pub async fn stop_sources(&mut self) { + self.stop_nodes(self.sources.keys().cloned().collect()) + .await; + } + + pub fn start_operators(&mut self) { + self.start_nodes(self.receivers.keys().cloned().collect()); + self.start_nodes(self.operators.keys().cloned().collect()); + self.start_nodes(self.senders.keys().cloned().collect()); + } + + pub async fn stop_operators(&mut self) { + self.stop_nodes(self.senders.keys().cloned().collect()) + .await; + self.stop_nodes(self.operators.keys().cloned().collect()) + .await; + self.stop_nodes(self.receivers.keys().cloned().collect()) + .await; + } + + pub fn start_sinks(&mut self) { + self.start_nodes(self.sinks.keys().cloned().collect()); + } + + pub async fn stop_sinks(&mut self) { + self.stop_nodes(self.sinks.keys().cloned().collect()).await; + } + + fn start_nodes(&mut self, nodes: HashSet) { + for (_, runner) in self + .runners + .iter_mut() + .filter(|(runner_id, _)| nodes.contains(runner_id)) + { + runner.start() + } + } + + async fn stop_nodes(&mut self, nodes: HashSet) { + for (_, runner) in self + .runners + .iter_mut() + .filter(|(runner_id, _)| nodes.contains(runner_id)) + { + runner.stop().await + } + } +} diff --git a/zenoh-flow-runtime/src/lib.rs b/zenoh-flow-runtime/src/lib.rs new file mode 100644 index 00000000..f37700e8 --- /dev/null +++ b/zenoh-flow-runtime/src/lib.rs @@ -0,0 +1,24 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +mod instance; + +mod loader; + +mod shared_memory; + +mod runners; + +mod runtime; +pub use runtime::Runtime; diff --git a/zenoh-flow-runtime/src/loader/configuration.rs b/zenoh-flow-runtime/src/loader/configuration.rs new file mode 100644 index 00000000..975e65e1 --- /dev/null +++ b/zenoh-flow-runtime/src/loader/configuration.rs @@ -0,0 +1,137 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::path::PathBuf; + +use anyhow::bail; +use serde::{Deserialize, Serialize}; +use zenoh_flow_commons::Result; + +use super::NodeSymbol; + +/// Extensible support for different implementations +/// This represents the configuration for an extension. +/// +/// +/// Example: +/// +/// ```yaml +/// name: python +/// file_extension: py +/// source_lib: ./target/release/libpy_source.so +/// sink_lib: ./target/release/libpy_sink.so +/// operator_lib: ./target/release/libpy_op.so +/// config_lib_key: python-script +/// ``` +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ExtensibleImplementation { + pub(crate) name: String, + pub(crate) file_extension: String, + pub(crate) source_lib: PathBuf, + pub(crate) sink_lib: PathBuf, + pub(crate) operator_lib: PathBuf, + pub(crate) config_lib_key: String, +} + +/// Loader configuration files, it includes the extensions. +/// +/// Example: +/// +/// ```yaml +/// extensions: +/// - name: python +/// file_extension: py +/// source_lib: ./target/release/libpy_source.so +/// sink_lib: ./target/release/libpy_sink.so +/// operator_lib: ./target/release/libpy_op.so +/// config_lib_key: python-script +/// ``` +/// +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LoaderConfig { + // NOTE This has to be a vector as we are reading it from a file. + extensions: Vec, +} + +impl LoaderConfig { + /// Creates an empty `LoaderConfig`. + pub fn new() -> Self { + Self { extensions: vec![] } + } + + /// Adds the given extension. + /// + /// # Errors + /// It returns an error variant if the extension is already present. + pub fn try_add_extension(&mut self, ext: ExtensibleImplementation) -> Result<()> { + if self + .extensions + .iter() + .any(|e| e.file_extension == ext.file_extension) + { + bail!( + "Extension < {} > already has an associated configuration", + ext.file_extension + ) + } + self.extensions.push(ext); + Ok(()) + } + + /// Removes the given extension. + pub fn remove_extension(&mut self, name: &str) -> Option { + if let Some(index) = self.extensions.iter().position(|e| e.name == name) { + let ext = self.extensions.remove(index); + return Some(ext); + } + None + } + + /// Gets the extension that matches the given `file_extension`. + pub fn get_extension_by_file_extension( + &self, + file_extension: &str, + ) -> Option<&ExtensibleImplementation> { + self.extensions + .iter() + .find(|e| e.file_extension == file_extension) + } + + pub(crate) fn get_library_path( + &self, + file_extension: &str, + symbol: &NodeSymbol, + ) -> Option<&PathBuf> { + self.get_extension_by_file_extension(file_extension) + .map(|extension| match symbol { + NodeSymbol::Source => &extension.source_lib, + NodeSymbol::Operator => &extension.operator_lib, + NodeSymbol::Sink => &extension.sink_lib, + }) + } + + /// Gets the extension that matches the given `name`. + pub fn get_extension_by_name(&self, name: &str) -> Option<&ExtensibleImplementation> { + if let Some(ext) = self.extensions.iter().find(|e| e.name == name) { + return Some(ext); + } + None + } +} + +impl Default for LoaderConfig { + fn default() -> Self { + Self::new() + } +} diff --git a/zenoh-flow-runtime/src/loader/mod.rs b/zenoh-flow-runtime/src/loader/mod.rs new file mode 100644 index 00000000..98d94a3f --- /dev/null +++ b/zenoh-flow-runtime/src/loader/mod.rs @@ -0,0 +1,180 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +mod configuration; +// mod uri; + +use self::configuration::LoaderConfig; +use anyhow::{anyhow, bail, Context}; +use libloading::Library; +use std::{collections::HashMap, path::PathBuf, str::FromStr, sync::Arc}; +use url::Url; +use zenoh_flow_commons::Result; +use zenoh_flow_nodes::{NodeDeclaration, CORE_VERSION, RUSTC_VERSION}; + +/// NodeSymbol groups the symbol we must find in the shared library we load. +pub(crate) enum NodeSymbol { + Source, + Operator, + Sink, +} + +impl NodeSymbol { + /// Returns the bytes representation of the symbol. + /// + /// They are of the form: + /// + /// `b"_zf_export_\0"` + /// + /// Where `` is either `operator`, `source`, or `sink`. + pub(crate) fn to_bytes(&self) -> &[u8] { + match self { + NodeSymbol::Source => b"_zf_export_source\0", + NodeSymbol::Operator => b"_zf_export_operator\0", + NodeSymbol::Sink => b"_zf_export_sink\0", + } + } +} + +/// The dynamic library loader. +/// Before loading it verifies if the versions are compatible +/// and if the symbols are presents. +/// It loads the files in different way depending on the operating system. +/// In particular the scope of the symbols is different between Unix and +/// Windows. +/// In Unix system the symbols are loaded with the flags: +/// +/// - `RTLD_NOW` load all the symbols when loading the library. +/// - `RTLD_LOCAL` keep all the symbols local. +pub struct Loader { + pub(crate) config: LoaderConfig, + pub(crate) libraries: HashMap, Library>, +} + +impl Loader { + /// Creates a new `Loader` with the given `config`. + pub fn new(config: LoaderConfig) -> Self { + Self { + config, + libraries: HashMap::default(), + } + } + + pub(crate) fn try_load_constructor( + &mut self, + uri: &Arc, + node_symbol: &NodeSymbol, + ) -> Result { + if let Some(library) = self.libraries.get(uri) { + return self.try_get_constructor(library, node_symbol); + } + + let url = Url::parse(uri).context(format!("Failed to parse uri:\n{}", uri))?; + + let library = match url.scheme() { + "file" => self + .try_load_library_from_uri(url.path(), node_symbol) + .context(format!("Failed to load library from file:\n{}", url.path()))?, + _ => bail!( + "Unsupported scheme < {} > while trying to load node:\n{}", + url.scheme(), + uri + ), + }; + + let constructor = self.try_get_constructor::(&library, node_symbol); + self.libraries.insert(uri.clone(), library); + + constructor + } + + pub(crate) fn try_load_library_from_uri( + &self, + path: &str, + node_symbol: &NodeSymbol, + ) -> Result { + let path_buf = PathBuf::from_str(path) + .context(format!("Failed to convert path to a `PathBuf`:\n{}", path))?; + + let library_path = match path_buf.extension().and_then(|ext| ext.to_str()) { + Some(extension) => { + if extension == std::env::consts::DLL_EXTENSION { + &path_buf + } else { + self.config + .get_library_path(extension, node_symbol) + .ok_or_else(|| { + anyhow!( + "Cannot load library, no extension found for files of type < {} > :\n{}", + extension, + path_buf.display() + ) + })? + } + } + None => bail!( + "Cannot load library, missing file extension:\n{}", + path_buf.display() + ), + }; + + let library_path = std::fs::canonicalize(library_path).context(format!( + "Failed to canonicalize path (did you put an absolute path?):\n{}", + path_buf.display() + ))?; + + #[cfg(any(target_family = "unix", target_family = "windows"))] + Ok(unsafe { + Library::new(&library_path).context(format!( + "libloading::Library::new failed:\n{}", + library_path.display() + ))? + }) + } + + fn try_get_constructor(&self, library: &Library, node_symbol: &NodeSymbol) -> Result { + let decl = unsafe { + library + .get::<*mut NodeDeclaration>(node_symbol.to_bytes())? + .read() + }; + + // version checks to prevent accidental ABI incompatibilities + if decl.rustc_version != RUSTC_VERSION || decl.core_version != CORE_VERSION { + if decl.rustc_version != RUSTC_VERSION { + bail!( + r#" +It appears that the node was not compiled with the same version of the Rust compiler than Zenoh-Flow: +- (expected, Zenoh-Flow): {} +- (found, Node): {} +"#, + RUSTC_VERSION, + decl.rustc_version, + ) + } + + bail!( + r#" +It appears that the node was not compiled with the same version of Zenoh-Flow than that of this Zenoh-Flow runtime: +- (expected, this Zenoh-Flow runtime): {} +- (found, Node): {} +"#, + CORE_VERSION, + decl.core_version, + ) + } + + Ok(decl.constructor) + } +} diff --git a/zenoh-flow-runtime/src/loader/uri.rs b/zenoh-flow-runtime/src/loader/uri.rs new file mode 100644 index 00000000..68bbe568 --- /dev/null +++ b/zenoh-flow-runtime/src/loader/uri.rs @@ -0,0 +1,52 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{path::PathBuf, str::FromStr}; + +use anyhow::{bail, Context}; +use libloading::Library; +use url::Url; +use zenoh_flow_commons::{Configuration, Result}; + +use super::NodeSymbol; + +fn try_load_node_from_file(path: &str) -> Result<(Library, N)> { + let path_buf = PathBuf::from_str(path) + .context(format!("Failed to convert path to a `PathBuf`:\n{}", path))?; + + let path = std::fs::canonicalize(&path_buf).context(format!( + "Failed to canonicalize path (did you put an absolute path?):\n{}", + path_buf.display() + ))?; + + let library_path = match path.extension().and_then(|ext| ext.to_str()) { + Some(extension) => { + if extension == std::env::consts::DLL_EXTENSION { + path + } else { + todo!() + } + } + None => bail!( + "Cannot load library, missing file extension:\n{}", + path.display() + ), + }; + + todo!() +} + +fn try_load_built_in_node(resource: &str) -> Result<(Library, N)> { + todo!() +} diff --git a/zenoh-flow-runtime/src/runners/builtin/mod.rs b/zenoh-flow-runtime/src/runners/builtin/mod.rs new file mode 100644 index 00000000..bd148cb1 --- /dev/null +++ b/zenoh-flow-runtime/src/runners/builtin/mod.rs @@ -0,0 +1,15 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +pub(crate) mod zenoh; diff --git a/zenoh-flow-runtime/src/runners/builtin/zenoh/mod.rs b/zenoh-flow-runtime/src/runners/builtin/zenoh/mod.rs new file mode 100644 index 00000000..abd2ad0c --- /dev/null +++ b/zenoh-flow-runtime/src/runners/builtin/zenoh/mod.rs @@ -0,0 +1,16 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +pub(crate) mod sink; +pub(crate) mod source; diff --git a/zenoh-flow-runtime/src/runners/builtin/zenoh/sink.rs b/zenoh-flow-runtime/src/runners/builtin/zenoh/sink.rs new file mode 100644 index 00000000..859238bf --- /dev/null +++ b/zenoh-flow-runtime/src/runners/builtin/zenoh/sink.rs @@ -0,0 +1,208 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use crate::shared_memory::SharedMemory; +use anyhow::{anyhow, Context}; +use async_std::sync::Mutex; +use futures::{future::select_all, Future}; +use std::{collections::HashMap, pin::Pin, sync::Arc}; +use zenoh::{prelude::r#async::*, publication::Publisher}; +use zenoh_flow_commons::{NodeId, PortId, Result, SharedMemoryConfiguration}; +use zenoh_flow_nodes::prelude::{InputRaw, Inputs, LinkMessage, Node}; + +/// Internal type of pending futures for the ZenohSink +type ZFInputFut = Pin)> + Send + Sync>>; + +fn wait_flow_input(id: PortId, input: &InputRaw) -> ZFInputFut { + let input = input.clone(); + Box::pin(async move { (id, input.recv().await) }) +} + +/// The builtin Zenoh Sink +/// It can publish to multiple KEs and can have multiple outputs +/// It expects a configuration in the format +/// +/// : +/// : +/// +/// It expects the input(s) defined in the configuration to be connected. +pub(crate) struct ZenohSink<'a> { + id: NodeId, + inputs: HashMap, + publishers: HashMap>, + key_exprs: HashMap, + state: Arc>, +} + +/// The ZenohSinkState stores in a single structure all the fields protected by a lock. +/// +/// The fields are: +/// - `futs` contains the pending futures waiting for inputs on their channel; +/// - `buffer` holds a growable vector of bytes in which the result of the serialization of the data +/// is stored. +struct State { + pub(crate) futs: Vec, + pub(crate) payload_buffer: Vec, + pub(crate) shm: SharedMemory, +} + +impl<'a> ZenohSink<'a> { + pub(crate) fn get(&self, port: &PortId) -> (&OwnedKeyExpr, &InputRaw, &Publisher<'a>) { + let key_expr = self.key_exprs.get(port).unwrap(); + let input_raw = self.inputs.get(port).unwrap(); + let publisher = self.publishers.get(port).unwrap(); + + (key_expr, input_raw, publisher) + } + + pub(crate) async fn try_new( + id: NodeId, + session: Arc, + key_exprs: &HashMap, + shm_configuration: &SharedMemoryConfiguration, + mut inputs: Inputs, + ) -> Result> { + let mut raw_inputs = HashMap::with_capacity(key_exprs.len()); + let mut publishers = HashMap::with_capacity(key_exprs.len()); + + for (port, key_expr) in key_exprs.clone().into_iter() { + raw_inputs.insert( + port.clone(), + inputs + .take(port.as_ref()) + .context(format!( + r#" +[built-in zenoh sink: {}][port: {}] Zenoh-Flow encountered a fatal internal error. +No Input was created for port: < {1} > (key expression: {}). +"#, + id, port, key_expr, + ))? + .raw(), + ); + + publishers.insert( + port.clone(), + session + .declare_publisher(key_expr.clone()) + .res() + .await + .map_err(|e| { + anyhow!( + r#" +[built-in zenoh sink: {}][port: {}] Zenoh-Flow encountered a fatal internal error. +Zenoh failed to declare a publisher on < {} >. +Caused by: + +{:?}"#, + id, + port, + key_expr, + e + ) + })?, + ); + } + + let futs: Vec<_> = raw_inputs + .iter() + .map(|(id, input)| wait_flow_input(id.clone(), input)) + .collect(); + + let shm = SharedMemory::new(&id, session.clone(), shm_configuration); + + Ok(Self { + id, + inputs: raw_inputs, + publishers, + key_exprs: key_exprs.clone(), + state: Arc::new(Mutex::new(State { + shm, + futs, + payload_buffer: Vec::new(), + })), + }) + } +} + +#[async_trait::async_trait] +impl<'a> Node for ZenohSink<'a> { + async fn iteration(&self) -> Result<()> { + // Getting the list of futures to poll in a temporary variable (that `select_all` can take ownership of) + let mut state = self.state.lock().await; + let tmp = std::mem::take(&mut state.futs); + let mut payload_buffer = std::mem::take(&mut state.payload_buffer); + + let ((id, message), _index, mut remaining) = select_all(tmp).await; + + let (key_expr, input, publisher) = self.get(&id); + + match message { + Ok(LinkMessage::Data(data)) => { + // NOTE: In most of cases sending through the shared memory should suffice. + // + // This holds true EVEN IF THERE IS NO SHARED MEMORY. Zenoh will, by default, automatically fallback to + // a "regular" put if there is no shared-memory channel. + // + // The only case where sending through it would fail is if it is impossible to allocate enough space in + // the shared memory. + // + // This can happen if: + // - not enough memory is allocated on the shared memory manager (data is bigger than the allocated + // memory), + // - the memory is full (is there a slow subscriber? some congestion on the network?). + if let Err(e) = state + .shm + .try_send_payload(key_expr, data, &mut payload_buffer) + .await + { + tracing::warn!( + r#" +[built-in zenoh sink: {}][port: {}] Failed to send the data via Zenoh's shared memory. + +Caused by: +{:?} +"#, + self.id, + key_expr, + e + ); + tracing::warn!( + "[built-in zenoh sink: {}][port: {}] Attempting to send via a non-shared memory channel.", + self.id, + key_expr + ); + + publisher + .put(payload_buffer) + .res() + .await + .map_err(|e| anyhow!("{:?}", e))? + } + } + Ok(_) => (), // Not the right message, ignore it. + Err(e) => tracing::error!( + "[built-in zenoh sink: {}][port: {}] Channel returned an error: {e:?}", + self.id, + key_expr + ), + } + + remaining.push(wait_flow_input(id, input)); + + // Set back the complete list for the next iteration + state.futs = remaining; + + Ok(()) + } +} diff --git a/zenoh-flow-runtime/src/runners/builtin/zenoh/source.rs b/zenoh-flow-runtime/src/runners/builtin/zenoh/source.rs new file mode 100644 index 00000000..a663bf1a --- /dev/null +++ b/zenoh-flow-runtime/src/runners/builtin/zenoh/source.rs @@ -0,0 +1,157 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use anyhow::{anyhow, Context as ac}; +use async_std::sync::Mutex; +use futures::future::select_all; +use futures::Future; +use std::sync::Arc; +use std::{collections::HashMap, pin::Pin}; +use zenoh::prelude::r#async::*; +use zenoh::sample::Sample; +use zenoh::{subscriber::FlumeSubscriber, Session}; +use zenoh_flow_commons::{NodeId, PortId, Result}; +use zenoh_flow_nodes::prelude::OutputRaw; +use zenoh_flow_nodes::prelude::{Node, Outputs}; + +/// Internal type of pending futures for the ZenohSource +pub(crate) type ZSubFut = Pin)> + Send + Sync>>; + +fn wait_zenoh_sub(id: PortId, sub: &FlumeSubscriber<'_>) -> ZSubFut { + let sub = sub.receiver.clone(); + Box::pin(async move { (id, sub.recv_async().await.map_err(|e| e.into())) }) +} + +pub(crate) struct ZenohSource<'a> { + id: NodeId, + outputs: HashMap, + subscribers: HashMap>, + futs: Arc>>, +} + +impl<'a> ZenohSource<'a> { + pub(crate) async fn try_new( + id: &NodeId, + session: Arc, + key_exprs: &HashMap, + mut outputs: Outputs, + ) -> Result> { + let mut raw_outputs = HashMap::with_capacity(key_exprs.len()); + let mut subscribers = HashMap::with_capacity(key_exprs.len()); + + for (port, key_expr) in key_exprs.iter() { + raw_outputs.insert( + port.clone(), + outputs + .take(port.as_ref()) + .context( + r#" +Zenoh-Flow encountered a fatal internal error. + +The Zenoh built-in Source < {} > wants to subscribe to the key expression < {} > but +no channel for this key expression was created. +"#, + )? + .raw(), + ); + + subscribers.insert( + port.clone(), + session + .declare_subscriber(key_expr) + .res() + .await + .map_err(|e| { + anyhow!( + r#" +Zenoh-Flow encountered a fatal internal error. + +Zenoh failed to declare a subscriber on < {} > for the Zenoh built-in Source < {} >. +Caused by: + +{:?}"#, + key_expr, + id, + e + ) + })?, + ); + } + + let futs = subscribers + .iter() + .map(|(id, sub)| wait_zenoh_sub(id.clone(), sub)) + .collect::>(); + + Ok(Self { + id: id.clone(), + outputs: raw_outputs, + subscribers, + futs: Arc::new(Mutex::new(futs)), + }) + } +} + +#[async_trait::async_trait] +impl<'a> Node for ZenohSource<'a> { + async fn iteration(&self) -> Result<()> { + // Getting the list of futures to poll in a temporary variable (that `select_all` can take + // ownership of) + let mut futs = self.futs.lock().await; + let tmp = std::mem::take(&mut (*futs)); + + let ((id, result), _index, mut remaining) = select_all(tmp).await; + + match result { + Ok(sample) => { + let data = sample.payload.contiguous().to_vec(); + let ke = sample.key_expr; + tracing::trace!( + "[ZenohSource] Received data from {ke:?} Len: {} for output: {id}", + data.len() + ); + let output = self.outputs.get(&id).ok_or(anyhow!( + "[{}] Built-in Zenoh source, unable to find output < {} >", + self.id, + id + ))?; + output.send(data, None).await?; + } + Err(e) => tracing::error!("[ZenohSource] got a Zenoh error from output {id} : {e:?}"), + } + + // Add back the subscriber that got polled + let sub = self + .subscribers + .get(&id) + .ok_or_else(|| anyhow!("[{}] Cannot find port < {} >", self.id, id))?; + + let next_sub = sub.receiver.clone(); + let source_id = self.id.clone(); + remaining.push(Box::pin(async move { + ( + id.clone(), + next_sub.recv_async().await.context(format!( + "[{}] Zenoh subscriber < {} > failed", + source_id, id + )), + ) + })); + + // Setting back a complete list for the next iteration + *futs = remaining; + + Ok(()) + } +} diff --git a/zenoh-flow-runtime/src/runners/connectors.rs b/zenoh-flow-runtime/src/runners/connectors.rs new file mode 100644 index 00000000..a84fe08d --- /dev/null +++ b/zenoh-flow-runtime/src/runners/connectors.rs @@ -0,0 +1,207 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use anyhow::{anyhow, bail}; +use async_std::sync::Mutex; +use std::sync::Arc; +use zenoh::{prelude::r#async::*, subscriber::FlumeSubscriber}; +use zenoh_flow_commons::{NodeId, Result, SharedMemoryConfiguration}; +use zenoh_flow_nodes::prelude::{InputRaw, Inputs, LinkMessage, Node, OutputRaw, Outputs}; +use zenoh_flow_records::{ReceiverRecord, SenderRecord}; + +use crate::shared_memory::SharedMemory; + +pub(crate) struct ZenohConnectorSender { + id: NodeId, + input: InputRaw, + key_expr: OwnedKeyExpr, + session: Arc, + state: Arc>, +} + +struct State { + pub(crate) payload_buffer: Vec, + pub(crate) message_buffer: Vec, + pub(crate) shm: SharedMemory, +} + +impl ZenohConnectorSender { + pub(crate) fn try_new( + session: Arc, + shm_config: &SharedMemoryConfiguration, + record: SenderRecord, + mut inputs: Inputs, + ) -> Result { + let input = inputs + .take(&record.resource) + // TODO@J-Loudet + .ok_or_else(|| anyhow!(""))? + .raw(); + + Ok(Self { + input, + key_expr: record.resource, + state: Arc::new(Mutex::new(State { + payload_buffer: Vec::new(), + message_buffer: Vec::new(), + shm: SharedMemory::new(&record.id, session.clone(), shm_config), + })), + id: record.id, + session, + }) + } +} + +#[async_trait::async_trait] +impl Node for ZenohConnectorSender { + async fn iteration(&self) -> Result<()> { + match self.input.recv().await { + Ok(message) => { + let mut state = self.state.lock().await; + + let mut message_buffer = std::mem::take(&mut state.message_buffer); + let mut payload_buffer = std::mem::take(&mut state.payload_buffer); + + if let Err(e) = state + .shm + .try_send_message( + &self.key_expr, + message, + &mut message_buffer, + &mut payload_buffer, + ) + .await + { + tracing::warn!( + r#" +[connector sender (zenoh): {}][key expr: {}] Failed to send the message via Zenoh's shared memory. + +Caused by: +{:?} +"#, + self.id, + self.key_expr, + e + ); + tracing::warn!( + "[connector sender (zenoh): {}][key expr: {}] Attempting to send via a non-shared memory channel.", + self.id, + self.key_expr + ); + + self.session + .put(&self.key_expr, message_buffer) + .res() + .await + .map_err(|e| { + anyhow!( + r#" +[connector sender (zenoh): {}][key expr: {}] Failed to send the message via a Zenoh publication. + +Caused by: +{:?} +"#, + self.id, + self.key_expr, + e + ) + })?; + } + + Ok(()) + } + + Err(e) => { + tracing::error!( + r#" +[connector sender (zenoh): {}][key expr: {}] Internal channel returned the following error: +{:?} +"#, + self.id, + self.key_expr, + e + ); + Err(e) + } + } + } +} + +pub(crate) struct ZenohConnectorReceiver { + pub(crate) id: NodeId, + pub(crate) key_expr: OwnedKeyExpr, + pub(crate) output_raw: OutputRaw, + pub(crate) subscriber: FlumeSubscriber<'static>, +} + +impl ZenohConnectorReceiver { + pub(crate) async fn try_new( + session: Arc, + record: ReceiverRecord, + mut outputs: Outputs, + ) -> Result { + let ke = session + .declare_keyexpr(record.resource.clone()) + .res() + .await + // TODO@J-Loudet + .map_err(|e| anyhow!("{:?}", e))?; + + let subscriber = session + .declare_subscriber(ke) + .res() + .await + // TODO@J-Loudet + .map_err(|e| anyhow!("{:?}", e))?; + + let output_raw = outputs + .take(&record.resource) + // TODO@J-Loudet + .ok_or_else(|| anyhow!(""))? + .raw(); + + Ok(Self { + id: record.id, + key_expr: record.resource, + output_raw, + subscriber, + }) + } +} + +#[async_trait::async_trait] +impl Node for ZenohConnectorReceiver { + async fn iteration(&self) -> Result<()> { + match self.subscriber.recv_async().await { + Ok(message) => { + let de: LinkMessage = bincode::deserialize(&message.value.payload.contiguous())?; + + self.output_raw.forward(de).await + } + + Err(e) => { + tracing::error!( + r#" +[connector receiver (zenoh): {}][key expr: {}] Zenoh subscriber returned the following error: +{:?} +"#, + self.id, + self.key_expr, + e + ); + bail!("{:?}", e) + } + } + } +} diff --git a/zenoh-flow-runtime/src/runners/mod.rs b/zenoh-flow-runtime/src/runners/mod.rs new file mode 100644 index 00000000..4c02169d --- /dev/null +++ b/zenoh-flow-runtime/src/runners/mod.rs @@ -0,0 +1,116 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +pub(crate) mod builtin; +pub(crate) mod connectors; + +use async_std::task::JoinHandle; +use futures::future::{AbortHandle, Abortable, Aborted}; +use std::sync::Arc; +use std::time::Instant; +use zenoh_flow_commons::NodeId; +use zenoh_flow_nodes::prelude::Node; + +/// A `Runner` takes care of running a `Node`. +/// +/// It spawns an abortable task in which the `iteration` is called in a loop, indefinitely. +pub(crate) struct Runner { + pub(crate) id: NodeId, + pub(crate) node: Arc, + pub(crate) run_loop_handle: Option>>, + pub(crate) run_loop_abort_handle: Option, +} + +impl Runner { + pub(crate) fn new(id: NodeId, node: Arc) -> Self { + Self { + id, + node, + run_loop_handle: None, + run_loop_abort_handle: None, + } + } + + /// Start the `Runner`, spawning an abortable task. + /// + /// `start` is idempotent and will do nothing if the node is already running. + pub(crate) fn start(&mut self) { + if self.is_running() { + tracing::warn!( + "[{}] Called `start` while node is ALREADY running. Returning.", + self.id + ); + return; + } + + let node = self.node.clone(); + let id = self.id.clone(); + let run_loop = async move { + let mut instant: Instant; + loop { + instant = Instant::now(); + if let Err(e) = node.iteration().await { + tracing::error!("[{}] Iteration error: {:?}", id, e); + return e; + } + + tracing::trace!( + "[{}] iteration took: {}µs", + id, + instant.elapsed().as_micros() + ); + + async_std::task::yield_now().await; + } + }; + + let (abort_handle, abort_registration) = AbortHandle::new_pair(); + let handle = async_std::task::spawn(Abortable::new(run_loop, abort_registration)); + + self.run_loop_handle = Some(handle); + self.run_loop_abort_handle = Some(abort_handle); + } + + /// Stop the execution of a `Node`. + /// + /// We will call `abort` on the `AbortHandle` and then `await` the `JoinHandle`. As per its + /// documentation, `abort` will not forcefully interrupt an execution if the corresponding task + /// is being polled on another thread. + /// + /// `stop` is idempotent and will do nothing if the node is not running. + pub(crate) async fn stop(&mut self) { + if !self.is_running() { + tracing::warn!( + "[{}] Called `stop` while node is NOT running. Returning.", + self.id + ); + return; + } + + if let Some(abort_handle) = self.run_loop_abort_handle.take() { + abort_handle.abort(); + if let Some(handle) = self.run_loop_handle.take() { + tracing::trace!("[{}] Handler finished with {:?}", self.id, handle.await); + } + } + } + + /// Tell if the node is running. + /// + /// To do so we check if an `AbortHandle` was set. If so, then a task was spawned and the node + /// is indeed running. + pub(crate) fn is_running(&self) -> bool { + self.run_loop_handle.is_some() + } +} diff --git a/zenoh-flow-runtime/src/runtime.rs b/zenoh-flow-runtime/src/runtime.rs new file mode 100644 index 00000000..05d6394e --- /dev/null +++ b/zenoh-flow-runtime/src/runtime.rs @@ -0,0 +1,354 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{collections::HashMap, sync::Arc}; + +use crate::{ + instance::DataFlowInstance, + loader::{Loader, NodeSymbol}, + runners::{ + builtin::zenoh::{sink::ZenohSink, source::ZenohSource}, + connectors::{ZenohConnectorReceiver, ZenohConnectorSender}, + Runner, + }, +}; +use anyhow::{bail, Context as errContext}; +use uhlc::HLC; +use uuid::Uuid; +use zenoh::Session; +use zenoh_flow_commons::{NodeId, RecordId, Result, RuntimeId, SharedMemoryConfiguration}; +use zenoh_flow_descriptors::{SinkLibrary, SourceLibrary}; +use zenoh_flow_nodes::{ + prelude::{Inputs, Outputs}, + Context, OperatorFn, SinkFn, SourceFn, +}; +use zenoh_flow_records::DataFlowRecord; + +pub(crate) type Channels = HashMap; + +pub struct Runtime { + pub(crate) hlc: Arc, + pub(crate) session: Arc, + pub(crate) runtime_id: RuntimeId, + pub(crate) shared_memory: SharedMemoryConfiguration, + pub(crate) loader: Loader, + pub(crate) flows: HashMap, +} + +impl Runtime { + /// TODO@J-Loudet + pub fn new( + loader: Loader, + hlc: Arc, + session: Arc, + shared_memory: SharedMemoryConfiguration, + ) -> Self { + Self { + runtime_id: Uuid::new_v4().into(), + session, + hlc, + loader, + shared_memory, + flows: HashMap::default(), + } + } + + /// TODO@J-Loudet + pub async fn try_instantiate_data_flow(&mut self, data_flow: DataFlowRecord) -> Result<()> { + let mut channels = self.create_channels(&data_flow)?; + let mut runners = HashMap::::default(); + + let context = Context::new( + data_flow.name().clone(), + data_flow.id().clone(), + self.runtime_id.clone(), + ); + + runners.extend( + self.try_load_operators(&data_flow, &mut channels, context.clone()) + .await?, + ); + runners.extend( + self.try_load_sources(&data_flow, &mut channels, context.clone()) + .await?, + ); + runners.extend( + self.try_load_sinks(&data_flow, &mut channels, context.clone()) + .await?, + ); + runners.extend(self.try_load_receivers(&data_flow, &mut channels).await?); + runners.extend(self.try_load_senders(&data_flow, &mut channels)?); + + Ok(()) + } + + /// TODO@J-Loudet + pub fn get_instance_mut(&mut self, record_id: &RecordId) -> Option<&mut DataFlowInstance> { + self.flows.get_mut(record_id) + } + + /// Create all the channels for the provided `DataFlowRecord`. + /// + /// # Errors + /// + /// The only scenario in which this method fails is if we did not correctly processed the data flow descriptor and + /// ended up having a link with nodes on two different runtime. + fn create_channels(&self, record: &DataFlowRecord) -> Result { + let nodes_runtime = record.get_nodes_for_runtime(&self.runtime_id); + + let mut channels = HashMap::default(); + for link in &record.links { + if !nodes_runtime.contains(&link.from.node) || !nodes_runtime.contains(&link.to.node) { + if nodes_runtime.contains(&link.from.node) || nodes_runtime.contains(&link.to.node) + { + bail!( + r#" +Zenoh-Flow encountered a fatal internal error: a link is connecting two nodes that are on *different* runtime. + +Link: +{} +"#, + link + ); + } + + continue; + } + + let (tx, rx) = flume::unbounded(); + let (_, outputs) = channels + .entry(link.from.node.clone()) + .or_insert_with(|| (Inputs::default(), Outputs::new(self.hlc.clone()))); + outputs.insert(link.from.output.clone(), tx); + + let (inputs, _) = channels + .entry(link.to.node.clone()) + .or_insert_with(|| (Inputs::default(), Outputs::new(self.hlc.clone()))); + inputs.insert(link.to.input.clone(), rx); + } + + Ok(channels) + } + + /// TODO@J-Loudet + async fn try_load_operators( + &mut self, + record: &DataFlowRecord, + channels: &mut Channels, + context: Context, + ) -> Result> { + let mut runners = HashMap::default(); + + for (operator_id, operator) in record + .operators + .iter() + .filter(|(_, operator)| operator.runtime() == &self.runtime_id) + { + let (inputs, outputs) = channels.remove(operator_id).context(format!( + r#" +Zenoh-Flow encountered a fatal internal error. +The channels for the Inputs and Outputs of Operator < {} > were not created. + "#, + &operator_id + ))?; + + let constructor = self + .loader + .try_load_constructor::(&operator.library, &NodeSymbol::Operator)?; + let operator_node = (constructor)( + context.clone(), + operator.configuration.clone(), + inputs, + outputs, + ) + .await?; + runners.insert( + operator_id.clone(), + Runner::new(operator_id.clone(), operator_node), + ); + } + + Ok(runners) + } + + /// TODO@J-Loudet + async fn try_load_sources( + &mut self, + record: &DataFlowRecord, + channels: &mut Channels, + context: Context, + ) -> Result> { + let mut runners = HashMap::default(); + + for (source_id, source) in record + .sources + .iter() + .filter(|(_, source)| source.runtime() == &self.runtime_id) + { + let (_, outputs) = channels.remove(source_id).context(format!( + r#" +Zenoh-Flow encountered a fatal internal error. +The channels for the Outputs of Source < {} > were not created. + "#, + &source_id + ))?; + + let runner = match &source.library { + SourceLibrary::Uri(uri) => { + let constructor = self + .loader + .try_load_constructor::(uri, &NodeSymbol::Source)?; + let source_node = + (constructor)(context.clone(), source.configuration.clone(), outputs) + .await?; + + Runner::new(source.id.clone(), source_node) + } + SourceLibrary::Zenoh(key_exprs) => { + let dyn_source = + ZenohSource::try_new(&source.id, self.session.clone(), key_exprs, outputs) + .await?; + Runner::new(source.id.clone(), Arc::new(dyn_source)) + } + }; + + runners.insert(source_id.clone(), runner); + } + + Ok(runners) + } + + /// TODO@J-Loudet + async fn try_load_sinks( + &mut self, + record: &DataFlowRecord, + channels: &mut Channels, + context: Context, + ) -> Result> { + let mut runners = HashMap::default(); + + for (sink_id, sink) in record + .sinks + .iter() + .filter(|(_, sink)| sink.runtime() == &self.runtime_id) + { + let (inputs, _) = channels.remove(sink_id).context(format!( + r#" +Zenoh-Flow encountered a fatal internal error. +The channels for the Inputs of Sink < {} > were not created. + "#, + &sink_id + ))?; + + let runner = match &sink.library { + SinkLibrary::Uri(uri) => { + let constructor = self + .loader + .try_load_constructor::(uri, &NodeSymbol::Sink)?; + let sink_node = + (constructor)(context.clone(), sink.configuration.clone(), inputs).await?; + + Runner::new(sink.id.clone(), sink_node) + } + SinkLibrary::Zenoh(key_exprs) => { + let zenoh_sink = ZenohSink::try_new( + sink_id.clone(), + self.session.clone(), + key_exprs, + &self.shared_memory, + inputs, + ) + .await?; + + Runner::new(sink_id.clone(), Arc::new(zenoh_sink)) + } + }; + + runners.insert(sink_id.clone(), runner); + } + + Ok(runners) + } + + /// TODO@J-Loudet + async fn try_load_receivers( + &mut self, + record: &DataFlowRecord, + channels: &mut Channels, + ) -> Result> { + let mut runners = HashMap::new(); + + for (receiver_id, receiver) in record + .receivers + .iter() + .filter(|(_, receiver)| receiver.runtime() == &self.runtime_id) + { + let (_, outputs) = channels.remove(receiver_id).context(format!( + r#" +Zenoh-Flow encountered a fatal internal error. +The channels for the Outputs of Connector Receiver < {} > were not created. + "#, + receiver_id + ))?; + + let runner = + ZenohConnectorReceiver::try_new(self.session.clone(), receiver.clone(), outputs) + .await?; + + runners.insert( + receiver_id.clone(), + Runner::new(receiver_id.clone(), Arc::new(runner)), + ); + } + + Ok(runners) + } + + /// TODO@J-Loudet + fn try_load_senders( + &mut self, + record: &DataFlowRecord, + channels: &mut Channels, + ) -> Result> { + let mut runners = HashMap::new(); + + for (sender_id, sender) in record + .senders + .iter() + .filter(|(_, sender)| sender.runtime() == &self.runtime_id) + { + let (inputs, _) = channels.remove(sender_id).context(format!( + r#" +Zenoh-Flow encountered a fatal internal error. +The channels for the Inputs of Connector Sender < {} > were not created. + "#, + sender_id + ))?; + + let runner = ZenohConnectorSender::try_new( + self.session.clone(), + &self.shared_memory, + sender.clone(), + inputs, + )?; + + runners.insert( + sender_id.clone(), + Runner::new(sender_id.clone(), Arc::new(runner)), + ); + } + + Ok(runners) + } +} diff --git a/zenoh-flow-runtime/src/shared_memory.rs b/zenoh-flow-runtime/src/shared_memory.rs new file mode 100644 index 00000000..10675e0c --- /dev/null +++ b/zenoh-flow-runtime/src/shared_memory.rs @@ -0,0 +1,147 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use anyhow::anyhow; +use std::sync::Arc; +use zenoh::shm::SharedMemoryManager; +use zenoh::{prelude::r#async::*, shm::SharedMemoryBuf}; +use zenoh_flow_commons::{NodeId, Result, SharedMemoryConfiguration}; +use zenoh_flow_nodes::prelude::{DataMessage, LinkMessage}; + +pub(crate) struct SharedMemory { + session: Arc, + manager: SharedMemoryManager, + configuration: SharedMemoryConfiguration, +} + +impl SharedMemory { + pub(crate) fn new( + id: &NodeId, + session: Arc, + shm_configuration: &SharedMemoryConfiguration, + ) -> Self { + Self { + session, + manager: SharedMemoryManager::make( + format!("{}>shared-memory-manager", id), + shm_configuration.size, + ) + .unwrap(), + configuration: *shm_configuration, + } + } + + /// This method tries to send the [LinkMessage] via Zenoh's shared memory. + /// + /// # Errors + /// + /// This method can fail for multiple reasons: + /// 1. Zenoh's [SharedMemoryManager] did not manage to allocate a buffer. + /// 2. The serialization of the message into the buffer failed. + /// 3. Zenoh failed to send the message via shared memory. + pub(crate) async fn try_send_message( + &mut self, + key_expr: &str, + message: LinkMessage, + message_buffer: &mut Vec, + payload_buffer: &mut Vec, + ) -> Result<()> { + message.serialize_bincode_into(message_buffer, payload_buffer)?; + self.try_put_buffer(key_expr, message_buffer).await + } + + pub(crate) async fn try_send_payload( + &mut self, + key_expr: &str, + data: DataMessage, + payload_buffer: &mut Vec, + ) -> Result<()> { + data.try_as_bytes_into(payload_buffer)?; + self.try_put_buffer(key_expr, payload_buffer).await + } + + async fn try_put_buffer(&mut self, key_expr: &str, buffer: &mut Vec) -> Result<()> { + let mut shm_buffer = self.try_allocate_buffer(buffer.len()).await?; + let slice = unsafe { shm_buffer.as_mut_slice() }; + slice.clone_from_slice(buffer.as_mut_slice()); + + self.session + .put(key_expr, shm_buffer) + .congestion_control(CongestionControl::Block) + .res() + .await + .map_err(|e| { + anyhow!( + r#"shared memory: Put on < {:?} > failed + +Caused by: + +{:?}"#, + &key_expr, + e + ) + }) + } + + /// This methods attempts, twice, to allocate memory leveraging Zenoh's [SharedMemoryManager]. + /// + /// # Errors + /// + /// If the first call fails, we wait for `backoff` nanoseconds (as configured) and then perform (i) a garbage + /// collection followed by (ii) a defragmentation. Once these two operations have finished, we try once more to + /// allocate memory. + /// + /// If it fails again, we return the error. + pub(crate) async fn try_allocate_buffer(&mut self, size: usize) -> Result { + if let Ok(buffer) = self.try_alloc(size) { + return Ok(buffer); + } + tracing::trace!( + "shared memory: backing off for {} nanoseconds", + self.configuration.backoff + ); + async_std::task::sleep(std::time::Duration::from_nanos(self.configuration.backoff)).await; + + tracing::trace!( + "shared memory: garbage collect recovered {} bytes", + self.manager.garbage_collect() + ); + tracing::trace!( + "shared memory: defragmented {} bytes", + self.manager.defragment() + ); + + self.try_alloc(size) + } + + // Utility method that logs the error if the shared memory manager failed to allocate and converts the error + // into one that is "compatible" with `anyhow`. + fn try_alloc(&mut self, size: usize) -> Result { + let buffer = self.manager.alloc(size); + + buffer.map_err(|e| { + tracing::trace!( + r#" +shared memory: allocation of {} bytes failed + +Caused by: +{:?} +"#, + size, + e + ); + anyhow!("{:?}", e) + }) + } +}