From 8385dd44e8d321947f324f675f1ca69dad6ca003 Mon Sep 17 00:00:00 2001 From: chejinge Date: Thu, 10 Oct 2024 20:07:17 +0800 Subject: [PATCH 1/2] feat: add update cache ability to ZPopMax and ZPopMin --- include/pika_cache.h | 2 + include/pika_zset.h | 4 ++ src/cache/include/cache.h | 2 + src/cache/src/zset.cc | 78 +++++++++++++++++++++++++++++++++++++++ src/pika_cache.cc | 28 ++++++++++++++ src/pika_command.cc | 4 +- src/pika_zset.cc | 22 +++++++++++ 7 files changed, 138 insertions(+), 2 deletions(-) diff --git a/include/pika_cache.h b/include/pika_cache.h index d82627ced7..18a55e651c 100644 --- a/include/pika_cache.h +++ b/include/pika_cache.h @@ -172,6 +172,8 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this< rocksdb::Status ZLexcount(std::string& key, std::string& min, std::string& max, uint64_t* len, const std::shared_ptr& db); rocksdb::Status ZRemrangebylex(std::string& key, std::string& min, std::string& max, const std::shared_ptr& db); + rocksdb::Status ZPopMin(std::string& key, int64_t count, std::vector* score_members, const std::shared_ptr& db); + rocksdb::Status ZPopMax(std::string& key, int64_t count, std::vector* score_members, const std::shared_ptr& db); // Bit Commands rocksdb::Status SetBit(std::string& key, size_t offset, int64_t value); diff --git a/include/pika_zset.h b/include/pika_zset.h index a74ee026fc..b4e5726233 100644 --- a/include/pika_zset.h +++ b/include/pika_zset.h @@ -603,6 +603,8 @@ class ZPopmaxCmd : public Cmd { void Do() override; void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; + void DoThroughDB() override; + void DoUpdateCache() override; Cmd* Clone() override { return new ZPopmaxCmd(*this); } private: @@ -623,6 +625,8 @@ class ZPopminCmd : public Cmd { void Do() override; void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; + void DoThroughDB() override; + void DoUpdateCache() override; Cmd* Clone() override { return new ZPopminCmd(*this); } private: diff --git a/src/cache/include/cache.h b/src/cache/include/cache.h index 5f5d6a2959..20c93715f0 100644 --- a/src/cache/include/cache.h +++ b/src/cache/include/cache.h @@ -147,6 +147,8 @@ class RedisCache { std::vector *members); Status ZLexcount(std::string& key, std::string &min, std::string &max, uint64_t *len); Status ZRemrangebylex(std::string& key, std::string &min, std::string &max); + Status ZPopMin(std::string& key, int64_t count, std::vector* score_members); + Status ZPopMax(std::string& key, int64_t count, std::vector* score_members); // Bit Commands Status SetBit(std::string& key, size_t offset, int64_t value); diff --git a/src/cache/src/zset.cc b/src/cache/src/zset.cc index 9a83c018c5..9835713418 100644 --- a/src/cache/src/zset.cc +++ b/src/cache/src/zset.cc @@ -405,5 +405,83 @@ Status RedisCache::ZRemrangebylex(std::string& key, std::string &min, std::strin return Status::OK(); } +Status RedisCache::ZPopMin(std::string& key, int64_t count, std::vector* score_members) { + zitem* items = nullptr; + unsigned long items_size = 0; + robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); + DEFER { + DecrObjectsRefCount(kobj); + }; + + int ret = RcZrange(cache_, kobj, 0, -1, &items, &items_size); + if (C_OK != ret) { + if (REDIS_KEY_NOT_EXIST == ret) { + return Status::NotFound("key not in cache"); + } + return Status::Corruption("RcZrange failed"); + } + + unsigned long to_return = std::min(static_cast(count), items_size); + for (unsigned long i = 0; i < to_return; ++i) { + storage::ScoreMember sm; + sm.score = items[i].score; + sm.member.assign(items[i].member, sdslen(items[i].member)); + score_members->push_back(sm); + } + + robj** members_obj = (robj**)zcallocate(sizeof(robj*) * items_size); + for (unsigned long i = 0; i < items_size; ++i) { + members_obj[i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member))); + } + DEFER { + FreeObjectList(members_obj, items_size); + }; + + RcZRem(cache_, kobj, members_obj, to_return); + + FreeZitemList(items, items_size); + return Status::OK(); +} + +Status RedisCache::ZPopMax(std::string& key, int64_t count, std::vector* score_members) { + zitem* items = nullptr; + unsigned long items_size = 0; + robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); + DEFER { + DecrObjectsRefCount(kobj); + }; + + int ret = RcZrange(cache_, kobj, 0, -1, &items, &items_size); + if (C_OK != ret) { + if (REDIS_KEY_NOT_EXIST == ret) { + return Status::NotFound("key not in cache"); + } + return Status::Corruption("RcZrange failed"); + } + + unsigned long to_return = std::min(static_cast(count), items_size); + for (unsigned long i = items_size - to_return; i < items_size; ++i) { + storage::ScoreMember sm; + sm.score = items[i].score; + sm.member.assign(items[i].member, sdslen(items[i].member)); + score_members->push_back(sm); + } + + robj** members_obj = (robj**)zcallocate(sizeof(robj*) * items_size); + for (unsigned long i = items_size - 1; i >= 0; --i) { + members_obj[items_size - 1 - i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member))); + } + + DEFER { + FreeObjectList(members_obj, items_size); + }; + + RcZRem(cache_, kobj, members_obj, to_return); + + FreeZitemList(items, items_size); + return Status::OK(); +} + + } // namespace cache /* EOF */ diff --git a/src/pika_cache.cc b/src/pika_cache.cc index b7d1f45eb1..ebec3f29b4 100644 --- a/src/pika_cache.cc +++ b/src/pika_cache.cc @@ -1465,6 +1465,34 @@ Status PikaCache::ZRemrangebylex(std::string& key, std::string &min, std::string } } +Status PikaCache::ZPopMin(std::string& key, int64_t count, std::vector* score_members, const std::shared_ptr& db) { + int cache_index = CacheIndex(key); + std::lock_guard lm(*cache_mutexs_[cache_index]); + + auto cache_obj = caches_[cache_index]; + Status s; + + if (cache_obj->Exists(key)) { + return cache_obj->ZPopMin(key, count, score_members); + } else { + return Status::NotFound("key not in cache"); + } +} + +Status PikaCache::ZPopMax(std::string& key, int64_t count, std::vector* score_members, const std::shared_ptr& db) { + int cache_index = CacheIndex(key); + std::lock_guard lm(*cache_mutexs_[cache_index]); + + auto cache_obj = caches_[cache_index]; + Status s; + + if (cache_obj->Exists(key)) { + return cache_obj->ZPopMax(key, count, score_members); + } else { + return Status::NotFound("key not in cache"); + } +} + /*----------------------------------------------------------------------------- * Bit Commands *----------------------------------------------------------------------------*/ diff --git a/src/pika_command.cc b/src/pika_command.cc index 8ba3e7ba6c..6c099f6ecf 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -600,11 +600,11 @@ void InitCmdTable(CmdTable* cmd_table) { cmd_table->insert(std::pair>(kCmdNameZRemrangebylex, std::move(zremrangebylexptr))); ////ZPopmax std::unique_ptr zpopmaxptr = std::make_unique( - kCmdNameZPopmax, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast); + kCmdNameZPopmax, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache); cmd_table->insert(std::pair>(kCmdNameZPopmax, std::move(zpopmaxptr))); ////ZPopmin std::unique_ptr zpopminptr = std::make_unique( - kCmdNameZPopmin, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast); + kCmdNameZPopmin, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache); cmd_table->insert(std::pair>(kCmdNameZPopmin, std::move(zpopminptr))); // Set diff --git a/src/pika_zset.cc b/src/pika_zset.cc index 6b62dbf93b..87b45723f0 100644 --- a/src/pika_zset.cc +++ b/src/pika_zset.cc @@ -1507,6 +1507,17 @@ void ZPopmaxCmd::Do() { } } +void ZPopmaxCmd::DoThroughDB(){ + Do(); +} + +void ZPopmaxCmd::DoUpdateCache(){ + std::vector score_members; + if(s_.ok() || s_.IsNotFound()){ + db_->cache()->ZPopMax(key_, count_, &score_members, db_); + } +} + void ZPopminCmd::DoInitial() { if (!CheckArg(argv_.size())) { res_.SetRes(CmdRes::kWrongNum, kCmdNameZPopmin); @@ -1523,6 +1534,17 @@ void ZPopminCmd::DoInitial() { } } +void ZPopminCmd::DoThroughDB(){ + Do(); +} + +void ZPopminCmd::DoUpdateCache(){ + std::vector score_members; + if(s_.ok() || s_.IsNotFound()){ + db_->cache()->ZPopMin(key_, count_, &score_members, db_); + } +} + void ZPopminCmd::Do() { std::vector score_members; rocksdb::Status s = db_->storage()->ZPopMin(key_, count_, &score_members); From 7c6c24c433f54f0045a083640bd02954716069f5 Mon Sep 17 00:00:00 2001 From: chejinge Date: Thu, 10 Oct 2024 20:16:49 +0800 Subject: [PATCH 2/2] feat:zpopmin && zpopmax to cache --- include/pika_cache.h | 26 +++++++++++++++----------- src/cache/src/zset.cc | 33 ++++++++++++--------------------- src/pika_cache.cc | 6 ++++-- src/pika_zset.cc | 12 ++++++------ tests/integration/zset_test.go | 32 ++++++++++++++++++++++++++++++++ 5 files changed, 69 insertions(+), 40 deletions(-) diff --git a/include/pika_cache.h b/include/pika_cache.h index 18a55e651c..a8495defe3 100644 --- a/include/pika_cache.h +++ b/include/pika_cache.h @@ -149,9 +149,10 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this< rocksdb::Status ZCard(std::string& key, uint32_t* len, const std::shared_ptr& db); rocksdb::Status ZCount(std::string& key, std::string& min, std::string& max, uint64_t* len, ZCountCmd* cmd); rocksdb::Status ZIncrby(std::string& key, std::string& member, double increment); - rocksdb::Status ZIncrbyIfKeyExist(std::string& key, std::string& member, double increment, ZIncrbyCmd* cmd, const std::shared_ptr& db); - rocksdb::Status ZRange(std::string& key, int64_t start, int64_t stop, std::vector* score_members, - const std::shared_ptr& db); + rocksdb::Status ZIncrbyIfKeyExist(std::string& key, std::string& member, double increment, ZIncrbyCmd* cmd, + const std::shared_ptr& db); + rocksdb::Status ZRange(std::string& key, int64_t start, int64_t stop, + std::vector* score_members, const std::shared_ptr& db); rocksdb::Status ZRangebyscore(std::string& key, std::string& min, std::string& max, std::vector* score_members, ZRangebyscoreCmd* cmd); rocksdb::Status ZRank(std::string& key, std::string& member, int64_t* rank, const std::shared_ptr& db); @@ -159,21 +160,24 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this< rocksdb::Status ZRemrangebyrank(std::string& key, std::string& min, std::string& max, int32_t ele_deleted = 0, const std::shared_ptr& db = nullptr); rocksdb::Status ZRemrangebyscore(std::string& key, std::string& min, std::string& max, const std::shared_ptr& db); - rocksdb::Status ZRevrange(std::string& key, int64_t start, int64_t stop, std::vector* score_members, - const std::shared_ptr& db); + rocksdb::Status ZRevrange(std::string& key, int64_t start, int64_t stop, + std::vector* score_members, const std::shared_ptr& db); rocksdb::Status ZRevrangebyscore(std::string& key, std::string& min, std::string& max, std::vector* score_members, ZRevrangebyscoreCmd* cmd, const std::shared_ptr& db); - rocksdb::Status ZRevrangebylex(std::string& key, std::string& min, std::string& max, std::vector* members, - const std::shared_ptr& db); - rocksdb::Status ZRevrank(std::string& key, std::string& member, int64_t *rank, const std::shared_ptr& db); + rocksdb::Status ZRevrangebylex(std::string& key, std::string& min, std::string& max, + std::vector* members, const std::shared_ptr& db); + rocksdb::Status ZRevrank(std::string& key, std::string& member, int64_t* rank, const std::shared_ptr& db); rocksdb::Status ZScore(std::string& key, std::string& member, double* score, const std::shared_ptr& db); - rocksdb::Status ZRangebylex(std::string& key, std::string& min, std::string& max, std::vector* members, const std::shared_ptr& db); + rocksdb::Status ZRangebylex(std::string& key, std::string& min, std::string& max, std::vector* members, + const std::shared_ptr& db); rocksdb::Status ZLexcount(std::string& key, std::string& min, std::string& max, uint64_t* len, const std::shared_ptr& db); rocksdb::Status ZRemrangebylex(std::string& key, std::string& min, std::string& max, const std::shared_ptr& db); - rocksdb::Status ZPopMin(std::string& key, int64_t count, std::vector* score_members, const std::shared_ptr& db); - rocksdb::Status ZPopMax(std::string& key, int64_t count, std::vector* score_members, const std::shared_ptr& db); + rocksdb::Status ZPopMin(std::string& key, int64_t count, std::vector* score_members, + const std::shared_ptr& db); + rocksdb::Status ZPopMax(std::string& key, int64_t count, std::vector* score_members, + const std::shared_ptr& db); // Bit Commands rocksdb::Status SetBit(std::string& key, size_t offset, int64_t value); diff --git a/src/cache/src/zset.cc b/src/cache/src/zset.cc index 9835713418..7dcc89f14a 100644 --- a/src/cache/src/zset.cc +++ b/src/cache/src/zset.cc @@ -405,13 +405,11 @@ Status RedisCache::ZRemrangebylex(std::string& key, std::string &min, std::strin return Status::OK(); } -Status RedisCache::ZPopMin(std::string& key, int64_t count, std::vector* score_members) { - zitem* items = nullptr; +Status RedisCache::ZPopMin(std::string &key, int64_t count, std::vector *score_members) { + zitem *items = nullptr; unsigned long items_size = 0; - robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); - DEFER { - DecrObjectsRefCount(kobj); - }; + robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); + DEFER { DecrObjectsRefCount(kobj); }; int ret = RcZrange(cache_, kobj, 0, -1, &items, &items_size); if (C_OK != ret) { @@ -429,13 +427,11 @@ Status RedisCache::ZPopMin(std::string& key, int64_t count, std::vectorpush_back(sm); } - robj** members_obj = (robj**)zcallocate(sizeof(robj*) * items_size); + robj **members_obj = (robj **)zcallocate(sizeof(robj *) * items_size); for (unsigned long i = 0; i < items_size; ++i) { members_obj[i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member))); } - DEFER { - FreeObjectList(members_obj, items_size); - }; + DEFER { FreeObjectList(members_obj, items_size); }; RcZRem(cache_, kobj, members_obj, to_return); @@ -443,13 +439,11 @@ Status RedisCache::ZPopMin(std::string& key, int64_t count, std::vector* score_members) { - zitem* items = nullptr; +Status RedisCache::ZPopMax(std::string &key, int64_t count, std::vector *score_members) { + zitem *items = nullptr; unsigned long items_size = 0; - robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); - DEFER { - DecrObjectsRefCount(kobj); - }; + robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); + DEFER { DecrObjectsRefCount(kobj); }; int ret = RcZrange(cache_, kobj, 0, -1, &items, &items_size); if (C_OK != ret) { @@ -467,14 +461,12 @@ Status RedisCache::ZPopMax(std::string& key, int64_t count, std::vectorpush_back(sm); } - robj** members_obj = (robj**)zcallocate(sizeof(robj*) * items_size); + robj **members_obj = (robj **)zcallocate(sizeof(robj *) * items_size); for (unsigned long i = items_size - 1; i >= 0; --i) { members_obj[items_size - 1 - i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member))); } - DEFER { - FreeObjectList(members_obj, items_size); - }; + DEFER { FreeObjectList(members_obj, items_size); }; RcZRem(cache_, kobj, members_obj, to_return); @@ -482,6 +474,5 @@ Status RedisCache::ZPopMax(std::string& key, int64_t count, std::vector* score_members, const std::shared_ptr& db) { +Status PikaCache::ZPopMin(std::string &key, int64_t count, std::vector *score_members, + const std::shared_ptr &db) { int cache_index = CacheIndex(key); std::lock_guard lm(*cache_mutexs_[cache_index]); @@ -1479,7 +1480,8 @@ Status PikaCache::ZPopMin(std::string& key, int64_t count, std::vector* score_members, const std::shared_ptr& db) { +Status PikaCache::ZPopMax(std::string &key, int64_t count, std::vector *score_members, + const std::shared_ptr &db) { int cache_index = CacheIndex(key); std::lock_guard lm(*cache_mutexs_[cache_index]); diff --git a/src/pika_zset.cc b/src/pika_zset.cc index 87b45723f0..29d1cb2355 100644 --- a/src/pika_zset.cc +++ b/src/pika_zset.cc @@ -1507,13 +1507,13 @@ void ZPopmaxCmd::Do() { } } -void ZPopmaxCmd::DoThroughDB(){ +void ZPopmaxCmd::DoThroughDB() { Do(); } -void ZPopmaxCmd::DoUpdateCache(){ +void ZPopmaxCmd::DoUpdateCache() { std::vector score_members; - if(s_.ok() || s_.IsNotFound()){ + if (s_.ok() || s_.IsNotFound()) { db_->cache()->ZPopMax(key_, count_, &score_members, db_); } } @@ -1534,13 +1534,13 @@ void ZPopminCmd::DoInitial() { } } -void ZPopminCmd::DoThroughDB(){ +void ZPopminCmd::DoThroughDB() { Do(); } -void ZPopminCmd::DoUpdateCache(){ +void ZPopminCmd::DoUpdateCache() { std::vector score_members; - if(s_.ok() || s_.IsNotFound()){ + if (s_.ok() || s_.IsNotFound()) { db_->cache()->ZPopMin(key_, count_, &score_members, db_); } } diff --git a/tests/integration/zset_test.go b/tests/integration/zset_test.go index a141f0ed2a..77b72e0bf1 100644 --- a/tests/integration/zset_test.go +++ b/tests/integration/zset_test.go @@ -1092,6 +1092,38 @@ var _ = Describe("Zset Commands", func() { Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'zpopmin' command"))) }) + It("should Zpopmin test", func() { + err := client.ZAdd(ctx, "zpopzset1", redis.Z{ + Score: 1, + Member: "m1", + }).Err() + Expect(err).NotTo(HaveOccurred()) + + err = client.ZAdd(ctx, "zpopzset1", redis.Z{ + Score: 3, + Member: "m3", + }).Err() + Expect(err).NotTo(HaveOccurred()) + + err = client.ZAdd(ctx, "zpopzset1", redis.Z{ + Score: 4, + Member: "m4", + }).Err() + Expect(err).NotTo(HaveOccurred()) + + max, err := client.ZPopMax(ctx, "zpopzset1", 1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(max).To(Equal([]redis.Z{{Score: 4, Member: "m4"}})) + + min, err := client.ZPopMin(ctx, "zpopzset1", 1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(min).To(Equal([]redis.Z{{Score: 1, Member: "m1"}})) + + rangeResult, err := client.ZRange(ctx, "zpopzset1", 0, -1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(rangeResult).To(Equal([]string{"m3"})) + }) + It("should ZRange", func() { err := client.ZAdd(ctx, "zset", redis.Z{Score: 1, Member: "one"}).Err() Expect(err).NotTo(HaveOccurred())