Skip to content

Commit

Permalink
objcache2
Browse files Browse the repository at this point in the history
Signed-off-by: Coldwings <coldwings@me.com>
  • Loading branch information
Coldwings committed Jul 9, 2024
1 parent 7c59780 commit cb68d2e
Show file tree
Hide file tree
Showing 5 changed files with 400 additions and 20 deletions.
255 changes: 255 additions & 0 deletions common/objectcachev2.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
#pragma once

#include <photon/common/alog.h>
#include <photon/common/lockfree_queue.h>
#include <photon/photon.h>
#include <photon/thread/list.h>
#include <photon/thread/thread11.h>
#include <photon/thread/timer.h>
#include <photon/thread/workerpool.h>

#include <memory>
#include <unordered_set>

/**
A simplify object cache implementation.
`ObjectCacheV2` shows better performance compare to `ObjectCache`, with several
improvements:
1. Less lock in both read and write.
2. Object destruction always goes in background photon thread called reclaimer.
3. Self adjustable timeout for reclaimer. No needs to set cycler timer.
4. New `update` API, able to immediatly substitude objects.
5. `ObjectCacheV2` no longer support acquire/release API.
It should work as `ObjectCache` in code do not depends on acquire/release API.
**/

template <typename K, typename VPtr>
class ObjectCacheV2 {
protected:
using V = std::remove_pointer_t<VPtr>;

struct Box : public intrusive_list_node<Box> {
const K key;
std::shared_ptr<V> ref;
photon::spinlock boxlock;
photon::spinlock reflock;
uint64_t lastcreate = 0;
uint64_t timestamp = 0;

struct Updater {
Box* box = nullptr;

~Updater() {}

// Returned RefPtr is the old one
// other reader will get new one after updated
std::shared_ptr<V> update(V* val, uint64_t ts = 0) {
while (box->reflock.try_lock() != 0) {
photon::thread_yield();
}
DEFER(box->reflock.unlock());
auto r = std::shared_ptr<V>(val);
if (ts) box->lastcreate = ts;
std::swap(r, box->ref);
return r;
}

explicit Updater(Box* b) : box(b) {}

Updater(const Updater&) = delete;
Updater(Updater&& rhs) : box(nullptr) { *this = std::move(rhs); }
Updater& operator=(const Updater&) = delete;
Updater& operator=(Updater&& rhs) {
std::swap(box, rhs.box);
return *this;
}
operator bool() const { return box != nullptr; }
};

Box() = default;
template <typename KeyType>
explicit Box(KeyType&& key)
: key(std::forward<KeyType>(key)), ref(nullptr) {}

~Box() {}

Updater writer() { return Updater(this); }
std::shared_ptr<V> reader() {
SCOPED_LOCK(reflock);
return ref;
}
};
struct ItemHash {
size_t operator()(const Box* x) const { return std::hash<K>()(x->key); }
};
struct ItemEqual {
bool operator()(const Box* a, const Box* b) const {
return a->key == b->key;
}
};

photon::thread* reclaimer = nullptr;
photon::common::RingChannel<
LockfreeMPMCRingQueue<std::shared_ptr<V>*, 4096>>
reclaim_queue;
photon::spinlock maplock;
std::unordered_set<Box*, ItemHash, ItemEqual> map;
intrusive_list<Box> cycle_list;
uint64_t lifespan;
photon::Timer _timer;

void __update(Box* item, V* val) {
reclaim_queue.template send<PhotonPause>(
new std::shared_ptr<V>(item->writer().update(val, photon::now)));
}

template <typename KeyType>
Box* __find_or_create_item(const KeyType& key) {
Box keyitem(key);
auto pkey = &keyitem;
Box* item = nullptr;
SCOPED_LOCK(maplock);
auto it = map.find(pkey);
if (it == map.end()) {
item = new Box(key);
map.emplace(item);
} else
item = *it;
assert(item);
item->timestamp = photon::now;
cycle_list.pop(item);
cycle_list.push_back(item);
return item;
}

uint64_t __expire() {
intrusive_list<Box> delete_list;
uint64_t now = photon::now;
{
SCOPED_LOCK(maplock);
if (cycle_list.empty()) return 0;
if (photon::sat_add(cycle_list.front()->timestamp, lifespan) >
now) {
return photon::sat_add(cycle_list.front()->timestamp,
lifespan) -
now;
}
auto x = cycle_list.front();
while (x && (photon::sat_add(x->timestamp, lifespan) < now)) {
cycle_list.pop(x);
__update(x, nullptr);
map.erase(x);
delete_list.push_back(x);
x = cycle_list.front();
}
}
delete_list.delete_all();
return 0;
}

public:
struct Borrow {
ObjectCacheV2* _oc = nullptr;
Box* _item = nullptr;
std::shared_ptr<V> _reader;
bool _recycle = false;

Borrow() : _reader(nullptr) {}

Borrow(ObjectCacheV2* oc, Box* item, std::shared_ptr<V>&& reader)
: _oc(oc),
_item(item),
_reader(std::move(reader)),
_recycle(false) {}

Borrow(Borrow&& rhs) : _reader(nullptr) { *this = std::move(rhs); }

Borrow& operator=(Borrow&& rhs) {
std::swap(_oc, rhs._oc);
std::swap(_item, rhs._item);
std::swap(_reader, rhs._reader);
std::swap(_recycle, rhs._recycle);
return *this;
}

~Borrow() {
if (_recycle) {
_oc->__update(_item, nullptr);
}
}

bool recycle() { return _recycle; }

bool recycle(bool x) { return _recycle = x; }

V& operator*() const { return *_reader; }
V* operator->() const { return &*_reader; }
operator bool() const { return (bool)_reader; }
};

template <typename KeyType, typename Ctor>
Borrow borrow(KeyType&& key, Ctor&& ctor, uint64_t cooldown = 0UL) {
auto item = __find_or_create_item(std::forward<KeyType>(key));
auto r = item->reader();
while (!r) {
if (item->boxlock.try_lock() == 0) {
DEFER(item->boxlock.unlock());
auto lr = item->reader();
if (!lr) {
if (photon::sat_add(item->lastcreate, cooldown) <=
photon::now) {
__update(item, ctor());
}
return Borrow(this, item, item->reader());
}
}
photon::thread_yield();
r = item->reader();
}
return Borrow(this, item, item->reader());
}

template <typename KeyType>
Borrow borrow(KeyType&& key) {
return borrow(&key, [&]() { return new V(); });
}

template <typename KeyType, typename Ctor>
Borrow update(KeyType&& key, Ctor&& ctor) {
auto item = __find_or_create_item(std::forward<KeyType>(key));
__update(item, ctor());
return Borrow(this, item, item->reader());
}

ObjectCacheV2(uint64_t lifespan)
: lifespan(lifespan),
_timer(1UL * 1000 * 1000, {this, &ObjectCacheV2::__expire}, true,
photon::DEFAULT_STACK_SIZE) {
reclaimer = photon::thread_create11([this] {
while (true) {
auto r = reclaim_queue.recv();
if (!r) {
break;
} else {
delete r;
}
}
});
photon::thread_enable_join(reclaimer);
}

~ObjectCacheV2() {
_timer.stop();
if (reclaimer) {
reclaim_queue.template send<PhotonPause>(nullptr);
photon::thread_join((photon::join_handle*)reclaimer);
reclaimer = nullptr;
}
SCOPED_LOCK(maplock);
cycle_list.delete_all();
}
};
4 changes: 4 additions & 0 deletions common/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ add_executable(test-objcache test_objcache.cpp)
target_link_libraries(test-objcache PRIVATE photon_shared)
add_test(NAME test-objcache COMMAND $<TARGET_FILE:test-objcache>)

