Skip to content

Commit

Permalink
feat: Use redis to share information across workers
Browse files Browse the repository at this point in the history
  • Loading branch information
dbcfd committed Sep 13, 2023
1 parent ae9223a commit c130444
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 2 deletions.
42 changes: 42 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ opentelemetry-otlp.workspace = true
opentelemetry.workspace = true
prometheus-client = "0.19"
rand = "0.8.5"
redis = { version = "0.23.2", features = ["tokio-comp"] }
reqwest.workspace = true
schemars = "0.8.12"
serde.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion runner/src/scenario/ceramic/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
pub mod model_reuse;
mod models;
pub mod new_streams;
pub mod query;
mod util;
pub mod util;
pub mod write_only;

use crate::goose_try;
Expand Down
148 changes: 148 additions & 0 deletions runner/src/scenario/ceramic/model_reuse.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use crate::goose_try;
use crate::scenario::ceramic::models::LargeModel;
use crate::scenario::ceramic::util::{goose_error, index_model, setup_model, setup_model_instance};
use crate::scenario::ceramic::{CeramicClient, Credentials};
use crate::scenario::get_redis_client;
use ceramic_http_client::api::StreamsResponseOrError;
use ceramic_http_client::ceramic_event::{JwkSigner, StreamId};
use ceramic_http_client::{CeramicHttpClient, ModelAccountRelation, ModelDefinition};
use goose::prelude::*;
use redis::AsyncCommands;
use std::str::FromStr;
use std::{sync::Arc, time::Duration};
use tracing::instrument;

#[derive(Clone)]
pub struct LoadTestUserData {
cli: CeramicHttpClient<JwkSigner>,
redis_cli: redis::Client,
model_id: StreamId,
}

const MODEL_ID_KEY: &str = "model_reuse_model_id";
const MODEL_INSTANCE_ID_KEY: &str = "model_reuse_model_instance_id";

pub async fn scenario() -> Result<Scenario, GooseError> {
let creds = Credentials::from_env().await.map_err(goose_error)?;
let cli = CeramicHttpClient::new(creds.signer);
let redis_cli = get_redis_client().await?;

let test_start = Transaction::new(Arc::new(move |user| {
Box::pin(setup(user, cli.clone(), redis_cli.clone()))
}))
.set_name("setup")
.set_on_start();

let create_instance_tx = transaction!(create_instance).set_name("create_instance");
let get_instance_tx = transaction!(get_instance).set_name("get_instance");

Ok(scenario!("CeramicModelReuseScenario")
// After each transactions runs, sleep randomly from 1 to 5 seconds.
.set_wait_time(Duration::from_secs(1), Duration::from_secs(5))?
.register_transaction(test_start)
.register_transaction(create_instance_tx)
.register_transaction(get_instance_tx))
}

