Skip to content

Commit

Permalink
Refactoring OutOfOrderEngine, make it able to share between vCPUs
Browse files Browse the repository at this point in the history
  • Loading branch information
Coldwings committed Sep 5, 2024
1 parent 32fd5ed commit 75de9cf
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 83 deletions.
204 changes: 127 additions & 77 deletions rpc/out-of-order-execution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ namespace rpc {
{
public:
unordered_map<uint64_t, OutOfOrderContext*> m_map;
condition_variable m_cond_collected;
mutex m_mutex_w, m_mutex_r;
condition_variable m_cond_collected, m_wait;
mutex m_mutex_w, m_mutex_r, m_mutex_map;
uint64_t m_issuing = 0;
uint64_t m_tag = 0;
bool m_running = true;
Expand Down Expand Up @@ -60,9 +60,9 @@ namespace rpc {
}
int issue_operation(OutOfOrderContext& args) //firing issue
{
SCOPED_LOCK(m_mutex_w);
m_issuing ++;
DEFER(m_issuing --);
SCOPED_LOCK(m_mutex_w);
if (!m_running)
LOG_ERROR_RETURN(ESHUTDOWN, -1, "engine is been shuting down");
if (!args.flag_tag_valid)
Expand All @@ -71,108 +71,158 @@ namespace rpc {
args.tag = ++m_tag; // auto increase if it is not user defined tag
}
args.th = CURRENT;
args.collected = false;
{
SCOPED_LOCK(args.phaselock);
args.phase = OooPhase::BEFORE_ISSUE;
}
args.ret = 0;
auto ret = m_map.insert({args.tag, &args}); //the return value is {iter, bool}
if (!ret.second) // means insert failed because of key already exists
{
auto tag = args.tag;
auto th = (ret.first == m_map.end()) ? nullptr : ret.first->second->th;
LOG_ERROR("failed to insert record into unordered hash map",
VALUE(tag), VALUE(CURRENT), VALUE(th));
if (args.flag_tag_valid) // user set tag, need to tell user it is a failure
LOG_ERROR_RETURN(EINVAL, -1, "the tag in argument is NOT valid");
goto again;
SCOPED_LOCK(m_mutex_map);
auto ret = m_map.insert({args.tag, &args}); //the return value is {iter, bool}
if (!ret.second) // means insert failed because of key already exists
{
auto tag = args.tag;
auto th = (ret.first == m_map.end()) ? nullptr : ret.first->second->th;
LOG_ERROR("failed to insert record into unordered hash map",
VALUE(tag), VALUE(CURRENT), VALUE(th));
if (args.flag_tag_valid) // user set tag, need to tell user it is a failure
LOG_ERROR_RETURN(EINVAL, -1, "the tag in argument is NOT valid");
goto again;
}
}

int ret2 = args.do_issue(&args);
if (ret2 < 0) {
SCOPED_LOCK(m_mutex_map);
m_map.erase(args.tag);
m_cond_collected.notify_one();
LOG_ERROR_RETURN(0, -1, "failed to do_issue()");
}
{
SCOPED_LOCK(args.phaselock);
args.phase = OooPhase::ISSUED;
}
return 0;
}

static void wait_check(void* args) {
OutOfOrderContext& ctx = *(OutOfOrderContext*)args;
ctx.phase = OooPhase::WAITING;
ctx.phaselock.unlock();
};
int wait_completion(OutOfOrderContext& args) //recieving work
{
// lock with param 1 means allow entry without lock
// when interuptted
int lockret = m_mutex_r.lock(args.timeout);
ERRNO err;
DEFER(if (lockret == 0) m_mutex_r.unlock());

// when wait_completion returned,
// always have tag removed from the map
// notify the waiting function (like shutdown())
DEFER(m_cond_collected.notify_one());

if (lockret < 0 && err.no == ETIMEDOUT) {
// Timed out so return as failure
m_map.erase(args.tag);
LOG_ERROR_RETURN(ETIMEDOUT, -1, "timeout wait for completion");
}

auto o_tag = args.tag;
{
auto o_it = m_map.find(o_tag);
if (o_it == m_map.end()) {
LOG_ERROR_RETURN(EINVAL, -1, "issue of ` not found", VALUE(args.tag));
}
if (o_it->second->th != CURRENT)
{
LOG_ERROR_RETURN(EINVAL, -1, "args tag ` not belong to current thread `", VALUE(args.tag), VALUE(CURRENT));
}
if (args.collected) {
// my completion has been done
// just collect it, clear the trace,
// then return result
m_map.erase(o_it);
return args.ret;
// check if context issued
SCOPED_LOCK(m_mutex_map);
if (m_map.find(args.tag) == m_map.end()) {
LOG_ERROR_RETURN(EINVAL, -1,
"context not found in map");
}
}
//Hold the lock, but not get the result.
while (true)
DEFER(m_wait.notify_one());
{
int ret = args.do_completion(&args); //this do_completion may recieve results for other threads.
SCOPED_LOCK(args.phaselock);
if (args.phase == OooPhase::BEFORE_ISSUE)
LOG_ERROR_RETURN(EINVAL, -1, "context not issued");
if (args.phase == OooPhase::WAITING)
LOG_ERROR_RETURN(EINVAL, -1, "context already in waiting");
for (bool hold_lock = false; !hold_lock;) {
switch (args.phase) {
case OooPhase::COLLECTED:
// result alread collected before wait
if (args.th != CURRENT)
LOG_ERROR_RETURN(EINVAL, -1, "context is not issued by current thread");
return args.ret;
case OooPhase::ISSUED:
args.th = photon::CURRENT;
args.phase = OooPhase::WAITING;
case OooPhase::WAITING:
{
if (m_mutex_r.try_lock() == 0) {
hold_lock = true;
break;
}
auto ret = m_wait.wait(args.phaselock, args.timeout);
// Check if collected
if (args.phase == OooPhase::COLLECTED &&
args.th == CURRENT) {
return args.ret;
}
if (ret == -1) {
// or just timed out
{
SCOPED_LOCK(m_mutex_map);
m_map.erase(args.tag);
m_cond_collected.notify_one();
}
LOG_ERROR_RETURN(ETIMEDOUT, -1, "waiting for completion timeout");
}
break;
}
default:
LOG_ERROR_RETURN(EINVAL, -1, "unexpected phase");
}
}
}

// Holding mutex_r
// My origin tag is o_tag
auto o_tag = args.tag;
DEFER(m_mutex_r.unlock());
for (;;) {
int ret = args.do_completion(&args);
//this do_completion may recieve results for other threads.
// but still works because even if tag of this issue have a unique do_completion
// which make other threads never could recieve it's result
// the thread will waiting till it hold the lock and get it by itself
// Since thread may not know the result of an issue will recieve by which thread
// User must make sure that the do_completion can atleast recieve the result of it's own issue.
if (ret < 0) {
// set with nullptr means the thread is once issued but failed when wait_completion
m_map.erase(o_tag);
LOG_ERROR_RETURN(0, -1, "failed to do_completion()");
}

if (o_tag == args.tag) {
m_map.erase(o_tag);
break; // it's my result, let's break, and collect it
}
OutOfOrderContext* targ = nullptr;
unordered_map<uint64_t, OutOfOrderContext*>::iterator it;
{
SCOPED_LOCK(m_mutex_map);
DEFER(m_cond_collected.notify_one());
if (ret < 0) {
// set with nullptr means the thread is once issued but failed when wait_completion
m_map.erase(o_tag);
LOG_ERROR_RETURN(0, -1, "failed to do_completion()");
}

auto it = m_map.find(args.tag);
it = m_map.find(args.tag);

if (it == m_map.end()) {
// response tag never issued
m_map.erase(o_tag);
LOG_ERROR_RETURN(ENOENT, -2, "response's tag ` not found, response should be dropped", args.tag);
if (it == m_map.end()) {
// response tag never issued
m_map.erase(o_tag);
LOG_ERROR_RETURN(ENOENT, -2, "response's tag ` not found, response should be dropped", args.tag);
}
targ = it->second;
m_map.erase(it);
}

auto targ = it->second;
auto th = targ->th;
// collect with mutex_r
targ->ret = targ->do_collect(targ);

if (!th)
// issued but requesting thread just failed in completion when waiting
LOG_ERROR_RETURN(ENOENT, -2, "response recvd, but requesting thread is NULL!");

it->second->ret = targ->do_collect(targ);
it->second->collected = true;
thread_interrupt(th); // other threads' response, resume him
{
photon::thread *th;
{
SCOPED_LOCK(targ->phaselock);
th = targ->th;
targ->phase = OooPhase::COLLECTED;
}
if (o_tag == args.tag) {
if (th != CURRENT) {
LOG_ERROR_RETURN(EINVAL, -1, "args tag ` not belong to current thread `", VALUE(args.tag), VALUE(CURRENT));
}
return args.ret; // it's my result, let's break, and
// collect it
}
if (!th)
// issued but requesting thread just failed in completion when waiting
LOG_ERROR_RETURN(ENOENT, -2, "response recvd, but requesting thread is NULL!");
thread_interrupt(th, EINTR); // other threads' response, resume him
}
}
// only break can bring out the while-loop
// means my result has been completed,
// ready to collect
DEFER(thread_yield_to(nullptr));
return args.do_collect(&args);
}
int issue_wait(OutOfOrderContext& args)
{
Expand Down
34 changes: 28 additions & 6 deletions rpc/out-of-order-execution.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ collection of result. The first 2 parts are realized via callbacks.
#pragma once
#include <photon/common/callback.h>
#include <photon/common/timeout.h>
#include <atomic>
#include <photon/thread/thread.h>

namespace photon{

Expand All @@ -42,14 +42,20 @@ namespace rpc {

void delete_ooo_execution_engine(OutOfOrder_Execution_Engine* engine);

enum class OooPhase : int {
BEFORE_ISSUE = 0,
ISSUED = 1,
WAITING = 2,
COLLECTED = 3
};
struct OutOfOrderContext
{
OutOfOrder_Execution_Engine* engine;

// an unique tag of the opeartion, which can be filled
// by user (together with `flag_tag_valid` = true),
// by the `engine`, or by `do_completion`.
uint64_t tag;
uint64_t tag = 0;

// The `CallbackType` have an prototype of
// either `int (*)(void*, OutOfOrderContext*)`,
Expand Down Expand Up @@ -77,19 +83,35 @@ namespace rpc {
CallbackType do_collect;

// thread that binding with this argument
thread * th;
thread * th = nullptr;

// Timeout for wait
Timeout timeout;

// Context phase
photon::spinlock phaselock;
volatile OooPhase phase = OooPhase::BEFORE_ISSUE;

// return value of collection
int ret;
int ret = -1;

// whether or not the `tag` field is valid
bool flag_tag_valid = false;

// whether the context result is collected
volatile bool collected = false;
OutOfOrderContext() = default;
OutOfOrderContext& operator=(const OutOfOrderContext& rhs) {
engine = rhs.engine;
tag = rhs.tag;
do_issue = rhs.do_issue;
do_completion = rhs.do_completion;
do_collect = rhs.do_collect;
th = rhs.th;
timeout = rhs.timeout;
flag_tag_valid = rhs.flag_tag_valid;
phase = rhs.phase;
ret = rhs.ret;
return *this;
}
};


Expand Down

0 comments on commit 75de9cf

Please sign in to comment.