Skip to content

Commit

Permalink
Fix to bug of invoking dedicated thread
Browse files Browse the repository at this point in the history
* Since workers are inserted into `unordered_map`, there is no guarantee
that the first invoked worker is the dedicated one. We should have
separate worker name to distinguish the dedicated ones.
  • Loading branch information
greensky00 committed Aug 31, 2024
1 parent fb7904f commit e297128
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 10 deletions.
18 changes: 11 additions & 7 deletions src/db_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,20 @@ void DBMgr::initInternal(const GlobalConfig& config) {
printGlobalConfig();

bool dedicated_async_flusher =
config.numDedicatedFlusherForAsyncReqs
&& config.numFlusherThreads > config.numDedicatedFlusherForAsyncReqs;
config.numDedicatedFlusherForAsyncReqs &&
config.numFlusherThreads > config.numDedicatedFlusherForAsyncReqs;

for (size_t ii=0; ii<config.numFlusherThreads; ++ii) {
std::string t_name = "flusher_" + std::to_string(ii);
Flusher* flusher = new Flusher(t_name, config);
// If dedicated flusher is enabled, only the first
// `numDedicatedFlusherForAsyncReqs` flushers will handle async reqs.
if (dedicated_async_flusher && ii >= config.numDedicatedFlusherForAsyncReqs) {
flusher->handleAsyncReqs = false;
}
bool handle_async_reqs =
!(dedicated_async_flusher && ii >= config.numDedicatedFlusherForAsyncReqs);
std::string t_name =
handle_async_reqs
? "flusher_ded_" + std::to_string(ii)
: "flusher_" + std::to_string(ii);
Flusher* flusher = new Flusher(t_name, config);
flusher->handleAsyncReqs = handle_async_reqs;
wMgr->addWorker(flusher);
flusher->run();
}
Expand Down
13 changes: 10 additions & 3 deletions src/jungle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -501,17 +501,24 @@ Status DB::flushLogsAsync(const FlushOptions& options,
new FlusherQueueElem(this, local_options, seq_num, handler, ctx);
db_mgr->flusherQueue()->push(elem);

// If dedicated workers are enabled, invoke those workers only.
const std::string WORKER_PREFIX =
db_mgr->getGlobalConfig()->numDedicatedFlusherForAsyncReqs
? "flusher_ded"
: "flusher";

if (options.execDelayUs) {
// Delay is given.
std::lock_guard<std::mutex> l(p->asyncFlushJobLock);
if ( !p->asyncFlushJob ||
p->asyncFlushJob->isDone() ) {
// Schedule a new timer.
p->asyncFlushJob = db_mgr->getTpMgr()->addTask(
[db_mgr, lv, this](const simple_thread_pool::TaskResult& ret) {
[db_mgr, lv, this, WORKER_PREFIX]
(const simple_thread_pool::TaskResult& ret) {
if (!ret.ok()) return;
_log_(lv, p->myLog, "delayed flushing wakes up");
db_mgr->workerMgr()->invokeWorker("flusher");
db_mgr->workerMgr()->invokeWorker(WORKER_PREFIX);
},
local_options.execDelayUs );
_log_(lv, p->myLog, "scheduled delayed flushing %p, %zu us",
Expand All @@ -521,7 +528,7 @@ Status DB::flushLogsAsync(const FlushOptions& options,
} else {
// Immediately invoke.
_log_(lv, p->myLog, "invoke flush worker");
db_mgr->workerMgr()->invokeWorker("flusher");
db_mgr->workerMgr()->invokeWorker(WORKER_PREFIX);
}

return Status();
Expand Down
50 changes: 50 additions & 0 deletions tests/jungle/log_reclaim_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1613,6 +1613,53 @@ int snapshot_on_purged_memtable_test() {
return 0;
}

int dedicated_flusher_test() {
std::string filename;
TEST_SUITE_PREPARE_PATH(filename);

jungle::Status s;
jungle::DB* db;

jungle::GlobalConfig g_config;
g_config.numDedicatedFlusherForAsyncReqs = 1;
g_config.numFlusherThreads = 2;
g_config.logFileReclaimerSleep_sec = 1;
jungle::init(g_config);

// Open DB.
jungle::DBConfig config;
TEST_CUSTOM_DB_CONFIG(config);
config.numL0Partitions = 4;
config.maxEntriesInLogFile = 10;
config.logSectionOnly = true;
config.logFileTtl_sec = 1;
CHK_Z(jungle::DB::open(&db, filename, config));

const size_t NUM = 10;
for (size_t ii = 0; ii < NUM; ++ii) {
std::string key_str = "k" + TestSuite::lzStr(8, ii);
std::string val_str = "v" + TestSuite::lzStr(16, ii);
CHK_Z(db->setSN(ii + 1, jungle::KV(key_str, val_str)));
jungle::FlushOptions f_opt;
f_opt.callFsync = true;
f_opt.syncOnly = true;
EventAwaiter awaiter;
TestSuite::Timer tt;
CHK_Z(db->flushLogsAsync(f_opt,
[&](jungle::Status s, void* c) { awaiter.invoke(); },
nullptr));
awaiter.wait();
TestSuite::_msg("%lu us elapsed\n", tt.getTimeUs());
}

CHK_Z(jungle::DB::close(db));

CHK_Z(jungle::shutdown());

TEST_SUITE_CLEANUP_PATH();
return 0;
}


} using namespace log_reclaim_test;

Expand Down Expand Up @@ -1695,6 +1742,9 @@ int main(int argc, char** argv) {
snapshot_on_purged_memtable_test);


ts.doTest("dedicated flusher test",
dedicated_flusher_test);

#if 0
ts.doTest("reload empty files test",
reload_with_empty_files_test_load);
Expand Down

0 comments on commit e297128

Please sign in to comment.