Skip to content

Commit

Permalink
cache: update (#1990)
Browse files Browse the repository at this point in the history
* cache: update
  • Loading branch information
ForestLH authored Oct 10, 2023
1 parent 0f536a1 commit 0b2e47c
Show file tree
Hide file tree
Showing 26 changed files with 644 additions and 674 deletions.
6 changes: 4 additions & 2 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ class FlushallCmd : public Cmd {
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new FlushallCmd(*this); }
void Execute() override;

private:
void DoInitial() override;
Expand Down Expand Up @@ -212,7 +211,8 @@ class InfoCmd : public Cmd {
kInfo,
kInfoAll,
kInfoDebug,
kInfoCommandStats
kInfoCommandStats,
kInfoCache
};

InfoCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
Expand Down Expand Up @@ -240,6 +240,7 @@ class InfoCmd : public Cmd {
const static std::string kRocksDBSection;
const static std::string kDebugSection;
const static std::string kCommandStatsSection;
const static std::string kCacheSection;

void DoInitial() override;
void Clear() override {
Expand All @@ -260,6 +261,7 @@ class InfoCmd : public Cmd {
void InfoRocksDB(std::string& info);
void InfoDebug(std::string& info);
void InfoCommandStats(std::string& info);
void InfoCache(std::string& info);
};

class ShutdownCmd : public Cmd {
Expand Down
229 changes: 113 additions & 116 deletions include/pika_cache.h

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion include/pika_cache_load_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
#include <unordered_map>
#include <vector>

#include "include/pika_cache.h"
#include "include/pika_slot.h"
#include "include/pika_define.h"
#include "net/include/net_thread.h"
#include "pika_slot.h"
#include "storage/storage.h"

#define CACHE_LOAD_QUEUE_MAX_SIZE 2048
Expand Down
40 changes: 4 additions & 36 deletions include/pika_cache_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,48 +10,16 @@
#include <memory>

#include "include/pika_cache.h"
#include "include/pika_define.h"
#include "dory/include/RedisCache.h"

class PikaCacheManager : public pstd::noncopyable {
public:
struct CacheInfo {
int status;
uint32_t cache_num;
long long keys_num;
size_t used_memory;
long long hits;
long long misses;
uint64_t async_load_keys_num;
uint32_t waitting_load_keys_num;
CacheInfo()
: status(PIKA_CACHE_STATUS_NONE),
cache_num(0),
keys_num(0),
used_memory(0),
hits(0),
misses(0),
async_load_keys_num(0),
waitting_load_keys_num(0) {}
void clear() {
status = PIKA_CACHE_STATUS_NONE;
cache_num = 0;
keys_num = 0;
used_memory = 0;
hits = 0;
misses = 0;
async_load_keys_num = 0;
waitting_load_keys_num = 0;
}
};

PikaCacheManager(std::vector<DBStruct> dbs);
~PikaCacheManager();
std::shared_ptr<PikaCache> GetCache(const std::string& db_name, int slot_index);
PikaCacheManager();
~PikaCacheManager() = default;
void Init(const std::map<std::string, std::shared_ptr<DB>>& dbs);
void ProcessCronTask();
void FlushDB(const std::string& db_name);
double HitRatio();
void ClearHitRatio();
PikaCache::CacheInfo Info();
private:
std::shared_mutex mu_;
std::unordered_map<std::string, std::shared_ptr<PikaCache>> caches_;
Expand Down
12 changes: 8 additions & 4 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

#include "include/pika_slot.h"
#include "net/src/dispatch_thread.h"
#include "include/pika_cache.h"
#include "include/pika_cache_manager.h"

class SyncMasterSlot;
class SyncSlaveSlot;
Expand Down Expand Up @@ -232,7 +230,8 @@ enum CmdFlagsMask {
kCmdFlagsMaskAdminRequire = 256,
kCmdFlagsMaskPreDo = 512,
kCmdFlagsMaskCacheDo = 1024,
kCmdFlagsMaskPostDo = 2048,
kCmdFlagsMaskUpdateCache = 2048,
kCmdFlagsMaskOnlyDoCache = 4096,
kCmdFlagsMaskSlot = 1536,
};

Expand Down Expand Up @@ -260,7 +259,8 @@ enum CmdFlags {
kCmdFlagsDoNotSpecifySlot = 0, // default do not specify slot
kCmdFlagsSingleSlot = 512,
kCmdFlagsMultiSlot = 1024,
kCmdFlagsPreDo = 2048,
kCmdFlagsUpdateCache = 2048,
kCmdFlagsOnlyDoCache = 4096
};

void inline RedisAppendContent(std::string& str, const std::string& value);
Expand Down Expand Up @@ -453,6 +453,8 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
virtual void ProcessMultiSlotCmd();
virtual void ProcessDoNotSpecifySlotCmd();
virtual void Do(std::shared_ptr<Slot> slot = nullptr) = 0;
virtual void DoFromCache(std::shared_ptr<Slot> slot = nullptr) {}
virtual void DoUpdateCache(std::shared_ptr<Slot> slot = nullptr) {}
virtual Cmd* Clone() = 0;
// used for execute multikey command into different slots
virtual void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) = 0;
Expand All @@ -468,6 +470,8 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
bool is_admin_require() const;
bool is_single_slot() const;
bool is_multi_slot() const;
bool is_need_update_cache() const;
bool is_only_from_cache() const;
bool HashtagIsConsistent(const std::string& lhs, const std::string& rhs) const;
uint64_t GetDoDuration() const { return do_duration_; };

Expand Down
10 changes: 8 additions & 2 deletions include/pika_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ class SetCmd : public Cmd {
res.push_back(key_);
return res;
}
void Execute() override;
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void DoUpdateCache(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new SetCmd(*this); }
Expand Down Expand Up @@ -54,14 +54,17 @@ class GetCmd : public Cmd {
res.push_back(key_);
return res;
}
void Execute() override;
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void DoFromCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoUpdateCache(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new GetCmd(*this); }

private:
std::string key_;
std::string value_;
int64_t sec_ = 0;
void DoInitial() override;
};

Expand Down Expand Up @@ -90,6 +93,7 @@ class IncrCmd : public Cmd {
return res;
}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void DoUpdateCache(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new IncrCmd(*this); }
Expand Down Expand Up @@ -217,7 +221,9 @@ class AppendCmd : public Cmd {
class MgetCmd : public Cmd {
public:
MgetCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag){};
void DoFromCache(std::shared_ptr<Slot> slot = nullptr) override;
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void DoUpdateCache(std::shared_ptr<Slot> slot = nullptr) override;
std::vector<std::string> current_key() const override { return keys_; }
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override;
void Merge() override;
Expand Down
7 changes: 5 additions & 2 deletions include/pika_slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
#include "storage/storage.h"

#include "include/pika_binlog.h"
#include "include/pika_cache.h"

class Cmd;
class PikaCache;

/*
*Keyscan used
Expand Down Expand Up @@ -56,6 +56,9 @@ class Slot : public std::enable_shared_from_this<Slot>,public pstd::noncopyable
uint32_t GetSlotID() const;
std::string GetSlotName() const;
std::shared_ptr<storage::Storage> db() const;
std::shared_ptr<PikaCache> cache() const;

void Init();

void Compact(const storage::DataType& type);

Expand Down Expand Up @@ -109,7 +112,7 @@ class Slot : public std::enable_shared_from_this<Slot>,public pstd::noncopyable
// class may be shared, using shared_ptr would be a better choice
std::shared_ptr<pstd::lock::LockMgr> lock_mgr_;
std::shared_ptr<storage::Storage> db_;
// std::shared_ptr<PikaCache> cache_;
std::shared_ptr<PikaCache> cache_;

bool full_sync_ = false;

Expand Down
5 changes: 4 additions & 1 deletion include/rsync_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@ class Session;
class WaitObject;
class WaitObjectManager;

using pstd::Status;


class RsyncClient : public net::Thread {
public:
public:
enum State {
IDLE,
RUNNING,
Expand Down
7 changes: 4 additions & 3 deletions src/dory/include/RedisCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ extern "C" {

namespace dory {

using Status = pstd::Status;
using Status = rocksdb::Status;

class RedisCache
{
Expand All @@ -39,7 +39,7 @@ class RedisCache
long long DbSize(void);
void FlushDb(void);

Status Del(std::string &key);
Status Del(const std::string &key);
Status Expire(std::string &key, int64_t ttl);
Status Expireat(std::string &key, int64_t ttl);
Status TTL(std::string &key, int64_t *ttl);
Expand All @@ -49,11 +49,12 @@ class RedisCache

// String Commands
Status Set(std::string &key, std::string &value, int64_t ttl);
Status SetWithoutTTL(std::string &key, std::string &value);
Status Setnx(std::string &key, std::string &value, int64_t ttl);
Status SetnxWithoutTTL(std::string &key, std::string &value);
Status Setxx(std::string &key, std::string &value, int64_t ttl);
Status SetxxWithoutTTL(std::string &key, std::string &value);
Status Get(std::string &key, std::string *value);
Status Get(const std::string &key, std::string *value);
Status Incr(std::string &key);
Status Decr(std::string &key);
Status IncrBy(std::string &key, long long incr);
Expand Down
2 changes: 1 addition & 1 deletion src/dory/src/RedisBit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include "redisdb/object.h"

namespace dory {
using pstd::Status;
using rocksdb::Status;

Status
RedisCache::SetBit(std::string &key, size_t offset, long value)
Expand Down
2 changes: 1 addition & 1 deletion src/dory/src/RedisCache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ RedisCache::FlushDb(void)
}

Status
RedisCache::Del(std::string &key)
RedisCache::Del(const std::string &key)
{
int ret;
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
Expand Down
5 changes: 3 additions & 2 deletions src/dory/src/RedisHash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ RedisCache::HGet(std::string &key, std::string &field, std::string *value)
return Status::NotFound("key not in cache");
} else if (REDIS_ITEM_NOT_EXIST == ret) {
DecrObjectsRefCount(kobj, fobj);
return Status::ItemNotExist("field not exist");
// todo(leehao): its better to let pstd::Status to inherit rocksdb::Status
return Status::NotFound("field not exist");
} else {
DecrObjectsRefCount(kobj, fobj);
return Status::Corruption("RsHGet failed");
Expand Down Expand Up @@ -261,7 +262,7 @@ RedisCache::HExists(std::string &key, std::string &field)
}

DecrObjectsRefCount(kobj, fobj);
return is_exist ? Status::OK() : Status::ItemNotExist("field not exist");
return is_exist ? Status::OK() : Status::NotFound("field not exist");
}

Status
Expand Down
2 changes: 1 addition & 1 deletion src/dory/src/RedisSet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ RedisCache::SIsmember(std::string &key, std::string &member)
}

DecrObjectsRefCount(kobj, mobj);
return is_member ? Status::OK() : Status::ItemNotExist("member not exist");
return is_member ? Status::OK() : Status::NotFound("member not exist");
}

Status
Expand Down
20 changes: 19 additions & 1 deletion src/dory/src/RedisString.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,24 @@ RedisCache::Set(std::string &key, std::string &value, int64_t ttl)
DecrObjectsRefCount(kobj, vobj, tobj);
return Status::OK();
}
Status
RedisCache::SetWithoutTTL(std::string &key, std::string &value)
{
if (C_OK != RsFreeMemoryIfNeeded(m_RedisDB)) {
return Status::Corruption("[error] Free memory faild !");
}

robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
robj *vobj = createObject(OBJ_STRING, sdsnewlen(value.data(), value.size()));

if (C_OK != RsSet(m_RedisDB, kobj, vobj, NULL)) {
DecrObjectsRefCount(kobj, vobj);
return Status::Corruption("RsSetnx failed, key exists!");
}

DecrObjectsRefCount(kobj, vobj);
return Status::OK();
}

Status
RedisCache::Setnx(std::string &key, std::string &value, int64_t ttl)
Expand Down Expand Up @@ -102,7 +120,7 @@ RedisCache::SetxxWithoutTTL(std::string &key, std::string &value)
return Status::OK();
}
Status
RedisCache::Get(std::string &key, std::string* value)
RedisCache::Get(const std::string &key, std::string* value)
{
robj *val;
int ret;
Expand Down
6 changes: 3 additions & 3 deletions src/dory/src/RedisZset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ RedisCache::ZRank(std::string &key, std::string &member, long *rank)
return Status::NotFound("key not in cache");
} else if (REDIS_ITEM_NOT_EXIST == ret) {
DecrObjectsRefCount(kobj, mobj);
return Status::ItemNotExist("member not exist");
return Status::NotFound("member not exist");
} else {
DecrObjectsRefCount(kobj, mobj);
return Status::Corruption("RsZRank failed");
Expand Down Expand Up @@ -355,7 +355,7 @@ RedisCache::ZRevrank(std::string &key, std::string &member, long *rank)
return Status::NotFound("key not in cache");
} else if (REDIS_ITEM_NOT_EXIST == ret) {
DecrObjectsRefCount(kobj, mobj);
return Status::ItemNotExist("member not exist");
return Status::NotFound("member not exist");
} else {
DecrObjectsRefCount(kobj, mobj);
return Status::Corruption("RsZRevrank failed");
Expand All @@ -378,7 +378,7 @@ RedisCache::ZScore(std::string &key, std::string &member, double *score)
return Status::NotFound("key not in cache");
} else if (REDIS_ITEM_NOT_EXIST == ret) {
DecrObjectsRefCount(kobj, mobj);
return Status::ItemNotExist("member not exist");
return Status::NotFound("member not exist");
} else {
DecrObjectsRefCount(kobj, mobj);
return Status::Corruption("RsZScore failed");
Expand Down
Loading

0 comments on commit 0b2e47c

Please sign in to comment.