-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add generator code #203
Merged
Merged
Changes from 7 commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
bfdd077
feature: add generator code
025596d
chore: formatting
0210290
feat: operator to create a separate job for load generation
08d6c03
feat: add api access for load gen
8248082
fix: command name
4f8f9c3
chore: review comment suggestions for better rust coding
188e094
feat: docs
0bdd901
fix: naming + clenaups
cffbab2
fix: update cargo
1c79e8c
fix: update before build
c16d5b7
fix: add update before all targets
ec53b9c
chore: review comments
bff0702
feat/lgen-operator
samika98 b5ab8ab
chore: renaming + updating deps
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
# Load Generation | ||
|
||
To run a load generator, you need to create a `LoadGenerator` resource. This resource is similar to a `Simulation` resource. However, the load generation can last for up to a week. They are used to generate sustained load on the system for a longer period of time. | ||
|
||
## Parameters | ||
|
||
- **`scenario`**: The scenario to run. Supported scenarios are: | ||
- `CreateModelInstancesSynced`: Requires at least two ceramic instances. Creates models on one node and has the other node sync them. | ||
- **`runTime`**: The duration to run the load generator, in hours. | ||
- **`image`**: The image to use for the load generator. This is the same as the `image` in the `Simulation` resource. | ||
- **`throttleRequests`**: WIP, not ready yet. The number of requests to send per second. This is the same as the `throttleRequests` in the `Simulation` resource. | ||
- **`tasks`**: The number of tasks to run. Increasing the number of tasks will increase the load on the node. A value of 2 generates a steady load of 20 requests per second. Values between 2-100 are recommended. Keep in mind the increase of tasks to throughput is non-linear. A value of 100 generates what we consider high load, which is 200 TPS. | ||
|
||
## Sample configuration | ||
|
||
```yaml | ||
apiVersion: "keramik.3box.io/v1alpha1" | ||
kind: LoadGenerator | ||
metadata: | ||
name: load-gen | ||
# Namespace of the network you wish to run against | ||
namespace: keramik-<unique-name>-small | ||
spec: | ||
scenario: CreateModelInstancesSynced | ||
runTime: 3 | ||
image: "keramik/runner:dev" | ||
throttleRequests: 20 | ||
tasks: 2 | ||
``` | ||
|
||
If you want to run this against a defined network, set the namespace to the same as the network. In this example, the namespace is set to the same network applied when [the network was set up](./setup_network.md). | ||
|
||
The load generator will automatically stop once the `runTime` is up. You should be able to see some success and error metrics at the end of the run. To see the metrics, you can use the `kubectl` command to get the logs of the load generator: | ||
|
||
|
||
```shell | ||
kubectl logs load-gen-<unique-string-for-each-run> -n keramik-<unique-name>-small | ||
``` | ||
You can get the name of the load-gen pod by running: | ||
|
||
```shell | ||
kubectl get pods -n keramik-<unique-name>-small | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,251 @@ | ||
use std::collections::HashMap; | ||
use std::path::PathBuf; | ||
|
||
use crate::load_generator::utils::{ | ||
CeramicConfig, CeramicDidType, CeramicScenarioParameters, StableLoadUser, | ||
}; | ||
use crate::utils::parse_peers_info; | ||
use crate::CommandResult; | ||
use anyhow::Result; | ||
use ceramic_core::StreamId; | ||
use clap::Args; | ||
use keramik_common::peer_info::Peer; | ||
use tokio::time::{Duration, Instant}; | ||
|
||
// TODO : Use this to envoke a particular scenario, currently we only have one | ||
// so this is unused | ||
#[allow(dead_code)] | ||
#[derive(Clone, Debug)] | ||
pub enum WeekLongSimulationScenarios { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about a different term instead of |
||
CreateModelInstancesSynced, | ||
} | ||
|
||
impl std::str::FromStr for WeekLongSimulationScenarios { | ||
type Err = String; | ||
|
||
fn from_str(s: &str) -> Result<Self, Self::Err> { | ||
match s { | ||
"CreateModelInstancesSynced" => { | ||
Ok(WeekLongSimulationScenarios::CreateModelInstancesSynced) | ||
} | ||
_ => Err(format!("Invalid scenario: {}", s)), | ||
} | ||
} | ||
} | ||
|
||
/// Options to Simulate command | ||
#[derive(Args, Debug, Clone)] | ||
pub struct WeekLongSimulationOpts { | ||
/// Simulation scenario to run. | ||
#[arg(long, env = "GENERATOR_SCENARIO")] | ||
scenario: WeekLongSimulationScenarios, | ||
|
||
/// Path to file containing the list of peers. | ||
/// File should contian JSON encoding of Vec<Peer>. | ||
#[arg(long, env = "GENERATOR_PEERS_PATH")] | ||
peers: PathBuf, | ||
|
||
/// Implmentation details: A task corresponds to a tokio task responsible | ||
/// for making requests. They should have low memory overhead, so you can | ||
/// create many tasks and then use `throttle_requests_rate` to constrain the overall | ||
/// throughput on the node (specifically the HTTP requests made). | ||
#[arg(long, default_value_t = 25, env = "GENERATOR_TASKS")] | ||
tasks: usize, | ||
|
||
/// Duration of the simulation in hours | ||
#[arg(long, env = "GENERATOR_RUN_TIME", default_value = "5h")] | ||
run_time: String, | ||
|
||
/// Unique value per test run to ensure uniqueness across different generator runs | ||
#[arg(long, env = "GENERATOR_NONCE")] | ||
nonce: u64, | ||
|
||
/// Option to throttle requests (per second) for load control | ||
#[arg(long, env = "GENERATOR_THROTTLE_REQUESTS_RATE")] | ||
throttle_requests_rate: Option<usize>, | ||
} | ||
|
||
//TODO : Use week long simulation scenario and separate out the logic which is ties to a particular scenario | ||
// TODO : This specific behavior is for createModelInstancesSynced scenario | ||
pub async fn simulate_load(opts: WeekLongSimulationOpts) -> Result<CommandResult> { | ||
let state = WeekLongSimulationState::try_from_opts(opts).await?; | ||
|
||
// Create two configs to simulate two independent nodes, each having it's own ceramic client | ||
let config_1 = state.initialize_config().await?; | ||
dav1do marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let config_2 = state.initialize_config().await?; | ||
|
||
let peer_addr_1 = state.peers[0] | ||
.ceramic_addr() | ||
.expect("Peer does not have a ceramic address"); | ||
let peer_addr_2 = state.peers[1] | ||
.ceramic_addr() | ||
.expect("Peer does not have a ceramic address"); | ||
|
||
// Create two users to simulate two independent nodes | ||
let stable_load_user_1 = | ||
StableLoadUser::setup_stability_test(config_1.admin_cli, Some(peer_addr_1.to_string())) | ||
.await; | ||
let stable_load_user_2 = | ||
StableLoadUser::setup_stability_test(config_2.admin_cli, Some(peer_addr_2.to_string())) | ||
.await; | ||
|
||
// Generate a model for the users to create | ||
let model = stable_load_user_1 | ||
.ceramic_utils | ||
.generate_random_model() | ||
.await?; | ||
|
||
// Index the model on the second node | ||
stable_load_user_2.ceramic_utils.index_model(&model).await?; | ||
|
||
let run_time: u64 = state | ||
.run_time | ||
.parse() | ||
.expect("Failed to parse run_time as u64"); | ||
|
||
println!("Model: {:?}", model); | ||
let model_instance_creation_result = | ||
create_model_instances_continuously(stable_load_user_1, model, run_time, state.tasks).await; | ||
println!( | ||
"Model instance creation result: {:?}", | ||
model_instance_creation_result | ||
); | ||
|
||
Ok(CommandResult::Success) | ||
} | ||
|
||
/** | ||
* Create model instances continuously | ||
* | ||
* @param stable_load_user The user to create the model instances | ||
* @param model The model schema to create model instances from | ||
* @param duration_in_hours The duration to run the simulation in hours | ||
* @return The result of the simulation | ||
*/ | ||
pub async fn create_model_instances_continuously( | ||
stable_load_user: StableLoadUser, | ||
model: StreamId, | ||
duration_in_hours: u64, | ||
tasks_count: usize, | ||
) -> Result<()> { | ||
let start_time = Instant::now(); | ||
|
||
let duration = Duration::from_secs(duration_in_hours * 60 * 60); | ||
dav1do marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let mut count = 0; | ||
let mut error_map: HashMap<String, u64> = HashMap::new(); | ||
// TODO : Make the rps configurable | ||
// TODO : Make the channel size configurable | ||
// TODO : Make the number of tasks configurable : tasks are currently 100 - | ||
dav1do marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// increasing tasks can help increase throughput | ||
let (tx, mut rx) = tokio::sync::mpsc::channel(10000); | ||
let mut tasks = tokio::task::JoinSet::new(); | ||
for i in 0..tasks_count { | ||
let user_clone = stable_load_user.clone(); | ||
let model = model.clone(); | ||
let tx = tx.clone(); | ||
tasks.spawn(async move { | ||
loop { | ||
if start_time.elapsed() > duration { | ||
println!("loop {i} Duration expired"); | ||
break; | ||
} | ||
match tokio::time::timeout( | ||
Duration::from_secs(5), | ||
user_clone.ceramic_utils.create_random_mid(&model), | ||
) | ||
.await | ||
{ | ||
Ok(Ok(mid)) => match tx.send(Ok(mid.to_string())).await { | ||
Ok(_) => {} | ||
Err(e) => { | ||
eprintln!("Failed to send MID: {}", e); | ||
} | ||
}, | ||
Ok(Err(e)) => match tx.send(Err(e.to_string())).await { | ||
Ok(_) => {} | ||
Err(e) => { | ||
eprintln!("Failed to send error: {}", e); | ||
} | ||
}, | ||
Err(e) => match tx.send(Err(e.to_string())).await { | ||
Ok(_) => {} | ||
Err(e) => { | ||
eprintln!("Failed to send error: {}", e); | ||
} | ||
}, | ||
} | ||
} | ||
}); | ||
} | ||
// Drop the tx sender, since the exit condition below requires the senders to be dropped for termination | ||
drop(tx); | ||
dav1do marked this conversation as resolved.
Show resolved
Hide resolved
|
||
loop { | ||
let mut mid_vec: Vec<Result<String, String>> = Vec::with_capacity(10); | ||
if rx.recv_many(&mut mid_vec, 10).await > 0 { | ||
for mid in mid_vec { | ||
match mid { | ||
Ok(_) => { | ||
count += 1; | ||
} | ||
Err(err) => { | ||
*error_map.entry(err).or_insert(0) += 1; | ||
} | ||
} | ||
} | ||
} | ||
// Add a small buffer to the duration to account for the time it takes to send the MIDs | ||
if start_time.elapsed() > duration + Duration::from_secs(5) { | ||
tasks.abort_all(); | ||
break; | ||
} | ||
} | ||
// After the loop, print the error map | ||
// TODO : Add observability to this, report these errors/counts | ||
println!("Error counts:"); | ||
for (error, count) in &error_map { | ||
println!("Error: {}, Count: {}", error, count); | ||
} | ||
println!("Created {} MIDs in {} hours", count, duration_in_hours); | ||
println!( | ||
"Failed to create {} MIDs in {} hours", | ||
error_map.values().sum::<u64>(), | ||
duration_in_hours | ||
); | ||
Ok(()) | ||
} | ||
|
||
struct WeekLongSimulationState { | ||
pub peers: Vec<Peer>, | ||
pub run_time: String, | ||
pub tasks: usize, | ||
} | ||
|
||
impl WeekLongSimulationState { | ||
/** | ||
* Try to create a new instance of the WeekLongSimulationState from the given options | ||
* | ||
* @param opts The options to use | ||
* @return The created instance | ||
*/ | ||
async fn try_from_opts(opts: WeekLongSimulationOpts) -> Result<Self> { | ||
Ok(Self { | ||
peers: parse_peers_info(opts.peers.clone()).await?, | ||
run_time: opts.run_time, | ||
tasks: opts.tasks, | ||
}) | ||
} | ||
|
||
/** | ||
* Initialize the configuration for the WeekLongSimulationState | ||
* | ||
* @return The created configuration | ||
*/ | ||
async fn initialize_config(&self) -> Result<CeramicConfig> { | ||
// Create a CeramicScenarioParameters instance with default values | ||
let params = CeramicScenarioParameters { | ||
did_type: CeramicDidType::EnvInjected, | ||
}; | ||
|
||
CeramicConfig::initialize_config(params).await | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
pub mod gen; | ||
pub mod utils; |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we want to run update as part of these tasks. Adding an update step is fine, but we should run
make update
and commit the lockfile in a commit. I suspect this will cause unstaged changes in CI that could fail the pipeline.