Skip to content

Commit

Permalink
Refactor Resizable Runtime from blocking TiKV shutting down. (tikv#17784
Browse files Browse the repository at this point in the history
) (tikv#17902)

close tikv#17807

A new version of resizable runtime is added, with same performance but won't block the tikv shutting down.

Signed-off-by: hillium <yujuncen@pingcap.com>

Co-authored-by: RidRisR <79858083+RidRisR@users.noreply.github.com>
Co-authored-by: hillium <yujuncen@pingcap.com>
  • Loading branch information
3 people authored Dec 2, 2024
1 parent b8efdb1 commit 7b96ecf
Show file tree
Hide file tree
Showing 13 changed files with 596 additions and 188 deletions.
56 changes: 37 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 9 additions & 11 deletions components/backup-stream/src/subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -938,17 +938,15 @@ mod test {
fail::cfg("execute_scan_command_sleep_100", "return").unwrap();
for _ in 0..100 {
let wg = wg.clone();
assert!(
block_on(pool.request(ScanCmd {
region: Default::default(),
handle: Default::default(),
last_checkpoint: Default::default(),
feedback_channel: tx.clone(),
// Note: Maybe make here a Box<dyn FnOnce()> or some other trait?
_work: wg.work(),
}))
.is_ok()
)
block_on(pool.request(ScanCmd {
region: Default::default(),
handle: Default::default(),
last_checkpoint: Default::default(),
feedback_channel: tx.clone(),
// Note: Maybe make here a Box<dyn FnOnce()> or some other trait?
_work: wg.work(),
}))
.unwrap();
}

should_finish_in(move || drop(pool), Duration::from_secs(5));
Expand Down
35 changes: 25 additions & 10 deletions components/backup/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use tikv_util::{
box_err, debug, error, error_unknown,
future::RescheduleChecker,
impl_display_as_debug, info,
resizable_threadpool::ResizableRuntime,
store::find_peer,
time::{Instant, Limiter},
warn,
Expand All @@ -49,7 +50,7 @@ use txn_types::{Key, Lock, TimeStamp};
use crate::{
metrics::*,
softlimit::{CpuStatistics, SoftLimit, SoftLimitByCpu},
utils::{ControlThreadPool, KeyValueCodec},
utils::KeyValueCodec,
writer::{BackupWriterBuilder, CfNameWrap},
Error, *,
};
Expand Down Expand Up @@ -702,7 +703,7 @@ impl SoftLimitKeeper {
/// It coordinates backup tasks and dispatches them to different workers.
pub struct Endpoint<E: Engine, R: RegionInfoProvider + Clone + 'static> {
store_id: u64,
pool: RefCell<ControlThreadPool>,
pool: RefCell<ResizableRuntime>,
io_pool: Runtime,
tablets: LocalTablets<E::Local>,
config_manager: ConfigManager,
Expand Down Expand Up @@ -877,7 +878,12 @@ impl<E: Engine, R: RegionInfoProvider + Clone + 'static> Endpoint<E, R> {
causal_ts_provider: Option<Arc<CausalTsProviderImpl>>,
resource_ctl: Option<Arc<ResourceGroupManager>>,
) -> Endpoint<E, R> {
let pool = ControlThreadPool::new();
let pool = ResizableRuntime::new(
config.num_threads,
"bkwkr",
Box::new(utils::create_tokio_runtime),
Box::new(|new_size| BACKUP_THREAD_POOL_SIZE_GAUGE.set(new_size as i64)),
);
let rt = utils::create_tokio_runtime(config.io_thread_size, "backup-io").unwrap();
let config_manager = ConfigManager(Arc::new(RwLock::new(config)));
let softlimit = SoftLimitKeeper::new(config_manager.clone());
Expand Down Expand Up @@ -1499,8 +1505,13 @@ pub mod tests {
use std::thread::sleep;

let counter = Arc::new(AtomicU32::new(0));
let mut pool = ControlThreadPool::new();
pool.adjust_with(3);

let mut pool = ResizableRuntime::new(
3,
"bkwkr",
Box::new(utils::create_tokio_runtime),
Box::new(|new_size: usize| BACKUP_THREAD_POOL_SIZE_GAUGE.set(new_size as i64)),
);

for i in 0..8 {
let ctr = counter.clone();
Expand Down Expand Up @@ -2537,20 +2548,20 @@ pub mod tests {
endpoint.get_config_manager().set_num_threads(15);
let (task, _) = Task::new(req.clone(), tx.clone()).unwrap();
endpoint.handle_backup_task(task);
assert!(endpoint.pool.borrow().size == 15);
assert!(endpoint.pool.borrow().size() == 15);

// shrink thread pool only if there are too many idle threads
endpoint.get_config_manager().set_num_threads(10);
req.set_start_key(vec![b'2']);
let (task, _) = Task::new(req.clone(), tx.clone()).unwrap();
endpoint.handle_backup_task(task);
assert!(endpoint.pool.borrow().size == 15);
assert!(endpoint.pool.borrow().size() == 10);

endpoint.get_config_manager().set_num_threads(3);
req.set_start_key(vec![b'3']);
let (task, _) = Task::new(req, tx).unwrap();
endpoint.handle_backup_task(task);
assert!(endpoint.pool.borrow().size == 3);
assert!(endpoint.pool.borrow().size() == 3);

// make sure all tasks can finish properly.
let responses = block_on(rx.collect::<Vec<_>>());
Expand All @@ -2559,8 +2570,12 @@ pub mod tests {
// for testing whether dropping the pool before all tasks finished causes panic.
// but the panic must be checked manually. (It may panic at tokio runtime
// threads)
let mut pool = ControlThreadPool::new();
pool.adjust_with(1);
let mut pool = ResizableRuntime::new(
1,
"bkwkr",
Box::new(utils::create_tokio_runtime),
Box::new(|new_size: usize| BACKUP_THREAD_POOL_SIZE_GAUGE.set(new_size as i64)),
);
pool.spawn(async { tokio::time::sleep(Duration::from_millis(100)).await });
pool.adjust_with(2);
drop(pool);
Expand Down
72 changes: 1 addition & 71 deletions components/backup/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,88 +1,18 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.

use std::sync::Arc;

use api_version::{dispatch_api_version, ApiV2, KeyMode, KvFormat};
use file_system::IoType;
use futures::Future;
use kvproto::kvrpcpb::ApiVersion;
use tikv_util::{error, sys::thread::ThreadBuildWrapper};
use tokio::{io::Result as TokioResult, runtime::Runtime};
use txn_types::{Key, TimeStamp};

use crate::{metrics::*, Result};
use crate::Result;

// BACKUP_V1_TO_V2_TS is used as causal timestamp to backup RawKV api version
// V1/V1Ttl data and save to V2 format. Use 1 other than 0 because 0 is not a
// acceptable value for causal timestamp. See api_version::ApiV2::is_valid_ts.
pub const BACKUP_V1_TO_V2_TS: u64 = 1;
/// DaemonRuntime is a "background" runtime, which contains "daemon" tasks:
/// any task spawn into it would run until finish even the runtime isn't
/// referenced.
pub struct DaemonRuntime(Option<Runtime>);

impl DaemonRuntime {
/// spawn a daemon task to the runtime.
pub fn spawn(self: &Arc<Self>, f: impl Future<Output = ()> + Send + 'static) {
let wkr = self.clone();
self.0.as_ref().unwrap().spawn(async move {
f.await;
drop(wkr)
});
}

/// create a daemon runtime from some runtime.
pub fn from_runtime(rt: Runtime) -> Arc<Self> {
Arc::new(Self(Some(rt)))
}
}

impl Drop for DaemonRuntime {
fn drop(&mut self) {
// it is safe because all tasks should be finished.
self.0.take().unwrap().shutdown_background()
}
}
pub struct ControlThreadPool {
pub(crate) size: usize,
workers: Option<Arc<DaemonRuntime>>,
}

impl ControlThreadPool {
pub fn new() -> Self {
ControlThreadPool {
size: 0,
workers: None,
}
}

pub fn spawn<F>(&self, func: F)
where
F: Future<Output = ()> + Send + 'static,
{
self.workers
.as_ref()
.expect("ControlThreadPool: please call adjust_with() before spawn()")
.spawn(func);
}

/// Lazily adjust the thread pool's size
///
/// Resizing if the thread pool need to expend or there
/// are too many idle threads. Otherwise do nothing.
pub fn adjust_with(&mut self, new_size: usize) {
if self.size >= new_size && self.size - new_size <= 10 {
return;
}
// TODO: after tokio supports adjusting thread pool size(https://github.com/tokio-rs/tokio/issues/3329),
// adapt it.
let workers = create_tokio_runtime(new_size, "bkwkr")
.expect("failed to create tokio runtime for backup worker.");
self.workers = Some(DaemonRuntime::from_runtime(workers));
self.size = new_size;
BACKUP_THREAD_POOL_SIZE_GAUGE.set(new_size as i64);
}
}

/// Create a standard tokio runtime.
/// (which allows io and time reactor, involve thread memory accessor),
Expand Down
Loading

0 comments on commit 7b96ecf

Please sign in to comment.