Skip to content

Commit

Permalink
Cherry-pick 4 PRs related to tikv-importer to release-2.1 (tikv#4369)
Browse files Browse the repository at this point in the history
* import: increase default region-split-size to 512 MiB (tikv#4347)

This makes the SST size to upload larger, which reduces the number of SST
files needed to ingested, and improve overall speed.

Signed-off-by: kennytm <kennytm@gmail.com>

* import: store the intermediate SST files on disk instead of memory (tikv#4348)

* import: store the intermediate SST files on disk instead of memory

The bottle-neck of Importer is in SST ingestion on the TiKV side, not
disk I/O. Storing these SST in RAM will just increase the chance Importer
getting killed by OOM.

Signed-off-by: kennytm <kennytm@gmail.com>

* import: fix tikv#4257, restrict memory usage by RocksDB (tikv#4350)

* import: put index and filter in block cache to restrict memory usage

Signed-off-by: Lonng <chris@lonng.org>
Signed-off-by: kennytm <kennytm@gmail.com>

* import: wait after split before scatter (tikv#4352)

Signed-off-by: kennytm <kennytm@gmail.com>
  • Loading branch information
kennytm authored and huachaohuang committed Mar 14, 2019
1 parent e02e08b commit 229c423
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 12 deletions.
2 changes: 1 addition & 1 deletion etc/tikv-importer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ num-import-jobs = 24
# maximum duration to prepare regions.
# max-prepare-duration = "5m"
# split regions into this size according to the importing data.
# region-split-size = "96MB"
# region-split-size = "512MB"
# stream channel window size, stream will be blocked on channel full.
# stream-channel-window = 128
# maximum number of open engines
Expand Down
8 changes: 8 additions & 0 deletions src/import/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ pub trait ImportClient: Send + Sync + Clone + 'static {
fn ingest_sst(&self, _: u64, _: IngestRequest) -> Result<IngestResponse> {
unimplemented!()
}

fn has_region_id(&self, _: u64) -> Result<bool> {
unimplemented!()
}
}

pub struct Client {
Expand Down Expand Up @@ -209,6 +213,10 @@ impl ImportClient for Client {
let res = client.ingest_opt(&req, self.option(Duration::from_secs(30)));
self.post_resolve(store_id, res.map_err(Error::from))
}

fn has_region_id(&self, id: u64) -> Result<bool> {
Ok(self.pd.get_region_by_id(id).wait()?.is_some())
}
}

pub struct UploadStream<'a> {
Expand Down
3 changes: 1 addition & 2 deletions src/import/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
use std::error::Error;
use std::result::Result;

use raftstore::coprocessor::config::SPLIT_SIZE_MB;
use util::config::{ReadableDuration, ReadableSize};

#[derive(Clone, Serialize, Deserialize, PartialEq, Debug)]
Expand All @@ -39,7 +38,7 @@ impl Default for Config {
num_import_jobs: 8,
num_import_sst_jobs: 2,
max_prepare_duration: ReadableDuration::minutes(5),
region_split_size: ReadableSize::mb(SPLIT_SIZE_MB),
region_split_size: ReadableSize::mb(512),
stream_channel_window: 128,
max_open_engines: 8,
}
Expand Down
20 changes: 11 additions & 9 deletions src/import/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::fmt;
use std::i32;
use std::io::Read;
use std::ops::Deref;
use std::path::Path;
use std::path::{Path, MAIN_SEPARATOR};
use std::sync::Arc;

use uuid::Uuid;
Expand Down Expand Up @@ -93,7 +93,7 @@ impl Engine {
}

pub fn new_sst_writer(&self) -> Result<SSTWriter> {
SSTWriter::new(&self.opts)
SSTWriter::new(&self.opts, self.db.path())
}

pub fn get_size_properties(&self) -> Result<SizeProperties> {
Expand Down Expand Up @@ -161,18 +161,19 @@ pub struct SSTWriter {
}

impl SSTWriter {
pub fn new(cfg: &DbConfig) -> Result<SSTWriter> {
let env = Arc::new(Env::new_mem());
pub fn new(cfg: &DbConfig, path: &str) -> Result<SSTWriter> {
let env = Arc::new(Env::default());
let uuid = Uuid::new_v4().to_string();

let mut default_opts = cfg.defaultcf.build_opt();
default_opts.set_env(Arc::clone(&env));
let mut default = SstFileWriter::new(EnvOptions::new(), default_opts);
default.open(CF_DEFAULT)?;
default.open(&format!("{}{}.{}:default", path, MAIN_SEPARATOR, uuid))?;

let mut write_opts = cfg.writecf.build_opt();
write_opts.set_env(Arc::clone(&env));
let mut write = SstFileWriter::new(EnvOptions::new(), write_opts);
write.open(CF_WRITE)?;
write.open(&format!("{}{}.{}:write", path, MAIN_SEPARATOR, uuid))?;

Ok(SSTWriter {
env,
Expand Down Expand Up @@ -256,9 +257,10 @@ fn tune_dboptions_for_bulk_load(opts: &DbConfig) -> (DBOptions, CFOptions) {
// RocksDB preserves `max_background_jobs/4` for flush.
db_opts.set_max_background_jobs(opts.max_background_jobs);

// Put index and filter in block cache to restrict memory usage.
let mut block_base_opts = BlockBasedOptions::new();
// Use a large block size for sequential access.
block_base_opts.set_block_size(MB as usize);
block_base_opts.set_lru_cache(128 * MB as usize, -1, 0, 0.0);
block_base_opts.set_cache_index_and_filter_blocks(true);
let mut cf_opts = ColumnFamilyOptions::new();
cf_opts.set_block_based_table_factory(&block_base_opts);
cf_opts.compression_per_level(&opts.defaultcf.compression_per_level);
Expand Down Expand Up @@ -351,7 +353,7 @@ mod tests {

let n = 10;
let commit_ts = 10;
let mut w = SSTWriter::new(&cfg).unwrap();
let mut w = SSTWriter::new(&cfg, temp_dir.path().to_str().unwrap()).unwrap();

// Write some keys.
let value = vec![1u8; value_size];
Expand Down
25 changes: 25 additions & 0 deletions src/import/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ use super::{Config, Error, Result};
const MAX_RETRY_TIMES: u64 = 3;
const RETRY_INTERVAL_SECS: u64 = 1;

const SCATTER_WAIT_MAX_RETRY_TIMES: u64 = 125;
const SCATTER_WAIT_INTERVAL_MILLIS: u64 = 8;

/// PrepareJob is responsible for improving cluster data balance
///
/// The main job is:
/// 1. split data into ranges according to region size and region distribution
/// 2. split and scatter regions of a cluster before we import a large amount of data
pub struct PrepareJob<Client> {
tag: String,
cfg: Config,
Expand Down Expand Up @@ -176,6 +184,23 @@ impl<Client: ImportClient> PrepareRangeJob<Client> {
}
match self.split_region(&region) {
Ok(new_region) => {
// We need to wait for a few milliseconds, because PD may have
// not received any heartbeat from the new split region, such
// that PD cannot create scatter operator for the new split
// region because it doesn't have the meta data of the new split
// region.
for i in 0..SCATTER_WAIT_MAX_RETRY_TIMES {
if self.client.has_region_id(new_region.region.id)? {
if i > 0 {
debug!("waited between split and scatter; retry times => {}", i);
}
break;
} else if i == SCATTER_WAIT_MAX_RETRY_TIMES - 1 {
warn!("split region still failed after exhausting all retries");
} else {
thread::sleep(Duration::from_millis(SCATTER_WAIT_INTERVAL_MILLIS));
}
}
self.scatter_region(&new_region)?;
Ok(true)
}
Expand Down
5 changes: 5 additions & 0 deletions src/import/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,9 @@ impl ImportClient for MockClient {
regions.insert(region.get_id(), region.region.clone());
Ok(())
}

fn has_region_id(&self, region_id: u64) -> Result<bool> {
let regions = self.regions.lock().unwrap();
Ok(regions.contains_key(&region_id))
}
}

0 comments on commit 229c423

Please sign in to comment.