forked from alibaba/PhotonLibOS
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
400 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,255 @@ | ||
#pragma once | ||
|
||
#include <photon/common/alog.h> | ||
#include <photon/common/lockfree_queue.h> | ||
#include <photon/photon.h> | ||
#include <photon/thread/list.h> | ||
#include <photon/thread/thread11.h> | ||
#include <photon/thread/timer.h> | ||
#include <photon/thread/workerpool.h> | ||
|
||
#include <memory> | ||
#include <unordered_set> | ||
|
||
/** | ||
A simplify object cache implementation. | ||
`ObjectCacheV2` shows better performance compare to `ObjectCache`, with several | ||
improvements: | ||
1. Less lock in both read and write. | ||
2. Object destruction always goes in background photon thread called reclaimer. | ||
3. Self adjustable timeout for reclaimer. No needs to set cycler timer. | ||
4. New `update` API, able to immediatly substitude objects. | ||
5. `ObjectCacheV2` no longer support acquire/release API. | ||
It should work as `ObjectCache` in code do not depends on acquire/release API. | ||
**/ | ||
|
||
template <typename K, typename VPtr> | ||
class ObjectCacheV2 { | ||
protected: | ||
using V = std::remove_pointer_t<VPtr>; | ||
|
||
struct Box : public intrusive_list_node<Box> { | ||
const K key; | ||
std::shared_ptr<V> ref; | ||
photon::spinlock boxlock; | ||
photon::spinlock reflock; | ||
uint64_t lastcreate = 0; | ||
uint64_t timestamp = 0; | ||
|
||
struct Updater { | ||
Box* box = nullptr; | ||
|
||
~Updater() {} | ||
|
||
// Returned RefPtr is the old one | ||
// other reader will get new one after updated | ||
std::shared_ptr<V> update(V* val, uint64_t ts = 0) { | ||
while (box->reflock.try_lock() != 0) { | ||
photon::thread_yield(); | ||
} | ||
DEFER(box->reflock.unlock()); | ||
auto r = std::shared_ptr<V>(val); | ||
if (ts) box->lastcreate = ts; | ||
std::swap(r, box->ref); | ||
return r; | ||
} | ||
|
||
explicit Updater(Box* b) : box(b) {} | ||
|
||
Updater(const Updater&) = delete; | ||
Updater(Updater&& rhs) : box(nullptr) { *this = std::move(rhs); } | ||
Updater& operator=(const Updater&) = delete; | ||
Updater& operator=(Updater&& rhs) { | ||
std::swap(box, rhs.box); | ||
return *this; | ||
} | ||
operator bool() const { return box != nullptr; } | ||
}; | ||
|
||
Box() = default; | ||
template <typename KeyType> | ||
explicit Box(KeyType&& key) | ||
: key(std::forward<KeyType>(key)), ref(nullptr) {} | ||
|
||
~Box() {} | ||
|
||
Updater writer() { return Updater(this); } | ||
std::shared_ptr<V> reader() { | ||
SCOPED_LOCK(reflock); | ||
return ref; | ||
} | ||
}; | ||
struct ItemHash { | ||
size_t operator()(const Box* x) const { return std::hash<K>()(x->key); } | ||
}; | ||
struct ItemEqual { | ||
bool operator()(const Box* a, const Box* b) const { | ||
return a->key == b->key; | ||
} | ||
}; | ||
|
||
photon::thread* reclaimer = nullptr; | ||
photon::common::RingChannel< | ||
LockfreeMPMCRingQueue<std::shared_ptr<V>*, 4096>> | ||
reclaim_queue; | ||
photon::spinlock maplock; | ||
std::unordered_set<Box*, ItemHash, ItemEqual> map; | ||
intrusive_list<Box> cycle_list; | ||
uint64_t lifespan; | ||
photon::Timer _timer; | ||
|
||
void __update(Box* item, V* val) { | ||
reclaim_queue.template send<PhotonPause>( | ||
new std::shared_ptr<V>(item->writer().update(val, photon::now))); | ||
} | ||
|
||
template <typename KeyType> | ||
Box* __find_or_create_item(const KeyType& key) { | ||
Box keyitem(key); | ||
auto pkey = &keyitem; | ||
Box* item = nullptr; | ||
SCOPED_LOCK(maplock); | ||
auto it = map.find(pkey); | ||
if (it == map.end()) { | ||
item = new Box(key); | ||
map.emplace(item); | ||
} else | ||
item = *it; | ||
assert(item); | ||
item->timestamp = photon::now; | ||
cycle_list.pop(item); | ||
cycle_list.push_back(item); | ||
return item; | ||
} | ||
|
||
uint64_t __expire() { | ||
intrusive_list<Box> delete_list; | ||
uint64_t now = photon::now; | ||
{ | ||
SCOPED_LOCK(maplock); | ||
if (cycle_list.empty()) return 0; | ||
if (photon::sat_add(cycle_list.front()->timestamp, lifespan) > | ||
now) { | ||
return photon::sat_add(cycle_list.front()->timestamp, | ||
lifespan) - | ||
now; | ||
} | ||
auto x = cycle_list.front(); | ||
while (x && (photon::sat_add(x->timestamp, lifespan) < now)) { | ||
cycle_list.pop(x); | ||
__update(x, nullptr); | ||
map.erase(x); | ||
delete_list.push_back(x); | ||
x = cycle_list.front(); | ||
} | ||
} | ||
delete_list.delete_all(); | ||
return 0; | ||
} | ||
|
||
public: | ||
struct Borrow { | ||
ObjectCacheV2* _oc = nullptr; | ||
Box* _item = nullptr; | ||
std::shared_ptr<V> _reader; | ||
bool _recycle = false; | ||
|
||
Borrow() : _reader(nullptr) {} | ||
|
||
Borrow(ObjectCacheV2* oc, Box* item, std::shared_ptr<V>&& reader) | ||
: _oc(oc), | ||
_item(item), | ||
_reader(std::move(reader)), | ||
_recycle(false) {} | ||
|
||
Borrow(Borrow&& rhs) : _reader(nullptr) { *this = std::move(rhs); } | ||
|
||
Borrow& operator=(Borrow&& rhs) { | ||
std::swap(_oc, rhs._oc); | ||
std::swap(_item, rhs._item); | ||
std::swap(_reader, rhs._reader); | ||
std::swap(_recycle, rhs._recycle); | ||
return *this; | ||
} | ||
|
||
~Borrow() { | ||
if (_recycle) { | ||
_oc->__update(_item, nullptr); | ||
} | ||
} | ||
|
||
bool recycle() { return _recycle; } | ||
|
||
bool recycle(bool x) { return _recycle = x; } | ||
|
||
V& operator*() const { return *_reader; } | ||
V* operator->() const { return &*_reader; } | ||
operator bool() const { return (bool)_reader; } | ||
}; | ||
|
||
template <typename KeyType, typename Ctor> | ||
Borrow borrow(KeyType&& key, Ctor&& ctor, uint64_t cooldown = 0UL) { | ||
auto item = __find_or_create_item(std::forward<KeyType>(key)); | ||
auto r = item->reader(); | ||
while (!r) { | ||
if (item->boxlock.try_lock() == 0) { | ||
DEFER(item->boxlock.unlock()); | ||
auto lr = item->reader(); | ||
if (!lr) { | ||
if (photon::sat_add(item->lastcreate, cooldown) <= | ||
photon::now) { | ||
__update(item, ctor()); | ||
} | ||
return Borrow(this, item, item->reader()); | ||
} | ||
} | ||
photon::thread_yield(); | ||
r = item->reader(); | ||
} | ||
return Borrow(this, item, item->reader()); | ||
} | ||
|
||
template <typename KeyType> | ||
Borrow borrow(KeyType&& key) { | ||
return borrow(&key, [&]() { return new V(); }); | ||
} | ||
|
||
template <typename KeyType, typename Ctor> | ||
Borrow update(KeyType&& key, Ctor&& ctor) { | ||
auto item = __find_or_create_item(std::forward<KeyType>(key)); | ||
__update(item, ctor()); | ||
return Borrow(this, item, item->reader()); | ||
} | ||
|
||
ObjectCacheV2(uint64_t lifespan) | ||
: lifespan(lifespan), | ||
_timer(1UL * 1000 * 1000, {this, &ObjectCacheV2::__expire}, true, | ||
photon::DEFAULT_STACK_SIZE) { | ||
reclaimer = photon::thread_create11([this] { | ||
while (true) { | ||
auto r = reclaim_queue.recv(); | ||
if (!r) { | ||
break; | ||
} else { | ||
delete r; | ||
} | ||
} | ||
}); | ||
photon::thread_enable_join(reclaimer); | ||
} | ||
|
||
~ObjectCacheV2() { | ||
_timer.stop(); | ||
if (reclaimer) { | ||
reclaim_queue.template send<PhotonPause>(nullptr); | ||
photon::thread_join((photon::join_handle*)reclaimer); | ||
reclaimer = nullptr; | ||
} | ||
SCOPED_LOCK(maplock); | ||
cycle_list.delete_all(); | ||
} | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.