diff --git a/src/db_mgr.cc b/src/db_mgr.cc index 359c464..68184f0 100644 --- a/src/db_mgr.cc +++ b/src/db_mgr.cc @@ -89,16 +89,20 @@ void DBMgr::initInternal(const GlobalConfig& config) { printGlobalConfig(); bool dedicated_async_flusher = - config.numDedicatedFlusherForAsyncReqs - && config.numFlusherThreads > config.numDedicatedFlusherForAsyncReqs; + config.numDedicatedFlusherForAsyncReqs && + config.numFlusherThreads > config.numDedicatedFlusherForAsyncReqs; + for (size_t ii=0; ii= config.numDedicatedFlusherForAsyncReqs) { - flusher->handleAsyncReqs = false; - } + bool handle_async_reqs = + !(dedicated_async_flusher && ii >= config.numDedicatedFlusherForAsyncReqs); + std::string t_name = + handle_async_reqs + ? "flusher_ded_" + std::to_string(ii) + : "flusher_" + std::to_string(ii); + Flusher* flusher = new Flusher(t_name, config); + flusher->handleAsyncReqs = handle_async_reqs; wMgr->addWorker(flusher); flusher->run(); } diff --git a/src/jungle.cc b/src/jungle.cc index 63e8410..749dcba 100644 --- a/src/jungle.cc +++ b/src/jungle.cc @@ -501,6 +501,12 @@ Status DB::flushLogsAsync(const FlushOptions& options, new FlusherQueueElem(this, local_options, seq_num, handler, ctx); db_mgr->flusherQueue()->push(elem); + // If dedicated workers are enabled, invoke those workers only. + const std::string WORKER_PREFIX = + db_mgr->getGlobalConfig()->numDedicatedFlusherForAsyncReqs + ? "flusher_ded" + : "flusher"; + if (options.execDelayUs) { // Delay is given. std::lock_guard l(p->asyncFlushJobLock); @@ -508,10 +514,11 @@ Status DB::flushLogsAsync(const FlushOptions& options, p->asyncFlushJob->isDone() ) { // Schedule a new timer. p->asyncFlushJob = db_mgr->getTpMgr()->addTask( - [db_mgr, lv, this](const simple_thread_pool::TaskResult& ret) { + [db_mgr, lv, this, WORKER_PREFIX] + (const simple_thread_pool::TaskResult& ret) { if (!ret.ok()) return; _log_(lv, p->myLog, "delayed flushing wakes up"); - db_mgr->workerMgr()->invokeWorker("flusher"); + db_mgr->workerMgr()->invokeWorker(WORKER_PREFIX); }, local_options.execDelayUs ); _log_(lv, p->myLog, "scheduled delayed flushing %p, %zu us", @@ -521,7 +528,7 @@ Status DB::flushLogsAsync(const FlushOptions& options, } else { // Immediately invoke. _log_(lv, p->myLog, "invoke flush worker"); - db_mgr->workerMgr()->invokeWorker("flusher"); + db_mgr->workerMgr()->invokeWorker(WORKER_PREFIX); } return Status(); diff --git a/tests/jungle/log_reclaim_test.cc b/tests/jungle/log_reclaim_test.cc index 7436203..74260c7 100644 --- a/tests/jungle/log_reclaim_test.cc +++ b/tests/jungle/log_reclaim_test.cc @@ -1613,6 +1613,53 @@ int snapshot_on_purged_memtable_test() { return 0; } +int dedicated_flusher_test() { + std::string filename; + TEST_SUITE_PREPARE_PATH(filename); + + jungle::Status s; + jungle::DB* db; + + jungle::GlobalConfig g_config; + g_config.numDedicatedFlusherForAsyncReqs = 1; + g_config.numFlusherThreads = 2; + g_config.logFileReclaimerSleep_sec = 1; + jungle::init(g_config); + + // Open DB. + jungle::DBConfig config; + TEST_CUSTOM_DB_CONFIG(config); + config.numL0Partitions = 4; + config.maxEntriesInLogFile = 10; + config.logSectionOnly = true; + config.logFileTtl_sec = 1; + CHK_Z(jungle::DB::open(&db, filename, config)); + + const size_t NUM = 10; + for (size_t ii = 0; ii < NUM; ++ii) { + std::string key_str = "k" + TestSuite::lzStr(8, ii); + std::string val_str = "v" + TestSuite::lzStr(16, ii); + CHK_Z(db->setSN(ii + 1, jungle::KV(key_str, val_str))); + jungle::FlushOptions f_opt; + f_opt.callFsync = true; + f_opt.syncOnly = true; + EventAwaiter awaiter; + TestSuite::Timer tt; + CHK_Z(db->flushLogsAsync(f_opt, + [&](jungle::Status s, void* c) { awaiter.invoke(); }, + nullptr)); + awaiter.wait(); + TestSuite::_msg("%lu us elapsed\n", tt.getTimeUs()); + } + + CHK_Z(jungle::DB::close(db)); + + CHK_Z(jungle::shutdown()); + + TEST_SUITE_CLEANUP_PATH(); + return 0; +} + } using namespace log_reclaim_test; @@ -1695,6 +1742,9 @@ int main(int argc, char** argv) { snapshot_on_purged_memtable_test); + ts.doTest("dedicated flusher test", + dedicated_flusher_test); + #if 0 ts.doTest("reload empty files test", reload_with_empty_files_test_load);