diff --git a/Cargo.lock b/Cargo.lock index 0040375c..319ecef3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2847,6 +2847,7 @@ dependencies = [ "multibase 0.9.1", "multihash 0.19.1", "multihash-codetable", + "once_cell", "opentelemetry", "rand", "redis", diff --git a/operator/src/simulation/controller.rs b/operator/src/simulation/controller.rs index 5b413b65..23587498 100644 --- a/operator/src/simulation/controller.rs +++ b/operator/src/simulation/controller.rs @@ -183,6 +183,9 @@ async fn reconcile_( throttle_requests: spec.throttle_requests, success_request_target: spec.success_request_target, log_level: spec.log_level.clone(), + anchor_wait_time: spec.anchor_wait_time, + cas_network: spec.cas_network.clone(), + cas_controller: spec.cas_controller.clone(), }; apply_manager(cx.clone(), &ns, simulation.clone(), manager_config).await?; @@ -765,4 +768,103 @@ mod tests { .expect("reconciler"); timeout_after_1s(mocksrv).await; } + + #[tokio::test] + #[traced_test] + async fn reconcile_anchor_wait_time() { + let mock_rpc_client = MockIpfsRpcClientTest::new(); + let (testctx, api_handle) = Context::test(mock_rpc_client); + let fakeserver = ApiServerVerifier::new(api_handle); + let simulation = Simulation::test().with_spec(SimulationSpec { + anchor_wait_time: Some(15), // Set anchor wait time to 15 minutes + ..Default::default() + }); + let mut stub = Stub::default(); + stub.manager_job.patch(expect![[r#" + --- original + +++ modified + @@ -87,6 +87,10 @@ + "name": "ceramic-admin" + } + } + + }, + + { + + "name": "SIMULATE_ANCHOR_WAIT_TIME", + + "value": "15" + } + ], + "image": "public.ecr.aws/r5b3e0r5/3box/keramik-runner:latest", + "#]]); + let mocksrv = stub.run(fakeserver); + reconcile(Arc::new(simulation), testctx) + .await + .expect("reconciler"); + timeout_after_1s(mocksrv).await; + } + + #[tokio::test] + #[traced_test] + async fn reconcile_cas_network() { + let mock_rpc_client = MockIpfsRpcClientTest::new(); + let (testctx, api_handle) = Context::test(mock_rpc_client); + let fakeserver = ApiServerVerifier::new(api_handle); + let simulation = Simulation::test().with_spec(SimulationSpec { + cas_network: Some("test-cas-network".to_owned()), + ..Default::default() + }); + let mut stub = Stub::default(); + stub.manager_job.patch(expect![[r#" + --- original + +++ modified + @@ -87,6 +87,10 @@ + "name": "ceramic-admin" + } + } + + }, + + { + + "name": "SIMULATE_CAS_NETWORK", + + "value": "test-cas-network" + } + ], + "image": "public.ecr.aws/r5b3e0r5/3box/keramik-runner:latest", + "#]]); + let mocksrv = stub.run(fakeserver); + reconcile(Arc::new(simulation), testctx) + .await + .expect("reconciler"); + timeout_after_1s(mocksrv).await; + } + + #[tokio::test] + #[traced_test] + async fn reconcile_cas_controller() { + let mock_rpc_client = MockIpfsRpcClientTest::new(); + let (testctx, api_handle) = Context::test(mock_rpc_client); + let fakeserver = ApiServerVerifier::new(api_handle); + let simulation = Simulation::test().with_spec(SimulationSpec { + cas_controller: Some("test-cas-controller".to_owned()), + ..Default::default() + }); + let mut stub = Stub::default(); + stub.manager_job.patch(expect![[r#" + --- original + +++ modified + @@ -87,6 +87,10 @@ + "name": "ceramic-admin" + } + } + + }, + + { + + "name": "SIMULATE_CAS_CONTROLLER", + + "value": "test-cas-controller" + } + ], + "image": "public.ecr.aws/r5b3e0r5/3box/keramik-runner:latest", +"#]]); + let mocksrv = stub.run(fakeserver); + reconcile(Arc::new(simulation), testctx) + .await + .expect("reconciler"); + timeout_after_1s(mocksrv).await; + } } diff --git a/operator/src/simulation/manager.rs b/operator/src/simulation/manager.rs index 590b21b8..963e271a 100644 --- a/operator/src/simulation/manager.rs +++ b/operator/src/simulation/manager.rs @@ -38,6 +38,9 @@ pub struct ManagerConfig { pub job_image_config: JobImageConfig, pub success_request_target: Option, pub log_level: Option, + pub anchor_wait_time: Option, + pub cas_network: Option, + pub cas_controller: Option, } pub fn manager_job_spec(config: ManagerConfig) -> JobSpec { @@ -131,6 +134,27 @@ pub fn manager_job_spec(config: ManagerConfig) -> JobSpec { ..Default::default() }) } + if let Some(anchor_wait_time) = config.anchor_wait_time { + env_vars.push(EnvVar { + name: "SIMULATE_ANCHOR_WAIT_TIME".to_owned(), + value: Some(anchor_wait_time.to_string()), + ..Default::default() + }) + } + if let Some(cas_network) = config.cas_network { + env_vars.push(EnvVar { + name: "SIMULATE_CAS_NETWORK".to_owned(), + value: Some(cas_network.to_owned()), + ..Default::default() + }) + } + if let Some(cas_controller) = config.cas_controller { + env_vars.push(EnvVar { + name: "SIMULATE_CAS_CONTROLLER".to_owned(), + value: Some(cas_controller.to_owned()), + ..Default::default() + }) + } if let Some(log_level) = config.log_level { env_vars.push(EnvVar { name: "SIMULATE_LOG_LEVEL".to_owned(), diff --git a/operator/src/simulation/spec.rs b/operator/src/simulation/spec.rs index cc4296c3..f1bc93bc 100644 --- a/operator/src/simulation/spec.rs +++ b/operator/src/simulation/spec.rs @@ -39,6 +39,12 @@ pub struct SimulationSpec { /// which consumes RAM, and will disappear if there is no persistent volume when the pod exits. /// Valid values: 'warn', 'info', 'debug', 'trace'. Defaults to None meaning no logging beyond RUST_LOG. pub(crate) log_level: Option, + /// Anchor wait time in seconds, use with ceramic-anchoring-benchmark scenario + pub anchor_wait_time: Option, + /// Network type to use for the simulation, use with cas-benchmark scenario + pub cas_network: Option, + /// Controller DID for the simulation, use with cas-benchmark scenario + pub cas_controller: Option, } impl Simulation { diff --git a/runner/Cargo.toml b/runner/Cargo.toml index b31fe9d4..cb5dbb99 100644 --- a/runner/Cargo.toml +++ b/runner/Cargo.toml @@ -30,6 +30,7 @@ serde_ipld_dagcbor = "0.6" serde_ipld_dagjson = "0.2" schemars.workspace = true serde_json.workspace = true +once_cell = "1.19.0" tokio.workspace = true tracing.workspace = true tracing-subscriber.workspace = true diff --git a/runner/src/scenario/ceramic/anchor.rs b/runner/src/scenario/ceramic/anchor.rs index d43a31a1..4c83a335 100644 --- a/runner/src/scenario/ceramic/anchor.rs +++ b/runner/src/scenario/ceramic/anchor.rs @@ -74,11 +74,13 @@ pub async fn stream_tip_car( pub async fn create_anchor_request_on_cas( user: &mut GooseUser, conn: Arc>, + cas_network: Option, + cas_controller: Option, ) -> TransactionResult { - let cas_service_url = std::env::var("CAS_SERVICE_URL") - .unwrap_or_else(|_| "https://cas-dev.3boxlabs.com".to_string()); - let node_controller = std::env::var("node_controller") - .unwrap_or_else(|_| "did:key:z6Mkh3pajt5brscshuDrCCber9nC9Ujpi7EcECveKtJPMEPo".to_string()); + let cas_service_url = + cas_network.unwrap_or_else(|| "https://cas-dev-direct.3boxlabs.com".to_string()); + let node_controller = cas_controller + .unwrap_or_else(|| "did:key:z6Mkh3pajt5brscshuDrCCber9nC9Ujpi7EcECveKtJPMEPo".to_string()); let (stream_id, genesis_cid, genesis_block) = crate::scenario::util::create_stream().unwrap(); let (root_cid, car_bytes) = stream_tip_car( @@ -118,13 +120,20 @@ pub async fn create_anchor_request_on_cas( Ok(()) } -pub async fn cas_benchmark() -> Result { +pub async fn cas_benchmark( + cas_network: Option, + cas_controller: Option, +) -> Result { let redis_cli = get_redis_client().await.unwrap(); let multiplexed_conn = redis_cli.get_multiplexed_tokio_connection().await.unwrap(); let conn_mutex = Arc::new(Mutex::new(multiplexed_conn)); let create_anchor_request = Transaction::new(Arc::new(move |user| { let conn_mutex_clone = conn_mutex.clone(); - Box::pin(async move { create_anchor_request_on_cas(user, conn_mutex_clone).await }) + let cas_network = cas_network.clone(); + let cas_controller = cas_controller.clone(); + Box::pin(async move { + create_anchor_request_on_cas(user, conn_mutex_clone, cas_network, cas_controller).await + }) })) .set_name("create_anchor_request"); diff --git a/runner/src/scenario/ceramic/mod.rs b/runner/src/scenario/ceramic/mod.rs index 2b93d59b..0aab8cb8 100644 --- a/runner/src/scenario/ceramic/mod.rs +++ b/runner/src/scenario/ceramic/mod.rs @@ -21,6 +21,7 @@ pub type CeramicClient = CeramicHttpClient; #[derive(Clone)] pub struct Credentials { pub signer: JwkSigner, + #[allow(dead_code)] pub did: Document, } diff --git a/runner/src/scenario/ceramic/model_instance.rs b/runner/src/scenario/ceramic/model_instance.rs index 8adfbabc..292a37c0 100644 --- a/runner/src/scenario/ceramic/model_instance.rs +++ b/runner/src/scenario/ceramic/model_instance.rs @@ -73,6 +73,7 @@ pub struct EnvBasedConfig { #[derive(Clone, Debug)] pub struct GooseUserInfo { + #[allow(dead_code)] pub global_leader: bool, /// True if this user is the lead user on the worker pub lead_user: bool, diff --git a/runner/src/scenario/ceramic/new_streams.rs b/runner/src/scenario/ceramic/new_streams.rs index a5b7349e..6146c9b4 100644 --- a/runner/src/scenario/ceramic/new_streams.rs +++ b/runner/src/scenario/ceramic/new_streams.rs @@ -7,7 +7,7 @@ use tracing::info; use crate::scenario::{ ceramic::{model_instance::CountResponse, models, RandomModelInstance}, - get_redis_client, + get_redis_client, is_goose_lead_worker, }; use super::{ @@ -144,7 +144,7 @@ pub async fn small_large_scenario( let instantiate_small_model = Transaction::new(Arc::new(move |user| { let small_model_conn_clone = small_model_conn.clone(); Box::pin(async move { - instantiate_small_model(user, params.store_mids, small_model_conn_clone).await + instantiate_small_model(user, params.store_mids, small_model_conn_clone, false).await }) })) .set_name("instantiate_small_model"); @@ -163,6 +163,36 @@ pub async fn small_large_scenario( .register_transaction(instantiate_large_model)) } +pub async fn small_scenario(params: CeramicScenarioParameters) -> Result { + let redis_cli = get_redis_client().await.unwrap(); + let multiplexed_conn = redis_cli.get_multiplexed_tokio_connection().await.unwrap(); + let shared_conn = Arc::new(Mutex::new(multiplexed_conn)); + let config = CeramicModelInstanceTestUser::prep_scenario(params.clone()) + .await + .unwrap(); + + let test_start = Transaction::new(Arc::new(move |user| { + Box::pin(CeramicModelInstanceTestUser::setup_mid_scenario( + user, + config.clone(), + )) + })) + .set_name("setup") + .set_on_start(); + + let instantiate_small_model = Transaction::new(Arc::new(move |user| { + let small_model_conn_clone = shared_conn.clone(); + Box::pin(async move { + instantiate_small_model(user, params.store_mids, small_model_conn_clone, true).await + }) + })) + .set_name("instantiate_small_model"); + + Ok(scenario!("CeramicAnchoringBenchmark") + .register_transaction(test_start) + .register_transaction(instantiate_small_model)) +} + // the nonce is used to ensure that the metrics stored in redis for the run are unique pub async fn benchmark_scenario( params: CeramicScenarioParameters, @@ -207,20 +237,28 @@ async fn instantiate_small_model( user: &mut GooseUser, store_in_redis: bool, conn: Arc>, + one_writer_per_network: bool, ) -> TransactionResult { - let user_data = CeramicModelInstanceTestUser::user_data(user).to_owned(); - let response = ModelInstanceRequests::create_model_instance( - user, - user_data.user_cli(), - &user_data.small_model_id, - "instantiate_small_model_instance", - &models::SmallModel::random(), - ) - .await?; - if store_in_redis { - let mut conn: tokio::sync::MutexGuard<'_, MultiplexedConnection> = conn.lock().await; - let stream_id_string = response.to_string(); - let _: () = conn.sadd("anchor_mids", stream_id_string).await.unwrap(); + let should_create = !one_writer_per_network || is_goose_lead_worker(); + if should_create { + let user_data = CeramicModelInstanceTestUser::user_data(user).to_owned(); + let response = ModelInstanceRequests::create_model_instance( + user, + user_data.user_cli(), + &user_data.small_model_id, + "instantiate_small_model_instance", + &models::SmallModel::random(), + ) + .await?; + if store_in_redis { + let mut conn: tokio::sync::MutexGuard<'_, MultiplexedConnection> = conn.lock().await; + let stream_id_string = response.to_string(); + let result: Result = + conn.sadd("anchor_mids", stream_id_string).await; + if let Err(e) = result { + tracing::error!("Failed to add to anchor_mids: {}", e); + } + } } Ok(()) } @@ -242,7 +280,11 @@ async fn instantiate_large_model( if store_in_redis { let mut conn: tokio::sync::MutexGuard<'_, MultiplexedConnection> = conn.lock().await; let stream_id_string = response.to_string(); - let _: () = conn.sadd("anchor_mids", stream_id_string).await.unwrap(); + let result: Result = + conn.sadd("anchor_mids", stream_id_string).await; + if let Err(e) = result { + tracing::error!("Failed to add to anchor_mids: {}", e); + } } Ok(()) } diff --git a/runner/src/simulate.rs b/runner/src/simulate.rs index 18172113..b2e86cfc 100644 --- a/runner/src/simulate.rs +++ b/runner/src/simulate.rs @@ -22,9 +22,11 @@ use crate::{ ceramic::{self, new_streams}, get_redis_client, ipfs_block_fetch, recon_sync, }, - utils::parse_peers_info, + utils::{calculate_sample_size, parse_peers_info, select_sample_set_ids}, CommandResult, }; +use once_cell::sync::Lazy; +use reqwest::Client; // FIXME: is it worth attaching metrics to the peer info? const IPFS_SERVICE_METRICS_PORT: &str = "9465"; @@ -32,6 +34,8 @@ const EVENT_SYNC_METRIC_NAME: &str = "ceramic_store_key_value_insert_count_total const ANCHOR_REQUEST_MIDS_KEY: &str = "anchor_mids"; const CAS_ANCHOR_REQUEST_KEY: &str = "anchor_requests"; +static CLIENT: Lazy = Lazy::new(reqwest::Client::new); + /// Options to Simulate command #[derive(Args, Debug)] pub struct Opts { @@ -88,8 +92,21 @@ pub struct Opts { /// left to the scenario (requests per second, total requests, rps/node etc). #[arg(long, env = "SIMULATE_TARGET_REQUESTS")] target_request_rate: Option, - // #[arg(long, env = "SIMULATE_ANCHOR_WAIT_TIME")] - // anchor_wait_time: Option, + + // Wait time after which we wish to check the anchor correctness. + // Should only be passed for ceramic-anchoring-benchamrk scenario + #[arg(long, env = "SIMULATE_ANCHOR_WAIT_TIME")] + anchor_wait_time: Option, + + // URL of the CAS network to use for the scenario. + // Use this with cas_benchmark scenario. + #[arg(long, env = "SIMULATE_CAS_NETWORK")] + cas_network: Option, + + // Did Key of the CAS controller to use for the scenario. + // Use this with cas_benchmark scenario. + #[arg(long, env = "SIMULATE_CAS_CONTROLLER")] + cas_controller: Option, } #[derive(Debug, Clone, ValueEnum)] @@ -119,6 +136,13 @@ pub struct Topology { pub nonce: u64, } +#[derive(Debug, Clone)] +pub struct ScenarioOptions { + pub anchor_wait_time: Option, + pub cas_network: Option, + pub cas_controller: Option, +} + #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] enum AnchorStatus { @@ -371,7 +395,7 @@ struct ScenarioState { run_time: String, log_level: Option, throttle_requests: Option, - // wait_time: Option, + pub scenario_opts: ScenarioOptions, } impl ScenarioState { @@ -402,6 +426,11 @@ impl ScenarioState { nonce: opts.nonce, users: peers.len() * opts.users, }; + let scenario_opts: ScenarioOptions = ScenarioOptions { + anchor_wait_time: opts.anchor_wait_time.map(|t| t as usize), + cas_network: opts.cas_network, + cas_controller: opts.cas_controller, + }; Ok(Self { topo, peers, @@ -413,7 +442,7 @@ impl ScenarioState { run_time: opts.run_time, log_level: opts.log_level, throttle_requests: opts.throttle_requests, - // wait_time: opts.anchor_wait_time, + scenario_opts, }) } @@ -425,16 +454,25 @@ impl ScenarioState { Scenario::CeramicWriteOnly => { ceramic::write_only::scenario(self.scenario.into()).await? } - Scenario::CeramicNewStreams | Scenario::CeramicAnchoringBenchmark => { + Scenario::CeramicNewStreams => { ceramic::new_streams::small_large_scenario(self.scenario.into()).await? } + Scenario::CeramicAnchoringBenchmark => { + ceramic::new_streams::small_scenario(self.scenario.into()).await? + } Scenario::CeramicNewStreamsBenchmark => { ceramic::new_streams::benchmark_scenario(self.scenario.into(), self.topo.nonce) .await? } Scenario::CeramicQuery => ceramic::query::scenario(self.scenario.into()).await?, Scenario::ReconEventSync => recon_sync::event_sync_scenario().await?, - Scenario::CASBenchmark => ceramic::anchor::cas_benchmark().await?, + Scenario::CASBenchmark => { + ceramic::anchor::cas_benchmark( + self.scenario_opts.cas_network.clone(), + self.scenario_opts.cas_controller.clone(), + ) + .await? + } }; self.collect_before_metrics().await?; Ok(scenario) @@ -637,7 +675,7 @@ impl ScenarioState { peer: &Peer, stream_id: String, ) -> Result { - let client = reqwest::Client::new(); + let client = &*CLIENT; let ceramic_addr = peer .ceramic_addr() .ok_or_else(|| anyhow!("Peer does not have a ceramic address"))?; @@ -665,6 +703,39 @@ impl ScenarioState { Ok(metrics.state.anchor_status) } + pub async fn model_sync_status( + &self, + peer: &Peer, + stream_id: String, + ) -> Result { + let client = &*CLIENT; + let ceramic_addr = peer + .ceramic_addr() + .ok_or_else(|| anyhow!("Peer does not have a ceramic address"))?; + + let streams_url = format!("{}/{}/{}", ceramic_addr, "api/v0/streams", stream_id); + + let response = client + .get(streams_url) + .send() + .await + .map_err(|e| { + error!("HTTP request failed: {}", e); + e + })? + .error_for_status() + .map_err(|e| { + error!("HTTP request returned unsuccessful status: {}", e); + e + })?; + + let text = response.text().await.map_err(|e| { + error!("Failed to read response text: {}", e); + e + })?; + Ok(text.contains(&stream_id)) + } + pub async fn get_set_from_redis(&self, key: &str) -> Result, anyhow::Error> { // Create a new Redis client let client: redis::Client = get_redis_client().await?; @@ -700,22 +771,60 @@ impl ScenarioState { let mut pending_count = 0; let mut failed_count = 0; let mut not_requested_count = 0; - let wait_duration = Duration::from_secs(60 * 60); - // TODO_3164_1 : Make this a parameter, pass it in from the scenario config - // TODO_3164_3 : Code clean-up : Move redis calls to separate file move it out of simulate.rs - // TODO_3164_4 : Code clean-up : Move api call (fetch stream) to model_instance.rs?, maybe rename it - sleep(wait_duration).await; + let mut synced_count = 0; + let mut synced_failures_count = 0; - // Pick a peer at random - let peer = self.peers.first().unwrap(); + // TODO_AES_84_2: Make this a nightly perf test. With a green signal when bith the validations pass + + // Pick the first peer as the validator. It should have all the streams synced to it + let write_peer = self.peers.first().unwrap(); + + let read_peer = self.peers.last().unwrap(); let ids = self.get_set_from_redis(ANCHOR_REQUEST_MIDS_KEY).await?; info!("Number of MIDs: {}", ids.len()); + // Calculate the sample size + let sample_size = calculate_sample_size(ids.len(), 0.99, 0.02).await; + info!("Sample size: {}", sample_size); + + // Select a random sample set of IDs + let sample_ids = select_sample_set_ids(&ids, sample_size).await; + info!("Sampled IDs count: {}", sample_ids.len()); + + for stream_id in sample_ids.iter() { + // Only fetch anchor status for every thousandth stream ID + match self.model_sync_status(read_peer, stream_id.clone()).await { + Ok(true) => synced_count += 1, + Ok(false) => synced_failures_count += 1, + Err(e) => { + synced_failures_count += 1; + error!("Failed to get model sync status: {}", e); + } + } + } + info!("Model sync failures count: {}", synced_failures_count); + info!("Models synced count: {}", synced_count); + if synced_failures_count > 0 { + return Ok(( + CommandResult::Failure(anyhow!("Model sync failures count is greater than 0")), + None, + )); + } + + info!( + "Anchor wait time: {}", + self.scenario_opts.anchor_wait_time.unwrap() + ); + let wait_duration = + Duration::from_secs(self.scenario_opts.anchor_wait_time.unwrap_or_default() as u64); + sleep(wait_duration).await; + info!("Waiting for {} seconds", wait_duration.as_secs()); + // Make an API call to get the status of request from the chosen peer - for stream_id in ids.clone() { - info!("Fetching anchor status for streamID {}", stream_id); - match self.get_anchor_status(peer, stream_id.clone()).await { + for stream_id in sample_ids.iter() { + // Only fetch anchor status for every thousandth stream ID + match self.get_anchor_status(write_peer, stream_id.clone()).await { Ok(AnchorStatus::Anchored) => anchored_count += 1, Ok(AnchorStatus::Pending) => pending_count += 1, Ok(AnchorStatus::Failed) => failed_count += 1, @@ -746,7 +855,6 @@ impl ScenarioState { info!("Anchored count is : {}", anchored_count); Ok((CommandResult::Success, None)) } - // TODO_3164_2 : Report these counts to Graphana } /// Removed from `validate_scenario_success` to make testing easier as constructing the GooseMetrics appropriately is difficult @@ -1287,7 +1395,9 @@ mod test { throttle_requests: None, log_level: None, target_request_rate, - // anchor_wait_time: None, + anchor_wait_time: None, + cas_network: None, + cas_controller: None, } } diff --git a/runner/src/utils.rs b/runner/src/utils.rs index ea0d40e8..30abaaf8 100644 --- a/runner/src/utils.rs +++ b/runner/src/utils.rs @@ -2,8 +2,10 @@ use std::path::Path; use anyhow::{bail, Result}; use keramik_common::peer_info::Peer; +use rand::seq::SliceRandom; +use rand::thread_rng; use tokio::{fs::File, io::AsyncReadExt}; -use tracing::debug; +use tracing::{debug, error}; /// Initiate connection from peer to other. #[tracing::instrument(skip_all, fields(peer.index, other.index))] @@ -52,3 +54,57 @@ pub async fn parse_peers_info(path: impl AsRef) -> Result> { f.read_to_string(&mut peers_json).await?; Ok(serde_json::from_str(&peers_json)?) } + +/// Calculates the sample size needed to estimate a population proportion with a given confidence level and margin of error. +/// +/// # Parameters +/// - `population_size`: The size of the entire population (N). +/// - `confidence_level`: The desired confidence level (e.g., 0.99 for 99% confidence). +/// - `margin_of_error`: The acceptable margin of error (e.g., 0.02 for 2% margin). +/// +/// # Returns +/// The calculated sample size (n). +pub async fn calculate_sample_size( + population_size: usize, + confidence_level: f64, + margin_of_error: f64, +) -> usize { + let mut z = 2.576; // Z-score for 99% confidence interval + if confidence_level == 0.95 { + z = 1.96; + } else if confidence_level == 0.99 { + z = 2.576; + } else { + error!( + "Invalid confidence level: {}, defaulting to 0.99", + confidence_level + ); + } + let p = 0.5; // Assuming maximum variability + let e = margin_of_error; + + let sample_size = ((z * z * p * (1.0 - p)) / (e * e)).ceil(); + // We need FPC for small population size (N < 60000) to get a good estimate for larger population size FPC can become irrelevant + let finite_population_correction = + sample_size / (1.0 + ((sample_size - 1.0) / (population_size as f64))); + // We want to round up to the nearest whole number + finite_population_correction.ceil() as usize +} + +/// Selects a random sample of IDs from a given set of IDs. +/// +/// # Parameters +/// - `ids`: A slice of IDs to sample from. +/// - `sample_size`: The number of IDs to sample. +/// +/// # Returns +/// A vector containing the sampled IDs. +pub async fn select_sample_set_ids(ids: &[T], sample_size: usize) -> Vec +where + T: Clone, +{ + let mut rng = thread_rng(); + ids.choose_multiple(&mut rng, sample_size) + .cloned() + .collect() +}