From a97bd666ad97ec4cb734cc2764354aea747e2a79 Mon Sep 17 00:00:00 2001 From: Coldwings Date: Tue, 16 Jul 2024 16:25:21 +0800 Subject: [PATCH] Makes queue transport sharedptr directly Signed-off-by: Coldwings --- common/lockfree_queue.h | 48 +++++++++++-------- common/objectcachev2.h | 103 ++++++++++++++++++++-------------------- 2 files changed, 80 insertions(+), 71 deletions(-) diff --git a/common/lockfree_queue.h b/common/lockfree_queue.h index 7fda6da9..333bca63 100644 --- a/common/lockfree_queue.h +++ b/common/lockfree_queue.h @@ -90,16 +90,23 @@ struct PhotonPause : PauseBase { } }; +template +struct is_shared_ptr : std::false_type {}; +template +struct is_shared_ptr> : std::true_type {}; + template class LockfreeRingQueueBase { public: #if __cplusplus < 201402L - static_assert(std::has_trivial_copy_constructor::value && - std::has_trivial_copy_assign::value, + static_assert((std::has_trivial_copy_constructor::value && + std::has_trivial_copy_assign::value) || + is_shared_ptr::value, "T should be trivially copyable"); #else - static_assert(std::is_trivially_copy_constructible::value && - std::is_trivially_copy_assignable::value, + static_assert((std::is_trivially_copy_constructible::value && + std::is_trivially_copy_assignable::value) || + is_shared_ptr::value, "T should be trivially copyable"); #endif @@ -286,15 +293,15 @@ class LockfreeBatchMPMCRingQueue : public LockfreeRingQueueBase { using Base::empty; using Base::full; - size_t push_batch(const T *x, size_t n) { + size_t push_batch(const T* x, size_t n) { size_t rh, wt; wt = tail.load(std::memory_order_relaxed); for (;;) { rh = head.load(std::memory_order_acquire); auto wn = std::min(n, Base::capacity - (wt - rh)); - if (wn == 0) - return 0; - if (!tail.compare_exchange_strong(wt, wt + wn, std::memory_order_acq_rel)) + if (wn == 0) return 0; + if (!tail.compare_exchange_strong(wt, wt + wn, + std::memory_order_acq_rel)) continue; auto first_idx = idx(wt); auto part_length = Base::capacity - first_idx; @@ -303,28 +310,28 @@ class LockfreeBatchMPMCRingQueue : public LockfreeRingQueueBase { } else { if (likely(part_length)) memcpy(&slots[first_idx], x, sizeof(T) * (part_length)); - memcpy(&slots[0], x + part_length, sizeof(T) * (wn - part_length)); + memcpy(&slots[0], x + part_length, + sizeof(T) * (wn - part_length)); } auto wh = wt; - while (!write_head.compare_exchange_strong(wh, wt + wn, std::memory_order_acq_rel)) + while (!write_head.compare_exchange_strong( + wh, wt + wn, std::memory_order_acq_rel)) wh = wt; return wn; } } - bool push(const T &x) { - return push_batch(&x, 1) == 1; - } + bool push(const T& x) { return push_batch(&x, 1) == 1; } - size_t pop_batch(T *x, size_t n) { + size_t pop_batch(T* x, size_t n) { size_t rt, wh; rt = read_tail.load(std::memory_order_relaxed); for (;;) { wh = write_head.load(std::memory_order_acquire); auto rn = std::min(n, wh - rt); - if (rn == 0) - return 0; - if (!read_tail.compare_exchange_strong(rt, rt + rn, std::memory_order_acq_rel)) + if (rn == 0) return 0; + if (!read_tail.compare_exchange_strong(rt, rt + rn, + std::memory_order_acq_rel)) continue; auto first_idx = idx(rt); auto part_length = Base::capacity - first_idx; @@ -333,10 +340,12 @@ class LockfreeBatchMPMCRingQueue : public LockfreeRingQueueBase { } else { if (likely(part_length)) memcpy(x, &slots[first_idx], sizeof(T) * (part_length)); - memcpy(x + part_length, &slots[0], sizeof(T) * (rn - part_length)); + memcpy(x + part_length, &slots[0], + sizeof(T) * (rn - part_length)); } auto rh = rt; - while (!head.compare_exchange_strong(rh, rt + rn, std::memory_order_acq_rel)) + while (!head.compare_exchange_strong(rh, rt + rn, + std::memory_order_acq_rel)) rh = rt; return rn; } @@ -585,4 +594,3 @@ class RingChannel : public QueueType { } // namespace photon #undef size_t - diff --git a/common/objectcachev2.h b/common/objectcachev2.h index 20e7422a..0085e3f9 100644 --- a/common/objectcachev2.h +++ b/common/objectcachev2.h @@ -47,16 +47,18 @@ class ObjectCacheV2 { // Returned RefPtr is the old one // other reader will get new one after updated - std::shared_ptr update(V* val, uint64_t ts = 0, - std::shared_ptr* newptr = nullptr) { - auto r = std::shared_ptr(val); - if (newptr) - std::atomic_store(newptr, r); - *newptr = r; + + std::shared_ptr update(std::shared_ptr& r, uint64_t ts = 0) { box->lastcreate = ts; r = std::atomic_exchange(&box->ref, r); return r; } + std::shared_ptr update(V* val, uint64_t ts = 0, + std::shared_ptr* newptr = nullptr) { + auto r = std::shared_ptr(val); + if (newptr) *newptr = r; + return update(r, ts); + } explicit Updater(Box* b) : box(b) {} @@ -78,53 +80,56 @@ class ObjectCacheV2 { ~Box() {} Updater writer() { return Updater(this); } - std::shared_ptr reader() { - return std::atomic_load(&ref); - } + std::shared_ptr reader() { return std::atomic_load(&ref); } }; struct ItemHash { - size_t operator()(const Box* x) const { return std::hash()(x->key); } + size_t operator()(const Box& x) const { return std::hash()(x.key); } }; struct ItemEqual { - bool operator()(const Box* a, const Box* b) const { - return a->key == b->key; + bool operator()(const Box& a, const Box& b) const { + return a.key == b.key; } }; photon::thread* reclaimer = nullptr; - photon::common::RingChannel< - LockfreeMPMCRingQueue*, 4096>> + photon::common::RingChannel, 4096>> reclaim_queue; photon::spinlock maplock; - std::unordered_set map; + std::unordered_set map; intrusive_list cycle_list; uint64_t lifespan; photon::Timer _timer; + bool _exit = false; + + std::shared_ptr __update(Box& item, std::shared_ptr val) { + auto ret_val = val; + auto old_val = item.writer().update(val, photon::now); + reclaim_queue.template send(old_val); + return ret_val; + } - std::shared_ptr __update(Box* item, V* val) { - std::shared_ptr ret; - reclaim_queue.template send(new std::shared_ptr( - item->writer().update(val, photon::now, &ret))); - return ret; + std::shared_ptr __update(Box& item, V* val) { + std::shared_ptr ptr(val); + return __update(item, ptr); } template - Box* __find_or_create_item(KeyType&& key) { + Box& __find_or_create_item(KeyType&& key) { Box keyitem(key); - auto pkey = &keyitem; Box* item = nullptr; SCOPED_LOCK(maplock); - auto it = map.find(pkey); + auto it = map.find(keyitem); if (it == map.end()) { - item = new Box(std::forward(key)); - map.emplace(item); + // item = new Box(std::forward(key)); + auto rt = map.emplace(key); + if (rt.second) item = (Box*)&*rt.first; } else - item = *it; + item = (Box*)&*it; assert(item); item->timestamp = photon::now; cycle_list.pop(item); cycle_list.push_back(item); - return item; + return *item; } uint64_t __expire() { @@ -142,13 +147,11 @@ class ObjectCacheV2 { auto x = cycle_list.front(); while (x && (photon::sat_add(x->timestamp, lifespan) < now)) { cycle_list.pop(x); - __update(x, nullptr); - map.erase(x); - delete_list.push_back(x); + __update(*x, nullptr); + map.erase(*x); x = cycle_list.front(); } } - delete_list.delete_all(); return 0; } @@ -180,7 +183,7 @@ class ObjectCacheV2 { ~Borrow() { if (_recycle) { - _oc->__update(_item, nullptr); + _oc->__update(*_item, nullptr); } } @@ -195,29 +198,30 @@ class ObjectCacheV2 { template Borrow borrow(KeyType&& key, Ctor&& ctor, uint64_t cooldown = 0UL) { - auto item = __find_or_create_item(std::forward(key)); - auto r = item->reader(); + auto& item = __find_or_create_item(std::forward(key)); + auto r = item.reader(); while (!r) { - if (item->boxlock.try_lock() == 0) { - DEFER(item->boxlock.unlock()); - r = item->reader(); + if (item.boxlock.try_lock() == 0) { + DEFER(item.boxlock.unlock()); + r = item.reader(); if (!r) { - if (photon::sat_add(item->lastcreate, cooldown) <= + if (photon::sat_add(item.lastcreate, cooldown) <= photon::now) { r = __update(item, ctor()); } - return Borrow(this, item, std::move(r)); + return Borrow(this, &item, std::move(r)); } } photon::thread_yield(); - r = item->reader(); + r = item.reader(); } - return Borrow(this, item, std::move(r)); + return Borrow(this, &item, std::move(r)); } template Borrow borrow(KeyType&& key) { - return borrow(std::forward(key), [&]() { return new V(); }); + return borrow(std::forward(key), + [&]() { return std::make_shared(); }); } template @@ -230,15 +234,11 @@ class ObjectCacheV2 { ObjectCacheV2(uint64_t lifespan) : lifespan(lifespan), _timer(1UL * 1000 * 1000, {this, &ObjectCacheV2::__expire}, true, - photon::DEFAULT_STACK_SIZE) { + photon::DEFAULT_STACK_SIZE), + _exit(false) { reclaimer = photon::thread_create11([this] { - while (true) { - auto r = reclaim_queue.recv(); - if (!r) { - break; - } else { - delete r; - } + while (!_exit) { + reclaim_queue.recv(); } }); photon::thread_enable_join(reclaimer); @@ -247,11 +247,12 @@ class ObjectCacheV2 { ~ObjectCacheV2() { _timer.stop(); if (reclaimer) { + _exit = true; reclaim_queue.template send(nullptr); photon::thread_join((photon::join_handle*)reclaimer); reclaimer = nullptr; } SCOPED_LOCK(maplock); - cycle_list.delete_all(); + cycle_list.node = nullptr; } }; \ No newline at end of file