diff --git a/include/pika_admin.h b/include/pika_admin.h index 4373d5add9..b1d09f0755 100644 --- a/include/pika_admin.h +++ b/include/pika_admin.h @@ -158,7 +158,6 @@ class FlushallCmd : public Cmd { void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new FlushallCmd(*this); } - void Execute() override; private: void DoInitial() override; @@ -212,7 +211,8 @@ class InfoCmd : public Cmd { kInfo, kInfoAll, kInfoDebug, - kInfoCommandStats + kInfoCommandStats, + kInfoCache }; InfoCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} @@ -240,6 +240,7 @@ class InfoCmd : public Cmd { const static std::string kRocksDBSection; const static std::string kDebugSection; const static std::string kCommandStatsSection; + const static std::string kCacheSection; void DoInitial() override; void Clear() override { @@ -260,6 +261,7 @@ class InfoCmd : public Cmd { void InfoRocksDB(std::string& info); void InfoDebug(std::string& info); void InfoCommandStats(std::string& info); + void InfoCache(std::string& info); }; class ShutdownCmd : public Cmd { diff --git a/include/pika_cache.h b/include/pika_cache.h index 6bde3acafd..5e37fca79a 100644 --- a/include/pika_cache.h +++ b/include/pika_cache.h @@ -6,14 +6,13 @@ #include #include "include/pika_server.h" -#include "pika_define.h" -#include "pika_zset.h" +#include "include/pika_define.h" +#include "include/pika_zset.h" #include "pstd/include/pstd_mutex.h" #include "pstd/include/pstd_status.h" #include "dory/include/RedisCache.h" #include "storage/storage.h" -using Status = pstd::Status; /* * cache status */ @@ -36,7 +35,7 @@ const char PIKA_KEY_TYPE_ZSET = 'z'; enum RangeStatus : int { RangeError = 1, RangeHit, RangeMiss }; class PikaCacheLoadThread; -class PikaCache : public pstd::noncopyable, std::enable_shared_from_this { +class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this { public: struct CacheInfo { int status; @@ -71,151 +70,150 @@ class PikaCache : public pstd::noncopyable, std::enable_shared_from_this slot); ~PikaCache(); - Status Init(uint32_t cache_num, dory::CacheConfig *cache_cfg); - Status Reset(uint32_t cache_num, dory::CacheConfig *cache_cfg = NULL); + rocksdb::Status Init(); + rocksdb::Status Reset(); void ResetConfig(dory::CacheConfig *cache_cfg); void Destroy(void); - void ProcessCronTask(void); void SetCacheStatus(int status); int CacheStatus(void); // Normal Commands - void Info(CacheInfo &info); - long long DbSize(void); + CacheInfo Info(); bool Exists(std::string &key); void FlushSlot(void); void ActiveExpireCycle(); - Status Del(std::string &key); - Status Expire(std::string &key, int64_t ttl); - Status Expireat(std::string &key, int64_t ttl); - Status TTL(std::string &key, int64_t *ttl); - Status Persist(std::string &key); - Status Type(std::string &key, std::string *value); - Status RandomKey(std::string *key); + rocksdb::Status Del(const std::vector &keys); + rocksdb::Status Expire(std::string &key, int64_t ttl); + rocksdb::Status Expireat(std::string &key, int64_t ttl); + rocksdb::Status TTL(std::string &key, int64_t *ttl); + rocksdb::Status Persist(std::string &key); + rocksdb::Status Type(std::string &key, std::string *value); + rocksdb::Status RandomKey(std::string *key); // String Commands - Status Set(std::string &key, std::string &value, int64_t ttl); - Status Setnx(std::string &key, std::string &value, int64_t ttl); - Status SetnxWithoutTTL(std::string &key, std::string &value); - Status Setxx(std::string &key, std::string &value, int64_t ttl); - Status SetxxWithoutTTL(std::string &key, std::string &value); - Status Get(std::string &key, std::string *value); - Status Incrxx(std::string &key); - Status Decrxx(std::string &key); - Status IncrByxx(std::string &key, long long incr); - Status DecrByxx(std::string &key, long long incr); - Status Incrbyfloatxx(std::string &key, long double incr); - Status Appendxx(std::string &key, std::string &value); - Status GetRange(std::string &key, int64_t start, int64_t end, std::string *value); - Status SetRangexx(std::string &key, int64_t start, std::string &value); - Status Strlen(std::string &key, int32_t *len); + rocksdb::Status Set(std::string &key, std::string &value, int64_t ttl); + rocksdb::Status SetWithoutTTL(std::string &key, std::string &value); + rocksdb::Status Setnx(std::string &key, std::string &value, int64_t ttl); + rocksdb::Status SetnxWithoutTTL(std::string &key, std::string &value); + rocksdb::Status Setxx(std::string &key, std::string &value, int64_t ttl); + rocksdb::Status SetxxWithoutTTL(std::string &key, std::string &value); + rocksdb::Status MSet(const std::vector& kvs); + rocksdb::Status Get(std::string &key, std::string *value); + rocksdb::Status MGet(const std::vector& keys, std::vector* vss); + rocksdb::Status Incrxx(std::string &key); + rocksdb::Status Decrxx(std::string &key); + rocksdb::Status IncrByxx(std::string &key, long long incr); + rocksdb::Status DecrByxx(std::string &key, long long incr); + rocksdb::Status Incrbyfloatxx(std::string &key, long double incr); + rocksdb::Status Appendxx(std::string &key, std::string &value); + rocksdb::Status GetRange(std::string &key, int64_t start, int64_t end, std::string *value); + rocksdb::Status SetRangexx(std::string &key, int64_t start, std::string &value); + rocksdb::Status Strlen(std::string &key, int32_t *len); // Hash Commands - Status HDel(std::string &key, std::vector &fields); - Status HSet(std::string &key, std::string &field, std::string &value); - Status HSetIfKeyExist(std::string &key, std::string &field, std::string &value); - Status HSetIfKeyExistAndFieldNotExist(std::string &key, std::string &field, std::string &value); - Status HMSet(std::string &key, std::vector &fvs); - Status HMSetnx(std::string &key, std::vector &fvs, int64_t ttl); - Status HMSetnxWithoutTTL(std::string &key, std::vector &fvs); - Status HMSetxx(std::string &key, std::vector &fvs); - Status HGet(std::string &key, std::string &field, std::string *value); - Status HMGet(std::string &key, std::vector &fields, std::vector *vss); - Status HGetall(std::string &key, std::vector *fvs); - Status HKeys(std::string &key, std::vector *fields); - Status HVals(std::string &key, std::vector *values); - Status HExists(std::string &key, std::string &field); - Status HIncrbyxx(std::string &key, std::string &field, int64_t value); - Status HIncrbyfloatxx(std::string &key, std::string &field, long double value); - Status HLen(std::string &key, unsigned long *len); - Status HStrlen(std::string &key, std::string &field, unsigned long *len); + rocksdb::Status HDel(std::string &key, std::vector &fields); + rocksdb::Status HSet(std::string &key, std::string &field, std::string &value); + rocksdb::Status HSetIfKeyExist(std::string &key, std::string &field, std::string &value); + rocksdb::Status HSetIfKeyExistAndFieldNotExist(std::string &key, std::string &field, std::string &value); + rocksdb::Status HMSet(std::string &key, std::vector &fvs); + rocksdb::Status HMSetnx(std::string &key, std::vector &fvs, int64_t ttl); + rocksdb::Status HMSetnxWithoutTTL(std::string &key, std::vector &fvs); + rocksdb::Status HMSetxx(std::string &key, std::vector &fvs); + rocksdb::Status HGet(std::string &key, std::string &field, std::string *value); + rocksdb::Status HMGet(std::string &key, std::vector &fields, std::vector *vss); + rocksdb::Status HGetall(std::string &key, std::vector *fvs); + rocksdb::Status HKeys(std::string &key, std::vector *fields); + rocksdb::Status HVals(std::string &key, std::vector *values); + rocksdb::Status HExists(std::string &key, std::string &field); + rocksdb::Status HIncrbyxx(std::string &key, std::string &field, int64_t value); + rocksdb::Status HIncrbyfloatxx(std::string &key, std::string &field, long double value); + rocksdb::Status HLen(std::string &key, unsigned long *len); + rocksdb::Status HStrlen(std::string &key, std::string &field, unsigned long *len); // List Commands - Status LIndex(std::string &key, long index, std::string *element); - Status LInsert(std::string &key, storage::BeforeOrAfter &before_or_after, std::string &pivot, std::string &value); - Status LLen(std::string &key, unsigned long *len); - Status LPop(std::string &key, std::string *element); - Status LPush(std::string &key, std::vector &values); - Status LPushx(std::string &key, std::vector &values); - Status LRange(std::string &key, long start, long stop, std::vector *values); - Status LRem(std::string &key, long count, std::string &value); - Status LSet(std::string &key, long index, std::string &value); - Status LTrim(std::string &key, long start, long stop); - Status RPop(std::string &key, std::string *element); - Status RPush(std::string &key, std::vector &values); - Status RPushx(std::string &key, std::vector &values); - Status RPushnx(std::string &key, std::vector &values, int64_t ttl); - Status RPushnxWithoutTTL(std::string &key, std::vector &values); + rocksdb::Status LIndex(std::string &key, long index, std::string *element); + rocksdb::Status LInsert(std::string &key, storage::BeforeOrAfter &before_or_after, std::string &pivot, std::string &value); + rocksdb::Status LLen(std::string &key, unsigned long *len); + rocksdb::Status LPop(std::string &key, std::string *element); + rocksdb::Status LPush(std::string &key, std::vector &values); + rocksdb::Status LPushx(std::string &key, std::vector &values); + rocksdb::Status LRange(std::string &key, long start, long stop, std::vector *values); + rocksdb::Status LRem(std::string &key, long count, std::string &value); + rocksdb::Status LSet(std::string &key, long index, std::string &value); + rocksdb::Status LTrim(std::string &key, long start, long stop); + rocksdb::Status RPop(std::string &key, std::string *element); + rocksdb::Status RPush(std::string &key, std::vector &values); + rocksdb::Status RPushx(std::string &key, std::vector &values); + rocksdb::Status RPushnx(std::string &key, std::vector &values, int64_t ttl); + rocksdb::Status RPushnxWithoutTTL(std::string &key, std::vector &values); // Set Commands - Status SAdd(std::string &key, std::vector &members); - Status SAddIfKeyExist(std::string &key, std::vector &members); - Status SAddnx(std::string &key, std::vector &members, int64_t ttl); - Status SAddnxWithoutTTL(std::string &key, std::vector &members); - Status SCard(std::string &key, unsigned long *len); - Status SIsmember(std::string &key, std::string &member); - Status SMembers(std::string &key, std::vector *members); - Status SRem(std::string &key, std::vector &members); - Status SRandmember(std::string &key, long count, std::vector *members); + rocksdb::Status SAdd(std::string &key, std::vector &members); + rocksdb::Status SAddIfKeyExist(std::string &key, std::vector &members); + rocksdb::Status SAddnx(std::string &key, std::vector &members, int64_t ttl); + rocksdb::Status SAddnxWithoutTTL(std::string &key, std::vector &members); + rocksdb::Status SCard(std::string &key, unsigned long *len); + rocksdb::Status SIsmember(std::string &key, std::string &member); + rocksdb::Status SMembers(std::string &key, std::vector *members); + rocksdb::Status SRem(std::string &key, std::vector &members); + rocksdb::Status SRandmember(std::string &key, long count, std::vector *members); // ZSet Commands - Status ZAdd(std::string &key, std::vector &score_members); - Status ZAddIfKeyExist(std::string &key, std::vector &score_members); - Status ZAddnx(std::string &key, std::vector &score_members, int64_t ttl); - Status ZAddnxWithoutTTL(std::string &key, std::vector &score_members); - Status ZCard(std::string &key, unsigned long *len, const std::shared_ptr &slot); - Status ZCount(std::string &key, std::string &min, std::string &max, unsigned long *len, ZCountCmd *cmd); - Status ZIncrby(std::string &key, std::string &member, double increment); - Status ZIncrbyIfKeyExist(std::string &key, std::string &member, double increment, ZIncrbyCmd *cmd); - Status ZRange(std::string &key, long start, long stop, std::vector *score_members, + rocksdb::Status ZAdd(std::string &key, std::vector &score_members); + rocksdb::Status ZAddIfKeyExist(std::string &key, std::vector &score_members); + rocksdb::Status ZAddnx(std::string &key, std::vector &score_members, int64_t ttl); + rocksdb::Status ZAddnxWithoutTTL(std::string &key, std::vector &score_members); + rocksdb::Status ZCard(std::string &key, unsigned long *len, const std::shared_ptr &slot); + rocksdb::Status ZCount(std::string &key, std::string &min, std::string &max, unsigned long *len, ZCountCmd *cmd); + rocksdb::Status ZIncrby(std::string &key, std::string &member, double increment); + rocksdb::Status ZIncrbyIfKeyExist(std::string &key, std::string &member, double increment, ZIncrbyCmd *cmd); + rocksdb::Status ZRange(std::string &key, long start, long stop, std::vector *score_members, const std::shared_ptr &slot); - Status ZRangebyscore(std::string &key, std::string &min, std::string &max, + rocksdb::Status ZRangebyscore(std::string &key, std::string &min, std::string &max, std::vector *score_members, ZRangebyscoreCmd *cmd); - Status ZRank(std::string &key, std::string &member, long *rank, const std::shared_ptr &slot); - Status ZRem(std::string &key, std::vector &members, std::shared_ptr slot = nullptr); - Status ZRemrangebyrank(std::string &key, std::string &min, std::string &max, int32_t ele_deleted = 0, + rocksdb::Status ZRank(std::string &key, std::string &member, long *rank, const std::shared_ptr &slot); + rocksdb::Status ZRem(std::string &key, std::vector &members, std::shared_ptr slot = nullptr); + rocksdb::Status ZRemrangebyrank(std::string &key, std::string &min, std::string &max, int32_t ele_deleted = 0, const std::shared_ptr &slot = nullptr); - Status ZRemrangebyscore(std::string &key, std::string &min, std::string &max, const std::shared_ptr &slot); - Status ZRevrange(std::string &key, long start, long stop, std::vector *score_members, + rocksdb::Status ZRemrangebyscore(std::string &key, std::string &min, std::string &max, const std::shared_ptr &slot); + rocksdb::Status ZRevrange(std::string &key, long start, long stop, std::vector *score_members, const std::shared_ptr &slot); - Status ZRevrangebyscore(std::string &key, std::string &min, std::string &max, + rocksdb::Status ZRevrangebyscore(std::string &key, std::string &min, std::string &max, std::vector *score_members, ZRevrangebyscoreCmd *cmd); - Status ZRevrangebylex(std::string &key, std::string &min, std::string &max, std::vector *members, + rocksdb::Status ZRevrangebylex(std::string &key, std::string &min, std::string &max, std::vector *members, const std::shared_ptr &slot); - Status ZRevrank(std::string &key, std::string &member, long *rank, const std::shared_ptr &slot); - Status ZScore(std::string &key, std::string &member, double *score, const std::shared_ptr &slot); - Status ZRangebylex(std::string &key, std::string &min, std::string &max, std::vector *members, const std::shared_ptr &slot); - Status ZLexcount(std::string &key, std::string &min, std::string &max, unsigned long *len, + rocksdb::Status ZRevrank(std::string &key, std::string &member, long *rank, const std::shared_ptr &slot); + rocksdb::Status ZScore(std::string &key, std::string &member, double *score, const std::shared_ptr &slot); + rocksdb::Status ZRangebylex(std::string &key, std::string &min, std::string &max, std::vector *members, const std::shared_ptr &slot); + rocksdb::Status ZLexcount(std::string &key, std::string &min, std::string &max, unsigned long *len, const std::shared_ptr &slot); - Status ZRemrangebylex(std::string &key, std::string &min, std::string &max, const std::shared_ptr &slot); + rocksdb::Status ZRemrangebylex(std::string &key, std::string &min, std::string &max, const std::shared_ptr &slot); // Bit Commands - Status SetBit(std::string &key, size_t offset, long value); - Status SetBitIfKeyExist(std::string &key, size_t offset, long value); - Status GetBit(std::string &key, size_t offset, long *value); - Status BitCount(std::string &key, long start, long end, long *value, bool have_offset); - Status BitPos(std::string &key, long bit, long *value); - Status BitPos(std::string &key, long bit, long start, long *value); - Status BitPos(std::string &key, long bit, long start, long end, long *value); + rocksdb::Status SetBit(std::string &key, size_t offset, long value); + rocksdb::Status SetBitIfKeyExist(std::string &key, size_t offset, long value); + rocksdb::Status GetBit(std::string &key, size_t offset, long *value); + rocksdb::Status BitCount(std::string &key, long start, long end, long *value, bool have_offset); + rocksdb::Status BitPos(std::string &key, long bit, long *value); + rocksdb::Status BitPos(std::string &key, long bit, long start, long *value); + rocksdb::Status BitPos(std::string &key, long bit, long start, long end, long *value); // Cache - Status WriteKvToCache(std::string &key, std::string &value, int64_t ttl); - Status WriteHashToCache(std::string &key, std::vector &fvs, int64_t ttl); - Status WriteListToCache(std::string &key, std::vector &values, int64_t ttl); - Status WriteSetToCache(std::string &key, std::vector &members, int64_t ttl); - Status WriteZSetToCache(std::string &key, std::vector &score_members, int64_t ttl); + rocksdb::Status WriteKvToCache(std::string &key, std::string &value, int64_t ttl); + rocksdb::Status WriteHashToCache(std::string &key, std::vector &fvs, int64_t ttl); + rocksdb::Status WriteListToCache(std::string &key, std::vector &values, int64_t ttl); + rocksdb::Status WriteSetToCache(std::string &key, std::vector &members, int64_t ttl); + rocksdb::Status WriteZSetToCache(std::string &key, std::vector &score_members, int64_t ttl); void PushKeyToAsyncLoadQueue(const char key_type, std::string &key); - static bool CheckCacheDBScoreMembers(std::vector &cache_score_members, - std::vector &db_score_members, bool print_result = true); - Status CacheZCard(std::string &key, unsigned long *len); - Status Select(int db_id); +// static bool CheckCacheDBScoreMembers(std::vector &cache_score_members, +// std::vector &db_score_members, bool print_result = true); + rocksdb::Status CacheZCard(std::string &key, unsigned long *len); std::shared_ptr GetSlot() { return slot_; } private: - Status InitWithoutLock(uint32_t cache_num, dory::CacheConfig *cache_cfg); + rocksdb::Status InitWithoutLock(); void DestroyWithoutLock(void); - int CacheIndex(const std::string &key); RangeStatus CheckCacheRange(int32_t cache_len, int32_t db_len, long start, long stop, long &out_start, long &out_stop); RangeStatus CheckCacheRevRange(int32_t cache_len, int32_t db_len, long start, long stop, long &out_start, @@ -228,18 +226,17 @@ class PikaCache : public pstd::noncopyable, std::enable_shared_from_this &slot = nullptr); - Status CleanCacheKeyIfNeeded(dory::RedisCache *cache_obj, std::string &key); + rocksdb::Status CleanCacheKeyIfNeeded(dory::RedisCache *cache_obj, std::string &key); private: std::atomic cache_status_; std::unique_ptr cache_; - uint32_t cache_num_; // currently only take effects to zset int cache_start_pos_; int cache_items_per_key_; std::shared_mutex rwlock_; - PikaCacheLoadThread *cache_load_thread_; // 这个线程保留 + std::unique_ptr cache_load_thread_; std::shared_ptr slot_; }; diff --git a/include/pika_cache_load_thread.h b/include/pika_cache_load_thread.h index 48fdec47a2..7d3b008f71 100644 --- a/include/pika_cache_load_thread.h +++ b/include/pika_cache_load_thread.h @@ -12,9 +12,10 @@ #include #include +#include "include/pika_cache.h" +#include "include/pika_slot.h" #include "include/pika_define.h" #include "net/include/net_thread.h" -#include "pika_slot.h" #include "storage/storage.h" #define CACHE_LOAD_QUEUE_MAX_SIZE 2048 diff --git a/include/pika_cache_manager.h b/include/pika_cache_manager.h index c3a812c061..79be813307 100644 --- a/include/pika_cache_manager.h +++ b/include/pika_cache_manager.h @@ -10,48 +10,16 @@ #include #include "include/pika_cache.h" -#include "include/pika_define.h" -#include "dory/include/RedisCache.h" class PikaCacheManager : public pstd::noncopyable { public: - struct CacheInfo { - int status; - uint32_t cache_num; - long long keys_num; - size_t used_memory; - long long hits; - long long misses; - uint64_t async_load_keys_num; - uint32_t waitting_load_keys_num; - CacheInfo() - : status(PIKA_CACHE_STATUS_NONE), - cache_num(0), - keys_num(0), - used_memory(0), - hits(0), - misses(0), - async_load_keys_num(0), - waitting_load_keys_num(0) {} - void clear() { - status = PIKA_CACHE_STATUS_NONE; - cache_num = 0; - keys_num = 0; - used_memory = 0; - hits = 0; - misses = 0; - async_load_keys_num = 0; - waitting_load_keys_num = 0; - } - }; - - PikaCacheManager(std::vector dbs); - ~PikaCacheManager(); - std::shared_ptr GetCache(const std::string& db_name, int slot_index); + PikaCacheManager(); + ~PikaCacheManager() = default; + void Init(const std::map>& dbs); void ProcessCronTask(); - void FlushDB(const std::string& db_name); double HitRatio(); void ClearHitRatio(); + PikaCache::CacheInfo Info(); private: std::shared_mutex mu_; std::unordered_map> caches_; diff --git a/include/pika_command.h b/include/pika_command.h index 09849ef124..92aa71e2c7 100644 --- a/include/pika_command.h +++ b/include/pika_command.h @@ -17,8 +17,6 @@ #include "include/pika_slot.h" #include "net/src/dispatch_thread.h" -#include "include/pika_cache.h" -#include "include/pika_cache_manager.h" class SyncMasterSlot; class SyncSlaveSlot; @@ -232,7 +230,8 @@ enum CmdFlagsMask { kCmdFlagsMaskAdminRequire = 256, kCmdFlagsMaskPreDo = 512, kCmdFlagsMaskCacheDo = 1024, - kCmdFlagsMaskPostDo = 2048, + kCmdFlagsMaskUpdateCache = 2048, + kCmdFlagsMaskOnlyDoCache = 4096, kCmdFlagsMaskSlot = 1536, }; @@ -260,7 +259,8 @@ enum CmdFlags { kCmdFlagsDoNotSpecifySlot = 0, // default do not specify slot kCmdFlagsSingleSlot = 512, kCmdFlagsMultiSlot = 1024, - kCmdFlagsPreDo = 2048, + kCmdFlagsUpdateCache = 2048, + kCmdFlagsOnlyDoCache = 4096 }; void inline RedisAppendContent(std::string& str, const std::string& value); @@ -453,6 +453,8 @@ class Cmd : public std::enable_shared_from_this { virtual void ProcessMultiSlotCmd(); virtual void ProcessDoNotSpecifySlotCmd(); virtual void Do(std::shared_ptr slot = nullptr) = 0; + virtual void DoFromCache(std::shared_ptr slot = nullptr) {} + virtual void DoUpdateCache(std::shared_ptr slot = nullptr) {} virtual Cmd* Clone() = 0; // used for execute multikey command into different slots virtual void Split(std::shared_ptr slot, const HintKeys& hint_keys) = 0; @@ -468,6 +470,8 @@ class Cmd : public std::enable_shared_from_this { bool is_admin_require() const; bool is_single_slot() const; bool is_multi_slot() const; + bool is_need_update_cache() const; + bool is_only_from_cache() const; bool HashtagIsConsistent(const std::string& lhs, const std::string& rhs) const; uint64_t GetDoDuration() const { return do_duration_; }; diff --git a/include/pika_kv.h b/include/pika_kv.h index 5edafe60de..7ecd7da65d 100644 --- a/include/pika_kv.h +++ b/include/pika_kv.h @@ -23,8 +23,8 @@ class SetCmd : public Cmd { res.push_back(key_); return res; } - void Execute() override; void Do(std::shared_ptr slot = nullptr) override; + void DoUpdateCache(std::shared_ptr slot = nullptr) override; void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new SetCmd(*this); } @@ -54,14 +54,17 @@ class GetCmd : public Cmd { res.push_back(key_); return res; } - void Execute() override; void Do(std::shared_ptr slot = nullptr) override; + void DoFromCache(std::shared_ptr slot = nullptr) override; + void DoUpdateCache(std::shared_ptr slot = nullptr) override; void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new GetCmd(*this); } private: std::string key_; + std::string value_; + int64_t sec_ = 0; void DoInitial() override; }; @@ -90,6 +93,7 @@ class IncrCmd : public Cmd { return res; } void Do(std::shared_ptr slot = nullptr) override; + void DoUpdateCache(std::shared_ptr slot = nullptr) override; void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new IncrCmd(*this); } @@ -217,7 +221,9 @@ class AppendCmd : public Cmd { class MgetCmd : public Cmd { public: MgetCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag){}; + void DoFromCache(std::shared_ptr slot = nullptr) override; void Do(std::shared_ptr slot = nullptr) override; + void DoUpdateCache(std::shared_ptr slot = nullptr) override; std::vector current_key() const override { return keys_; } void Split(std::shared_ptr slot, const HintKeys& hint_keys) override; void Merge() override; diff --git a/include/pika_slot.h b/include/pika_slot.h index 374aca5b73..d0fdd6450c 100644 --- a/include/pika_slot.h +++ b/include/pika_slot.h @@ -14,9 +14,9 @@ #include "storage/storage.h" #include "include/pika_binlog.h" -#include "include/pika_cache.h" class Cmd; +class PikaCache; /* *Keyscan used @@ -56,6 +56,9 @@ class Slot : public std::enable_shared_from_this,public pstd::noncopyable uint32_t GetSlotID() const; std::string GetSlotName() const; std::shared_ptr db() const; + std::shared_ptr cache() const; + + void Init(); void Compact(const storage::DataType& type); @@ -109,7 +112,7 @@ class Slot : public std::enable_shared_from_this,public pstd::noncopyable // class may be shared, using shared_ptr would be a better choice std::shared_ptr lock_mgr_; std::shared_ptr db_; -// std::shared_ptr cache_; + std::shared_ptr cache_; bool full_sync_ = false; diff --git a/include/rsync_client.h b/include/rsync_client.h index 3b2aa94d8b..d1077a8a61 100644 --- a/include/rsync_client.h +++ b/include/rsync_client.h @@ -41,8 +41,11 @@ class Session; class WaitObject; class WaitObjectManager; +using pstd::Status; + + class RsyncClient : public net::Thread { -public: + public: enum State { IDLE, RUNNING, diff --git a/src/dory/include/RedisCache.h b/src/dory/include/RedisCache.h index 37e89a9f00..7921c18640 100644 --- a/src/dory/include/RedisCache.h +++ b/src/dory/include/RedisCache.h @@ -18,7 +18,7 @@ extern "C" { namespace dory { -using Status = pstd::Status; +using Status = rocksdb::Status; class RedisCache { @@ -39,7 +39,7 @@ class RedisCache long long DbSize(void); void FlushDb(void); - Status Del(std::string &key); + Status Del(const std::string &key); Status Expire(std::string &key, int64_t ttl); Status Expireat(std::string &key, int64_t ttl); Status TTL(std::string &key, int64_t *ttl); @@ -49,11 +49,12 @@ class RedisCache // String Commands Status Set(std::string &key, std::string &value, int64_t ttl); + Status SetWithoutTTL(std::string &key, std::string &value); Status Setnx(std::string &key, std::string &value, int64_t ttl); Status SetnxWithoutTTL(std::string &key, std::string &value); Status Setxx(std::string &key, std::string &value, int64_t ttl); Status SetxxWithoutTTL(std::string &key, std::string &value); - Status Get(std::string &key, std::string *value); + Status Get(const std::string &key, std::string *value); Status Incr(std::string &key); Status Decr(std::string &key); Status IncrBy(std::string &key, long long incr); diff --git a/src/dory/src/RedisBit.cc b/src/dory/src/RedisBit.cc index 093f177412..14f0a6c812 100644 --- a/src/dory/src/RedisBit.cc +++ b/src/dory/src/RedisBit.cc @@ -3,7 +3,7 @@ #include "redisdb/object.h" namespace dory { -using pstd::Status; +using rocksdb::Status; Status RedisCache::SetBit(std::string &key, size_t offset, long value) diff --git a/src/dory/src/RedisCache.cc b/src/dory/src/RedisCache.cc index 52c77879fd..59632a18d8 100644 --- a/src/dory/src/RedisCache.cc +++ b/src/dory/src/RedisCache.cc @@ -126,7 +126,7 @@ RedisCache::FlushDb(void) } Status -RedisCache::Del(std::string &key) +RedisCache::Del(const std::string &key) { int ret; robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); diff --git a/src/dory/src/RedisHash.cc b/src/dory/src/RedisHash.cc index 6d2ab3c79c..f5a376ba8d 100644 --- a/src/dory/src/RedisHash.cc +++ b/src/dory/src/RedisHash.cc @@ -110,7 +110,8 @@ RedisCache::HGet(std::string &key, std::string &field, std::string *value) return Status::NotFound("key not in cache"); } else if (REDIS_ITEM_NOT_EXIST == ret) { DecrObjectsRefCount(kobj, fobj); - return Status::ItemNotExist("field not exist"); + // todo(leehao): its better to let pstd::Status to inherit rocksdb::Status + return Status::NotFound("field not exist"); } else { DecrObjectsRefCount(kobj, fobj); return Status::Corruption("RsHGet failed"); @@ -261,7 +262,7 @@ RedisCache::HExists(std::string &key, std::string &field) } DecrObjectsRefCount(kobj, fobj); - return is_exist ? Status::OK() : Status::ItemNotExist("field not exist"); + return is_exist ? Status::OK() : Status::NotFound("field not exist"); } Status diff --git a/src/dory/src/RedisSet.cc b/src/dory/src/RedisSet.cc index 4cf3291866..7a30170c5e 100644 --- a/src/dory/src/RedisSet.cc +++ b/src/dory/src/RedisSet.cc @@ -62,7 +62,7 @@ RedisCache::SIsmember(std::string &key, std::string &member) } DecrObjectsRefCount(kobj, mobj); - return is_member ? Status::OK() : Status::ItemNotExist("member not exist"); + return is_member ? Status::OK() : Status::NotFound("member not exist"); } Status diff --git a/src/dory/src/RedisString.cc b/src/dory/src/RedisString.cc index 82ec575cdb..fd5a73c467 100644 --- a/src/dory/src/RedisString.cc +++ b/src/dory/src/RedisString.cc @@ -23,6 +23,24 @@ RedisCache::Set(std::string &key, std::string &value, int64_t ttl) DecrObjectsRefCount(kobj, vobj, tobj); return Status::OK(); } +Status +RedisCache::SetWithoutTTL(std::string &key, std::string &value) +{ + if (C_OK != RsFreeMemoryIfNeeded(m_RedisDB)) { + return Status::Corruption("[error] Free memory faild !"); + } + + robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); + robj *vobj = createObject(OBJ_STRING, sdsnewlen(value.data(), value.size())); + + if (C_OK != RsSet(m_RedisDB, kobj, vobj, NULL)) { + DecrObjectsRefCount(kobj, vobj); + return Status::Corruption("RsSetnx failed, key exists!"); + } + + DecrObjectsRefCount(kobj, vobj); + return Status::OK(); +} Status RedisCache::Setnx(std::string &key, std::string &value, int64_t ttl) @@ -102,7 +120,7 @@ RedisCache::SetxxWithoutTTL(std::string &key, std::string &value) return Status::OK(); } Status -RedisCache::Get(std::string &key, std::string* value) +RedisCache::Get(const std::string &key, std::string* value) { robj *val; int ret; diff --git a/src/dory/src/RedisZset.cc b/src/dory/src/RedisZset.cc index 97d7514a55..a389f5c333 100644 --- a/src/dory/src/RedisZset.cc +++ b/src/dory/src/RedisZset.cc @@ -167,7 +167,7 @@ RedisCache::ZRank(std::string &key, std::string &member, long *rank) return Status::NotFound("key not in cache"); } else if (REDIS_ITEM_NOT_EXIST == ret) { DecrObjectsRefCount(kobj, mobj); - return Status::ItemNotExist("member not exist"); + return Status::NotFound("member not exist"); } else { DecrObjectsRefCount(kobj, mobj); return Status::Corruption("RsZRank failed"); @@ -355,7 +355,7 @@ RedisCache::ZRevrank(std::string &key, std::string &member, long *rank) return Status::NotFound("key not in cache"); } else if (REDIS_ITEM_NOT_EXIST == ret) { DecrObjectsRefCount(kobj, mobj); - return Status::ItemNotExist("member not exist"); + return Status::NotFound("member not exist"); } else { DecrObjectsRefCount(kobj, mobj); return Status::Corruption("RsZRevrank failed"); @@ -378,7 +378,7 @@ RedisCache::ZScore(std::string &key, std::string &member, double *score) return Status::NotFound("key not in cache"); } else if (REDIS_ITEM_NOT_EXIST == ret) { DecrObjectsRefCount(kobj, mobj); - return Status::ItemNotExist("member not exist"); + return Status::NotFound("member not exist"); } else { DecrObjectsRefCount(kobj, mobj); return Status::Corruption("RsZScore failed"); diff --git a/src/pika.cc b/src/pika.cc index 45428d5746..9c52e1cba0 100644 --- a/src/pika.cc +++ b/src/pika.cc @@ -7,8 +7,8 @@ #include #include -#include "net/include/net_stats.h" #include "include/build_version.h" +#include "include/pika_cache_manager.h" #include "include/pika_cmd_table_manager.h" #include "include/pika_command.h" #include "include/pika_conf.h" @@ -17,6 +17,7 @@ #include "include/pika_server.h" #include "include/pika_slot_command.h" #include "include/pika_version.h" +#include "net/include/net_stats.h" #include "pstd/include/env.h" #include "pstd/include/pstd_defer.h" @@ -26,7 +27,7 @@ PikaServer* g_pika_server = nullptr; std::unique_ptr g_pika_rm; std::unique_ptr g_pika_cmd_table_manager; -std::shared_ptr g_pika_cache_manager; +std::unique_ptr g_pika_cache_manager; extern std::unique_ptr g_network_statistic; @@ -210,7 +211,9 @@ int main(int argc, char* argv[]) { g_pika_server = new PikaServer(); g_pika_rm = std::make_unique(); g_network_statistic = std::make_unique(); - g_pika_cache_manager = std::make_shared(g_pika_conf->db_structs()); + g_pika_server->InitDBStruct(); + g_pika_cache_manager = std::make_unique(); + g_pika_cache_manager->Init(g_pika_server->GetDB()); if (g_pika_conf->daemonize()) { close_std(); diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 3345149e5f..8565ea44e5 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -15,7 +15,7 @@ #include #include "include/build_version.h" -#include "include/pika_cmd_table_manager.h" +#include "include/pika_cache_manager.h" #include "include/pika_conf.h" #include "include/pika_rm.h" #include "include/pika_server.h" @@ -26,6 +26,7 @@ using pstd::Status; extern PikaServer* g_pika_server; extern std::unique_ptr g_pika_rm; +extern std::unique_ptr g_pika_cache_manager; static std::string ConstructPinginPubSubResp(const PikaCmdArgsType& argv) { if (argv.size() > 2) { @@ -452,15 +453,14 @@ void FlushallCmd::DoInitial() { return; } } -void FlushallCmd::Execute() { - Cmd::Execute(); - -} void FlushallCmd::Do(std::shared_ptr slot) { if (!slot) { LOG(INFO) << "Flushall, but Slot not found"; } else { - slot->FlushDB(); + auto ok = slot->FlushDB(); + if (ok) { + slot->cache()->FlushSlot(); + } } } @@ -513,6 +513,8 @@ void FlushdbCmd::Do(std::shared_ptr slot) { } else { slot->FlushSubDB(db_name_); } + //todo(leehao): flush specified db name + slot->cache()->FlushSlot(); } } @@ -648,6 +650,7 @@ const std::string InfoCmd::kDataSection = "data"; const std::string InfoCmd::kRocksDBSection = "rocksdb"; const std::string InfoCmd::kDebugSection = "debug"; const std::string InfoCmd::kCommandStatsSection = "commandstats"; +const std::string InfoCmd::kCacheSection = "cache"; void InfoCmd::DoInitial() { size_t argc = argv_.size(); @@ -798,6 +801,8 @@ void InfoCmd::Do(std::shared_ptr slot) { case kInfoCommandStats: InfoCommandStats(info); break; + case kInfoCache: + default: // kInfoErr is nothing break; @@ -1281,6 +1286,32 @@ void InfoCmd::InfoCommandStats(std::string& info) { } info.append(tmp_stream.str()); } +// todo(cache): cache info cmd +void InfoCmd::InfoCache(std::string& info) { +// std::stringstream tmp_stream; +// tmp_stream << "# Cache" << "\r\n"; +// if (PIKA_CACHE_NONE == ) { +// tmp_stream << "cache_status:Disable" << "\r\n"; +// } else { +// PikaServer::DisplayCacheInfo cache_info; +// g_pika_server->GetCacheInfo(cache_info); +// tmp_stream << "cache_status:" << CacheStatusToString(cache_info.status) << "\r\n"; +// tmp_stream << "cache_db_num:" << cache_info.cache_num << "\r\n"; +// tmp_stream << "cache_keys:" << cache_info.keys_num << "\r\n"; +// tmp_stream << "cache_memory:" << cache_info.used_memory << "\r\n"; +// tmp_stream << "cache_memory_human:" << (cache_info.used_memory >> 20) << "M\r\n"; +// tmp_stream << "hits:" << cache_info.hits << "\r\n"; +// tmp_stream << "all_cmds:" << cache_info.hits + cache_info.misses << "\r\n"; +// tmp_stream << "hits_per_sec:" << cache_info.hits_per_sec << "\r\n"; +// tmp_stream << "read_cmd_per_sec:" << cache_info.read_cmd_per_sec << "\r\n"; +// tmp_stream << "hitratio_per_sec:" << std::setprecision(4) << cache_info.hitratio_per_sec << "%" <<"\r\n"; +// tmp_stream << "hitratio_all:" << std::setprecision(4) << cache_info.hitratio_all << "%" <<"\r\n"; +// tmp_stream << "load_keys_per_sec:" << cache_info.load_keys_per_sec << "\r\n"; +// tmp_stream << "waitting_load_keys_num:" << cache_info.waitting_load_keys_num << "\r\n"; +// } +// +// info.append(tmp_stream.str()); +} void ConfigCmd::DoInitial() { if (!CheckArg(argv_.size())) { diff --git a/src/pika_cache.cc b/src/pika_cache.cc index fd361802c2..8c673b5d12 100644 --- a/src/pika_cache.cc +++ b/src/pika_cache.cc @@ -1,5 +1,4 @@ #include -#include #include #include @@ -13,15 +12,14 @@ extern PikaServer *g_pika_server; #define EXTEND_CACHE_SIZE(N) (N * 12 / 10) +using Status = rocksdb::Status; + PikaCache::PikaCache(int cache_start_pos, int cache_items_per_key, std::shared_ptr slot) : cache_status_(PIKA_CACHE_STATUS_NONE), - cache_num_(0), cache_start_pos_(cache_start_pos), cache_items_per_key_(EXTEND_CACHE_SIZE(cache_items_per_key)), slot_(slot) { cache_ = std::make_unique(); - cache_load_thread_ = new PikaCacheLoadThread(cache_start_pos_, cache_items_per_key_, shared_from_this()); - cache_load_thread_->StartThread(); } PikaCache::~PikaCache() { @@ -31,20 +29,15 @@ PikaCache::~PikaCache() { } } -Status PikaCache::Init(uint32_t cache_num, dory::CacheConfig *cache_cfg) { +Status PikaCache::Init() { std::unique_lock l(rwlock_); - - if (nullptr == cache_cfg) { - return Status::Corruption("invalid arguments !!!"); - } - return InitWithoutLock(cache_num, cache_cfg); + return InitWithoutLock(); } -Status PikaCache::Reset(uint32_t cache_num, dory::CacheConfig *cache_cfg) { +Status PikaCache::Reset() { std::unique_lock l(rwlock_); - DestroyWithoutLock(); - return InitWithoutLock(cache_num, cache_cfg); + return InitWithoutLock(); } void PikaCache::ResetConfig(dory::CacheConfig *cache_cfg) { @@ -60,14 +53,6 @@ void PikaCache::Destroy(void) { DestroyWithoutLock(); } -void PikaCache::ProcessCronTask(void) { - std::unique_lock l(rwlock_); - for (uint32_t i = 0; i < caches_.size(); ++i) { - std::unique_lock lm(*cache_mutexs_[i]); - caches_[i]->ActiveExpireCycle(); - } -} - void PikaCache::SetCacheStatus(int status) { cache_status_ = status; } int PikaCache::CacheStatus(void) { return cache_status_; } @@ -75,19 +60,13 @@ int PikaCache::CacheStatus(void) { return cache_status_; } /*----------------------------------------------------------------------------- * Normal Commands *----------------------------------------------------------------------------*/ -void PikaCache::Info(CacheInfo &info) { - info.clear(); +PikaCache::CacheInfo PikaCache::Info() { std::unique_lock l(rwlock_); + PikaCache::CacheInfo info; info.status = cache_status_; - info.cache_num = cache_num_; - info.used_memory = dory::RedisCache::GetUsedMemory(); info.async_load_keys_num = cache_load_thread_->AsyncLoadKeysNum(); info.waitting_load_keys_num = cache_load_thread_->WaittingLoadKeysNum(); - dory::RedisCache::GetHitAndMissNum(&info.hits, &info.misses); - for (uint32_t i = 0; i < caches_.size(); ++i) { - std::unique_lock lm(*cache_mutexs_[i]); - info.keys_num += caches_[i]->DbSize(); - } + info.keys_num = cache_->DbSize(); } bool PikaCache::Exists(std::string &key) { @@ -105,9 +84,12 @@ void PikaCache::ActiveExpireCycle() { cache_->ActiveExpireCycle(); } -Status PikaCache::Del(std::string &key) { +Status PikaCache::Del(const std::vector &keys) { std::unique_lock l(rwlock_); - return cache_->Del(key); + for (const auto &key : keys) { + cache_->Del(key); + } + return Status::OK(); } Status PikaCache::Expire(std::string &key, int64_t ttl) { @@ -127,36 +109,17 @@ Status PikaCache::TTL(std::string &key, int64_t *ttl) { Status PikaCache::Persist(std::string &key) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->Persist(key); + return cache_->Persist(key); } Status PikaCache::Type(std::string &key, std::string *value) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->Type(key, value); + return cache_->Type(key, value); } Status PikaCache::RandomKey(std::string *key) { std::unique_lock l(rwlock_); - - Status s; - srand((unsigned)time(nullptr)); - int cache_index = rand() % caches_.size(); - for (unsigned int i = 0; i < caches_.size(); ++i) { - cache_index = (cache_index + i) % caches_.size(); - - std::lock_guard lm(*cache_mutexs_[cache_index]); - s = caches_[cache_index]->RandomKey(key); - if (s.ok()) { - break; - } - } - return s; + return cache_->RandomKey(key); } /*----------------------------------------------------------------------------- @@ -164,52 +127,58 @@ Status PikaCache::RandomKey(std::string *key) { *----------------------------------------------------------------------------*/ Status PikaCache::Set(std::string &key, std::string &value, int64_t ttl) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->Set(key, value, ttl); + return cache_->Set(key, value, ttl); +} +Status PikaCache::SetWithoutTTL(std::string &key, std::string &value) { + std::unique_lock l(rwlock_); + return cache_->SetWithoutTTL(key, value); } Status PikaCache::Setnx(std::string &key, std::string &value, int64_t ttl) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->Setnx(key, value, ttl); + return cache_->Setnx(key, value, ttl); } Status PikaCache::SetnxWithoutTTL(std::string &key, std::string &value) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->SetnxWithoutTTL(key, value); + return cache_->SetnxWithoutTTL(key, value); } Status PikaCache::Setxx(std::string &key, std::string &value, int64_t ttl) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->Setxx(key, value, ttl); + return cache_->Setxx(key, value, ttl); } Status PikaCache::SetxxWithoutTTL(std::string &key, std::string &value) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->SetxxWithoutTTL(key, value); + return cache_->SetxxWithoutTTL(key, value); +} +Status PikaCache::MSet(const std::vector &kvs) { + std::unique_lock l(rwlock_); + for (const auto &item : kvs) { + auto [key, value] = item; + cache_->SetWithoutTTL(key, value); + } + return Status::OK(); } Status PikaCache::Get(std::string &key, std::string *value) { -// std::unique_lock l(rwlock_); std::shared_lock l(rwlock_); + return cache_->Get(key, value); +} - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - caches_[cache_index]-> - return caches_[cache_index]->Get(key, value); +Status PikaCache::MGet(const std::vector &keys, std::vector *vss) { + std::shared_lock l(rwlock_); + vss->resize(keys.size()); + auto ret = Status::OK(); + for (int i = 0; i < keys.size(); ++i) { + auto s = cache_->Get(keys[i], &(*vss)[i].value); + (*vss)[i].status = s; + if (!s.ok()) { + ret = s; + } + } + return ret; } Status PikaCache::Incrxx(std::string &key) { @@ -223,21 +192,18 @@ Status PikaCache::Incrxx(std::string &key) { Status PikaCache::Decrxx(std::string &key) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - if (caches_[cache_index]->Exists(key)) { - return caches_[cache_index]->Decr(key); + + + if (cache_->Exists(key)) { + return cache_->Decr(key); } return Status::NotFound("key not exist"); } Status PikaCache::IncrByxx(std::string &key, long long incr) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - if (caches_[cache_index]->Exists(key)) { - return caches_[cache_index]->IncrBy(key, incr); + if (cache_->Exists(key)) { + return cache_->IncrBy(key, incr); } return Status::NotFound("key not exist"); } @@ -245,10 +211,10 @@ Status PikaCache::IncrByxx(std::string &key, long long incr) { Status PikaCache::DecrByxx(std::string &key, long long incr) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - if (caches_[cache_index]->Exists(key)) { - return caches_[cache_index]->DecrBy(key, incr); + + + if (cache_->Exists(key)) { + return cache_->DecrBy(key, incr); } return Status::NotFound("key not exist"); } @@ -256,21 +222,18 @@ Status PikaCache::DecrByxx(std::string &key, long long incr) { Status PikaCache::Incrbyfloatxx(std::string &key, long double incr) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - if (caches_[cache_index]->Exists(key)) { - return caches_[cache_index]->Incrbyfloat(key, incr); + + + if (cache_->Exists(key)) { + return cache_->Incrbyfloat(key, incr); } return Status::NotFound("key not exist"); } Status PikaCache::Appendxx(std::string &key, std::string &value) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - if (caches_[cache_index]->Exists(key)) { - return caches_[cache_index]->Append(key, value); + if (cache_->Exists(key)) { + return cache_->Append(key, value); } return Status::NotFound("key not exist"); } @@ -278,18 +241,18 @@ Status PikaCache::Appendxx(std::string &key, std::string &value) { Status PikaCache::GetRange(std::string &key, int64_t start, int64_t end, std::string *value) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->GetRange(key, start, end, value); + + + return cache_->GetRange(key, start, end, value); } Status PikaCache::SetRangexx(std::string &key, int64_t start, std::string &value) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - if (caches_[cache_index]->Exists(key)) { - return caches_[cache_index]->SetRange(key, start, value); + + + if (cache_->Exists(key)) { + return cache_->SetRange(key, start, value); } return Status::NotFound("key not exist"); } @@ -297,9 +260,9 @@ Status PikaCache::SetRangexx(std::string &key, int64_t start, std::string &value Status PikaCache::Strlen(std::string &key, int32_t *len) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->Strlen(key, len); + + + return cache_->Strlen(key, len); } /*----------------------------------------------------------------------------- @@ -308,26 +271,26 @@ Status PikaCache::Strlen(std::string &key, int32_t *len) { Status PikaCache::HDel(std::string &key, std::vector &fields) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->HDel(key, fields); + + + return cache_->HDel(key, fields); } Status PikaCache::HSet(std::string &key, std::string &field, std::string &value) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->HSet(key, field, value); + + + return cache_->HSet(key, field, value); } Status PikaCache::HSetIfKeyExist(std::string &key, std::string &field, std::string &value) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - if (caches_[cache_index]->Exists(key)) { - return caches_[cache_index]->HSet(key, field, value); + + + if (cache_->Exists(key)) { + return cache_->HSet(key, field, value); } return Status::NotFound("key not exist"); } @@ -335,10 +298,10 @@ Status PikaCache::HSetIfKeyExist(std::string &key, std::string &field, std::stri Status PikaCache::HSetIfKeyExistAndFieldNotExist(std::string &key, std::string &field, std::string &value) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - if (caches_[cache_index]->Exists(key)) { - return caches_[cache_index]->HSetnx(key, field, value); + + + if (cache_->Exists(key)) { + return cache_->HSetnx(key, field, value); } return Status::NotFound("key not exist"); } @@ -346,19 +309,19 @@ Status PikaCache::HSetIfKeyExistAndFieldNotExist(std::string &key, std::string & Status PikaCache::HMSet(std::string &key, std::vector &fvs) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->HMSet(key, fvs); + + + return cache_->HMSet(key, fvs); } Status PikaCache::HMSetnx(std::string &key, std::vector &fvs, int64_t ttl) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - if (!caches_[cache_index]->Exists(key)) { - caches_[cache_index]->HMSet(key, fvs); - caches_[cache_index]->Expire(key, ttl); + + + if (!cache_->Exists(key)) { + cache_->HMSet(key, fvs); + cache_->Expire(key, ttl); return Status::OK(); } else { return Status::NotFound("key exist"); @@ -368,10 +331,10 @@ Status PikaCache::HMSetnx(std::string &key, std::vector &fv Status PikaCache::HMSetnxWithoutTTL(std::string &key, std::vector &fvs) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - if (!caches_[cache_index]->Exists(key)) { - caches_[cache_index]->HMSet(key, fvs); + + + if (!cache_->Exists(key)) { + cache_->HMSet(key, fvs); return Status::OK(); } else { return Status::NotFound("key exist"); @@ -381,10 +344,10 @@ Status PikaCache::HMSetnxWithoutTTL(std::string &key, std::vector &fvs) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - if (caches_[cache_index]->Exists(key)) { - return caches_[cache_index]->HMSet(key, fvs); + + + if (cache_->Exists(key)) { + return cache_->HMSet(key, fvs); } else { return Status::NotFound("key not exist"); } @@ -393,55 +356,55 @@ Status PikaCache::HMSetxx(std::string &key, std::vector &fv Status PikaCache::HGet(std::string &key, std::string &field, std::string *value) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->HGet(key, field, value); + + + return cache_->HGet(key, field, value); } Status PikaCache::HMGet(std::string &key, std::vector &fields, std::vector *vss) { std::unique_lock l(rwlock_); - return caches_[cache_index]->HMGet(key, fields, vss); + return cache_->HMGet(key, fields, vss); } Status PikaCache::HGetall(std::string &key, std::vector *fvs) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->HGetall(key, fvs); + + + return cache_->HGetall(key, fvs); } Status PikaCache::HKeys(std::string &key, std::vector *fields) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->HKeys(key, fields); + + + return cache_->HKeys(key, fields); } Status PikaCache::HVals(std::string &key, std::vector *values) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->HVals(key, values); + + + return cache_->HVals(key, values); } Status PikaCache::HExists(std::string &key, std::string &field) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->HExists(key, field); + + + return cache_->HExists(key, field); } Status PikaCache::HIncrbyxx(std::string &key, std::string &field, int64_t value) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - if (caches_[cache_index]->Exists(key)) { - return caches_[cache_index]->HIncrby(key, field, value); + + + if (cache_->Exists(key)) { + return cache_->HIncrby(key, field, value); } return Status::NotFound("key not exist"); } @@ -449,10 +412,10 @@ Status PikaCache::HIncrbyxx(std::string &key, std::string &field, int64_t value) Status PikaCache::HIncrbyfloatxx(std::string &key, std::string &field, long double value) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - if (caches_[cache_index]->Exists(key)) { - return caches_[cache_index]->HIncrbyfloat(key, field, value); + + + if (cache_->Exists(key)) { + return cache_->HIncrbyfloat(key, field, value); } return Status::NotFound("key not exist"); } @@ -460,17 +423,14 @@ Status PikaCache::HIncrbyfloatxx(std::string &key, std::string &field, long doub Status PikaCache::HLen(std::string &key, unsigned long *len) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->HLen(key, len); + + + return cache_->HLen(key, len); } Status PikaCache::HStrlen(std::string &key, std::string &field, unsigned long *len) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->HStrlen(key, field, len); + return cache_->HStrlen(key, field, len); } /*----------------------------------------------------------------------------- @@ -479,115 +439,108 @@ Status PikaCache::HStrlen(std::string &key, std::string &field, unsigned long *l Status PikaCache::LIndex(std::string &key, long index, std::string *element) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->LIndex(key, index, element); + + + return cache_->LIndex(key, index, element); } Status PikaCache::LInsert(std::string &key, storage::BeforeOrAfter &before_or_after, std::string &pivot, std::string &value) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->LInsert(key, before_or_after, pivot, value); + + + return cache_->LInsert(key, before_or_after, pivot, value); } Status PikaCache::LLen(std::string &key, unsigned long *len) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->LLen(key, len); + + + return cache_->LLen(key, len); } Status PikaCache::LPop(std::string &key, std::string *element) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->LPop(key, element); + + + return cache_->LPop(key, element); } Status PikaCache::LPush(std::string &key, std::vector &values) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->LPush(key, values); + + + return cache_->LPush(key, values); } Status PikaCache::LPushx(std::string &key, std::vector &values) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->LPushx(key, values); + return cache_->LPushx(key, values); } Status PikaCache::LRange(std::string &key, long start, long stop, std::vector *values) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->LRange(key, start, stop, values); + return cache_->LRange(key, start, stop, values); } Status PikaCache::LRem(std::string &key, long count, std::string &value) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->LRem(key, count, value); + + + return cache_->LRem(key, count, value); } Status PikaCache::LSet(std::string &key, long index, std::string &value) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->LSet(key, index, value); + + + return cache_->LSet(key, index, value); } Status PikaCache::LTrim(std::string &key, long start, long stop) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->LTrim(key, start, stop); + + + return cache_->LTrim(key, start, stop); } Status PikaCache::RPop(std::string &key, std::string *element) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->RPop(key, element); + + + return cache_->RPop(key, element); } Status PikaCache::RPush(std::string &key, std::vector &values) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->RPush(key, values); + + + return cache_->RPush(key, values); } Status PikaCache::RPushx(std::string &key, std::vector &values) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->RPushx(key, values); + + + return cache_->RPushx(key, values); } Status PikaCache::RPushnx(std::string &key, std::vector &values, int64_t ttl) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - if (!caches_[cache_index]->Exists(key)) { - caches_[cache_index]->RPush(key, values); - caches_[cache_index]->Expire(key, ttl); + + if (!cache_->Exists(key)) { + cache_->RPush(key, values); + cache_->Expire(key, ttl); return Status::OK(); } else { return Status::NotFound("key exist"); @@ -597,10 +550,10 @@ Status PikaCache::RPushnx(std::string &key, std::vector &values, in Status PikaCache::RPushnxWithoutTTL(std::string &key, std::vector &values) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - if (!caches_[cache_index]->Exists(key)) { - caches_[cache_index]->RPush(key, values); + + + if (!cache_->Exists(key)) { + cache_->RPush(key, values); return Status::OK(); } else { return Status::NotFound("key exist"); @@ -612,19 +565,14 @@ Status PikaCache::RPushnxWithoutTTL(std::string &key, std::vector & *----------------------------------------------------------------------------*/ Status PikaCache::SAdd(std::string &key, std::vector &members) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->SAdd(key, members); + return cache_->SAdd(key, members); } Status PikaCache::SAddIfKeyExist(std::string &key, std::vector &members) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - if (caches_[cache_index]->Exists(key)) { - return caches_[cache_index]->SAdd(key, members); + if (cache_->Exists(key)) { + return cache_->SAdd(key, members); } return Status::NotFound("key not exist"); } @@ -632,11 +580,11 @@ Status PikaCache::SAddIfKeyExist(std::string &key, std::vector &mem Status PikaCache::SAddnx(std::string &key, std::vector &members, int64_t ttl) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - if (!caches_[cache_index]->Exists(key)) { - caches_[cache_index]->SAdd(key, members); - caches_[cache_index]->Expire(key, ttl); + + + if (!cache_->Exists(key)) { + cache_->SAdd(key, members); + cache_->Expire(key, ttl); return Status::OK(); } else { return Status::NotFound("key exist"); @@ -645,11 +593,8 @@ Status PikaCache::SAddnx(std::string &key, std::vector &members, in Status PikaCache::SAddnxWithoutTTL(std::string &key, std::vector &members) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - if (!caches_[cache_index]->Exists(key)) { - caches_[cache_index]->SAdd(key, members); + if (!cache_->Exists(key)) { + cache_->SAdd(key, members); return Status::OK(); } else { return Status::NotFound("key exist"); @@ -658,42 +603,27 @@ Status PikaCache::SAddnxWithoutTTL(std::string &key, std::vector &m Status PikaCache::SCard(std::string &key, unsigned long *len) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->SCard(key, len); + return cache_->SCard(key, len); } Status PikaCache::SIsmember(std::string &key, std::string &member) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->SIsmember(key, member); + return cache_->SIsmember(key, member); } Status PikaCache::SMembers(std::string &key, std::vector *members) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->SMembers(key, members); + return cache_->SMembers(key, members); } Status PikaCache::SRem(std::string &key, std::vector &members) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->SRem(key, members); + return cache_->SRem(key, members); } Status PikaCache::SRandmember(std::string &key, long count, std::vector *members) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->SRandmember(key, count, members); + return cache_->SRandmember(key, count, members); } /*----------------------------------------------------------------------------- @@ -702,9 +632,9 @@ Status PikaCache::SRandmember(std::string &key, long count, std::vector &score_members) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->ZAdd(key, score_members); + + + return cache_->ZAdd(key, score_members); } void PikaCache::GetMinMaxScore(std::vector &score_members, double &min, double &max) { @@ -865,11 +795,11 @@ Status PikaCache::CleanCacheKeyIfNeeded(dory::RedisCache *cache_obj, std::string Status PikaCache::ZAddnx(std::string &key, std::vector &score_members, int64_t ttl) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - if (!caches_[cache_index]->Exists(key)) { - caches_[cache_index]->ZAdd(key, score_members); - caches_[cache_index]->Expire(key, ttl); + + + if (!cache_->Exists(key)) { + cache_->ZAdd(key, score_members); + cache_->Expire(key, ttl); return Status::OK(); } else { return Status::NotFound("key exist"); @@ -879,7 +809,7 @@ Status PikaCache::ZAddnx(std::string &key, std::vector &sc Status PikaCache::ZAddnxWithoutTTL(std::string &key, std::vector &score_members) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); + if (!cache_->Exists(key)) { cache_->ZAdd(key, score_members); return Status::OK(); @@ -897,11 +827,7 @@ Status PikaCache::ZCard(std::string &key, unsigned long *len, const std::shared_ Status PikaCache::CacheZCard(std::string &key, unsigned long *len) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - - return caches_[cache_index]->ZCard(key, len); + return cache_->ZCard(key, len); } RangeStatus PikaCache::CheckCacheRangeByScore(unsigned long cache_len, double cache_min, double cache_max, double min, @@ -966,9 +892,7 @@ RangeStatus PikaCache::CheckCacheRangeByScore(unsigned long cache_len, double ca Status PikaCache::ZCount(std::string &key, std::string &min, std::string &max, unsigned long *len, ZCountCmd *cmd) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - auto cache_obj = caches_[cache_index]; + auto cache_obj = cache_.get(); unsigned long cache_len = 0; cache_obj->ZCard(key, &cache_len); if (cache_len <= 0) { @@ -994,10 +918,7 @@ Status PikaCache::ZCount(std::string &key, std::string &min, std::string &max, u Status PikaCache::ZIncrby(std::string &key, std::string &member, double increment) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->ZIncrby(key, member, increment); + return cache_->ZIncrby(key, member, increment); } bool PikaCache::ReloadCacheKeyIfNeeded(dory::RedisCache *cache_obj, std::string &key, int mem_len, int db_len, @@ -1170,10 +1091,10 @@ Status PikaCache::ZRange(std::string &key, long start, long stop, std::vector &slot) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); + + - auto cache_obj = caches_[cache_index]; + auto cache_obj = cache_.get(); auto db_obj = slot->db(); Status s; if (cache_obj->Exists(key)) { @@ -1203,10 +1124,7 @@ Status PikaCache::ZRangebyscore(std::string &key, std::string &min, std::string std::vector *score_members, ZRangebyscoreCmd *cmd) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - - auto cache_obj = caches_[cache_index]; + auto cache_obj = cache_.get(); unsigned long cache_len = 0; cache_obj->ZCard(key, &cache_len); if (cache_len <= 0) { @@ -1230,11 +1148,8 @@ Status PikaCache::ZRangebyscore(std::string &key, std::string &min, std::string Status PikaCache::ZRank(std::string &key, std::string &member, long *rank, const std::shared_ptr &slot) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - - auto cache_obj = caches_[cache_index]; + + auto cache_obj = cache_.get(); unsigned long cache_len = 0; cache_obj->ZCard(key, &cache_len); if (cache_len <= 0) { @@ -1256,21 +1171,15 @@ Status PikaCache::ZRank(std::string &key, std::string &member, long *rank, const Status PikaCache::ZRem(std::string &key, std::vector &members, std::shared_ptr slot) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - - auto s = caches_[cache_index]->ZRem(key, members); - ReloadCacheKeyIfNeeded(caches_[cache_index], key); + auto s = cache_->ZRem(key, members); + ReloadCacheKeyIfNeeded(cache_.get(), key); return s; } Status PikaCache::ZRemrangebyrank(std::string &key, std::string &min, std::string &max, int32_t ele_deleted, const std::shared_ptr &slot) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - auto cache_obj = caches_[cache_index]; + auto cache_obj = cache_.get(); unsigned long cache_len = 0; cache_obj->ZCard(key, &cache_len); if (cache_len <= 0) { @@ -1326,22 +1235,15 @@ Status PikaCache::ZRemrangebyrank(std::string &key, std::string &min, std::strin Status PikaCache::ZRemrangebyscore(std::string &key, std::string &min, std::string &max, const std::shared_ptr &slot) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::unique_lock lm(*cache_mutexs_[cache_index]); - auto s = caches_[cache_index]->ZRemrangebyscore(key, min, max); - ReloadCacheKeyIfNeeded(caches_[cache_index], key, -1, -1, slot); + auto s = cache_->ZRemrangebyscore(key, min, max); + ReloadCacheKeyIfNeeded(cache_.get(), key, -1, -1, slot); return s; } Status PikaCache::ZRevrange(std::string &key, long start, long stop, std::vector *score_members, const std::shared_ptr &slot) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - - auto cache_obj = caches_[cache_index]; + auto cache_obj = cache_.get(); auto db_obj = slot->db(); Status s; if (cache_obj->Exists(key)) { @@ -1370,11 +1272,7 @@ Status PikaCache::ZRevrange(std::string &key, long start, long stop, std::vector Status PikaCache::ZRevrangebyscore(std::string &key, std::string &min, std::string &max, std::vector *score_members, ZRevrangebyscoreCmd *cmd) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - - auto cache_obj = caches_[cache_index]; + auto cache_obj = cache_.get(); unsigned long cache_len = 0; cache_obj->ZCard(key, &cache_len); if (cache_len <= 0) { @@ -1403,13 +1301,12 @@ Status PikaCache::ZRevrangebyscore(std::string &key, std::string &min, std::stri bool PikaCache::CacheSizeEqsDB(std::string &key, const std::shared_ptr &slot) { int32_t db_len = 0; + slot->DbRWLockWriter(); slot->db()->ZCard(key, &db_len); - + slot->DbRWUnLock(); std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); unsigned long cache_len = 0; - caches_[cache_index]->ZCard(key, &cache_len); + cache_->ZCard(key, &cache_len); return db_len == (int32_t)cache_len; } @@ -1418,11 +1315,7 @@ Status PikaCache::ZRevrangebylex(std::string &key, std::string &min, std::string std::vector *members, const std::shared_ptr &slot) { if (CacheSizeEqsDB(key, slot)) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - - return caches_[cache_index]->ZRevrangebylex(key, min, max, members); + return cache_->ZRevrangebylex(key, min, max, members); } else { return Status::NotFound("key not in cache"); } @@ -1430,10 +1323,7 @@ Status PikaCache::ZRevrangebylex(std::string &key, std::string &min, std::string Status PikaCache::ZRevrank(std::string &key, std::string &member, long *rank, const std::shared_ptr &slot) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - auto cache_obj = caches_[cache_index]; + auto cache_obj = cache_.get(); unsigned long cache_len = 0; cache_obj->ZCard(key, &cache_len); if (cache_len <= 0) { @@ -1454,10 +1344,7 @@ Status PikaCache::ZRevrank(std::string &key, std::string &member, long *rank, co } Status PikaCache::ZScore(std::string &key, std::string &member, double *score, const std::shared_ptr &slot) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - auto s = caches_[cache_index]->ZScore(key, member, score); + auto s = cache_->ZScore(key, member, score); if (!s.ok()) { return Status::NotFound("key or member not in cache"); } @@ -1468,11 +1355,7 @@ Status PikaCache::ZRangebylex(std::string &key, std::string &min, std::string &m const std::shared_ptr &slot) { if (CacheSizeEqsDB(key, slot)) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - - return caches_[cache_index]->ZRangebylex(key, min, max, members); + return cache_->ZRangebylex(key, min, max, members); } else { return Status::NotFound("key not in cache"); } @@ -1482,11 +1365,7 @@ Status PikaCache::ZLexcount(std::string &key, std::string &min, std::string &max const std::shared_ptr &slot) { if (CacheSizeEqsDB(key, slot)) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - - return caches_[cache_index]->ZLexcount(key, min, max, len); + return cache_->ZLexcount(key, min, max, len); } else { return Status::NotFound("key not in cache"); } @@ -1496,11 +1375,7 @@ Status PikaCache::ZRemrangebylex(std::string &key, std::string &min, std::string const std::shared_ptr &slot) { if (CacheSizeEqsDB(key, slot)) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - - return caches_[cache_index]->ZRemrangebylex(key, min, max); + return cache_->ZRemrangebylex(key, min, max); } else { return Status::NotFound("key not in cache"); } @@ -1512,18 +1387,18 @@ Status PikaCache::ZRemrangebylex(std::string &key, std::string &min, std::string Status PikaCache::SetBit(std::string &key, size_t offset, long value) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->SetBit(key, offset, value); + + + return cache_->SetBit(key, offset, value); } Status PikaCache::SetBitIfKeyExist(std::string &key, size_t offset, long value) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - if (caches_[cache_index]->Exists(key)) { - return caches_[cache_index]->SetBit(key, offset, value); + + + if (cache_->Exists(key)) { + return cache_->SetBit(key, offset, value); } return Status::NotFound("key not exist"); } @@ -1531,65 +1406,46 @@ Status PikaCache::SetBitIfKeyExist(std::string &key, size_t offset, long value) Status PikaCache::GetBit(std::string &key, size_t offset, long *value) { std::unique_lock l(rwlock_); - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->GetBit(key, offset, value); + + + return cache_->GetBit(key, offset, value); } Status PikaCache::BitCount(std::string &key, long start, long end, long *value, bool have_offset) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->BitCount(key, start, end, value, have_offset); + return cache_->BitCount(key, start, end, value, have_offset); } Status PikaCache::BitPos(std::string &key, long bit, long *value) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->BitPos(key, bit, value); + return cache_->BitPos(key, bit, value); } Status PikaCache::BitPos(std::string &key, long bit, long start, long *value) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->BitPos(key, bit, start, value); + return cache_->BitPos(key, bit, start, value); } Status PikaCache::BitPos(std::string &key, long bit, long start, long end, long *value) { std::unique_lock l(rwlock_); - - int cache_index = CacheIndex(key); - std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->BitPos(key, bit, start, end, value); + return cache_->BitPos(key, bit, start, end, value); } -Status PikaCache::InitWithoutLock(uint32_t cache_num, dory::CacheConfig *cache_cfg) { +Status PikaCache::InitWithoutLock() { cache_status_ = PIKA_CACHE_STATUS_INIT; - - cache_num_ = cache_num; - if (nullptr != cache_cfg) { - dory::RedisCache::SetConfig(cache_cfg); + if (cache_ == nullptr) { + cache_ = std::make_unique(); } - - for (uint32_t i = 0; i < cache_num; ++i) { - auto *cache = new dory::RedisCache(); - Status s = cache->Open(); - if (!s.ok()) { - LOG(ERROR) << "PikaCache::InitWithoutLock Open cache failed"; - DestroyWithoutLock(); - cache_status_ = PIKA_CACHE_STATUS_NONE; - return Status::Corruption("create redis cache failed"); - } - caches_.push_back(cache); - cache_mutexs_.push_back(new std::shared_mutex); + Status s = cache_->Open(); + if (!s.ok()) { + LOG(ERROR) << "PikaCache::InitWithoutLock Open cache failed"; + DestroyWithoutLock(); + cache_status_ = PIKA_CACHE_STATUS_NONE; + return Status::Corruption("create redis cache failed"); } + cache_load_thread_ = std::make_unique(cache_start_pos_, cache_items_per_key_, shared_from_this()); + cache_load_thread_->StartThread(); cache_status_ = PIKA_CACHE_STATUS_OK; - return Status::OK(); } @@ -1597,32 +1453,12 @@ void PikaCache::DestroyWithoutLock(void) { cache_status_ = PIKA_CACHE_STATUS_DESTROY; cache_.reset(); } - -// TODO(leehao) 这里貌似应该使用DB来分离,每一个db都对应一个cache,因为cache的设置是一个全局的,所以我觉得这个PikaCache也应该设置成为单例 - - - for (auto iter = caches_.begin(); iter != caches_.end(); ++iter) { - delete *iter; - } - caches_.clear(); - - for (auto iter = cache_mutexs_.begin(); iter != cache_mutexs_.end(); ++iter) { - delete *iter; - } - cache_mutexs_.clear(); -} - -int PikaCache::CacheIndex(const std::string &key) { - uint32_t crc = CRC32Update(0, key.data(), (int)key.size()); - return (int)(crc % caches_.size()); -} - Status PikaCache::WriteKvToCache(std::string &key, std::string &value, int64_t ttl) { if (0 >= ttl) { if (PIKA_TTL_NONE == ttl) { return SetnxWithoutTTL(key, value); } else { - return Del(key); + return Del({key}); } } else { return Setnx(key, value, ttl); @@ -1635,7 +1471,7 @@ Status PikaCache::WriteHashToCache(std::string &key, std::vector &v if (PIKA_TTL_NONE == ttl) { return RPushnxWithoutTTL(key, values); } else { - return Del(key); + return Del({key}); } } else { return RPushnx(key, values, ttl); @@ -1661,7 +1497,7 @@ Status PikaCache::WriteSetToCache(std::string &key, std::vector &me if (PIKA_TTL_NONE == ttl) { return SAddnxWithoutTTL(key, members); } else { - return Del(key); + return Del({key}); } } else { return SAddnx(key, members, ttl); @@ -1674,7 +1510,7 @@ Status PikaCache::WriteZSetToCache(std::string &key, std::vector dbs) : cache_status_(PIKA_CACHE_STATUS_NONE) { - std::shared_lock lg(mu_); - for (const auto& db : dbs) { - for (uint32_t i = 0; i < db.slot_num; ++i) { - auto key = db.db_name + std::to_string(i); - caches_.emplace(key, std::make_shared(0, 0, g_pika_server->GetDBSlotById(db.db_name, i))); - } - } -} - -PikaCacheManager::~PikaCacheManager() { +PikaCacheManager::PikaCacheManager() : cache_status_(PIKA_CACHE_STATUS_NONE) { + dory::CacheConfig cache_config{}; + dory::RedisCache::SetConfig(&cache_config); } -std::shared_ptr PikaCacheManager::GetCache(const std::string& db_name, int slot_index) { +void PikaCacheManager::Init(const std::map>& dbs) { std::shared_lock lg(mu_); - auto key = db_name + std::to_string(slot_index); - if (caches_.count(key) == 0) { - return nullptr; + for (const auto& kv : dbs) { + auto db = kv.second; + for (uint32_t i = 0; i < db->SlotNum(); ++i) { + auto key = db->GetDBName() + std::to_string(i); + caches_[key] = db->GetSlotById(i)->cache(); + } } - return caches_[key]; } void PikaCacheManager::ProcessCronTask() { - std::unique_lock lg(mu_); for (auto& cache : caches_) { cache.second->ActiveExpireCycle(); } -} -void PikaCacheManager::FlushDB(const std::string& db_name) { - std::unique_lock lg(mu_); - for (auto& cache : caches_) { - if (cache.first.compare(0, db_name.size(), db_name) == 0) { - cache.second->FlushSlot(); - } - } + LOG(INFO) << "hit rate:" << HitRatio() << std::endl; } double PikaCacheManager::HitRatio(void) { @@ -60,4 +47,18 @@ double PikaCacheManager::HitRatio(void) { void PikaCacheManager::ClearHitRatio(void) { std::unique_lock l(mu_); dory::RedisCache::ResetHitAndMissNum(); -} \ No newline at end of file +} + +CacheInfo PikaCacheManager::Info() { + CacheInfo info; + std::unique_lock l(mu_); + for (const auto &cache : caches_) { + auto each_info = cache.second->Info(); + info.keys_num += each_info.keys_num; + info.async_load_keys_num += each_info.async_load_keys_num; + } + info.used_memory = dory::RedisCache::GetUsedMemory(); + info.cache_num = caches_.size(); + dory::RedisCache::GetHitAndMissNum(&info.hits, &info.misses); + return info; +} diff --git a/src/pika_command.cc b/src/pika_command.cc index f3919ae687..8538b5f751 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -8,6 +8,7 @@ #include #include "include/pika_admin.h" #include "include/pika_bit.h" +#include "include/pika_cache_manager.h" #include "include/pika_cmd_table_manager.h" #include "include/pika_command.h" #include "include/pika_geo.h" @@ -21,13 +22,14 @@ #include "include/pika_set.h" #include "include/pika_slot_command.h" #include "include/pika_zset.h" +#include "pstd_defer.h" using pstd::Status; extern PikaServer* g_pika_server; extern std::unique_ptr g_pika_rm; extern std::unique_ptr g_pika_cmd_table_manager; -extern std::shared_ptr g_pika_cache_manager; +extern std::shared_ptr c; void InitCmdTable(CmdTable* cmd_table) { // Admin @@ -54,10 +56,10 @@ void InitCmdTable(CmdTable* cmd_table) { std::unique_ptr selectptr = std::make_unique(kCmdNameSelect, 2, kCmdFlagsRead | kCmdFlagsAdmin); cmd_table->insert(std::pair>(kCmdNameSelect, std::move(selectptr))); std::unique_ptr flushallptr = - std::make_unique(kCmdNameFlushall, 1, kCmdFlagsWrite | kCmdFlagsSuspend | kCmdFlagsAdmin); + std::make_unique(kCmdNameFlushall, 1, kCmdFlagsWrite | kCmdFlagsSuspend | kCmdFlagsAdmin | kCmdFlagsUpdateCache); cmd_table->insert(std::pair>(kCmdNameFlushall, std::move(flushallptr))); std::unique_ptr flushdbptr = - std::make_unique(kCmdNameFlushdb, -1, kCmdFlagsWrite | kCmdFlagsSuspend | kCmdFlagsAdmin); + std::make_unique(kCmdNameFlushdb, -1, kCmdFlagsWrite | kCmdFlagsSuspend | kCmdFlagsAdmin | kCmdFlagsUpdateCache); cmd_table->insert(std::pair>(kCmdNameFlushdb, std::move(flushdbptr))); std::unique_ptr clientptr = std::make_unique(kCmdNameClient, -2, kCmdFlagsRead | kCmdFlagsAdmin); cmd_table->insert(std::pair>(kCmdNameClient, std::move(clientptr))); @@ -165,38 +167,38 @@ void InitCmdTable(CmdTable* cmd_table) { // Kv ////SetCmd std::unique_ptr setptr = - std::make_unique(kCmdNameSet, -3, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsKv); + std::make_unique(kCmdNameSet, -3, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsKv | kCmdFlagsUpdateCache); cmd_table->insert(std::pair>(kCmdNameSet, std::move(setptr))); ////GetCmd std::unique_ptr getptr = - std::make_unique(kCmdNameGet, 2, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsKv); + std::make_unique(kCmdNameGet, 2, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsKv | kCmdFlagsUpdateCache); cmd_table->insert(std::pair>(kCmdNameGet, std::move(getptr))); ////DelCmd std::unique_ptr delptr = - std::make_unique(kCmdNameDel, -2, kCmdFlagsWrite | kCmdFlagsMultiSlot | kCmdFlagsKv); + std::make_unique(kCmdNameDel, -2, kCmdFlagsWrite | kCmdFlagsMultiSlot | kCmdFlagsKv | kCmdFlagsUpdateCache); cmd_table->insert(std::pair>(kCmdNameDel, std::move(delptr))); std::unique_ptr Unlinkptr = - std::make_unique(kCmdNameUnlink, -2, kCmdFlagsWrite | kCmdFlagsMultiSlot | kCmdFlagsKv); + std::make_unique(kCmdNameUnlink, -2, kCmdFlagsWrite | kCmdFlagsMultiSlot | kCmdFlagsKv | kCmdFlagsUpdateCache); cmd_table->insert(std::pair>(kCmdNameUnlink, std::move(Unlinkptr))); ////IncrCmd std::unique_ptr incrptr = - std::make_unique(kCmdNameIncr, 2, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsKv); + std::make_unique(kCmdNameIncr, 2, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsKv | kCmdFlagsUpdateCache); cmd_table->insert(std::pair>(kCmdNameIncr, std::move(incrptr))); ////IncrbyCmd std::unique_ptr incrbyptr = - std::make_unique(kCmdNameIncrby, 3, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsKv); + std::make_unique(kCmdNameIncrby, 3, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsKv | kCmdFlagsUpdateCache); cmd_table->insert(std::pair>(kCmdNameIncrby, std::move(incrbyptr))); ////IncrbyfloatCmd std::unique_ptr incrbyfloatptr = - std::make_unique(kCmdNameIncrbyfloat, 3, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsKv); + std::make_unique(kCmdNameIncrbyfloat, 3, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsKv | kCmdFlagsUpdateCache); cmd_table->insert(std::pair>(kCmdNameIncrbyfloat, std::move(incrbyfloatptr))); ////DecrCmd std::unique_ptr decrptr = - std::make_unique(kCmdNameDecr, 2, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsKv); + std::make_unique(kCmdNameDecr, 2, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsKv | kCmdFlagsUpdateCache); cmd_table->insert(std::pair>(kCmdNameDecr, std::move(decrptr))); ////DecrbyCmd std::unique_ptr decrbyptr = - std::make_unique(kCmdNameDecrby, 3, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsKv); + std::make_unique(kCmdNameDecrby, 3, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsKv | kCmdFlagsUpdateCache); cmd_table->insert(std::pair>(kCmdNameDecrby, std::move(decrbyptr))); ////GetsetCmd std::unique_ptr getsetptr = @@ -801,7 +803,7 @@ void Cmd::ProcessCommand(const std::shared_ptr& slot, const std::shared_pt void Cmd::InternalProcessCommand(const std::shared_ptr& slot, const std::shared_ptr& sync_slot, const HintKeys& hint_keys) { pstd::lock::MultiRecordLock record_lock(slot->LockMgr()); - if (is_write()) { + if (is_write() || is_need_update_cache()) { record_lock.Lock(current_key()); } @@ -816,7 +818,7 @@ void Cmd::InternalProcessCommand(const std::shared_ptr& slot, const std::s DoBinlog(sync_slot); - if (is_write()) { + if (is_write() || is_need_update_cache()) { record_lock.Unlock(current_key()); } } @@ -825,11 +827,21 @@ void Cmd::DoCommand(const std::shared_ptr& slot, const HintKeys& hint_keys if (!is_suspend()) { slot->DbRWLockReader(); } + DEFER { + if (!is_suspend()) { + slot->DbRWUnLock(); + } + }; + DoFromCache(slot); + if (is_only_from_cache() && res_.ok()) { + return; + } + res_.clear(); Do(slot); - - if (!is_suspend()) { - slot->DbRWUnLock(); + //TODO(pia_cache): should write a new function for determine whether the cache needs to be updated + if (res_.ok()) { + DoUpdateCache(slot); } } @@ -929,6 +941,8 @@ bool Cmd::is_suspend() const { return ((flag_ & kCmdFlagsMaskSuspend) == kCmdFla bool Cmd::is_admin_require() const { return ((flag_ & kCmdFlagsMaskAdminRequire) == kCmdFlagsAdminRequire); } bool Cmd::is_single_slot() const { return ((flag_ & kCmdFlagsMaskSlot) == kCmdFlagsSingleSlot); } bool Cmd::is_multi_slot() const { return ((flag_ & kCmdFlagsMaskSlot) == kCmdFlagsMultiSlot); } +bool Cmd::is_need_update_cache() const { return ((flag_ & kCmdFlagsMaskUpdateCache) == kCmdFlagsUpdateCache); } +bool Cmd::is_only_from_cache() const { return ((flag_ & kCmdFlagsMaskOnlyDoCache) == kCmdFlagsOnlyDoCache); } bool Cmd::HashtagIsConsistent(const std::string& lhs, const std::string& rhs) const { return true; } diff --git a/src/pika_db.cc b/src/pika_db.cc index a1d83d24af..1f2dd67469 100644 --- a/src/pika_db.cc +++ b/src/pika_db.cc @@ -97,6 +97,8 @@ Status DB::AddSlots(const std::set& slot_ids) { for (const uint32_t& id : slot_ids) { slots_.emplace(id, std::make_shared(db_name_, id, db_path_)); + slots_[id]->Init(); + } return Status::OK(); } diff --git a/src/pika_kv.cc b/src/pika_kv.cc index bd229af831..b25e5107db 100644 --- a/src/pika_kv.cc +++ b/src/pika_kv.cc @@ -8,6 +8,7 @@ #include "pstd/include/pstd_string.h" #include "include/pika_binlog_transverter.h" +#include "include/pika_cache_manager.h" #include "include/pika_conf.h" #include "include/pika_slot_command.h" @@ -61,13 +62,6 @@ void SetCmd::DoInitial() { index++; } } -void SetCmd::Execute() { - Cmd::Execute(); - auto cache = g_pika_cache_manager->GetCache(db_name_); - if (cache != nullptr) { - auto s = cache->Set(key_, value_, static_cast(sec_)); - } -} void SetCmd::Do(std::shared_ptr slot) { rocksdb::Status s; @@ -106,6 +100,27 @@ void SetCmd::Do(std::shared_ptr slot) { } } +void SetCmd::DoUpdateCache(std::shared_ptr slot) { + switch (condition_) { + case SetCmd::kXX: + slot->cache()->Setxx(key_, value_, sec_); + break; + case SetCmd::kNX: + slot->cache()->Setnx(key_, value_, sec_); + break; + case SetCmd::kVX: + // todo(leehao): cache + // slot->cache()->Setvx(key_, target_, value_, sec_); + break; + case SetCmd::kEXORPX: + // slot->cache()->Setex(key_, value_, static_cast(sec_)); + break; + default: + slot->cache()->SetWithoutTTL(key_, value_); + break; + } +} + std::string SetCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t logic_id, uint32_t filenum, uint64_t offset) { if (condition_ == SetCmd::kEXORPX) { @@ -144,24 +159,22 @@ void GetCmd::DoInitial() { } key_ = argv_[1]; } -void GetCmd::Execute() { - auto cache = g_pika_cache_manager->GetCache(db_name_); - std::string value; - auto s = cache->Get(key_, &value); + +void GetCmd::DoFromCache(std::shared_ptr slot) { + auto s = slot->cache()->Get(key_, &value_); if (s.ok()) { - res_.AppendStringLenUint64(value.size()); - res_.AppendContent(value); + res_.AppendStringLenUint64(value_.size()); + res_.AppendContent(value_); return; } - Cmd::Execute(); + res_.AppendInteger(-1); } void GetCmd::Do(std::shared_ptr slot) { - std::string value; - rocksdb::Status s = slot->db()->Get(key_, &value); + auto s = slot->db()->GetWithTTL(key_, &value_, &sec_); if (s.ok()) { - res_.AppendStringLenUint64(value.size()); - res_.AppendContent(value); + res_.AppendStringLenUint64(value_.size()); + res_.AppendContent(value_); } else if (s.IsNotFound()) { res_.AppendStringLen(-1); } else { @@ -169,6 +182,10 @@ void GetCmd::Do(std::shared_ptr slot) { } } +void GetCmd::DoUpdateCache(std::shared_ptr slot) { + slot->cache()->Set(key_, value_, sec_); +} + void DelCmd::DoInitial() { if (!CheckArg(argv_.size())) { res_.SetRes(CmdRes::kWrongNum, name()); @@ -190,6 +207,9 @@ void DelCmd::Do(std::shared_ptr slot) { } else { res_.SetRes(CmdRes::kErrOther, "delete error"); } + if (count >= 0 && is_need_update_cache()) { + slot->cache()->Del(keys_); + } } void DelCmd::Split(std::shared_ptr slot, const HintKeys& hint_keys) { @@ -200,6 +220,7 @@ void DelCmd::Split(std::shared_ptr slot, const HintKeys& hint_keys) { } else { res_.SetRes(CmdRes::kErrOther, "delete error"); } + slot->cache()->Del(hint_keys.keys); } void DelCmd::Merge() { res_.AppendInteger(split_res_); } @@ -236,6 +257,10 @@ void IncrCmd::Do(std::shared_ptr slot) { } } +void IncrCmd::DoUpdateCache(std::shared_ptr slot) { + slot->cache()->IncrByxx(key_, 1); +} + void IncrbyCmd::DoInitial() { if (!CheckArg(argv_.size())) { res_.SetRes(CmdRes::kWrongNum, kCmdNameIncrby); @@ -260,6 +285,9 @@ void IncrbyCmd::Do(std::shared_ptr slot) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } + if (res_.ok() && is_need_update_cache()) { + slot->cache()->IncrByxx(key_, by_); + } } void IncrbyfloatCmd::DoInitial() { @@ -334,6 +362,9 @@ void DecrbyCmd::Do(std::shared_ptr slot) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } + if (res_.ok() && is_need_update_cache()) { + slot->cache()->DecrByxx(key_, by_); + } } void GetsetCmd::DoInitial() { @@ -359,6 +390,9 @@ void GetsetCmd::Do(std::shared_ptr slot) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } + if (res_.ok() && is_need_update_cache()) { + slot->cache()->SetWithoutTTL(key_, new_value_); + } } void AppendCmd::DoInitial() { @@ -379,6 +413,9 @@ void AppendCmd::Do(std::shared_ptr slot) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } + if (res_.ok() && is_need_update_cache()) { + slot->cache()->Appendxx(key_, value_); + } } void MgetCmd::DoInitial() { @@ -390,11 +427,28 @@ void MgetCmd::DoInitial() { keys_.erase(keys_.begin()); split_res_.resize(keys_.size()); } +void MgetCmd::DoFromCache(std::shared_ptr slot) { + std::vector vss; + auto status = slot->cache()->MGet(keys_, &vss); + if (!status.ok()) { + res_.SetRes(CmdRes::kNotFound); + return; + } + res_.AppendArrayLenUint64(vss.size()); + for (const auto& vs : vss) { + if (vs.status.ok()) { + res_.AppendStringLenUint64(vs.value.size()); + res_.AppendContent(vs.value); + } else { + res_.AppendContent("$-1"); + } + } +} void MgetCmd::Do(std::shared_ptr slot) { std::vector vss; - rocksdb::Status s = slot->db()->MGet(keys_, &vss); - if (s.ok()) { + auto status = slot->db()->MGet(keys_, &vss); + if (status.ok()) { res_.AppendArrayLenUint64(vss.size()); for (const auto& vs : vss) { if (vs.status.ok()) { @@ -405,10 +459,13 @@ void MgetCmd::Do(std::shared_ptr slot) { } } } else { - res_.SetRes(CmdRes::kErrOther, s.ToString()); + res_.SetRes(CmdRes::kErrOther, status.ToString()); } } +void MgetCmd::DoUpdateCache(std::shared_ptr slot) { +} + void MgetCmd::Split(std::shared_ptr slot, const HintKeys& hint_keys) { std::vector vss; const std::vector& keys = hint_keys.keys; @@ -506,6 +563,9 @@ void SetnxCmd::Do(std::shared_ptr slot) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } + if (res_.ok() && is_need_update_cache()) { + slot->cache()->SetnxWithoutTTL(key_, value_); + } } std::string SetnxCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t logic_id, uint32_t filenum, @@ -551,6 +611,9 @@ void SetexCmd::Do(std::shared_ptr slot) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } + if (res_.ok() && is_need_update_cache()) { + slot->cache()->Set(key_, value_, sec_); + } } std::string SetexCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t logic_id, uint32_t filenum, @@ -600,6 +663,9 @@ void PsetexCmd::Do(std::shared_ptr slot) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } + if (res_.ok() && is_need_update_cache()) { + slot->cache()->Set(key_, value_, usec_ / 1000); + } } std::string PsetexCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t logic_id, uint32_t filenum, @@ -674,6 +740,9 @@ void MsetCmd::Do(std::shared_ptr slot) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } + if (res_.ok() && is_need_update_cache()) { + slot->cache()->MSet(kvs_); + } } void MsetCmd::Split(std::shared_ptr slot, const HintKeys& hint_keys) { diff --git a/src/pika_server.cc b/src/pika_server.cc index 0b0e016148..8119322892 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -28,12 +28,14 @@ #include "include/pika_instant.h" #include "include/pika_server.h" #include "include/pika_rm.h" +#include "include/pika_cache_manager.h" using pstd::Status; extern PikaServer* g_pika_server; extern std::unique_ptr g_pika_rm; extern std::unique_ptr g_pika_cmd_table_manager; extern std::unique_ptr g_network_statistic; +extern std::unique_ptr g_pika_cache_manager; void DoPurgeDir(void* arg) { std::unique_ptr path(static_cast(arg)); @@ -149,9 +151,6 @@ void PikaServer::Start() { } */ - // We Init DB Struct Before Start The following thread - InitDBStruct(); - ret = pika_client_processor_->Start(); if (ret != net::kSuccess) { dbs_.clear(); @@ -1298,6 +1297,7 @@ void PikaServer::DoTimingTask() { ResetLastSecQuerynum(); // Auto update network instantaneous metric AutoUpdateNetworkMetric(); + g_pika_cache_manager->ProcessCronTask(); } void PikaServer::AutoCompactRange() { diff --git a/src/pika_slot.cc b/src/pika_slot.cc index a92f1ca7f1..cbcfd3429b 100644 --- a/src/pika_slot.cc +++ b/src/pika_slot.cc @@ -19,7 +19,7 @@ using pstd::Status; extern PikaServer* g_pika_server; extern std::unique_ptr g_pika_rm; -extern std::shared_ptr g_pika_cache_manager; +extern std::unique_ptr g_pika_cache_manager; std::string SlotPath(const std::string& db_path, uint32_t slot_id) { char buf[100]; @@ -112,6 +112,12 @@ std::string Slot::GetSlotName() const { return slot_name_; } std::shared_ptr Slot::db() const { return db_; } +std::shared_ptr Slot::cache() const { return cache_; } + +void Slot::Init() { + cache_ = std::make_shared(0, 0, shared_from_this()); + cache_->Init(); +} void Slot::Compact(const storage::DataType& type) { if (!opened_) { return; diff --git a/src/pstd/include/pstd_status.h b/src/pstd/include/pstd_status.h index d98ac260b5..c7c44f6dca 100644 --- a/src/pstd/include/pstd_status.h +++ b/src/pstd/include/pstd_status.h @@ -39,8 +39,7 @@ class Status { static Status Busy(const Slice& msg, const Slice& msg2 = Slice()) { return Status(kBusy, msg, msg2); } - // TODO(leeHao) : mock for add cache from xcache - static Status ItemNotExist(const Slice& msg, const Slice& msg2 = Slice()) { return Status(kIncomplete, msg, msg2); } + static Status ItemNotExist(const Slice& msg, const Slice& msg2 = Slice()) { return Status(kItemNotExist, msg, msg2); } // Returns true if the status indicates success. bool ok() const { return !state_; } @@ -102,7 +101,8 @@ class Status { kComplete = 8, kTimeout = 9, kAuthFailed = 10, - kBusy = 11 + kBusy = 11, + kItemNotExist = 12 }; Code code() const { return !state_ ? kOk : static_cast(state_[4]); } diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index 7cefba8466..37962ced3c 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -373,6 +373,10 @@ Status Storage::LRange(const Slice& key, int64_t start, int64_t stop, std::vecto return lists_db_->LRange(key, start, stop, ret); } +Status Storage::LRangeWithTTL(const Slice& key, int64_t start, int64_t stop, std::vector* ret, int64_t *ttl) { + return lists_db_->LRangeWithTTL(key, start, stop, ret, ttl); +} + Status Storage::LTrim(const Slice& key, int64_t start, int64_t stop) { return lists_db_->LTrim(key, start, stop); } Status Storage::LLen(const Slice& key, uint64_t* len) { return lists_db_->LLen(key, len); }