Skip to content

Commit

Permalink
Makes queue transport sharedptr directly
Browse files Browse the repository at this point in the history
Signed-off-by: Coldwings <coldwings@me.com>
  • Loading branch information
Coldwings committed Jul 16, 2024
1 parent b07d10a commit a97bd66
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 71 deletions.
48 changes: 28 additions & 20 deletions common/lockfree_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,23 @@ struct PhotonPause : PauseBase {
}
};

template <typename T>
struct is_shared_ptr : std::false_type {};
template <typename T>
struct is_shared_ptr<std::shared_ptr<T>> : std::true_type {};

template <typename T, size_t N>
class LockfreeRingQueueBase {
public:
#if __cplusplus < 201402L
static_assert(std::has_trivial_copy_constructor<T>::value &&
std::has_trivial_copy_assign<T>::value,
static_assert((std::has_trivial_copy_constructor<T>::value &&
std::has_trivial_copy_assign<T>::value) ||
is_shared_ptr<T>::value,
"T should be trivially copyable");
#else
static_assert(std::is_trivially_copy_constructible<T>::value &&
std::is_trivially_copy_assignable<T>::value,
static_assert((std::is_trivially_copy_constructible<T>::value &&
std::is_trivially_copy_assignable<T>::value) ||
is_shared_ptr<T>::value,
"T should be trivially copyable");
#endif

Expand Down Expand Up @@ -286,15 +293,15 @@ class LockfreeBatchMPMCRingQueue : public LockfreeRingQueueBase<T, N> {
using Base::empty;
using Base::full;

size_t push_batch(const T *x, size_t n) {
size_t push_batch(const T* x, size_t n) {
size_t rh, wt;
wt = tail.load(std::memory_order_relaxed);
for (;;) {
rh = head.load(std::memory_order_acquire);
auto wn = std::min(n, Base::capacity - (wt - rh));
if (wn == 0)
return 0;
if (!tail.compare_exchange_strong(wt, wt + wn, std::memory_order_acq_rel))
if (wn == 0) return 0;
if (!tail.compare_exchange_strong(wt, wt + wn,
std::memory_order_acq_rel))
continue;
auto first_idx = idx(wt);
auto part_length = Base::capacity - first_idx;
Expand All @@ -303,28 +310,28 @@ class LockfreeBatchMPMCRingQueue : public LockfreeRingQueueBase<T, N> {
} else {
if (likely(part_length))
memcpy(&slots[first_idx], x, sizeof(T) * (part_length));
memcpy(&slots[0], x + part_length, sizeof(T) * (wn - part_length));
memcpy(&slots[0], x + part_length,
sizeof(T) * (wn - part_length));
}
auto wh = wt;
while (!write_head.compare_exchange_strong(wh, wt + wn, std::memory_order_acq_rel))
while (!write_head.compare_exchange_strong(
wh, wt + wn, std::memory_order_acq_rel))
wh = wt;
return wn;
}
}

bool push(const T &x) {
return push_batch(&x, 1) == 1;
}
bool push(const T& x) { return push_batch(&x, 1) == 1; }

size_t pop_batch(T *x, size_t n) {
size_t pop_batch(T* x, size_t n) {
size_t rt, wh;
rt = read_tail.load(std::memory_order_relaxed);
for (;;) {
wh = write_head.load(std::memory_order_acquire);
auto rn = std::min(n, wh - rt);
if (rn == 0)
return 0;
if (!read_tail.compare_exchange_strong(rt, rt + rn, std::memory_order_acq_rel))
if (rn == 0) return 0;
if (!read_tail.compare_exchange_strong(rt, rt + rn,
std::memory_order_acq_rel))
continue;
auto first_idx = idx(rt);
auto part_length = Base::capacity - first_idx;
Expand All @@ -333,10 +340,12 @@ class LockfreeBatchMPMCRingQueue : public LockfreeRingQueueBase<T, N> {
} else {
if (likely(part_length))
memcpy(x, &slots[first_idx], sizeof(T) * (part_length));
memcpy(x + part_length, &slots[0], sizeof(T) * (rn - part_length));
memcpy(x + part_length, &slots[0],
sizeof(T) * (rn - part_length));
}
auto rh = rt;
while (!head.compare_exchange_strong(rh, rt + rn, std::memory_order_acq_rel))
while (!head.compare_exchange_strong(rh, rt + rn,
std::memory_order_acq_rel))
rh = rt;
return rn;
}
Expand Down Expand Up @@ -585,4 +594,3 @@ class RingChannel : public QueueType {
} // namespace photon

#undef size_t

103 changes: 52 additions & 51 deletions common/objectcachev2.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,18 @@ class ObjectCacheV2 {

// Returned RefPtr is the old one
// other reader will get new one after updated
std::shared_ptr<V> update(V* val, uint64_t ts = 0,
std::shared_ptr<V>* newptr = nullptr) {
auto r = std::shared_ptr<V>(val);
if (newptr)
std::atomic_store(newptr, r);
*newptr = r;

std::shared_ptr<V> update(std::shared_ptr<V>& r, uint64_t ts = 0) {
box->lastcreate = ts;
r = std::atomic_exchange(&box->ref, r);
return r;
}
std::shared_ptr<V> update(V* val, uint64_t ts = 0,
std::shared_ptr<V>* newptr = nullptr) {
auto r = std::shared_ptr<V>(val);
if (newptr) *newptr = r;
return update(r, ts);
}

explicit Updater(Box* b) : box(b) {}

Expand All @@ -78,53 +80,56 @@ class ObjectCacheV2 {
~Box() {}

Updater writer() { return Updater(this); }
std::shared_ptr<V> reader() {
return std::atomic_load(&ref);
}
std::shared_ptr<V> reader() { return std::atomic_load(&ref); }
};
struct ItemHash {
size_t operator()(const Box* x) const { return std::hash<K>()(x->key); }
size_t operator()(const Box& x) const { return std::hash<K>()(x.key); }
};
struct ItemEqual {
bool operator()(const Box* a, const Box* b) const {
return a->key == b->key;
bool operator()(const Box& a, const Box& b) const {
return a.key == b.key;
}
};

photon::thread* reclaimer = nullptr;
photon::common::RingChannel<
LockfreeMPMCRingQueue<std::shared_ptr<V>*, 4096>>
photon::common::RingChannel<LockfreeMPMCRingQueue<std::shared_ptr<V>, 4096>>
reclaim_queue;
photon::spinlock maplock;
std::unordered_set<Box*, ItemHash, ItemEqual> map;
std::unordered_set<Box, ItemHash, ItemEqual> map;
intrusive_list<Box> cycle_list;
uint64_t lifespan;
photon::Timer _timer;
bool _exit = false;

std::shared_ptr<V> __update(Box& item, std::shared_ptr<V> val) {
auto ret_val = val;
auto old_val = item.writer().update(val, photon::now);
reclaim_queue.template send<PhotonPause>(old_val);
return ret_val;
}

std::shared_ptr<V> __update(Box* item, V* val) {
std::shared_ptr<V> ret;
reclaim_queue.template send<PhotonPause>(new std::shared_ptr<V>(
item->writer().update(val, photon::now, &ret)));
return ret;
std::shared_ptr<V> __update(Box& item, V* val) {
std::shared_ptr<V> ptr(val);
return __update(item, ptr);
}

template <typename KeyType>
Box* __find_or_create_item(KeyType&& key) {
Box& __find_or_create_item(KeyType&& key) {
Box keyitem(key);
auto pkey = &keyitem;
Box* item = nullptr;
SCOPED_LOCK(maplock);
auto it = map.find(pkey);
auto it = map.find(keyitem);
if (it == map.end()) {
item = new Box(std::forward<KeyType>(key));
map.emplace(item);
// item = new Box(std::forward<KeyType>(key));
auto rt = map.emplace(key);
if (rt.second) item = (Box*)&*rt.first;
} else
item = *it;
item = (Box*)&*it;
assert(item);
item->timestamp = photon::now;
cycle_list.pop(item);
cycle_list.push_back(item);
return item;
return *item;
}

uint64_t __expire() {
Expand All @@ -142,13 +147,11 @@ class ObjectCacheV2 {
auto x = cycle_list.front();
while (x && (photon::sat_add(x->timestamp, lifespan) < now)) {
cycle_list.pop(x);
__update(x, nullptr);
map.erase(x);
delete_list.push_back(x);
__update(*x, nullptr);
map.erase(*x);
x = cycle_list.front();
}
}
delete_list.delete_all();
return 0;
}

Expand Down Expand Up @@ -180,7 +183,7 @@ class ObjectCacheV2 {

~Borrow() {
if (_recycle) {
_oc->__update(_item, nullptr);
_oc->__update(*_item, nullptr);
}
}

Expand All @@ -195,29 +198,30 @@ class ObjectCacheV2 {

template <typename KeyType, typename Ctor>
Borrow borrow(KeyType&& key, Ctor&& ctor, uint64_t cooldown = 0UL) {
auto item = __find_or_create_item(std::forward<KeyType>(key));
auto r = item->reader();
auto& item = __find_or_create_item(std::forward<KeyType>(key));
auto r = item.reader();
while (!r) {
if (item->boxlock.try_lock() == 0) {
DEFER(item->boxlock.unlock());
r = item->reader();
if (item.boxlock.try_lock() == 0) {
DEFER(item.boxlock.unlock());
r = item.reader();
if (!r) {
if (photon::sat_add(item->lastcreate, cooldown) <=
if (photon::sat_add(item.lastcreate, cooldown) <=
photon::now) {
r = __update(item, ctor());
}
return Borrow(this, item, std::move(r));
return Borrow(this, &item, std::move(r));
}
}
photon::thread_yield();
r = item->reader();
r = item.reader();
}
return Borrow(this, item, std::move(r));
return Borrow(this, &item, std::move(r));
}

template <typename KeyType>
Borrow borrow(KeyType&& key) {
return borrow(std::forward<KeyType>(key), [&]() { return new V(); });
return borrow(std::forward<KeyType>(key),
[&]() { return std::make_shared<V>(); });
}

template <typename KeyType, typename Ctor>
Expand All @@ -230,15 +234,11 @@ class ObjectCacheV2 {
ObjectCacheV2(uint64_t lifespan)
: lifespan(lifespan),
_timer(1UL * 1000 * 1000, {this, &ObjectCacheV2::__expire}, true,
photon::DEFAULT_STACK_SIZE) {
photon::DEFAULT_STACK_SIZE),
_exit(false) {
reclaimer = photon::thread_create11([this] {
while (true) {
auto r = reclaim_queue.recv();
if (!r) {
break;
} else {
delete r;
}
while (!_exit) {
reclaim_queue.recv();
}
});
photon::thread_enable_join(reclaimer);
Expand All @@ -247,11 +247,12 @@ class ObjectCacheV2 {
~ObjectCacheV2() {
_timer.stop();
if (reclaimer) {
_exit = true;
reclaim_queue.template send<PhotonPause>(nullptr);
photon::thread_join((photon::join_handle*)reclaimer);
reclaimer = nullptr;
}
SCOPED_LOCK(maplock);
cycle_list.delete_all();
cycle_list.node = nullptr;
}
};

0 comments on commit a97bd66

Please sign in to comment.