From 229c4239bd892125866f6565e2b820835869c9ef Mon Sep 17 00:00:00 2001 From: kennytm Date: Thu, 14 Mar 2019 11:11:23 +0800 Subject: [PATCH] Cherry-pick 4 PRs related to tikv-importer to release-2.1 (#4369) * import: increase default region-split-size to 512 MiB (#4347) This makes the SST size to upload larger, which reduces the number of SST files needed to ingested, and improve overall speed. Signed-off-by: kennytm * import: store the intermediate SST files on disk instead of memory (#4348) * import: store the intermediate SST files on disk instead of memory The bottle-neck of Importer is in SST ingestion on the TiKV side, not disk I/O. Storing these SST in RAM will just increase the chance Importer getting killed by OOM. Signed-off-by: kennytm * import: fix #4257, restrict memory usage by RocksDB (#4350) * import: put index and filter in block cache to restrict memory usage Signed-off-by: Lonng Signed-off-by: kennytm * import: wait after split before scatter (#4352) Signed-off-by: kennytm --- etc/tikv-importer.toml | 2 +- src/import/client.rs | 8 ++++++++ src/import/config.rs | 3 +-- src/import/engine.rs | 20 +++++++++++--------- src/import/prepare.rs | 25 +++++++++++++++++++++++++ src/import/test_helpers.rs | 5 +++++ 6 files changed, 51 insertions(+), 12 deletions(-) diff --git a/etc/tikv-importer.toml b/etc/tikv-importer.toml index fde8aa8786f..e83c5f870ca 100644 --- a/etc/tikv-importer.toml +++ b/etc/tikv-importer.toml @@ -46,7 +46,7 @@ num-import-jobs = 24 # maximum duration to prepare regions. # max-prepare-duration = "5m" # split regions into this size according to the importing data. -# region-split-size = "96MB" +# region-split-size = "512MB" # stream channel window size, stream will be blocked on channel full. # stream-channel-window = 128 # maximum number of open engines diff --git a/src/import/client.rs b/src/import/client.rs index e534c7ef37c..db28124ad1b 100644 --- a/src/import/client.rs +++ b/src/import/client.rs @@ -52,6 +52,10 @@ pub trait ImportClient: Send + Sync + Clone + 'static { fn ingest_sst(&self, _: u64, _: IngestRequest) -> Result { unimplemented!() } + + fn has_region_id(&self, _: u64) -> Result { + unimplemented!() + } } pub struct Client { @@ -209,6 +213,10 @@ impl ImportClient for Client { let res = client.ingest_opt(&req, self.option(Duration::from_secs(30))); self.post_resolve(store_id, res.map_err(Error::from)) } + + fn has_region_id(&self, id: u64) -> Result { + Ok(self.pd.get_region_by_id(id).wait()?.is_some()) + } } pub struct UploadStream<'a> { diff --git a/src/import/config.rs b/src/import/config.rs index 11babe40ab1..72e62be108d 100644 --- a/src/import/config.rs +++ b/src/import/config.rs @@ -14,7 +14,6 @@ use std::error::Error; use std::result::Result; -use raftstore::coprocessor::config::SPLIT_SIZE_MB; use util::config::{ReadableDuration, ReadableSize}; #[derive(Clone, Serialize, Deserialize, PartialEq, Debug)] @@ -39,7 +38,7 @@ impl Default for Config { num_import_jobs: 8, num_import_sst_jobs: 2, max_prepare_duration: ReadableDuration::minutes(5), - region_split_size: ReadableSize::mb(SPLIT_SIZE_MB), + region_split_size: ReadableSize::mb(512), stream_channel_window: 128, max_open_engines: 8, } diff --git a/src/import/engine.rs b/src/import/engine.rs index f3bd502c696..1ee4762ed05 100644 --- a/src/import/engine.rs +++ b/src/import/engine.rs @@ -16,7 +16,7 @@ use std::fmt; use std::i32; use std::io::Read; use std::ops::Deref; -use std::path::Path; +use std::path::{Path, MAIN_SEPARATOR}; use std::sync::Arc; use uuid::Uuid; @@ -93,7 +93,7 @@ impl Engine { } pub fn new_sst_writer(&self) -> Result { - SSTWriter::new(&self.opts) + SSTWriter::new(&self.opts, self.db.path()) } pub fn get_size_properties(&self) -> Result { @@ -161,18 +161,19 @@ pub struct SSTWriter { } impl SSTWriter { - pub fn new(cfg: &DbConfig) -> Result { - let env = Arc::new(Env::new_mem()); + pub fn new(cfg: &DbConfig, path: &str) -> Result { + let env = Arc::new(Env::default()); + let uuid = Uuid::new_v4().to_string(); let mut default_opts = cfg.defaultcf.build_opt(); default_opts.set_env(Arc::clone(&env)); let mut default = SstFileWriter::new(EnvOptions::new(), default_opts); - default.open(CF_DEFAULT)?; + default.open(&format!("{}{}.{}:default", path, MAIN_SEPARATOR, uuid))?; let mut write_opts = cfg.writecf.build_opt(); write_opts.set_env(Arc::clone(&env)); let mut write = SstFileWriter::new(EnvOptions::new(), write_opts); - write.open(CF_WRITE)?; + write.open(&format!("{}{}.{}:write", path, MAIN_SEPARATOR, uuid))?; Ok(SSTWriter { env, @@ -256,9 +257,10 @@ fn tune_dboptions_for_bulk_load(opts: &DbConfig) -> (DBOptions, CFOptions) { // RocksDB preserves `max_background_jobs/4` for flush. db_opts.set_max_background_jobs(opts.max_background_jobs); + // Put index and filter in block cache to restrict memory usage. let mut block_base_opts = BlockBasedOptions::new(); - // Use a large block size for sequential access. - block_base_opts.set_block_size(MB as usize); + block_base_opts.set_lru_cache(128 * MB as usize, -1, 0, 0.0); + block_base_opts.set_cache_index_and_filter_blocks(true); let mut cf_opts = ColumnFamilyOptions::new(); cf_opts.set_block_based_table_factory(&block_base_opts); cf_opts.compression_per_level(&opts.defaultcf.compression_per_level); @@ -351,7 +353,7 @@ mod tests { let n = 10; let commit_ts = 10; - let mut w = SSTWriter::new(&cfg).unwrap(); + let mut w = SSTWriter::new(&cfg, temp_dir.path().to_str().unwrap()).unwrap(); // Write some keys. let value = vec![1u8; value_size]; diff --git a/src/import/prepare.rs b/src/import/prepare.rs index fc07651201c..67e8acaad7c 100644 --- a/src/import/prepare.rs +++ b/src/import/prepare.rs @@ -31,6 +31,14 @@ use super::{Config, Error, Result}; const MAX_RETRY_TIMES: u64 = 3; const RETRY_INTERVAL_SECS: u64 = 1; +const SCATTER_WAIT_MAX_RETRY_TIMES: u64 = 125; +const SCATTER_WAIT_INTERVAL_MILLIS: u64 = 8; + +/// PrepareJob is responsible for improving cluster data balance +/// +/// The main job is: +/// 1. split data into ranges according to region size and region distribution +/// 2. split and scatter regions of a cluster before we import a large amount of data pub struct PrepareJob { tag: String, cfg: Config, @@ -176,6 +184,23 @@ impl PrepareRangeJob { } match self.split_region(®ion) { Ok(new_region) => { + // We need to wait for a few milliseconds, because PD may have + // not received any heartbeat from the new split region, such + // that PD cannot create scatter operator for the new split + // region because it doesn't have the meta data of the new split + // region. + for i in 0..SCATTER_WAIT_MAX_RETRY_TIMES { + if self.client.has_region_id(new_region.region.id)? { + if i > 0 { + debug!("waited between split and scatter; retry times => {}", i); + } + break; + } else if i == SCATTER_WAIT_MAX_RETRY_TIMES - 1 { + warn!("split region still failed after exhausting all retries"); + } else { + thread::sleep(Duration::from_millis(SCATTER_WAIT_INTERVAL_MILLIS)); + } + } self.scatter_region(&new_region)?; Ok(true) } diff --git a/src/import/test_helpers.rs b/src/import/test_helpers.rs index b0d5e8f69a8..dbac75cb575 100644 --- a/src/import/test_helpers.rs +++ b/src/import/test_helpers.rs @@ -162,4 +162,9 @@ impl ImportClient for MockClient { regions.insert(region.get_id(), region.region.clone()); Ok(()) } + + fn has_region_id(&self, region_id: u64) -> Result { + let regions = self.regions.lock().unwrap(); + Ok(regions.contains_key(®ion_id)) + } }