From dc46653bc9838c30c8007649588ed72c80d27d60 Mon Sep 17 00:00:00 2001 From: Andrea Campi Date: Thu, 17 Oct 2024 02:29:27 -0700 Subject: [PATCH] Implement sharded mode execution Summary: ## This stack Running the worker for all repos is causing OOMs; sharding should help. ## This diff With the preparation we've done eariler in the stack, we can now enabled sharded execution. Reviewed By: singhsrb Differential Revision: D64402510 fbshipit-source-id: 9b0a57a0423d47a5c48233fea63bd50e0cc5af1e --- .../async_requests_client/src/lib.rs | 4 +- .../mononoke/async_requests/worker/Cargo.toml | 4 +- eden/mononoke/async_requests/worker/TARGETS | 3 + .../async_requests/worker/src/main.rs | 195 +++++++++++++++--- 4 files changed, 170 insertions(+), 36 deletions(-) diff --git a/eden/mononoke/async_requests/async_requests_client/src/lib.rs b/eden/mononoke/async_requests/async_requests_client/src/lib.rs index e3d8791db686d..7985f08748d00 100644 --- a/eden/mononoke/async_requests/async_requests_client/src/lib.rs +++ b/eden/mononoke/async_requests/async_requests_client/src/lib.rs @@ -19,7 +19,7 @@ use mononoke_api::MononokeRepo; use mononoke_api::RepositoryId; use mononoke_app::MononokeApp; use requests_table::SqlLongRunningRequestsQueue; -use slog::info; +use slog::debug; use sql_construct::SqlConstructFromDatabaseConfig; use sql_ext::facebook::MysqlOptions; @@ -46,7 +46,7 @@ pub async fn open_sql_connection( ) -> Result { let config = app.repo_configs().common.async_requests_config.clone(); if let Some(config) = config.db_config { - info!( + debug!( app.logger(), "Initializing async_requests with an explicit config" ); diff --git a/eden/mononoke/async_requests/worker/Cargo.toml b/eden/mononoke/async_requests/worker/Cargo.toml index 72a5cb4a4c84b..74280fdd02c11 100644 --- a/eden/mononoke/async_requests/worker/Cargo.toml +++ b/eden/mononoke/async_requests/worker/Cargo.toml @@ -14,6 +14,7 @@ async-trait = "0.1.71" async_requests = { version = "0.1.0", path = "../lib" } async_requests_client = { version = "0.1.0", path = "../async_requests_client" } async_requests_types_thrift = { version = "0.1.0", path = "../if" } +blobstore = { version = "0.1.0", path = "../../blobstore" } clap = { version = "4.5.20", features = ["derive", "env", "string", "unicode", "wrap_help"] } cloned = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" } cmdlib_logging = { version = "0.1.0", path = "../../cmdlib/log" } @@ -31,6 +32,8 @@ mononoke_api = { version = "0.1.0", path = "../../mononoke_api" } mononoke_app = { version = "0.1.0", path = "../../cmdlib/mononoke_app" } mononoke_types = { version = "0.1.0", path = "../../mononoke_types" } repo_authorization = { version = "0.1.0", path = "../../repo_authorization" } +requests_table = { version = "0.1.0", path = "../requests_table" } +sharding_ext = { version = "0.1.0", path = "../../cmdlib/sharding_ext" } slog = { version = "2.7", features = ["max_level_trace", "nested-values"] } source_control = { version = "0.1.0", path = "../../scs/if" } stats = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" } @@ -39,4 +42,3 @@ tokio = { version = "1.37.0", features = ["full", "test-util", "tracing"] } [dev-dependencies] fbinit-tokio = { version = "0.1.2", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" } mononoke_macros = { version = "0.1.0", path = "../../mononoke_macros" } -requests_table = { version = "0.1.0", path = "../requests_table" } diff --git a/eden/mononoke/async_requests/worker/TARGETS b/eden/mononoke/async_requests/worker/TARGETS index df72f3f8294be..d5ce68181468f 100644 --- a/eden/mononoke/async_requests/worker/TARGETS +++ b/eden/mononoke/async_requests/worker/TARGETS @@ -25,12 +25,15 @@ rust_binary( "//common/rust/shed/stats:stats", "//eden/mononoke/async_requests:async_requests", "//eden/mononoke/async_requests:async_requests_client", + "//eden/mononoke/async_requests:requests_table", "//eden/mononoke/async_requests/if:async_requests_types-thrift-rust", + "//eden/mononoke/blobstore:blobstore", "//eden/mononoke/blobstore:ephemeral_blobstore", "//eden/mononoke/cmdlib:cmdlib_logging", "//eden/mononoke/cmdlib:environment", "//eden/mononoke/cmdlib/mononoke_app:mononoke_app", "//eden/mononoke/cmdlib/sharding:executor_lib", + "//eden/mononoke/cmdlib/sharding_ext:sharding_ext", "//eden/mononoke/megarepo_api:megarepo_api", "//eden/mononoke/megarepo_api:megarepo_error", "//eden/mononoke/metaconfig:metaconfig_types", diff --git a/eden/mononoke/async_requests/worker/src/main.rs b/eden/mononoke/async_requests/worker/src/main.rs index c5ba6d552b555..349bbe05a0b27 100644 --- a/eden/mononoke/async_requests/worker/src/main.rs +++ b/eden/mononoke/async_requests/worker/src/main.rs @@ -14,17 +14,27 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; +use anyhow::Context; use anyhow::Result; +use async_requests::AsyncMethodRequestQueue; +use async_requests_client::open_blobstore; +use async_requests_client::open_sql_connection; +use async_trait::async_trait; use clap::Parser; +use cloned::cloned; use cmdlib_logging::ScribeLoggingArgs; +use context::CoreContext; use context::SessionContainer; use environment::BookmarkCacheDerivedData; use environment::BookmarkCacheKind; use environment::BookmarkCacheOptions; +use executor_lib::args::ShardedExecutorArgs; +use executor_lib::RepoShardedProcess; use executor_lib::RepoShardedProcessExecutor; use fbinit::FacebookInit; use megarepo_api::MegarepoApi; use metaconfig_types::ShardedService; +use mononoke_api::Mononoke; use mononoke_api::Repo; use mononoke_app::args::HooksAppExtension; use mononoke_app::args::RepoFilterAppExtension; @@ -33,12 +43,21 @@ use mononoke_app::args::WarmBookmarksCacheExtension; use mononoke_app::fb303::AliveService; use mononoke_app::fb303::Fb303AppExtension; use mononoke_app::MononokeAppBuilder; +use mononoke_app::MononokeReposManager; +use requests_table::SqlLongRunningRequestsQueue; +use sharding_ext::RepoShard; +use slog::info; const SERVICE_NAME: &str = "async_requests_worker"; +const SM_CLEANUP_TIMEOUT_SECS: u64 = 60; + /// Processes the megarepo async requests #[derive(Parser)] struct AsyncRequestsWorkerArgs { + #[clap(flatten)] + pub sharded_executor_args: ShardedExecutorArgs, + #[clap(flatten)] shutdown_timeout_args: ShutdownTimeoutArgs, #[clap(flatten)] @@ -55,6 +74,75 @@ struct AsyncRequestsWorkerArgs { process_all_repos: bool, } +pub struct WorkerProcess { + ctx: Arc, + args: Arc, + repos_mgr: Arc>, + mononoke: Arc>, + megarepo: Arc>, + sql_connection: Arc, + blobstore: Arc, + will_exit: Arc, +} + +impl WorkerProcess { + pub(crate) fn new( + ctx: Arc, + args: Arc, + repos_mgr: Arc>, + mononoke: Arc>, + megarepo: Arc>, + sql_connection: Arc, + blobstore: Arc, + will_exit: Arc, + ) -> Self { + Self { + ctx, + args, + repos_mgr, + mononoke, + megarepo, + sql_connection, + blobstore, + will_exit, + } + } +} + +#[async_trait] +impl RepoShardedProcess for WorkerProcess { + async fn setup(&self, repo: &RepoShard) -> Result> { + let repo_name = repo.repo_name.as_str(); + let logger = self.repos_mgr.repo_logger(repo_name); + info!(&logger, "Setting up repo {}", repo_name); + + let repo = self + .repos_mgr + .add_repo(repo_name) + .await + .with_context(|| format!("Failure in setting up repo {}", repo_name))?; + let repos = vec![repo.repo_identity.id()]; + info!(&logger, "Completed setup for repos {:?}", repos); + + let queue = Arc::new(AsyncMethodRequestQueue::new( + self.sql_connection.clone(), + self.blobstore.clone(), + Some(repos), + )); + + let executor = worker::AsyncMethodRequestWorker::new( + self.args.clone(), + self.ctx.clone(), + queue, + self.mononoke.clone(), + self.megarepo.clone(), + self.will_exit.clone(), + ) + .await?; + Ok(Arc::new(executor)) + } +} + #[fbinit::main] fn main(fb: FacebookInit) -> Result<()> { let app = MononokeAppBuilder::new(fb) @@ -69,50 +157,91 @@ fn main(fb: FacebookInit) -> Result<()> { .build::()?; let args: Arc = Arc::new(app.args()?); - let env = app.environment(); + let logger = app.logger().clone(); let runtime = app.runtime().clone(); let session = SessionContainer::new_with_defaults(env.fb); - let ctx = session.new_context(app.logger().clone(), env.scuba_sample_builder.clone()); - - let mononoke = Arc::new( - runtime - .block_on(app.open_managed_repos::(Some(ShardedService::AsyncRequestsWorker)))? - .make_mononoke_api()?, - ); - let repos = mononoke.known_repo_ids(); + let ctx = Arc::new(session.new_context(app.logger().clone(), env.scuba_sample_builder.clone())); + + let service_name = Some(ShardedService::AsyncRequestsWorker); + let repos_mgr = Arc::new(runtime.block_on(app.open_managed_repos(service_name))?); + let mononoke = Arc::new(repos_mgr.make_mononoke_api()?); let megarepo = Arc::new(MegarepoApi::new(&app, mononoke.clone())?); + let sql_connection = Arc::new(runtime.block_on(open_sql_connection(fb, &app))?); + let blobstore = runtime.block_on(open_blobstore(fb, &app))?; let will_exit = Arc::new(AtomicBool::new(false)); - let filter_repos = if args.process_all_repos { - None - } else { - Some(repos) - }; - let queue = Arc::new(runtime.block_on(async_requests_client::build(fb, &app, filter_repos))?); - let worker = runtime.block_on(worker::AsyncMethodRequestWorker::new( - args.clone(), - Arc::new(ctx), - queue, - mononoke, - megarepo, - will_exit.clone(), - ))?; app.start_monitoring(SERVICE_NAME, AliveService)?; app.start_stats_aggregation()?; - let run_worker = { move |_app| async move { worker.execute().await } }; - - app.run_until_terminated( - run_worker, - move || will_exit.store(true, Ordering::Relaxed), - args.shutdown_timeout_args.shutdown_grace_period, - async { - // the code to gracefully stop things goes here + if let Some(mut executor) = args.sharded_executor_args.clone().build_executor( + app.fb, + runtime.clone(), + &logger, + || { + Arc::new(WorkerProcess::new( + ctx.clone(), + args.clone(), + repos_mgr.clone(), + mononoke.clone(), + megarepo.clone(), + sql_connection.clone(), + blobstore.clone(), + will_exit.clone(), + )) }, - args.shutdown_timeout_args.shutdown_timeout, - )?; + true, // enable shard (repo) level healing + SM_CLEANUP_TIMEOUT_SECS, + )? { + info!(logger, "Starting sharded process"); + // The Sharded Process Executor needs to branch off and execute + // on its own dedicated task spawned off the common tokio runtime. + runtime.spawn({ + let logger = logger.clone(); + { + cloned!(will_exit); + async move { executor.block_and_execute(&logger, will_exit).await } + } + }); + + app.wait_until_terminated( + move || will_exit.store(true, Ordering::Relaxed), + args.shutdown_timeout_args.shutdown_grace_period, + async { + info!(logger, "Shutdown"); + }, + args.shutdown_timeout_args.shutdown_timeout, + )?; + } else { + let logger = logger.clone(); + let queue = Arc::new(AsyncMethodRequestQueue::new( + sql_connection, + blobstore, + None, + )); + + info!(logger, "Starting unsharded executor for all repos"); + let executor = runtime.block_on(worker::AsyncMethodRequestWorker::new( + args.clone(), + ctx.clone(), + queue.clone(), + mononoke.clone(), + megarepo.clone(), + will_exit.clone(), + ))?; + let run_worker = { move |_app| async move { executor.execute().await } }; + + app.run_until_terminated( + run_worker, + move || will_exit.store(true, Ordering::Relaxed), + args.shutdown_timeout_args.shutdown_grace_period, + async { + info!(logger, "Shutdown"); + }, + args.shutdown_timeout_args.shutdown_timeout, + )?; + } Ok(()) }