diff --git a/src/config.cpp b/src/config.cpp index a8217e7bc..fbe97dd0f 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -361,7 +361,7 @@ bool initializeStorageProvider(const char **err) // Create The Storage Factory (if necessary) serverLog(LL_NOTICE, "Initializing FLASH storage provider (this may take a long time)"); adjustOpenFilesLimit(); - g_pserver->m_pstorageFactory = CreateRocksDBStorageFactory(g_sdsArgs, cserver.dbnum, cserver.storage_conf, cserver.storage_conf ? strlen(cserver.storage_conf) : 0, &g_pserver->asyncworkqueue); + g_pserver->m_pstorageFactory = CreateRocksDBStorageFactory(g_sdsArgs, cserver.dbnum, cserver.storage_conf, cserver.storage_conf ? strlen(cserver.storage_conf) : 0, &g_pserver->asyncreadworkqueue, &g_pserver->asyncwriteworkqueue); #else serverLog(LL_WARNING, "To use the flash storage provider please compile KeyDB with ENABLE_FLASH=yes"); serverLog(LL_WARNING, "Exiting due to the use of an unsupported storage provider"); diff --git a/src/db.cpp b/src/db.cpp index 9a0f35684..a861244b9 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -3416,6 +3416,7 @@ void redisDbPersistentData::prefetchKeysFlash(std::unordered_set &setc) blockClient(c, BLOCKED_STORAGE); } tok->setc = std::move(setcBlocked); + tok->type = StorageToken::TokenType::SingleRead; tok->db = this; } return; diff --git a/src/server.cpp b/src/server.cpp index 4792d0fd9..abdf73dfe 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4158,7 +4158,12 @@ void InitServerLast() { set_jemalloc_bg_thread(cserver.jemalloc_bg_thread); g_pserver->initial_memory_usage = zmalloc_used_memory(); - g_pserver->asyncworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(cserver.cthreads*10); + g_pserver->asyncworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(cserver.cthreads); + + g_pserver->asyncreadworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(cserver.cthreads*10); + + //Process one write/commit at a time to ensure consistency + g_pserver->asyncwriteworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(1); // Allocate the repl backlog diff --git a/src/server.h b/src/server.h index ef21fd930..88f3031ce 100644 --- a/src/server.h +++ b/src/server.h @@ -2705,6 +2705,8 @@ struct redisServer { uint64_t mvcc_tstamp; AsyncWorkQueue *asyncworkqueue; + AsyncWorkQueue *asyncreadworkqueue; + AsyncWorkQueue *asyncwriteworkqueue; /* System hardware info */ size_t system_memory_size; /* Total memory in system as reported by OS */ diff --git a/src/storage/rocksdb.cpp b/src/storage/rocksdb.cpp index 56f00821f..73e709a8b 100644 --- a/src/storage/rocksdb.cpp +++ b/src/storage/rocksdb.cpp @@ -266,7 +266,7 @@ StorageToken* RocksDBStorageProvider::begin_endWriteBatch(struct aeEventLoop *el tok->tspdb = m_spdb; m_spbatch = nullptr; m_lock.unlock(); - (*m_pfactory->m_wqueue)->AddWorkFunction([this, el,callback,tok]{ + (*m_pfactory->m_wwqueue)->AddWorkFunction([this, el,callback,tok]{ tok->tspdb->Write(WriteOptions(),tok->tspbatch.get()->GetWriteBatch()); aePostFunction(el,callback,tok); }); @@ -316,7 +316,7 @@ StorageToken *RocksDBStorageProvider::begin_retrieve(struct aeEventLoop *el, aeP auto opts = ReadOptions(); opts.async_io = true; - (*m_pfactory->m_wqueue)->AddWorkFunction([this, el, callback, tok, opts]{ + (*m_pfactory->m_rwqueue)->AddWorkFunction([this, el, callback, tok, opts]{ std::vector veckeysStr; std::vector veckeys; std::vector vecvals; diff --git a/src/storage/rocksdbfactor_internal.h b/src/storage/rocksdbfactor_internal.h index addff77ce..0fa34eec5 100644 --- a/src/storage/rocksdbfactor_internal.h +++ b/src/storage/rocksdbfactor_internal.h @@ -11,9 +11,10 @@ class RocksDBStorageFactory : public IStorageFactory bool m_fCreatedTempFolder = false; public: - AsyncWorkQueue **m_wqueue; + AsyncWorkQueue **m_rwqueue; + AsyncWorkQueue **m_wwqueue; - RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue); + RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **rwqueue, AsyncWorkQueue **wwqueue); ~RocksDBStorageFactory(); virtual IStorage *create(int db, key_load_iterator iter, void *privdata) override; diff --git a/src/storage/rocksdbfactory.cpp b/src/storage/rocksdbfactory.cpp index 0ebfb93e1..2e02010be 100644 --- a/src/storage/rocksdbfactory.cpp +++ b/src/storage/rocksdbfactory.cpp @@ -35,9 +35,9 @@ rocksdb::Options DefaultRocksDBOptions() { return options; } -IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue) +IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **rwqueue, AsyncWorkQueue **wwqueue) { - return new RocksDBStorageFactory(path, dbnum, rgchConfig, cchConfig, wqueue); + return new RocksDBStorageFactory(path, dbnum, rgchConfig, cchConfig, rwqueue, wwqueue); } rocksdb::Options RocksDBStorageFactory::RocksDbOptions() @@ -52,8 +52,8 @@ rocksdb::Options RocksDBStorageFactory::RocksDbOptions() return options; } -RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue) - : m_path(dbfile), m_wqueue(wqueue) +RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **rwqueue, AsyncWorkQueue **wwqueue) + : m_path(dbfile), m_rwqueue(rwqueue), m_wwqueue(wwqueue) { dbnum++; // create an extra db for metadata // Get the count of column families in the actual database diff --git a/src/storage/rocksdbfactory.h b/src/storage/rocksdbfactory.h index c137a79e4..e12881475 100644 --- a/src/storage/rocksdbfactory.h +++ b/src/storage/rocksdbfactory.h @@ -1,3 +1,3 @@ #pragma once -class IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue); \ No newline at end of file +class IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **rwqueue, AsyncWorkQueue **wwqueue); \ No newline at end of file