diff --git a/include/pika_cache.h b/include/pika_cache.h index eb16dac0d3..aaa5c1844c 100644 --- a/include/pika_cache.h +++ b/include/pika_cache.h @@ -166,12 +166,17 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this< const std::shared_ptr& db); rocksdb::Status ZRevrangebylex(std::string& key, std::string& min, std::string& max, std::vector* members, const std::shared_ptr& db); - rocksdb::Status ZRevrank(std::string& key, std::string& member, int64_t *rank, const std::shared_ptr& db); + rocksdb::Status ZRevrank(std::string& key, std::string& member, int64_t* rank, const std::shared_ptr& db); rocksdb::Status ZScore(std::string& key, std::string& member, double* score, const std::shared_ptr& db); - rocksdb::Status ZRangebylex(std::string& key, std::string& min, std::string& max, std::vector* members, const std::shared_ptr& db); + rocksdb::Status ZRangebylex(std::string& key, std::string& min, std::string& max, std::vector* members, + const std::shared_ptr& db); rocksdb::Status ZLexcount(std::string& key, std::string& min, std::string& max, uint64_t* len, const std::shared_ptr& db); rocksdb::Status ZRemrangebylex(std::string& key, std::string& min, std::string& max, const std::shared_ptr& db); + rocksdb::Status ZPopMin(std::string& key, int64_t count, std::vector* score_members, + const std::shared_ptr& db); + rocksdb::Status ZPopMax(std::string& key, int64_t count, std::vector* score_members, + const std::shared_ptr& db); // Bit Commands rocksdb::Status SetBit(std::string& key, size_t offset, int64_t value); diff --git a/include/pika_zset.h b/include/pika_zset.h index a74ee026fc..b4e5726233 100644 --- a/include/pika_zset.h +++ b/include/pika_zset.h @@ -603,6 +603,8 @@ class ZPopmaxCmd : public Cmd { void Do() override; void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; + void DoThroughDB() override; + void DoUpdateCache() override; Cmd* Clone() override { return new ZPopmaxCmd(*this); } private: @@ -623,6 +625,8 @@ class ZPopminCmd : public Cmd { void Do() override; void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; + void DoThroughDB() override; + void DoUpdateCache() override; Cmd* Clone() override { return new ZPopminCmd(*this); } private: diff --git a/src/cache/include/cache.h b/src/cache/include/cache.h index 869cb5aa1b..efd4c4881b 100644 --- a/src/cache/include/cache.h +++ b/src/cache/include/cache.h @@ -147,6 +147,8 @@ class RedisCache { std::vector *members); Status ZLexcount(std::string& key, std::string &min, std::string &max, uint64_t *len); Status ZRemrangebylex(std::string& key, std::string &min, std::string &max); + Status ZPopMin(std::string& key, int64_t count, std::vector* score_members); + Status ZPopMax(std::string& key, int64_t count, std::vector* score_members); // Bit Commands Status SetBit(std::string& key, size_t offset, int64_t value); diff --git a/src/cache/src/zset.cc b/src/cache/src/zset.cc index 3333cc6854..7afcdef7fc 100644 --- a/src/cache/src/zset.cc +++ b/src/cache/src/zset.cc @@ -405,5 +405,83 @@ Status RedisCache::ZRemrangebylex(std::string& key, std::string &min, std::strin return Status::OK(); } + +Status RedisCache::ZPopMin(std::string& key, int64_t count, std::vector* score_members) { + zitem* items = nullptr; + unsigned long items_size = 0; + robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); + DEFER { + DecrObjectsRefCount(kobj); + }; + + int ret = RcZrange(cache_, kobj, 0, -1, &items, &items_size); + if (C_OK != ret) { + if (REDIS_KEY_NOT_EXIST == ret) { + return Status::NotFound("key not in cache"); + } + return Status::Corruption("RcZrange failed"); + } + + unsigned long to_return = std::min(static_cast(count), items_size); + for (unsigned long i = 0; i < to_return; ++i) { + storage::ScoreMember sm; + sm.score = items[i].score; + sm.member.assign(items[i].member, sdslen(items[i].member)); + score_members->push_back(sm); + } + + robj** members_obj = (robj**)zcallocate(sizeof(robj*) * items_size); + for (unsigned long i = 0; i < items_size; ++i) { + members_obj[i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member))); + } + DEFER { + FreeObjectList(members_obj, items_size); + }; + + RcZRem(cache_, kobj, members_obj, to_return); + + FreeZitemList(items, items_size); + return Status::OK(); +} + +Status RedisCache::ZPopMax(std::string& key, int64_t count, std::vector* score_members) { + zitem* items = nullptr; + unsigned long items_size = 0; + robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); + DEFER { + DecrObjectsRefCount(kobj); + }; + + int ret = RcZrange(cache_, kobj, 0, -1, &items, &items_size); + if (C_OK != ret) { + if (REDIS_KEY_NOT_EXIST == ret) { + return Status::NotFound("key not in cache"); + } + return Status::Corruption("RcZrange failed"); + } + + unsigned long to_return = std::min(static_cast(count), items_size); + for (unsigned long i = items_size - to_return; i < items_size; ++i) { + storage::ScoreMember sm; + sm.score = items[i].score; + sm.member.assign(items[i].member, sdslen(items[i].member)); + score_members->push_back(sm); + } + + robj** members_obj = (robj**)zcallocate(sizeof(robj*) * items_size); + for (unsigned long i = items_size - 1; i >= 0; --i) { + members_obj[items_size - 1 - i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member))); + } + + DEFER { + FreeObjectList(members_obj, items_size); + }; + + RcZRem(cache_, kobj, members_obj, to_return); + + FreeZitemList(items, items_size); + return Status::OK(); +} + } // namespace cache /* EOF */ diff --git a/src/pika_cache.cc b/src/pika_cache.cc index 9866a9f74a..31b30beddf 100644 --- a/src/pika_cache.cc +++ b/src/pika_cache.cc @@ -1523,6 +1523,37 @@ Status PikaCache::ZRemrangebylex(std::string& key, std::string &min, std::string } } +Status PikaCache::ZPopMin(std::string &key, int64_t count, std::vector *score_members, + const std::shared_ptr &db) { + int cache_index = CacheIndex(key); + std::lock_guard lm(*cache_mutexs_[cache_index]); + + auto cache_obj = caches_[cache_index]; + Status s; + + if (cache_obj->Exists(key)) { + return cache_obj->ZPopMin(key, count, score_members); + } else { + return Status::NotFound("key not in cache"); + } +} + +Status PikaCache::ZPopMax(std::string &key, int64_t count, std::vector *score_members, + const std::shared_ptr &db) { + int cache_index = CacheIndex(key); + std::lock_guard lm(*cache_mutexs_[cache_index]); + + auto cache_obj = caches_[cache_index]; + Status s; + + if (cache_obj->Exists(key)) { + return cache_obj->ZPopMax(key, count, score_members); + } else { + return Status::NotFound("key not in cache"); + } +} + + /*----------------------------------------------------------------------------- * Bit Commands *----------------------------------------------------------------------------*/ diff --git a/src/pika_command.cc b/src/pika_command.cc index 853aa44ea5..cb09788315 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -605,11 +605,11 @@ void InitCmdTable(CmdTable* cmd_table) { cmd_table->insert(std::pair>(kCmdNameZRemrangebylex, std::move(zremrangebylexptr))); ////ZPopmax std::unique_ptr zpopmaxptr = std::make_unique( - kCmdNameZPopmax, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast); + kCmdNameZPopmax, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache); cmd_table->insert(std::pair>(kCmdNameZPopmax, std::move(zpopmaxptr))); ////ZPopmin std::unique_ptr zpopminptr = std::make_unique( - kCmdNameZPopmin, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast); + kCmdNameZPopmin, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache); cmd_table->insert(std::pair>(kCmdNameZPopmin, std::move(zpopminptr))); // Set diff --git a/src/pika_zset.cc b/src/pika_zset.cc index 7ce26854ce..305cd09016 100644 --- a/src/pika_zset.cc +++ b/src/pika_zset.cc @@ -1462,6 +1462,17 @@ void ZPopmaxCmd::Do() { } } +void ZPopmaxCmd::DoThroughDB(){ + Do(); +} + +void ZPopmaxCmd::DoUpdateCache(){ + std::vector score_members; + if(s_.ok() || s_.IsNotFound()){ + db_->cache()->ZPopMax(key_, count_, &score_members, db_); + } +} + void ZPopminCmd::DoInitial() { if (!CheckArg(argv_.size())) { res_.SetRes(CmdRes::kWrongNum, kCmdNameZPopmin); @@ -1478,6 +1489,17 @@ void ZPopminCmd::DoInitial() { } } +void ZPopminCmd::DoThroughDB(){ + Do(); +} + +void ZPopminCmd::DoUpdateCache(){ + std::vector score_members; + if(s_.ok() || s_.IsNotFound()){ + db_->cache()->ZPopMin(key_, count_, &score_members, db_); + } +} + void ZPopminCmd::Do() { std::vector score_members; rocksdb::Status s = db_->storage()->ZPopMin(key_, count_, &score_members); diff --git a/tests/integration/zset_test.go b/tests/integration/zset_test.go index c742287cae..7cc81bbcef 100644 --- a/tests/integration/zset_test.go +++ b/tests/integration/zset_test.go @@ -1440,6 +1440,38 @@ var _ = Describe("Zset Commands", func() { }})) }) + It("should Zpopmin test", func() { + err := client.ZAdd(ctx, "zpopzset1", redis.Z{ + Score: 1, + Member: "m1", + }).Err() + Expect(err).NotTo(HaveOccurred()) + + err = client.ZAdd(ctx, "zpopzset1", redis.Z{ + Score: 3, + Member: "m3", + }).Err() + Expect(err).NotTo(HaveOccurred()) + + err = client.ZAdd(ctx, "zpopzset1", redis.Z{ + Score: 4, + Member: "m4", + }).Err() + Expect(err).NotTo(HaveOccurred()) + + max, err := client.ZPopMax(ctx, "zpopzset1", 1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(max).To(Equal([]redis.Z{{Score: 4, Member: "m4"}})) + + min, err := client.ZPopMin(ctx, "zpopzset1", 1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(min).To(Equal([]redis.Z{{Score: 1, Member: "m1"}})) + + rangeResult, err := client.ZRange(ctx, "zpopzset1", 0, -1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(rangeResult).To(Equal([]string{"m3"})) + }) + It("should ZRemRangeByRank", func() { err := client.ZAdd(ctx, "zset", redis.Z{Score: 1, Member: "one"}).Err() Expect(err).NotTo(HaveOccurred())