add_executable(perf-objcache perf_objcache.cpp)
target_link_libraries(perf-objcache PRIVATE photon_shared)
add_test(NAME perf-objcache COMMAND $<TARGET_FILE:test-objcache>)

add_executable(test-common test.cpp)
target_link_libraries(test-common PRIVATE photon_shared)
add_test(NAME test-common COMMAND $<TARGET_FILE:test-common>)
Expand Down
50 changes: 30 additions & 20 deletions common/test/perf_objcache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@
#include <photon/photon.h>
#include <photon/thread/thread.h>

#include <algorithm>
#include <array>
#include <chrono>
#include <random>
#include <thread>
#include <unordered_map>
#include <vector>
#include <algorithm>
#include "../../test/ci-tools.h"

#include "../expirecontainer.h"
#include "../objectcachev2.h"

constexpr size_t count = 10UL * 1024;

Expand All @@ -29,11 +28,13 @@ void ready() {
LOG_INFO("Random key generated");
}

template <template <class, class> class OC>
void *task(void *arg) {
auto oc = (ObjectCache<std::string, std::string *> *)arg;
auto oc = (OC<std::string, std::string *> *)arg;
std::array<uint64_t, count> k = keys;
std::random_shuffle(k.begin(), k.end());

auto start = std::chrono::steady_clock::now();
for (const auto &x : k) {
auto strx = std::to_string(x);
auto b = oc->borrow(strx, [&strx] {
Expand All @@ -42,38 +43,47 @@ void *task(void *arg) {
return new std::string(strx);
});
}
auto done = std::chrono::steady_clock::now();
LOG_INFO("spent ` ms",
std::chrono::duration_cast<std::chrono::milliseconds>(done - start)
.count());
return nullptr;
}

void test_objcache(ObjectCache<std::string, std::string *> &oc) {
template <template <class, class> class OC>
void test_objcache(OC<std::string, std::string *> &oc, const char *name) {
std::vector<photon::join_handle *> jhs;
LOG_INFO("Query to ObjectCache");
auto start = std::chrono::steady_clock::now();
LOG_INFO("Query to ", name);
for (int i = 0; i < 4; i++)
jhs.emplace_back(
photon::thread_enable_join(photon::thread_create(task, &oc)));
photon::thread_enable_join(photon::thread_create(task<OC>, &oc)));
for (auto &x : jhs) photon::thread_join(x);
auto done = std::chrono::steady_clock::now();
LOG_INFO("spent ` ms",
std::chrono::duration_cast<std::chrono::milliseconds>(done - start)
.count());
}

int main() {
photon::init(0, 0);
DEFER(photon::fini());
ObjectCache<std::string, std::string *> oc(-1UL);
ready();
template <template <class, class> class OC>
void test(const char *name) {
OC<std::string, std::string *> oc(-1UL);
std::vector<std::thread> ths;
photon::semaphore sem(0);
for (int i = 0; i < 10; i++) {
ths.emplace_back([&] {
photon::init(0, 0);
DEFER(photon::fini());
test_objcache(oc);
photon::vcpu_init();
DEFER(photon::vcpu_fini());
test_objcache<OC>(oc, name);
sem.signal(1);
});
}
sem.wait(10);
for (auto &x : ths) {
x.join();
}
}

int main() {
photon::vcpu_init();
DEFER(photon::vcpu_fini());
ready();
test<ObjectCache>("ObjectCache");
test<ObjectCacheV2>("ObjectCacheV2");
return 0;
}
Loading

0 comments on commit cb68d2e

Please sign in to comment.