Skip to content

Commit

Permalink
refactor asyncwrite and asyncread work queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Yu Zhao (Yuri) authored and JohnSully committed Jan 10, 2024
1 parent 1d59efb commit e1935a7
Show file tree
Hide file tree
Showing 8 changed files with 20 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ bool initializeStorageProvider(const char **err)
// Create The Storage Factory (if necessary)
serverLog(LL_NOTICE, "Initializing FLASH storage provider (this may take a long time)");
adjustOpenFilesLimit();
g_pserver->m_pstorageFactory = CreateRocksDBStorageFactory(g_sdsArgs, cserver.dbnum, cserver.storage_conf, cserver.storage_conf ? strlen(cserver.storage_conf) : 0, &g_pserver->asyncworkqueue);
g_pserver->m_pstorageFactory = CreateRocksDBStorageFactory(g_sdsArgs, cserver.dbnum, cserver.storage_conf, cserver.storage_conf ? strlen(cserver.storage_conf) : 0, &g_pserver->asyncreadworkqueue, &g_pserver->asyncwriteworkqueue);
#else
serverLog(LL_WARNING, "To use the flash storage provider please compile KeyDB with ENABLE_FLASH=yes");
serverLog(LL_WARNING, "Exiting due to the use of an unsupported storage provider");
Expand Down
1 change: 1 addition & 0 deletions src/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3416,6 +3416,7 @@ void redisDbPersistentData::prefetchKeysFlash(std::unordered_set<client*> &setc)
blockClient(c, BLOCKED_STORAGE);
}
tok->setc = std::move(setcBlocked);
tok->type = StorageToken::TokenType::SingleRead;
tok->db = this;
}
return;
Expand Down
7 changes: 6 additions & 1 deletion src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4158,7 +4158,12 @@ void InitServerLast() {
set_jemalloc_bg_thread(cserver.jemalloc_bg_thread);
g_pserver->initial_memory_usage = zmalloc_used_memory();

g_pserver->asyncworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(cserver.cthreads*10);
g_pserver->asyncworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(cserver.cthreads);

g_pserver->asyncreadworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(cserver.cthreads*10);

//Process one write/commit at a time to ensure consistency
g_pserver->asyncwriteworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(1);

// Allocate the repl backlog

Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2705,6 +2705,8 @@ struct redisServer {
uint64_t mvcc_tstamp;

AsyncWorkQueue *asyncworkqueue;
AsyncWorkQueue *asyncreadworkqueue;
AsyncWorkQueue *asyncwriteworkqueue;

/* System hardware info */
size_t system_memory_size; /* Total memory in system as reported by OS */
Expand Down
4 changes: 2 additions & 2 deletions src/storage/rocksdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ StorageToken* RocksDBStorageProvider::begin_endWriteBatch(struct aeEventLoop *el
tok->tspdb = m_spdb;
m_spbatch = nullptr;
m_lock.unlock();
(*m_pfactory->m_wqueue)->AddWorkFunction([this, el,callback,tok]{
(*m_pfactory->m_wwqueue)->AddWorkFunction([this, el,callback,tok]{
tok->tspdb->Write(WriteOptions(),tok->tspbatch.get()->GetWriteBatch());
aePostFunction(el,callback,tok);
});
Expand Down Expand Up @@ -316,7 +316,7 @@ StorageToken *RocksDBStorageProvider::begin_retrieve(struct aeEventLoop *el, aeP

auto opts = ReadOptions();
opts.async_io = true;
(*m_pfactory->m_wqueue)->AddWorkFunction([this, el, callback, tok, opts]{
(*m_pfactory->m_rwqueue)->AddWorkFunction([this, el, callback, tok, opts]{
std::vector<std::string> veckeysStr;
std::vector<rocksdb::Slice> veckeys;
std::vector<rocksdb::PinnableSlice> vecvals;
Expand Down
5 changes: 3 additions & 2 deletions src/storage/rocksdbfactor_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ class RocksDBStorageFactory : public IStorageFactory
bool m_fCreatedTempFolder = false;

public:
AsyncWorkQueue **m_wqueue;
AsyncWorkQueue **m_rwqueue;
AsyncWorkQueue **m_wwqueue;

RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue);
RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **rwqueue, AsyncWorkQueue **wwqueue);
~RocksDBStorageFactory();

virtual IStorage *create(int db, key_load_iterator iter, void *privdata) override;
Expand Down
8 changes: 4 additions & 4 deletions src/storage/rocksdbfactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ rocksdb::Options DefaultRocksDBOptions() {
return options;
}

IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue)
IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **rwqueue, AsyncWorkQueue **wwqueue)
{
return new RocksDBStorageFactory(path, dbnum, rgchConfig, cchConfig, wqueue);
return new RocksDBStorageFactory(path, dbnum, rgchConfig, cchConfig, rwqueue, wwqueue);
}

rocksdb::Options RocksDBStorageFactory::RocksDbOptions()
Expand All @@ -52,8 +52,8 @@ rocksdb::Options RocksDBStorageFactory::RocksDbOptions()
return options;
}

RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue)
: m_path(dbfile), m_wqueue(wqueue)
RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **rwqueue, AsyncWorkQueue **wwqueue)
: m_path(dbfile), m_rwqueue(rwqueue), m_wwqueue(wwqueue)
{
dbnum++; // create an extra db for metadata
// Get the count of column families in the actual database
Expand Down
2 changes: 1 addition & 1 deletion src/storage/rocksdbfactory.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#pragma once

class IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue);
class IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **rwqueue, AsyncWorkQueue **wwqueue);

0 comments on commit e1935a7

Please sign in to comment.