From 345601a6cb95a31c00fc66cffff9260b485d2fb9 Mon Sep 17 00:00:00 2001 From: Danny Browning Date: Tue, 5 Sep 2023 18:15:28 -0600 Subject: [PATCH] feat: Use redis to share information across workers --- Cargo.lock | 42 ++++++ runner/Cargo.toml | 1 + runner/src/scenario/ceramic/mod.rs | 3 +- runner/src/scenario/ceramic/model_reuse.rs | 148 +++++++++++++++++++++ runner/src/scenario/mod.rs | 9 ++ runner/src/simulate.rs | 7 +- 6 files changed, 208 insertions(+), 2 deletions(-) create mode 100644 runner/src/scenario/ceramic/model_reuse.rs diff --git a/Cargo.lock b/Cargo.lock index bf492cd5..cf29f734 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1055,6 +1055,20 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "337cdbf3f1a0e643b4a7d1a2ffa39d22342fb6ee25739b5cfb997c28b3586422" +[[package]] +name = "combine" +version = "4.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "concurrent-queue" version = "2.2.0" @@ -2853,6 +2867,7 @@ dependencies = [ "opentelemetry-otlp", "prometheus-client", "rand 0.8.5", + "redis", "reqwest", "schemars", "serde", @@ -4412,6 +4427,27 @@ dependencies = [ "locspan-derive", ] +[[package]] +name = "redis" +version = "0.23.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f49cdc0bb3f412bf8e7d1bd90fe1d9eb10bc5c399ba90973c14662a27b3f8ba" +dependencies = [ + "async-trait", + "bytes", + "combine", + "futures-util", + "itoa", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2 0.4.9", + "tokio", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -4992,6 +5028,12 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "sha1_smol" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" + [[package]] name = "sha2" version = "0.8.2" diff --git a/runner/Cargo.toml b/runner/Cargo.toml index 678b9073..6606f0d5 100644 --- a/runner/Cargo.toml +++ b/runner/Cargo.toml @@ -22,6 +22,7 @@ opentelemetry-otlp.workspace = true opentelemetry.workspace = true prometheus-client = "0.19" rand = "0.8.5" +redis = { version = "0.23.2", features = ["tokio-comp"] } reqwest.workspace = true schemars = "0.8.12" serde.workspace = true diff --git a/runner/src/scenario/ceramic/mod.rs b/runner/src/scenario/ceramic/mod.rs index 08b963bb..02114ee3 100644 --- a/runner/src/scenario/ceramic/mod.rs +++ b/runner/src/scenario/ceramic/mod.rs @@ -1,7 +1,8 @@ +pub mod model_reuse; mod models; pub mod new_streams; pub mod query; -mod util; +pub mod util; pub mod write_only; use crate::goose_try; diff --git a/runner/src/scenario/ceramic/model_reuse.rs b/runner/src/scenario/ceramic/model_reuse.rs new file mode 100644 index 00000000..9049ed69 --- /dev/null +++ b/runner/src/scenario/ceramic/model_reuse.rs @@ -0,0 +1,148 @@ +use crate::goose_try; +use crate::scenario::ceramic::models::LargeModel; +use crate::scenario::ceramic::util::{goose_error, index_model, setup_model, setup_model_instance}; +use crate::scenario::ceramic::{CeramicClient, Credentials}; +use crate::scenario::get_redis_client; +use ceramic_http_client::api::StreamsResponseOrError; +use ceramic_http_client::ceramic_event::{JwkSigner, StreamId}; +use ceramic_http_client::{CeramicHttpClient, ModelAccountRelation, ModelDefinition}; +use goose::prelude::*; +use redis::AsyncCommands; +use std::str::FromStr; +use std::{sync::Arc, time::Duration}; +use tracing::instrument; + +#[derive(Clone)] +pub struct LoadTestUserData { + cli: CeramicHttpClient, + redis_cli: redis::Client, + model_id: StreamId, +} + +const MODEL_ID_KEY: &str = "model_reuse_model_id"; +const MODEL_INSTANCE_ID_KEY: &str = "model_reuse_model_instance_id"; + +pub async fn scenario() -> Result { + let creds = Credentials::from_env().await.map_err(goose_error)?; + let cli = CeramicHttpClient::new(creds.signer); + let redis_cli = get_redis_client().await?; + + let test_start = Transaction::new(Arc::new(move |user| { + Box::pin(setup(user, cli.clone(), redis_cli.clone())) + })) + .set_name("setup") + .set_on_start(); + + let create_instance_tx = transaction!(create_instance).set_name("create_instance"); + let get_instance_tx = transaction!(get_instance).set_name("get_instance"); + + Ok(scenario!("CeramicModelReuseScenario") + // After each transactions runs, sleep randomly from 1 to 5 seconds. + .set_wait_time(Duration::from_secs(1), Duration::from_secs(5))? + .register_transaction(test_start) + .register_transaction(create_instance_tx) + .register_transaction(get_instance_tx)) +} + +async fn get_model_id(conn: &mut redis::aio::Connection) -> StreamId { + loop { + if conn.exists(MODEL_ID_KEY).await.unwrap() { + let id: String = conn.get(MODEL_ID_KEY).await.unwrap(); + return StreamId::from_str(&id).unwrap(); + } else { + tokio::time::sleep(Duration::from_millis(100)).await; + } + } +} + +#[instrument(skip_all, fields(user.index = user.weighted_users_index), ret)] +async fn setup( + user: &mut GooseUser, + cli: CeramicClient, + redis_cli: redis::Client, +) -> TransactionResult { + let mut conn = redis_cli.get_async_connection().await.unwrap(); + let model_id = if user.weighted_users_index == 0 { + let model_definition = ModelDefinition::new::( + "model_reuse_query_model", + ModelAccountRelation::List, + ) + .unwrap(); + let model_id = setup_model(user, &cli, model_definition).await?; + index_model(user, &cli, &model_id).await?; + + let _: () = conn.set(MODEL_ID_KEY, model_id.to_string()).await.unwrap(); + + model_id + } else { + get_model_id(&mut conn).await + }; + + let user_data = LoadTestUserData { + cli, + redis_cli, + model_id, + }; + + user.set_session_data(user_data); + + Ok(()) +} + +async fn create_instance(user: &mut GooseUser) -> TransactionResult { + let user_data: LoadTestUserData = { + let data: &LoadTestUserData = user.get_session_data_unchecked(); + data.clone() + }; + let cli = &user_data.cli; + let mut conn = user_data.redis_cli.get_async_connection().await.unwrap(); + + let id = setup_model_instance( + user, + cli, + &user_data.model_id, + &LargeModel { + creator: "keramik".to_string(), + name: "model-reuse-model-instance".to_string(), + description: "a".to_string(), + tpe: 10, + }, + ) + .await?; + + let _: () = conn + .rpush(MODEL_INSTANCE_ID_KEY, id.to_string()) + .await + .unwrap(); + + Ok(()) +} + +async fn get_model_instance_id(conn: &mut redis::aio::Connection) -> StreamId { + loop { + let len: usize = conn.llen(MODEL_INSTANCE_ID_KEY).await.unwrap(); + if len > 0 { + let id: String = conn.lpop(MODEL_INSTANCE_ID_KEY, None).await.unwrap(); + return StreamId::from_str(&id).unwrap(); + } else { + tokio::time::sleep(Duration::from_millis(100)).await; + } + } +} + +async fn get_instance(user: &mut GooseUser) -> TransactionResult { + let user_data: &LoadTestUserData = user.get_session_data_unchecked(); + let cli: &CeramicClient = &user_data.cli; + let mut redis_conn = user_data.redis_cli.get_async_connection().await.unwrap(); + let model_instance_id = get_model_instance_id(&mut redis_conn).await; + let url = user.build_url(&format!("{}/{}", cli.streams_endpoint(), model_instance_id,))?; + let mut goose = user.get(&url).await?; + let resp: StreamsResponseOrError = goose.response?.json().await?; + goose_try!( + user, + "get", + &mut goose.request, + resp.resolve("get_instance") + )?; + Ok(()) +} diff --git a/runner/src/scenario/mod.rs b/runner/src/scenario/mod.rs index b42e2efc..fbeb1cd9 100644 --- a/runner/src/scenario/mod.rs +++ b/runner/src/scenario/mod.rs @@ -1,2 +1,11 @@ +use crate::scenario::ceramic::util::goose_error; +use goose::GooseError; + pub mod ceramic; pub mod ipfs_block_fetch; + +pub async fn get_redis_client() -> Result { + let redis_host = + std::env::var("REDIS_CONNECTION_STRING").unwrap_or("redis://redis:6379".to_string()); + redis::Client::open(redis_host).map_err(|e| goose_error(e.into())) +} diff --git a/runner/src/simulate.rs b/runner/src/simulate.rs index 71c9f661..d952896a 100644 --- a/runner/src/simulate.rs +++ b/runner/src/simulate.rs @@ -68,6 +68,8 @@ pub enum Scenario { CeramicNewStreams, /// Simple Query Scenario CeramicQuery, + /// Scenario to reuse the same model id and query instances across workers + CeramicModelReuse, } impl Scenario { @@ -78,6 +80,7 @@ impl Scenario { Scenario::CeramicWriteOnly => "ceramic_write_only", Scenario::CeramicNewStreams => "ceramic_new_streams", Scenario::CeramicQuery => "ceramic_query", + Scenario::CeramicModelReuse => "ceramic_model_reuse", } } @@ -87,7 +90,8 @@ impl Scenario { Self::CeramicSimple | Self::CeramicWriteOnly | Self::CeramicNewStreams - | Self::CeramicQuery => match peer { + | Self::CeramicQuery + | Self::CeramicModelReuse => match peer { Peer::Ceramic(peer) => Ok(peer.ceramic_addr.clone()), Peer::Ipfs(_) => Err(anyhow!( "cannot use non ceramic peer as target for simulation {}", @@ -125,6 +129,7 @@ pub async fn simulate(opts: Opts) -> Result<()> { Scenario::CeramicWriteOnly => ceramic::write_only::scenario().await?, Scenario::CeramicNewStreams => ceramic::new_streams::scenario().await?, Scenario::CeramicQuery => ceramic::query::scenario().await?, + Scenario::CeramicModelReuse => ceramic::model_reuse::scenario().await?, }; let config = if opts.manager { manager_config(peers.len(), opts.users, opts.run_time)