Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix and improves #153

Merged
merged 9 commits into from
Jul 4, 2023
87 changes: 40 additions & 47 deletions common/executor/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,26 @@ namespace photon {

class ExecutorImpl {
public:
using CBList = LockfreeMPMCRingQueue<Delegate<void>, 32UL * 1024>;
using CBList =
common::RingChannel<LockfreeMPMCRingQueue<Delegate<void>, 32UL * 1024>>;
std::unique_ptr<std::thread> th;
photon::thread *pth = nullptr;
EventLoop *loop = nullptr;
CBList queue;
photon::ThreadPoolBase *pool;
bool quiting;
photon::semaphore sem;
std::atomic_bool waiting;

ExecutorImpl() {
loop = new_event_loop({this, &ExecutorImpl::wait_for_event},
{this, &ExecutorImpl::on_event});
th.reset(new std::thread(&ExecutorImpl::do_loop, this));
quiting = false;
waiting = true;
while (!loop || loop->state() != loop->WAITING) ::sched_yield();
}

~ExecutorImpl() {
photon::thread_interrupt(pth);
th->join();
ExecutorImpl(int init_ev, int init_io) {
th.reset(
new std::thread(&ExecutorImpl::launch, this, init_ev, init_io));
}

int wait_for_event(EventLoop *) {
std::atomic_thread_fence(std::memory_order_acquire);
sem.wait(1);
waiting.store(true, std::memory_order_release);
return quiting ? -1 : 1;
ExecutorImpl() {}

~ExecutorImpl() {
queue.send({});
if (th)
th->join();
else
while (pool) photon::thread_yield();
}

struct CallArg {
Expand All @@ -60,51 +51,53 @@ class ExecutorImpl {
return nullptr;
}

int on_event(EventLoop *) {
void main_loop() {
CallArg arg;
arg.backth = photon::CURRENT;
size_t cnt = 0;
while (!queue.empty()) {
arg.task = queue.recv<PhotonPause>();
for (;;) {
arg.task = queue.recv();
if (!arg.task) {
return;
}
auto th =
pool->thread_create(&ExecutorImpl::do_event, (void *)&arg);
photon::thread_yield_to(th);
cnt++;
}
return 0;
}

void do_loop() {
photon::init(INIT_EVENT_DEFAULT, photon::INIT_IO_DEFAULT);
DEFER(photon::fini());
pth = photon::CURRENT;
LOG_INFO("worker start");
pool = photon::new_thread_pool(32);
loop->async_run();
photon::thread_usleep(-1);
LOG_INFO("worker start");
main_loop();
LOG_INFO("worker finished");
while (!queue.empty()) {
photon::thread_usleep(1000);
}
quiting = true;
sem.signal(1);
delete loop;
photon::delete_thread_pool(pool);
pool = nullptr;
}

void launch(int init_ev, int init_io) {
photon::init(init_ev, init_io);
DEFER(photon::fini());
do_loop();
}
};

ExecutorImpl *_new_executor() { return new ExecutorImpl(); }
Executor::Executor(int init_ev, int init_io)
: e(new ExecutorImpl(init_ev, init_io)) {}

Executor::Executor(create_on_current_vcpu) : e(new ExecutorImpl()) {}

void _delete_executor(ExecutorImpl *e) { delete e; }
Executor::~Executor() { delete e; }

void _issue(ExecutorImpl *e, Delegate<void> act) {
void Executor::_issue(ExecutorImpl *e, Delegate<void> act) {
e->queue.send<ThreadPause>(act);
bool cond = true;
if (e->waiting.compare_exchange_weak(cond, false,
std::memory_order_acq_rel)) {
e->sem.signal(1);
}
}

Executor *Executor::export_as_executor() {
auto ret = new Executor(create_on_current_vcpu());
auto th = photon::thread_create11(&ExecutorImpl::do_loop, ret->e);
photon::thread_yield_to(th);
return ret;
}

} // namespace photon
25 changes: 16 additions & 9 deletions common/executor/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.

#include <photon/common/callback.h>
#include <photon/common/executor/stdlock.h>
#include <photon/photon.h>

#include <atomic>
#include <type_traits>
Expand All @@ -26,14 +27,12 @@ namespace photon {

class ExecutorImpl;

ExecutorImpl *_new_executor();
void _delete_executor(ExecutorImpl *e);
void _issue(ExecutorImpl *e, Delegate<void> cb);

class Executor {
public:
ExecutorImpl *e = _new_executor();
~Executor() { _delete_executor(e); }
ExecutorImpl *e;
Executor(int init_ev = photon::INIT_EVENT_DEFAULT,
int init_io = photon::INIT_IO_DEFAULT);
~Executor();

template <
typename Context = StdContext, typename Func,
Expand Down Expand Up @@ -68,8 +67,8 @@ class Executor {
// The task object will be delete after work done
template <typename Context = StdContext, typename Func>
void async_perform(Func *task) {
void (*func)(void*);
func = [](void* task_) {
void (*func)(void *);
func = [](void *task_) {
using Task = decltype(task);
auto t = (Task)task_;
(*t)();
Expand All @@ -78,8 +77,14 @@ class Executor {
_issue(e, {func, task});
}

static Executor* export_as_executor();

protected:
static constexpr int64_t kCondWaitMaxTime = 1000L * 1000;
static constexpr int64_t kCondWaitMaxTime = 100L * 1000;

struct create_on_current_vcpu {};

Executor(create_on_current_vcpu);

template <typename Context>
struct AsyncOp {
Expand Down Expand Up @@ -107,6 +112,8 @@ class Executor {
wait_for_completion();
}
};

static void _issue(ExecutorImpl *e, Delegate<void> cb);
};

} // namespace photon
37 changes: 37 additions & 0 deletions common/executor/test/test_export_as_executor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#include <gtest/gtest.h>
#include <photon/common/executor/executor.h>
#include <photon/common/utility.h>
#include <photon/common/alog.h>
#include <photon/photon.h>
#include <photon/thread/thread.h>

#include <thread>

TEST(enter_as_executor, test) {
// set global default logger output to null
set_log_output(log_output_null);

// create own logger
ALogLogger logger;
// set log_output to stdout, log level as info
logger.log_output = log_output_stdout;
logger.log_level = ALOG_INFO;

photon::init();
DEFER(photon::fini());
// do some ready work in this photon environment
auto vcpu = photon::get_vcpu();
auto e = photon::Executor::export_as_executor();
DEFER(delete e);

std::thread([&logger, e, vcpu] {
e->perform([&logger, vcpu] {
logger << LOG_INFO("Hello from a normal thread");
auto cvcpu = photon::get_vcpu();
logger << LOG_INFO("executor at `, current on `, `", vcpu, cvcpu,
vcpu == cvcpu ? "EQ" : "NEQ");
});
}).detach();

photon::thread_usleep(1UL * 1024 * 1024);
}
3 changes: 2 additions & 1 deletion common/expirecontainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class ExpireContainerBase : public Object {
intrusive_list<Item> _list;
uint64_t _expiration;
photon::Timer _timer;
photon::spinlock _lock;
photon::spinlock _lock; // protect _list/_set operations

using ItemPtr = Item*;
struct ItemHash {
Expand Down Expand Up @@ -177,6 +177,7 @@ class ExpireContainer : public ExpireContainerBase {
template <typename... Gs>
iterator insert(const InterfaceKey& key, Gs&&... xs) {
auto item = new Item(key, std::forward<Gs>(xs)...);
SCOPED_LOCK(_lock);
auto pr = Base::insert(item);
if (!pr.second) {
delete item;
Expand Down
34 changes: 27 additions & 7 deletions common/lockfree_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,10 @@ namespace common {
* photon thread when queue is empty, and once it got object by recv, it will
* trying using `thread_yield` instead of semaphore, to get better performance
* and load balancing.
* Watch out that `recv` should run in photon environment (because it has to)
* use photon semaphore to be notified that new item has sended. `send` could
* running in photon or std::thread environment (needs to set template `Pause` as
* `ThreadPause`).
*
* @tparam QueueType shoulde be one of LockfreeMPMCRingQueue,
* LockfreeBatchMPMCRingQueue, or LockfreeSPSCRingQueue, with their own template
Expand All @@ -541,7 +545,8 @@ class RingChannel : public QueueType {
protected:
photon::semaphore queue_sem;
std::atomic<uint64_t> idler{0};
static constexpr uint64_t BUSY_YIELD_TIMEOUT = 1024;
uint64_t m_busy_yield_turn;
uint64_t m_busy_yield_timeout;

using T = decltype(std::declval<QueueType>().recv());

Expand All @@ -553,21 +558,36 @@ class RingChannel : public QueueType {
using QueueType::read_available;
using QueueType::write_available;

/**
* @brief Construct a new Ring Channel object
*
* @param busy_yield_timeout setting yield timeout, default is template
* parameter DEFAULT_BUSY_YIELD_TIMEOUT. Ring Channel will try busy yield
* in `busy_yield_timeout` usecs.
*/
RingChannel(uint64_t busy_yield_turn = 64,
uint64_t busy_yield_timeout = 1024)
: m_busy_yield_turn(busy_yield_turn),
m_busy_yield_timeout(busy_yield_timeout) {}

template <typename Pause = ThreadPause>
void send(const T& x) {
while (!push(x)) {
if (!full()) photon::thread_yield();
if (!full()) Pause::pause();
}
uint64_t idle = idler.exchange(0, std::memory_order_acq_rel);
queue_sem.signal(idle);
queue_sem.signal(idler.load(std::memory_order_acquire));
}
T recv() {
T x;
Timeout yield_timeout(BUSY_YIELD_TIMEOUT);
Timeout yield_timeout(m_busy_yield_timeout);
int yield_turn = m_busy_yield_turn;
idler.fetch_add(1, std::memory_order_acq_rel);
DEFER(idler.fetch_sub(1, std::memory_order_acq_rel));
while (!pop(x)) {
if (photon::now < yield_timeout.expire()) {
if (yield_turn > 0 && photon::now < yield_timeout.expire()) {
yield_turn--;
photon::thread_yield();
} else {
idler.fetch_add(1, std::memory_order_acq_rel);
queue_sem.wait(1);
}
}
Expand Down
10 changes: 5 additions & 5 deletions common/metric-meter/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ class AverageCounter {
if (now - time > m_interval * 2) {
reset();
} else if (now - time > m_interval) {
sum = photon::sat_sub(sum, sum * (now - time) / m_interval);
cnt = photon::sat_sub(cnt, cnt * (now - time) / m_interval);
time = now;
sum = photon::sat_sub(sum, sum * (now - time - m_interval) / m_interval);
cnt = photon::sat_sub(cnt, cnt * (now - time - m_interval) / m_interval);
time = now - m_interval;
}
}
void put(int64_t val) {
Expand Down Expand Up @@ -95,8 +95,8 @@ class QPSCounter {
reset();
} else if (now - time > m_interval) {
counter =
photon::sat_sub(counter, counter * (now - time) / m_interval);
time = now;
photon::sat_sub(counter, counter * (now - time - m_interval) / m_interval);
time = now - m_interval;
}
}
void put(int64_t val = 1) {
Expand Down
Loading