Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use redis to share information across workers #78

Merged
merged 1 commit into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ opentelemetry-otlp.workspace = true
opentelemetry.workspace = true
prometheus-client = "0.19"
rand = "0.8.5"
redis = { version = "0.23.2", features = ["tokio-comp"] }
reqwest.workspace = true
schemars = "0.8.12"
serde.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion runner/src/scenario/ceramic/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
pub mod model_reuse;
mod models;
pub mod new_streams;
pub mod query;
mod util;
pub mod util;
pub mod write_only;

use crate::goose_try;
Expand Down
148 changes: 148 additions & 0 deletions runner/src/scenario/ceramic/model_reuse.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use crate::goose_try;
use crate::scenario::ceramic::models::LargeModel;
use crate::scenario::ceramic::util::{goose_error, index_model, setup_model, setup_model_instance};
use crate::scenario::ceramic::{CeramicClient, Credentials};
use crate::scenario::get_redis_client;
use ceramic_http_client::api::StreamsResponseOrError;
use ceramic_http_client::ceramic_event::{JwkSigner, StreamId};
use ceramic_http_client::{CeramicHttpClient, ModelAccountRelation, ModelDefinition};
use goose::prelude::*;
use redis::AsyncCommands;
use std::str::FromStr;
use std::{sync::Arc, time::Duration};
use tracing::instrument;

#[derive(Clone)]
pub struct LoadTestUserData {
cli: CeramicHttpClient<JwkSigner>,
redis_cli: redis::Client,
model_id: StreamId,
}

const MODEL_ID_KEY: &str = "model_reuse_model_id";
const MODEL_INSTANCE_ID_KEY: &str = "model_reuse_model_instance_id";

pub async fn scenario() -> Result<Scenario, GooseError> {
let creds = Credentials::from_env().await.map_err(goose_error)?;
let cli = CeramicHttpClient::new(creds.signer);
let redis_cli = get_redis_client().await?;

let test_start = Transaction::new(Arc::new(move |user| {
Box::pin(setup(user, cli.clone(), redis_cli.clone()))
}))
.set_name("setup")
.set_on_start();

let create_instance_tx = transaction!(create_instance).set_name("create_instance");
let get_instance_tx = transaction!(get_instance).set_name("get_instance");

Ok(scenario!("CeramicModelReuseScenario")
// After each transactions runs, sleep randomly from 1 to 5 seconds.
.set_wait_time(Duration::from_secs(1), Duration::from_secs(5))?
.register_transaction(test_start)
.register_transaction(create_instance_tx)
.register_transaction(get_instance_tx))
}

