From 15f9a322d03677171a2f63d2fb1c803cf5c83159 Mon Sep 17 00:00:00 2001 From: Youssef Ibrahim Date: Thu, 17 Oct 2024 04:44:43 -0700 Subject: [PATCH] delete streaming_clone_warmup Summary: This binary is not used anywhere in production. Let's delete it. Reviewed By: lmvasquezg Differential Revision: D64532911 fbshipit-source-id: 9d5a6112b4771b62cc557f49e0fe0f8cdda31a63 --- eden/mononoke/Cargo.toml | 12 +- eden/mononoke/TARGETS | 27 -- .../cmds/streaming_clone_warmup/main.rs | 306 ------------------ eden/mononoke/facebook/fbpkg/TARGETS | 7 - 4 files changed, 1 insertion(+), 351 deletions(-) delete mode 100644 eden/mononoke/cmds/streaming_clone_warmup/main.rs diff --git a/eden/mononoke/Cargo.toml b/eden/mononoke/Cargo.toml index a7f5e038232f9..a04b85a7f7067 100644 --- a/eden/mononoke/Cargo.toml +++ b/eden/mononoke/Cargo.toml @@ -1,4 +1,4 @@ -# @generated by autocargo from //eden/mononoke:[backfill_mapping,bonsai_verify,check_git_wc,packer,revlogrepo,sqlblob_gc,streaming_clone_warmup] +# @generated by autocargo from //eden/mononoke:[backfill_mapping,bonsai_verify,check_git_wc,packer,revlogrepo,sqlblob_gc] [package] name = "eden_mononoke" @@ -32,10 +32,6 @@ path = "cmds/revlogrepo.rs" name = "sqlblob_gc" path = "cmds/sqlblob_gc/main.rs" -[[bin]] -name = "streaming_clone_warmup" -path = "cmds/streaming_clone_warmup/main.rs" - [dependencies] anyhow = "1.0.86" ascii = "1.0" @@ -48,13 +44,10 @@ bonsai_hg_mapping = { version = "0.1.0", path = "bonsai_hg_mapping" } bonsai_svnrev_mapping = { version = "0.1.0", path = "bonsai_svnrev_mapping" } borrowed = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" } bytesize = "1.1" -cacheblob = { version = "0.1.0", path = "blobstore/cacheblob" } -cached_config = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" } check_git_wc = { version = "0.1.0", path = "git/check_git_wc" } clap = { version = "4.5.20", features = ["derive", "env", "string", "unicode", "wrap_help"] } clap_old = { package = "clap", version = "2.34.0" } cloned = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" } -cmdlib = { version = "0.1.0", path = "cmdlib" } cmdlib_logging = { version = "0.1.0", path = "cmdlib/log" } commit_graph = { version = "0.1.0", path = "repo_attributes/commit_graph/commit_graph" } context = { version = "0.1.0", path = "server/context" } @@ -81,10 +74,7 @@ scuba_ext = { version = "0.1.0", path = "common/scuba_ext" } serde = { version = "1.0.185", features = ["derive", "rc"] } serde_derive = "1.0.185" slog = { version = "2.7", features = ["max_level_trace", "nested-values"] } -sql_construct = { version = "0.1.0", path = "common/sql_construct" } -sql_ext = { version = "0.1.0", path = "common/rust/sql_ext" } sqlblob = { version = "0.1.0", path = "blobstore/sqlblob" } -streaming_clone = { version = "0.1.0", path = "repo_client/streaming_clone" } tokio = { version = "1.37.0", features = ["full", "test-util", "tracing"] } toml = "0.8.4" diff --git a/eden/mononoke/TARGETS b/eden/mononoke/TARGETS index 94efaf38bef6a..5ccec7c919c03 100644 --- a/eden/mononoke/TARGETS +++ b/eden/mononoke/TARGETS @@ -261,33 +261,6 @@ rust_binary( ], ) -rust_binary( - name = "streaming_clone_warmup", - srcs = glob(["cmds/streaming_clone_warmup/**/*.rs"]), - named_deps = { - "clap-old": "fbsource//third-party/rust:clap-2", - }, - test_deps = [], - deps = [ - "fbsource//third-party/rust:anyhow", - "fbsource//third-party/rust:futures", - "fbsource//third-party/rust:slog", - "fbsource//third-party/rust:tokio", - "//common/rust/shed/cached_config:cached_config", - "//common/rust/shed/fbinit:fbinit", - "//eden/mononoke/blobrepo:repo_blobstore", - "//eden/mononoke/blobstore:blobstore_factory", - "//eden/mononoke/blobstore:cacheblob", - "//eden/mononoke/cmdlib:cmdlib", - "//eden/mononoke/common/rust/sql_ext:sql_ext", - "//eden/mononoke/common/scuba_ext:scuba_ext", - "//eden/mononoke/common/sql_construct:sql_construct", - "//eden/mononoke/metaconfig:metaconfig_types", - "//eden/mononoke/repo_client:streaming_clone", - "//eden/mononoke/server/context:context", - ], -) - rust_binary( name = "sqlblob_gc", srcs = glob(["cmds/sqlblob_gc/**/*.rs"]), diff --git a/eden/mononoke/cmds/streaming_clone_warmup/main.rs b/eden/mononoke/cmds/streaming_clone_warmup/main.rs deleted file mode 100644 index ffe073a7fee4a..0000000000000 --- a/eden/mononoke/cmds/streaming_clone_warmup/main.rs +++ /dev/null @@ -1,306 +0,0 @@ -/* - * Copyright (c) Meta Platforms, Inc. and affiliates. - * - * This software may be used and distributed according to the terms of the - * GNU General Public License version 2. - */ - -use std::sync::Arc; -use std::time::Duration; -use std::time::Instant; - -use anyhow::anyhow; -use anyhow::Error; -use blobstore_factory::make_blobstore; -use blobstore_factory::BlobstoreOptions; -use blobstore_factory::ReadOnlyStorage; -use cacheblob::new_memcache_blobstore; -use cached_config::ConfigStore; -use clap_old::Arg; -use cmdlib::args; -use cmdlib::args::MononokeMatches; -use cmdlib::helpers; -use context::CoreContext; -use fbinit::FacebookInit; -use futures::future; -use futures::future::try_join; -use futures::future::try_join_all; -use futures::future::TryFutureExt; -use futures::stream; -use futures::stream::StreamExt; -use futures::stream::TryStreamExt; -use metaconfig_types::RepoConfig; -use repo_blobstore::RepoBlobstore; -use scuba_ext::MononokeScubaSampleBuilder; -use slog::error; -use slog::info; -use sql_construct::SqlConstructFromMetadataDatabaseConfig; -use sql_ext::facebook::MysqlOptions; -use streaming_clone::RevlogStreamingChunks; -use streaming_clone::StreamingClone; -use streaming_clone::StreamingCloneBuilder; -use tokio::time; - -const REPO_ARG: &str = "repo"; -const REPO_WITH_TAGS_ARG: &str = "repo-with-tags"; -const PERIOD_ARG: &str = "warmup-period"; - -#[fbinit::main] -fn main(fb: FacebookInit) -> Result<(), Error> { - let app = args::MononokeAppBuilder::new("Utility to keep streaming clone data warm") - .with_advanced_args_hidden() - .with_fb303_args() - .build() - .about("Utility to keep streaming clone data warm") - .arg( - Arg::with_name(REPO_ARG) - .long(REPO_ARG) - .takes_value(true) - .required(false) - .multiple(true) - .help("Repository name to warm-up, and empty tag is assumed"), - ) - .arg( - Arg::with_name(REPO_WITH_TAGS_ARG) - .long(REPO_WITH_TAGS_ARG) - .takes_value(true) - .required(false) - .multiple(true) - .help("Repository name with a list of tags to warmup in format REPO=tag1,tag2."), - ) - .arg( - Arg::with_name(PERIOD_ARG) - .long(PERIOD_ARG) - .takes_value(true) - .required(false) - .default_value("900") - .help("Period of warmup runs in secods"), - ); - let (matches, _runtime) = app.get_matches(fb)?; - - let logger = matches.logger(); - let ctx = CoreContext::new_with_logger(fb, logger.clone()); - helpers::block_execute( - run(ctx, &matches), - fb, - &std::env::var("TW_JOB_NAME").unwrap_or_else(|_| "streaming_clone_warmup".to_string()), - logger, - &matches, - cmdlib::monitoring::AliveService, - ) -} - -async fn run<'a>(ctx: CoreContext, matches: &'a MononokeMatches<'a>) -> Result<(), Error> { - let period_secs: u64 = matches - .value_of(PERIOD_ARG) - .ok_or_else(|| anyhow!("--{} argument is required", PERIOD_ARG))? - .parse()?; - let period = Duration::from_secs(period_secs); - - let mut reponames_with_tags = vec![]; - if let Some(values) = matches.values_of(REPO_ARG) { - // Assume empty tag - reponames_with_tags.extend( - values - .map(ToString::to_string) - .map(|reponame| (reponame, None)), - ); - } - - if let Some(values) = matches.values_of(REPO_WITH_TAGS_ARG) { - for value in values { - let (reponame, tags) = split_repo_with_tags(value)?; - for tag in tags { - reponames_with_tags.push((reponame.clone(), Some(tag))); - } - } - } - - if reponames_with_tags.is_empty() { - error!(ctx.logger(), "At least one repo had to be specified"); - return Ok(()); - } - - let config_store = matches.config_store(); - let mysql_options = matches.mysql_options(); - let blobstore_options = matches.blobstore_options(); - let configs = args::load_repo_configs(config_store, matches)?; - - let mut warmers = Vec::new(); - for (reponame, tag) in reponames_with_tags { - let config = configs - .repos - .get(&reponame) - .ok_or_else(|| anyhow!("unknown repository: {}", reponame))?; - let warmer = StreamingCloneWarmup::new( - ctx.clone(), - reponame, - tag, - config, - mysql_options, - blobstore_options.clone(), - config_store, - ) - .await?; - warmers.push(warmer); - } - - let offset_delay = period / warmers.len() as u32; - let mut tasks = Vec::new(); - for (index, warmer) in warmers.into_iter().enumerate() { - let ctx = ctx.clone(); - tasks.push(async move { - // spread fetches over period, to reduce memory consumption - time::sleep(offset_delay * index as u32).await; - warmer.warmer_task(ctx.clone(), period).await?; - Ok::<_, Error>(()) - }); - } - try_join_all(tasks).await?; - Ok(()) -} - -fn split_repo_with_tags(s: &str) -> Result<(String, Vec), Error> { - if let Some((reponame, tags)) = s.split_once('=') { - let tags = tags.split(',').map(|s| s.to_string()).collect(); - - Ok((reponame.to_string(), tags)) - } else { - Err(anyhow!("invalid format for repo with tags: {}", s)) - } -} - -struct StreamingCloneWarmup { - streaming_clone: StreamingClone, - reponame: String, - tag: Option, -} - -impl StreamingCloneWarmup { - async fn new( - ctx: CoreContext, - reponame: String, - tag: Option, - config: &RepoConfig, - mysql_options: &MysqlOptions, - blobstore_options: BlobstoreOptions, - config_store: &ConfigStore, - ) -> Result { - // Create blobstore that contains streaming clone chunks, without cachelib - // layer (we want to hit memcache even if it is available in cachelib), and - // with memcache layer identical to production setup. - let blobstore = make_blobstore( - ctx.fb, - config.storage_config.blobstore.clone(), - mysql_options, - ReadOnlyStorage(true), - &blobstore_options, - ctx.logger(), - config_store, - &blobstore_factory::default_scrub_handler(), - None, - ) - .await?; - let blobstore = new_memcache_blobstore(ctx.fb, blobstore, "multiplexed", "")?; - let repo_blobstore = Arc::new(RepoBlobstore::new( - Arc::new(blobstore), - None, - config.repoid, - MononokeScubaSampleBuilder::with_discard(), - )); - - // Because we want to use our custom blobstore, we must construct the - // streaming clone attribute directly. - let streaming_clone = StreamingCloneBuilder::with_metadata_database_config( - ctx.fb, - &config.storage_config.metadata, - mysql_options, - true, /*read-only*/ - ) - .await? - .build(config.repoid, repo_blobstore); - - Ok(Self { - streaming_clone, - reponame, - tag, - }) - } - - /// Periodically fetch streaming clone data - async fn warmer_task(&self, ctx: CoreContext, period: Duration) -> Result<(), Error> { - if let Some(ref tag) = self.tag { - info!(ctx.logger(), "[{}:{}] warmer started", self.reponame, tag); - } else { - info!(ctx.logger(), "[{}] warmer started", self.reponame); - }; - - loop { - let tag = None; - let start = Instant::now(); - let chunks = self - .streaming_clone - .fetch_changelog(ctx.clone(), tag) - .await?; - info!( - ctx.logger(), - "[{}] index fetched in: {:.2?}", - self.reponame, - start.elapsed() - ); - - let size = chunks_warmup(ctx.clone(), chunks).await? as f32; - let duration = start.elapsed(); - info!( - ctx.logger(), - "[{}] fetching complete in: time:{:.2?} speed:{:.1?} b/s size: {}", - self.reponame, - duration, - size / duration.as_secs_f32(), - size, - ); - - // sleep if needed - if duration < period { - let delay = period - duration; - info!( - ctx.logger(), - "[{}] sleeping for: {:?}", self.reponame, delay - ); - time::sleep(delay).await; - } - } - } -} - -async fn chunks_warmup(ctx: CoreContext, chunks: RevlogStreamingChunks) -> Result { - let RevlogStreamingChunks { - index_blobs, - data_blobs, - index_size: index_size_expected, - data_size: data_size_expected, - } = chunks; - - let index = stream::iter(index_blobs.into_iter().map(|f| f.map_ok(|b| b.len()))) - .buffer_unordered(100) - .try_fold(0usize, |acc, size| future::ok(acc + size)); - - let data = stream::iter(data_blobs.into_iter().map(|f| f.map_ok(|b| b.len()))) - .buffer_unordered(100) - .try_fold(0usize, |acc, size| future::ok(acc + size)); - - let (index_size, data_size) = try_join(index, data).await?; - if index_size_expected != index_size { - error!( - ctx.logger(), - "incorrect index size: expected:{} received:{}", index_size_expected, index_size - ); - } - if data_size_expected != data_size { - error!( - ctx.logger(), - "incorrect data size: expected:{} received:{}", data_size_expected, data_size - ); - } - Ok(index_size + data_size) -} diff --git a/eden/mononoke/facebook/fbpkg/TARGETS b/eden/mononoke/facebook/fbpkg/TARGETS index 52c38c0004476..2a00f6fae4b08 100644 --- a/eden/mononoke/facebook/fbpkg/TARGETS +++ b/eden/mononoke/facebook/fbpkg/TARGETS @@ -116,13 +116,6 @@ mononoke_fbpkg( }, ) -mononoke_fbpkg( - name = "mononoke.streaming_clone_warmup", - path_actions = { - "streaming_clone_warmup": "//eden/mononoke:streaming_clone_warmup", - }, -) - mononoke_fbpkg( name = "mononoke.walker", path_actions = {