diff --git a/common/objectcachev2.h b/common/objectcachev2.h new file mode 100644 index 00000000..96b606cc --- /dev/null +++ b/common/objectcachev2.h @@ -0,0 +1,256 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +/** + +A simpler but effective object cache implementation. + +`ObjectCacheV2` shows better performance compare to `ObjectCache`, with several +improvements: + +1. Less lock in both read and write. +2. Object destruction always goes in background photon thread called reclaimer. +3. Self adjustable timeout for reclaimer. No needs to set cycler timer. +4. New `update` API, able to immediately substitute objects. +5. `ObjectCacheV2` no longer support acquire/release API. + +It should work as `ObjectCache` in code do not depends on acquire/release API. + +**/ + +template +class ObjectCacheV2 { +protected: + using V = std::remove_pointer_t; + + struct Box : public intrusive_list_node { + const K key; + std::shared_ptr ref; + photon::spinlock boxlock; + photon::spinlock reflock; + uint64_t lastcreate = 0; + uint64_t timestamp = 0; + + struct Updater { + Box* box = nullptr; + + ~Updater() {} + + // Returned RefPtr is the old one + // other reader will get new one after updated + std::shared_ptr update(V* val, uint64_t ts = 0) { + while (box->reflock.try_lock() != 0) { + photon::thread_yield(); + } + DEFER(box->reflock.unlock()); + auto r = std::shared_ptr(val); + if (ts) box->lastcreate = ts; + std::swap(r, box->ref); + return r; + } + + explicit Updater(Box* b) : box(b) {} + + Updater(const Updater&) = delete; + Updater(Updater&& rhs) : box(nullptr) { *this = std::move(rhs); } + Updater& operator=(const Updater&) = delete; + Updater& operator=(Updater&& rhs) { + std::swap(box, rhs.box); + return *this; + } + operator bool() const { return box != nullptr; } + }; + + Box() = default; + template + explicit Box(KeyType&& key) + : key(std::forward(key)), ref(nullptr) {} + + ~Box() {} + + Updater writer() { return Updater(this); } + std::shared_ptr reader() { + SCOPED_LOCK(reflock); + return ref; + } + }; + struct ItemHash { + size_t operator()(const Box* x) const { return std::hash()(x->key); } + }; + struct ItemEqual { + bool operator()(const Box* a, const Box* b) const { + return a->key == b->key; + } + }; + + photon::thread* reclaimer = nullptr; + photon::common::RingChannel< + LockfreeMPMCRingQueue*, 4096>> + reclaim_queue; + photon::spinlock maplock; + std::unordered_set map; + intrusive_list cycle_list; + uint64_t lifespan; + photon::Timer _timer; + + void __update(Box* item, V* val) { + reclaim_queue.template send( + new std::shared_ptr(item->writer().update(val, photon::now))); + } + + template + Box* __find_or_create_item(const KeyType& key) { + Box keyitem(key); + auto pkey = &keyitem; + Box* item = nullptr; + SCOPED_LOCK(maplock); + auto it = map.find(pkey); + if (it == map.end()) { + item = new Box(key); + map.emplace(item); + } else + item = *it; + assert(item); + item->timestamp = photon::now; + cycle_list.pop(item); + cycle_list.push_back(item); + return item; + } + + uint64_t __expire() { + intrusive_list delete_list; + uint64_t now = photon::now; + { + SCOPED_LOCK(maplock); + if (cycle_list.empty()) return 0; + if (photon::sat_add(cycle_list.front()->timestamp, lifespan) > + now) { + return photon::sat_add(cycle_list.front()->timestamp, + lifespan) - + now; + } + auto x = cycle_list.front(); + while (x && (photon::sat_add(x->timestamp, lifespan) < now)) { + cycle_list.pop(x); + __update(x, nullptr); + map.erase(x); + delete_list.push_back(x); + x = cycle_list.front(); + } + } + delete_list.delete_all(); + return 0; + } + +public: + struct Borrow { + ObjectCacheV2* _oc = nullptr; + Box* _item = nullptr; + std::shared_ptr _reader; + bool _recycle = false; + + Borrow() : _reader(nullptr) {} + + Borrow(ObjectCacheV2* oc, Box* item, std::shared_ptr&& reader) + : _oc(oc), + _item(item), + _reader(std::move(reader)), + _recycle(false) {} + + Borrow(Borrow&& rhs) : _reader(nullptr) { *this = std::move(rhs); } + + Borrow& operator=(Borrow&& rhs) { + std::swap(_oc, rhs._oc); + std::swap(_item, rhs._item); + std::swap(_reader, rhs._reader); + std::swap(_recycle, rhs._recycle); + return *this; + } + + ~Borrow() { + if (_recycle) { + _oc->__update(_item, nullptr); + } + } + + bool recycle() { return _recycle; } + + bool recycle(bool x) { return _recycle = x; } + + V& operator*() const { return *_reader; } + V* operator->() const { return &*_reader; } + operator bool() const { return (bool)_reader; } + }; + + template + Borrow borrow(KeyType&& key, Ctor&& ctor, uint64_t cooldown = 0UL) { + auto item = __find_or_create_item(std::forward(key)); + auto r = item->reader(); + while (!r) { + if (item->boxlock.try_lock() == 0) { + DEFER(item->boxlock.unlock()); + auto lr = item->reader(); + if (!lr) { + if (photon::sat_add(item->lastcreate, cooldown) <= + photon::now) { + __update(item, ctor()); + } + return Borrow(this, item, item->reader()); + } + } + photon::thread_yield(); + r = item->reader(); + } + return Borrow(this, item, item->reader()); + } + + template + Borrow borrow(KeyType&& key) { + return borrow(&key, [&]() { return new V(); }); + } + + template + Borrow update(KeyType&& key, Ctor&& ctor) { + auto item = __find_or_create_item(std::forward(key)); + __update(item, ctor()); + return Borrow(this, item, item->reader()); + } + + ObjectCacheV2(uint64_t lifespan) + : lifespan(lifespan), + _timer(1UL * 1000 * 1000, {this, &ObjectCacheV2::__expire}, true, + photon::DEFAULT_STACK_SIZE) { + reclaimer = photon::thread_create11([this] { + while (true) { + auto r = reclaim_queue.recv(); + if (!r) { + break; + } else { + delete r; + } + } + }); + photon::thread_enable_join(reclaimer); + } + + ~ObjectCacheV2() { + _timer.stop(); + if (reclaimer) { + reclaim_queue.template send(nullptr); + photon::thread_join((photon::join_handle*)reclaimer); + reclaimer = nullptr; + } + SCOPED_LOCK(maplock); + cycle_list.delete_all(); + } +}; \ No newline at end of file diff --git a/common/test/CMakeLists.txt b/common/test/CMakeLists.txt index a4febc05..5e7819f5 100644 --- a/common/test/CMakeLists.txt +++ b/common/test/CMakeLists.txt @@ -5,6 +5,10 @@ add_executable(test-objcache test_objcache.cpp) target_link_libraries(test-objcache PRIVATE photon_shared) add_test(NAME test-objcache COMMAND $) +add_executable(perf-objcache perf_objcache.cpp) +target_link_libraries(perf-objcache PRIVATE photon_shared) +add_test(NAME perf-objcache COMMAND $) + add_executable(test-common test.cpp) target_link_libraries(test-common PRIVATE photon_shared) add_test(NAME test-common COMMAND $) diff --git a/common/test/perf_objcache.cpp b/common/test/perf_objcache.cpp index 5832c245..257af4f1 100644 --- a/common/test/perf_objcache.cpp +++ b/common/test/perf_objcache.cpp @@ -2,16 +2,15 @@ #include #include +#include #include #include #include #include -#include #include -#include -#include "../../test/ci-tools.h" #include "../expirecontainer.h" +#include "../objectcachev2.h" constexpr size_t count = 10UL * 1024; @@ -29,11 +28,15 @@ void ready() { LOG_INFO("Random key generated"); } +template