async fn get_model_id(conn: &mut redis::aio::Connection) -> StreamId {
loop {
if conn.exists(MODEL_ID_KEY).await.unwrap() {
let id: String = conn.get(MODEL_ID_KEY).await.unwrap();
return StreamId::from_str(&id).unwrap();
} else {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}

#[instrument(skip_all, fields(user.index = user.weighted_users_index), ret)]
async fn setup(
user: &mut GooseUser,
cli: CeramicClient,
redis_cli: redis::Client,
) -> TransactionResult {
let mut conn = redis_cli.get_async_connection().await.unwrap();
let model_id = if user.weighted_users_index == 0 {
let model_definition = ModelDefinition::new::<LargeModel>(
"model_reuse_query_model",
ModelAccountRelation::List,
)
.unwrap();
let model_id = setup_model(user, &cli, model_definition).await?;
index_model(user, &cli, &model_id).await?;

let _: () = conn.set(MODEL_ID_KEY, model_id.to_string()).await.unwrap();

model_id
} else {
get_model_id(&mut conn).await
};

let user_data = LoadTestUserData {
cli,
redis_cli,
model_id,
};

user.set_session_data(user_data);

Ok(())
}

async fn create_instance(user: &mut GooseUser) -> TransactionResult {
let user_data: LoadTestUserData = {
let data: &LoadTestUserData = user.get_session_data_unchecked();
data.clone()
};
let cli = &user_data.cli;
let mut conn = user_data.redis_cli.get_async_connection().await.unwrap();

let id = setup_model_instance(
user,
cli,
&user_data.model_id,
&LargeModel {
creator: "keramik".to_string(),
name: "model-reuse-model-instance".to_string(),
description: "a".to_string(),
tpe: 10,
},
)
.await?;

let _: () = conn
.rpush(MODEL_INSTANCE_ID_KEY, id.to_string())
.await
.unwrap();

Ok(())
}

async fn get_model_instance_id(conn: &mut redis::aio::Connection) -> StreamId {
loop {
let len: usize = conn.llen(MODEL_INSTANCE_ID_KEY).await.unwrap();
if len > 0 {
let id: String = conn.lpop(MODEL_INSTANCE_ID_KEY, None).await.unwrap();
return StreamId::from_str(&id).unwrap();
} else {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}

async fn get_instance(user: &mut GooseUser) -> TransactionResult {
let user_data: &LoadTestUserData = user.get_session_data_unchecked();
let cli: &CeramicClient = &user_data.cli;
let mut redis_conn = user_data.redis_cli.get_async_connection().await.unwrap();
let model_instance_id = get_model_instance_id(&mut redis_conn).await;
let url = user.build_url(&format!("{}/{}", cli.streams_endpoint(), model_instance_id,))?;
let mut goose = user.get(&url).await?;
let resp: StreamsResponseOrError = goose.response?.json().await?;
goose_try!(
user,
"get",
&mut goose.request,
resp.resolve("get_instance")
)?;
Ok(())
}
9 changes: 9 additions & 0 deletions runner/src/scenario/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,11 @@
use crate::scenario::ceramic::util::goose_error;
use goose::GooseError;

pub mod ceramic;
pub mod ipfs_block_fetch;

pub async fn get_redis_client() -> Result<redis::Client, GooseError> {
let redis_host =
std::env::var("REDIS_CONNECTION_STRING").unwrap_or("redis://redis:6379".to_string());
redis::Client::open(redis_host).map_err(|e| goose_error(e.into()))
}
7 changes: 6 additions & 1 deletion runner/src/simulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub enum Scenario {
CeramicNewStreams,
/// Simple Query Scenario
CeramicQuery,
/// Scenario to reuse the same model id and query instances across workers
CeramicModelReuse,
}

impl Scenario {
Expand All @@ -78,6 +80,7 @@ impl Scenario {
Scenario::CeramicWriteOnly => "ceramic_write_only",
Scenario::CeramicNewStreams => "ceramic_new_streams",
Scenario::CeramicQuery => "ceramic_query",
Scenario::CeramicModelReuse => "ceramic_model_reuse",
}
}

Expand All @@ -87,7 +90,8 @@ impl Scenario {
Self::CeramicSimple
| Self::CeramicWriteOnly
| Self::CeramicNewStreams
| Self::CeramicQuery => match peer {
| Self::CeramicQuery
| Self::CeramicModelReuse => match peer {
Peer::Ceramic(peer) => Ok(peer.ceramic_addr.clone()),
Peer::Ipfs(_) => Err(anyhow!(
"cannot use non ceramic peer as target for simulation {}",
Expand Down Expand Up @@ -125,6 +129,7 @@ pub async fn simulate(opts: Opts) -> Result<()> {
Scenario::CeramicWriteOnly => ceramic::write_only::scenario().await?,
Scenario::CeramicNewStreams => ceramic::new_streams::scenario().await?,
Scenario::CeramicQuery => ceramic::query::scenario().await?,
Scenario::CeramicModelReuse => ceramic::model_reuse::scenario().await?,
};
let config = if opts.manager {
manager_config(peers.len(), opts.users, opts.run_time)
Expand Down