async fn get_model_id(conn: &mut redis::aio::Connection) -> StreamId {
loop {
if conn.exists(MODEL_ID_KEY).await.unwrap() {
let id: String = conn.get(MODEL_ID_KEY).await.unwrap();
return StreamId::from_str(&id).unwrap();
} else {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}

#[instrument(skip_all, fields(user.index = user.weighted_users_index), ret)]
async fn setup(
user: &mut GooseUser,
cli: CeramicClient,
redis_cli: redis::Client,
) -> TransactionResult {
let mut conn = redis_cli.get_async_connection().await.unwrap();
let model_id = if user.weighted_users_index == 0 {
let model_definition = ModelDefinition::new::<LargeModel>(
"model_reuse_query_model",
ModelAccountRelation::List,
)
.unwrap();
let model_id = setup_model(user, &cli, model_definition).await?;
index_model(user, &cli, &model_id).await?;

let _: () = conn.set(MODEL_ID_KEY, model_id.to_string()).await.unwrap();

model_id
} else {
get_model_id(&mut conn).await
};

let user_data = LoadTestUserData {
cli,
redis_cli,
model_id,
};

user.set_session_data(user_data);

Ok(())
}

async fn create_instance(user: &mut GooseUser) -> TransactionResult {
let user_data: LoadTestUserData = {
let data: &LoadTestUserData = user.get_session_data_unchecked();
data.clone()
};
let cli = &user_data.cli;
let mut conn = user_data.redis_cli.get_async_connection().await.unwrap();

let id = setup_model_instance(
user,
cli,
&user_data.model_id,
&LargeModel {
creator: "keramik".to_string(),
name: "model-reuse-model-instance".to_string(),
description: "a".to_string(),
tpe: 10,
},
)
.await?;

let _: () = conn
.rpush(MODEL_INSTANCE_ID_KEY, id.to_string())
.await
.unwrap();

Ok(())
}

async fn get_model_instance_id(conn: &mut redis::aio::Connection) -> StreamId {
loop {
let len: usize = conn.llen(MODEL_INSTANCE_ID_KEY).await.unwrap();
if len > 0 {
let id: String = conn.lpop(MODEL_INSTANCE_ID_KEY, None).await.unwrap();
return StreamId::from_str(&id).unwrap();
} else {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}

async fn get_instance(user: &mut GooseUser) -> TransactionResult {
let user_data: &LoadTestUserData = user.get_session_data_unchecked();
let cli: &CeramicClient = &user_data.cli;
let mut redis_conn = user_data.redis_cli.get_async_connection().await.unwrap();
let model_instance_id = get_model_instance_id(&mut redis_conn).await;
let url = user.build_url(&format!("{}/{}", cli.streams_endpoint(), model_instance_id,))?;
let mut goose = user.get(&url).await?;
let resp: StreamsResponseOrError = goose.response?.json().await?;
goose_try!(
user,
"get",
&mut goose.request,
resp.resolve("get_instance")
)?;
Ok(())
}
9 changes: 9 additions & 0 deletions runner/src/scenario/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,11 @@
use crate::scenario::ceramic::util::goose_error;
use goose::GooseError;

pub mod ceramic;
pub mod ipfs_block_fetch;

pub async fn get_redis_client() -> Result<redis::Client, GooseError> {
let redis_host =
std::env::var("REDIS_CONNECTION_STRING").unwrap_or("redis://redis:6379".to_string());
redis::Client::open(redis_host).map_err(|e| goose_error(e.into()))
}
7 changes: 6 additions & 1 deletion runner/src/simulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub enum Scenario {
CeramicNewStreams,
/// Simple Query Scenario
CeramicQuery,
/// Scenario to reuse the same model id and query instances across workers
CeramicModelReuse,
}

impl Scenario {
Expand All @@ -78,6 +80,7 @@ impl Scenario {
Scenario::CeramicWriteOnly => "ceramic_write_only",
Scenario::CeramicNewStreams => "ceramic_new_streams",
Scenario::CeramicQuery => "ceramic_query",
Scenario::CeramicModelReuse => "ceramic_model_reuse",
}
}

Expand All @@ -87,7 +90,8 @@ impl Scenario {
Self::CeramicSimple
| Self::CeramicWriteOnly
| Self::CeramicNewStreams
| Self::CeramicQuery => match peer {
| Self::CeramicQuery
| Self::CeramicModelReuse => match peer {
Peer::Ceramic(peer) => Ok(peer.ceramic_addr.clone()),
Peer::Ipfs(_) => Err(anyhow!(
"cannot use non ceramic peer as target for simulation {}",
Expand Down Expand Up @@ -125,6 +129,7 @@ pub async fn simulate(opts: Opts) -> Result<()> {
Scenario::CeramicWriteOnly => ceramic::write_only::scenario().await?,
Scenario::CeramicNewStreams => ceramic::new_streams::scenario().await?,
Scenario::CeramicQuery => ceramic::query::scenario().await?,
Scenario::CeramicModelReuse => ceramic::model_reuse::scenario().await?,
};
let config = if opts.manager {
manager_config(peers.len(), opts.users, opts.run_time)
Expand Down

0 comments on commit c130444

Please sign in to comment.