Skip to content

Commit

Permalink
Fix and improves (#153)
Browse files Browse the repository at this point in the history
* Fix ExpireContainer<T> insert should protect by lock
* `RingChannel` now will busy yield for a while before waiting in semaphore, to improve performance
* `Executor`, `WorkPool` now use `RingChannel` for async task delivery
* Fix libaio may lost requests if `io_submit` failed
* Improve metric-meter accuracy
* http `URL` provides `host_no_port()`
* `set_photon_thread_stack_allocator` able to hook photon threads allocation, helps with GC adaption
* `client_function_test` should not initial et_poller if arch is not linux
* FIX `set_photon_thread_stack_allocator` implemention

---------

Signed-off-by: Coldwings <coldwings@me.com>
  • Loading branch information
Coldwings committed Jul 4, 2023
1 parent 7ebf6ba commit a73a22c
Show file tree
Hide file tree
Showing 19 changed files with 245 additions and 115 deletions.
87 changes: 40 additions & 47 deletions common/executor/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,26 @@ namespace photon {

class ExecutorImpl {
public:
using CBList = LockfreeMPMCRingQueue<Delegate<void>, 32UL * 1024>;
using CBList =
common::RingChannel<LockfreeMPMCRingQueue<Delegate<void>, 32UL * 1024>>;
std::unique_ptr<std::thread> th;
photon::thread *pth = nullptr;
EventLoop *loop = nullptr;
CBList queue;
photon::ThreadPoolBase *pool;
bool quiting;
photon::semaphore sem;
std::atomic_bool waiting;

ExecutorImpl() {
loop = new_event_loop({this, &ExecutorImpl::wait_for_event},
{this, &ExecutorImpl::on_event});
th.reset(new std::thread(&ExecutorImpl::do_loop, this));
quiting = false;
waiting = true;
while (!loop || loop->state() != loop->WAITING) ::sched_yield();
}

~ExecutorImpl() {
photon::thread_interrupt(pth);
th->join();
ExecutorImpl(int init_ev, int init_io) {
th.reset(
new std::thread(&ExecutorImpl::launch, this, init_ev, init_io));
}

int wait_for_event(EventLoop *) {
std::atomic_thread_fence(std::memory_order_acquire);
sem.wait(1);
waiting.store(true, std::memory_order_release);
return quiting ? -1 : 1;
ExecutorImpl() {}

~ExecutorImpl() {
queue.send({});
if (th)
th->join();
else
while (pool) photon::thread_yield();
}

struct CallArg {
Expand All @@ -60,51 +51,53 @@ class ExecutorImpl {
return nullptr;
}

int on_event(EventLoop *) {
void main_loop() {
CallArg arg;
arg.backth = photon::CURRENT;
size_t cnt = 0;
while (!queue.empty()) {
arg.task = queue.recv<PhotonPause>();
for (;;) {
arg.task = queue.recv();
if (!arg.task) {
return;
}
auto th =
pool->thread_create(&ExecutorImpl::do_event, (void *)&arg);
photon::thread_yield_to(th);
cnt++;
}
return 0;
}

void do_loop() {
photon::init(INIT_EVENT_DEFAULT, photon::INIT_IO_DEFAULT);
DEFER(photon::fini());
pth = photon::CURRENT;
LOG_INFO("worker start");
pool = photon::new_thread_pool(32);
loop->async_run();
photon::thread_usleep(-1);
LOG_INFO("worker start");
main_loop();
LOG_INFO("worker finished");
while (!queue.empty()) {
photon::thread_usleep(1000);
}
quiting = true;
sem.signal(1);
delete loop;
photon::delete_thread_pool(pool);
pool = nullptr;
}

void launch(int init_ev, int init_io) {
photon::init(init_ev, init_io);
DEFER(photon::fini());
do_loop();
}
};

ExecutorImpl *_new_executor() { return new ExecutorImpl(); }
Executor::Executor(int init_ev, int init_io)
: e(new ExecutorImpl(init_ev, init_io)) {}

Executor::Executor(create_on_current_vcpu) : e(new ExecutorImpl()) {}

void _delete_executor(ExecutorImpl *e) { delete e; }
Executor::~Executor() { delete e; }

void _issue(ExecutorImpl *e, Delegate<void> act) {
void Executor::_issue(ExecutorImpl *e, Delegate<void> act) {
e->queue.send<ThreadPause>(act);
bool cond = true;
if (e->waiting.compare_exchange_weak(cond, false,
std::memory_order_acq_rel)) {
e->sem.signal(1);
}
}

Executor *Executor::export_as_executor() {
auto ret = new Executor(create_on_current_vcpu());
auto th = photon::thread_create11(&ExecutorImpl::do_loop, ret->e);
photon::thread_yield_to(th);
return ret;
}

} // namespace photon
25 changes: 16 additions & 9 deletions common/executor/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.

#include <photon/common/callback.h>
#include <photon/common/executor/stdlock.h>
#include <photon/photon.h>

#include <atomic>
#include <type_traits>
Expand All @@ -26,14 +27,12 @@ namespace photon {

class ExecutorImpl;

ExecutorImpl *_new_executor();
void _delete_executor(ExecutorImpl *e);
void _issue(ExecutorImpl *e, Delegate<void> cb);

class Executor {
public:
ExecutorImpl *e = _new_executor();
~Executor() { _delete_executor(e); }
ExecutorImpl *e;
Executor(int init_ev = photon::INIT_EVENT_DEFAULT,
int init_io = photon::INIT_IO_DEFAULT);
~Executor();

template <
typename Context = StdContext, typename Func,
Expand Down Expand Up @@ -68,8 +67,8 @@ class Executor {
// The task object will be delete after work done
template <typename Context = StdContext, typename Func>
void async_perform(Func *task) {
void (*func)(void*);
func = [](void* task_) {
void (*func)(void *);
func = [](void *task_) {
using Task = decltype(task);
auto t = (Task)task_;
(*t)();
Expand All @@ -78,8 +77,14 @@ class Executor {
_issue(e, {func, task});
}

static Executor* export_as_executor();

protected:
static constexpr int64_t kCondWaitMaxTime = 1000L * 1000;
static constexpr int64_t kCondWaitMaxTime = 100L * 1000;

struct create_on_current_vcpu {};

Executor(create_on_current_vcpu);

template <typename Context>
struct AsyncOp {
Expand Down Expand Up @@ -107,6 +112,8 @@ class Executor {
wait_for_completion();
}
};

static void _issue(ExecutorImpl *e, Delegate<void> cb);
};

} // namespace photon
37 changes: 37 additions & 0 deletions common/executor/test/test_export_as_executor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#include <gtest/gtest.h>
#include <photon/common/executor/executor.h>
#include <photon/common/utility.h>
#include <photon/common/alog.h>
#include <photon/photon.h>
#include <photon/thread/thread.h>

#include <thread>

TEST(enter_as_executor, test) {
// set global default logger output to null
set_log_output(log_output_null);

// create own logger
ALogLogger logger;
// set log_output to stdout, log level as info
logger.log_output = log_output_stdout;
logger.log_level = ALOG_INFO;

photon::init();
DEFER(photon::fini());
// do some ready work in this photon environment
auto vcpu = photon::get_vcpu();
auto e = photon::Executor::export_as_executor();
DEFER(delete e);

std::thread([&logger, e, vcpu] {
e->perform([&logger, vcpu] {
logger << LOG_INFO("Hello from a normal thread");
auto cvcpu = photon::get_vcpu();
logger << LOG_INFO("executor at `, current on `, `", vcpu, cvcpu,
vcpu == cvcpu ? "EQ" : "NEQ");
});
}).detach();

photon::thread_usleep(1UL * 1024 * 1024);
}
3 changes: 2 additions & 1 deletion common/expirecontainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class ExpireContainerBase : public Object {
intrusive_list<Item> _list;
uint64_t _expiration;
photon::Timer _timer;
photon::spinlock _lock;
photon::spinlock _lock; // protect _list/_set operations

using ItemPtr = Item*;
struct ItemHash {
Expand Down Expand Up @@ -177,6 +177,7 @@ class ExpireContainer : public ExpireContainerBase {
template <typename... Gs>
iterator insert(const InterfaceKey& key, Gs&&... xs) {
auto item = new Item(key, std::forward<Gs>(xs)...);
SCOPED_LOCK(_lock);
auto pr = Base::insert(item);
if (!pr.second) {
delete item;
Expand Down
34 changes: 27 additions & 7 deletions common/lockfree_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,10 @@ namespace common {
* photon thread when queue is empty, and once it got object by recv, it will
* trying using `thread_yield` instead of semaphore, to get better performance
* and load balancing.
* Watch out that `recv` should run in photon environment (because it has to)
* use photon semaphore to be notified that new item has sended. `send` could
* running in photon or std::thread environment (needs to set template `Pause` as
* `ThreadPause`).
*
* @tparam QueueType shoulde be one of LockfreeMPMCRingQueue,
* LockfreeBatchMPMCRingQueue, or LockfreeSPSCRingQueue, with their own template
Expand All @@ -541,7 +545,8 @@ class RingChannel : public QueueType {
protected:
photon::semaphore queue_sem;
std::atomic<uint64_t> idler{0};
static constexpr uint64_t BUSY_YIELD_TIMEOUT = 1024;
uint64_t m_busy_yield_turn;
uint64_t m_busy_yield_timeout;

using T = decltype(std::declval<QueueType>().recv());

Expand All @@ -553,21 +558,36 @@ class RingChannel : public QueueType {
using QueueType::read_available;
using QueueType::write_available;

/**
* @brief Construct a new Ring Channel object
*
* @param busy_yield_timeout setting yield timeout, default is template
* parameter DEFAULT_BUSY_YIELD_TIMEOUT. Ring Channel will try busy yield
* in `busy_yield_timeout` usecs.
*/
RingChannel(uint64_t busy_yield_turn = 64,
uint64_t busy_yield_timeout = 1024)
: m_busy_yield_turn(busy_yield_turn),
m_busy_yield_timeout(busy_yield_timeout) {}

template <typename Pause = ThreadPause>
void send(const T& x) {
while (!push(x)) {
if (!full()) photon::thread_yield();
if (!full()) Pause::pause();
}
uint64_t idle = idler.exchange(0, std::memory_order_acq_rel);
queue_sem.signal(idle);
queue_sem.signal(idler.load(std::memory_order_acquire));
}
T recv() {
T x;
Timeout yield_timeout(BUSY_YIELD_TIMEOUT);
Timeout yield_timeout(m_busy_yield_timeout);
int yield_turn = m_busy_yield_turn;
idler.fetch_add(1, std::memory_order_acq_rel);
DEFER(idler.fetch_sub(1, std::memory_order_acq_rel));
while (!pop(x)) {
if (photon::now < yield_timeout.expire()) {
if (yield_turn > 0 && photon::now < yield_timeout.expire()) {
yield_turn--;
photon::thread_yield();
} else {
idler.fetch_add(1, std::memory_order_acq_rel);
queue_sem.wait(1);
}
}
Expand Down
10 changes: 5 additions & 5 deletions common/metric-meter/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ class AverageCounter {
if (now - time > m_interval * 2) {
reset();
} else if (now - time > m_interval) {
sum = photon::sat_sub(sum, sum * (now - time) / m_interval);
cnt = photon::sat_sub(cnt, cnt * (now - time) / m_interval);
time = now;
sum = photon::sat_sub(sum, sum * (now - time - m_interval) / m_interval);
cnt = photon::sat_sub(cnt, cnt * (now - time - m_interval) / m_interval);
time = now - m_interval;
}
}
void put(int64_t val) {
Expand Down Expand Up @@ -95,8 +95,8 @@ class QPSCounter {
reset();
} else if (now - time > m_interval) {
counter =
photon::sat_sub(counter, counter * (now - time) / m_interval);
time = now;
photon::sat_sub(counter, counter * (now - time - m_interval) / m_interval);
time = now - m_interval;
}
}
void put(int64_t val = 1) {
Expand Down
Loading

0 comments on commit a73a22c

Please sign in to comment.