Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show resource providers available at a node #215

Merged
merged 4 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ below.
| `ollama` | Interact via an LLM ChatBot deployed on an external [ollama](https://ollama.com/) server | host, port, messages_number_limit, provider (separate section) | model | [click](examples/ollama/README.md) |
| `redis` | Update a value on an external [Redis](https://redis.io/) server | redis_provider | url, key | [click](examples/redis/README.md) |

With `edgeless_node_d --available-resources` you can find the list of resource
providers that a node supports, along with the version, output channels, and
configuration parameters for each resource provider.

### Functions

Functions are _stateful_: a given function instance is assigned to exactly one
Expand Down
43 changes: 36 additions & 7 deletions documentation/benchmark.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ The tool supports different arrival models and workflow types.

Arrival models (option `--arrival-model`):

| Arrival model | Description |
| ------------- | --------------------------------------------------------------------------------------------------------- |
| poisson | Inter-arrival between consecutive workflows and durations are exponentially distributed. |
| incremental | One new workflow arrive every new inter-arrival time. |
| incr-and-keep | Add workflows incrementally until the warm up period finishes, then keep until the end of the experiment. |
| single | Add a single workflow. |
| Arrival model | Description |
| ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------- |
| poisson | Inter-arrival between consecutive workflows and lifetimes are exponentially distributed. |
| incremental | One new workflow arrive every new inter-arrival time, with constant lifetime. |
| incr-and-keep | Add workflows, with constant lifetimes, incrementally until the warm up period finishes, then keep until the end of the experiment. |
| single | Add a single workflow that lasts for the entire experiment. |
| trace | Read the arrival and end times of workflows from a file specified with `--workload-trace`, one workflow per line in the format `arrival,end_time` |

Workflow types (option `--wf_type`):

Expand All @@ -26,6 +27,19 @@ Workflow types (option `--wf_type`):
| vector-mul-chain | A chain of functions, each performing the multiplication of an internal random matrix of 32-bit floating point numbers by the input vector received from the caller. | workflow,function |
| map-reduce | A workflow consisting of a random number of stages, where each stage is composed of a random number of processing blocks. Before going to the next stage, the output from all the processing blocks in the stage before must be received. | workflow |

For the workflow types, except `single` a template can be generated by specifying `--wf-type "NAME;template"`.
For example, by running:

```shell
target/debug/edgeless_benchmark --wf-type "map-reduce;template" > map_reduce.json
```

A template will be generated in `map_reduce.json`, which can then be loaded with:

```shell
target/debug/edgeless_benchmark --wf-type "map-reduce;map_reduce.json"
```

The duration of the experiment is configurable via a command-line option,
like the seed used to generate pseudo-random numbers to enable repeatable
experiments and the duration of a warm-up period.
Expand Down Expand Up @@ -119,11 +133,26 @@ In one shell start the EDGELESS in-a-box:
target/debug/edgeless_inabox
```

Then create the JSON file specifying the characteristics of the vector-mul-chain
workflow:

```shell
cat << EOF > vector_mul_chain.json
{
"min_chain_length": 5,
"max_chain_length": 5,
"min_input_size": 1000,
"max_input_size": 2000,
"function_wasm_path": "functions/vector_mul/vector_mul.wasm"
}
EOF
```

In another run the following benchmark, which lasts 30 seconds:

```shell
target/debug/edgeless_benchmark \
-w "vector-mul-chain;5;5;1000;2000;functions/vector_mul/vector_mul.wasm" \
-w "vector-mul-chain;vector_mul_chain.json" \
--dataset-path "dataset/myexp-" \
--additional-fields "a,b" \
--additional-header "h_a,h_b" \
Expand Down
54 changes: 54 additions & 0 deletions edgeless_node/src/bin/edgeless_node_d.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@
// SPDX-FileCopyrightText: © 2023 Claudio Cicconetti <c.cicconetti@iit.cnr.it>
// SPDX-License-Identifier: MIT
use clap::Parser;
use edgeless_node::resources::dda::DdaResourceSpec;
use edgeless_node::resources::file_log::FileLogResourceSpec;
use edgeless_node::resources::http_egress::HttpEgressResourceSpec;
use edgeless_node::resources::http_ingress::HttpIngressResourceSpec;
#[cfg(feature = "rdkafka")]
use edgeless_node::resources::kafka_egress::KafkaEgressResourceSpec;
use edgeless_node::resources::ollama::OllamasResourceSpec;
use edgeless_node::resources::redis::RedisResourceSpec;
use edgeless_node::resources::resource_provider_specs::ResourceProviderSpecOutput;
use edgeless_node::resources::resource_provider_specs::ResourceProviderSpecs;

#[derive(Debug, clap::Parser)]
#[command(long_about = None)]
Expand All @@ -10,6 +20,10 @@ struct Args {
config_file: String,
#[arg(short, long, default_value_t = String::from(""))]
template: String,
#[arg(long, default_value_t = false)]
available_resources: bool,
#[arg(long, default_value_t = false)]
output_json: bool,
}

fn main() -> anyhow::Result<()> {
Expand All @@ -20,6 +34,46 @@ fn main() -> anyhow::Result<()> {
// console_subscriber::init();

let args = Args::parse();
if args.available_resources {
#[allow(unused_mut)]
let mut specs: Vec<Box<dyn ResourceProviderSpecs>> = vec![
Box::new(DdaResourceSpec {}),
Box::new(FileLogResourceSpec {}),
Box::new(HttpEgressResourceSpec {}),
Box::new(HttpIngressResourceSpec {}),
Box::new(OllamasResourceSpec {}),
Box::new(RedisResourceSpec {}),
];
#[cfg(feature = "rdkafka")]
specs.push(Box::new(KafkaEgressResourceSpec {}));

if args.output_json {
println!(
"{}",
serde_json::to_string(&specs.iter().map(|x| x.to_output()).collect::<Vec<ResourceProviderSpecOutput>>())
.expect("could not serialize available resources to JSON")
);
} else {
for spec in specs {
println!("class_type: {}", spec.class_type());
println!("version: {}", spec.version());
println!("outputs: [{}]", spec.outputs().join(","));
if !spec.configurations().is_empty() {
println!("configurations:");
println!(
"{}",
spec.configurations()
.iter()
.map(|(field, desc)| format!(" - {}: {}", field, desc))
.collect::<Vec<String>>()
.join("\n")
)
}
println!();
}
}
return Ok(());
}
if !args.template.is_empty() {
edgeless_api::util::create_template(&args.template, edgeless_node::edgeless_node_default_conf().as_str())?;
return Ok(());
Expand Down
29 changes: 15 additions & 14 deletions edgeless_node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

use edgeless_api::orc::OrchestratorAPI;
use futures::Future;
use resources::resource_provider_specs::ResourceProviderSpecs;

pub mod agent;
pub mod base_runtime;
Expand Down Expand Up @@ -297,7 +298,7 @@ async fn fill_resources(
if let Some(settings) = settings {
if let (Some(http_ingress_url), Some(provider_id)) = (&settings.http_ingress_url, &settings.http_ingress_provider) {
if !http_ingress_url.is_empty() && !provider_id.is_empty() {
let class_type = "http-ingress".to_string();
let class_type = resources::http_ingress::HttpIngressResourceSpec {}.class_type();
log::info!("Creating resource '{}' at {}", provider_id, http_ingress_url);
ret.insert(
provider_id.clone(),
Expand All @@ -314,15 +315,15 @@ async fn fill_resources(
provider_specifications.push(edgeless_api::node_registration::ResourceProviderSpecification {
provider_id: provider_id.clone(),
class_type,
outputs: vec!["new_request".to_string()],
outputs: resources::http_ingress::HttpIngressResourceSpec {}.outputs(),
});
}
}

if let Some(provider_id) = &settings.http_egress_provider {
if !provider_id.is_empty() {
log::info!("Creating resource '{}'", provider_id);
let class_type = "http-egress".to_string();
let class_type = resources::http_egress::HttpEgressResourceSpec {}.class_type();
ret.insert(
provider_id.clone(),
agent::ResourceDesc {
Expand All @@ -339,15 +340,15 @@ async fn fill_resources(
provider_specifications.push(edgeless_api::node_registration::ResourceProviderSpecification {
provider_id: provider_id.clone(),
class_type,
outputs: vec![],
outputs: resources::http_egress::HttpEgressResourceSpec {}.outputs(),
});
}
}

if let Some(provider_id) = &settings.file_log_provider {
if !provider_id.is_empty() {
log::info!("Creating resource '{}'", provider_id);
let class_type = "file-log".to_string();
let class_type = resources::file_log::FileLogResourceSpec {}.class_type();
ret.insert(
provider_id.clone(),
agent::ResourceDesc {
Expand All @@ -364,15 +365,15 @@ async fn fill_resources(
provider_specifications.push(edgeless_api::node_registration::ResourceProviderSpecification {
provider_id: provider_id.clone(),
class_type,
outputs: vec![],
outputs: resources::file_log::FileLogResourceSpec {}.outputs(),
});
}
}

if let Some(provider_id) = &settings.redis_provider {
if !provider_id.is_empty() {
log::info!("Creating resource '{}'", provider_id);
let class_type = "redis".to_string();
let class_type = resources::redis::RedisResourceSpec {}.class_type();
ret.insert(
provider_id.clone(),
agent::ResourceDesc {
Expand All @@ -389,15 +390,15 @@ async fn fill_resources(
provider_specifications.push(edgeless_api::node_registration::ResourceProviderSpecification {
provider_id: provider_id.clone(),
class_type,
outputs: vec![],
outputs: resources::redis::RedisResourceSpec {}.outputs(),
});
}
}

if let Some(provider_id) = &settings.dda_provider {
if !provider_id.is_empty() {
log::info!("Creating dda resource provider '{}'", provider_id);
let class_type = "dda".to_string();
let class_type = resources::dda::DdaResourceSpec {}.class_type();
ret.insert(
provider_id.clone(),
agent::ResourceDesc {
Expand All @@ -412,7 +413,7 @@ async fn fill_resources(
provider_specifications.push(edgeless_api::node_registration::ResourceProviderSpecification {
provider_id: provider_id.clone(),
class_type,
outputs: vec!["new_request".to_string()],
outputs: resources::dda::DdaResourceSpec {}.outputs(),
});
}
}
Expand All @@ -426,7 +427,7 @@ async fn fill_resources(
settings.port,
settings.messages_number_limit
);
let class_type = "ollama".to_string();
let class_type = resources::ollama::OllamasResourceSpec {}.class_type();
ret.insert(
settings.provider.clone(),
agent::ResourceDesc {
Expand All @@ -447,7 +448,7 @@ async fn fill_resources(
provider_specifications.push(edgeless_api::node_registration::ResourceProviderSpecification {
provider_id: settings.provider.clone(),
class_type,
outputs: vec!["out".to_string()],
outputs: resources::ollama::OllamasResourceSpec {}.outputs(),
});
}
}
Expand All @@ -457,7 +458,7 @@ async fn fill_resources(
#[cfg(feature = "rdkafka")]
{
log::info!("Creating resource '{}'", provider_id);
let class_type = "kafka-egress".to_string();
let class_type = resources::kafka_egress::KafkaEgressResourceSpec {}.class_type();
ret.insert(
provider_id.clone(),
agent::ResourceDesc {
Expand All @@ -474,7 +475,7 @@ async fn fill_resources(
provider_specifications.push(edgeless_api::node_registration::ResourceProviderSpecification {
provider_id: provider_id.clone(),
class_type,
outputs: vec![],
outputs: resources::kafka_egress::KafkaEgressResourceSpec {}.outputs(),
});
}
#[cfg(not(feature = "rdkafka"))]
Expand Down
30 changes: 30 additions & 0 deletions edgeless_node/src/resources/dda/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,36 @@ use std::{collections::HashMap, sync::Arc};
use tokio::sync::Mutex;
use uuid::Uuid;

pub struct DdaResourceSpec {}

impl super::resource_provider_specs::ResourceProviderSpecs for DdaResourceSpec {
fn class_type(&self) -> String {
String::from("dda")
}

fn outputs(&self) -> Vec<String> {
vec![String::from("out")]
}

fn configurations(&self) -> std::collections::HashMap<String, String> {
std::collections::HashMap::from([
(String::from("dda_url"), String::from("URL of the DDA")),
(
String::from("dda_com_subscription_mapping"),
String::from("JSON encoding the DDA subscription mapping"),
),
(
String::from("dda_com_publication_mapping"),
String::from("JSON encoding the DDA publication mapping"),
),
])
}

fn version(&self) -> String {
String::from("1.0")
}
}

// imports the generated proto file for dda
pub mod dda_com {
tonic::include_proto!("dda.com.v1");
Expand Down
26 changes: 26 additions & 0 deletions edgeless_node/src/resources/file_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,32 @@
use edgeless_dataplane::core::Message;
use std::io::prelude::*;

pub struct FileLogResourceSpec {}

impl super::resource_provider_specs::ResourceProviderSpecs for FileLogResourceSpec {
fn class_type(&self) -> String {
String::from("file-log")
}

fn outputs(&self) -> Vec<String> {
vec![]
}

fn configurations(&self) -> std::collections::HashMap<String, String> {
std::collections::HashMap::from([
(
String::from("add-source-id"),
String::from("If specified adds the InstanceId of the source component"),
),
(String::from("add-timestamp"), String::from("If specified adds a timestamp")),
])
}

fn version(&self) -> String {
String::from("1.0")
}
}

#[derive(Clone)]
pub struct FileLogResourceProvider {
inner: std::sync::Arc<tokio::sync::Mutex<FileLogResourceProviderInner>>,
Expand Down
20 changes: 20 additions & 0 deletions edgeless_node/src/resources/http_egress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,26 @@
// SPDX-License-Identifier: MIT
use edgeless_dataplane::core::Message;

pub struct HttpEgressResourceSpec {}

impl super::resource_provider_specs::ResourceProviderSpecs for HttpEgressResourceSpec {
fn class_type(&self) -> String {
String::from("http-egress")
}

fn outputs(&self) -> Vec<String> {
vec![]
}

fn configurations(&self) -> std::collections::HashMap<String, String> {
std::collections::HashMap::new()
}

fn version(&self) -> String {
String::from("1.0")
}
}

#[derive(Clone)]
pub struct EgressResourceProvider {
inner: std::sync::Arc<tokio::sync::Mutex<EgressResourceProviderInner>>,
Expand Down
Loading
Loading