diff --git a/rpc/out-of-order-execution.cpp b/rpc/out-of-order-execution.cpp index 7fdc30e7..4c7c9d90 100644 --- a/rpc/out-of-order-execution.cpp +++ b/rpc/out-of-order-execution.cpp @@ -28,8 +28,8 @@ namespace rpc { { public: unordered_map m_map; - condition_variable m_cond_collected; - mutex m_mutex_w, m_mutex_r; + condition_variable m_cond_collected, m_wait; + mutex m_mutex_w, m_mutex_r, m_mutex_map; uint64_t m_issuing = 0; uint64_t m_tag = 0; bool m_running = true; @@ -60,9 +60,9 @@ namespace rpc { } int issue_operation(OutOfOrderContext& args) //firing issue { + SCOPED_LOCK(m_mutex_w); m_issuing ++; DEFER(m_issuing --); - SCOPED_LOCK(m_mutex_w); if (!m_running) LOG_ERROR_RETURN(ESHUTDOWN, -1, "engine is been shuting down"); if (!args.flag_tag_valid) @@ -71,108 +71,158 @@ namespace rpc { args.tag = ++m_tag; // auto increase if it is not user defined tag } args.th = CURRENT; - args.collected = false; + { + SCOPED_LOCK(args.phaselock); + args.phase = OooPhase::BEFORE_ISSUE; + } args.ret = 0; - auto ret = m_map.insert({args.tag, &args}); //the return value is {iter, bool} - if (!ret.second) // means insert failed because of key already exists { - auto tag = args.tag; - auto th = (ret.first == m_map.end()) ? nullptr : ret.first->second->th; - LOG_ERROR("failed to insert record into unordered hash map", - VALUE(tag), VALUE(CURRENT), VALUE(th)); - if (args.flag_tag_valid) // user set tag, need to tell user it is a failure - LOG_ERROR_RETURN(EINVAL, -1, "the tag in argument is NOT valid"); - goto again; + SCOPED_LOCK(m_mutex_map); + auto ret = m_map.insert({args.tag, &args}); //the return value is {iter, bool} + if (!ret.second) // means insert failed because of key already exists + { + auto tag = args.tag; + auto th = (ret.first == m_map.end()) ? nullptr : ret.first->second->th; + LOG_ERROR("failed to insert record into unordered hash map", + VALUE(tag), VALUE(CURRENT), VALUE(th)); + if (args.flag_tag_valid) // user set tag, need to tell user it is a failure + LOG_ERROR_RETURN(EINVAL, -1, "the tag in argument is NOT valid"); + goto again; + } } int ret2 = args.do_issue(&args); if (ret2 < 0) { + SCOPED_LOCK(m_mutex_map); m_map.erase(args.tag); + m_cond_collected.notify_one(); LOG_ERROR_RETURN(0, -1, "failed to do_issue()"); } + { + SCOPED_LOCK(args.phaselock); + args.phase = OooPhase::ISSUED; + } return 0; } + + static void wait_check(void* args) { + OutOfOrderContext& ctx = *(OutOfOrderContext*)args; + ctx.phase = OooPhase::WAITING; + ctx.phaselock.unlock(); + }; int wait_completion(OutOfOrderContext& args) //recieving work { - // lock with param 1 means allow entry without lock - // when interuptted - int lockret = m_mutex_r.lock(args.timeout); - ERRNO err; - DEFER(if (lockret == 0) m_mutex_r.unlock()); - - // when wait_completion returned, - // always have tag removed from the map - // notify the waiting function (like shutdown()) - DEFER(m_cond_collected.notify_one()); - - if (lockret < 0 && err.no == ETIMEDOUT) { - // Timed out so return as failure - m_map.erase(args.tag); - LOG_ERROR_RETURN(ETIMEDOUT, -1, "timeout wait for completion"); - } - - auto o_tag = args.tag; { - auto o_it = m_map.find(o_tag); - if (o_it == m_map.end()) { - LOG_ERROR_RETURN(EINVAL, -1, "issue of ` not found", VALUE(args.tag)); - } - if (o_it->second->th != CURRENT) - { - LOG_ERROR_RETURN(EINVAL, -1, "args tag ` not belong to current thread `", VALUE(args.tag), VALUE(CURRENT)); - } - if (args.collected) { - // my completion has been done - // just collect it, clear the trace, - // then return result - m_map.erase(o_it); - return args.ret; + // check if context issued + SCOPED_LOCK(m_mutex_map); + if (m_map.find(args.tag) == m_map.end()) { + LOG_ERROR_RETURN(EINVAL, -1, + "context not found in map"); } } - //Hold the lock, but not get the result. - while (true) + DEFER(m_wait.notify_one()); { - int ret = args.do_completion(&args); //this do_completion may recieve results for other threads. + SCOPED_LOCK(args.phaselock); + if (args.phase == OooPhase::BEFORE_ISSUE) + LOG_ERROR_RETURN(EINVAL, -1, "context not issued"); + if (args.phase == OooPhase::WAITING) + LOG_ERROR_RETURN(EINVAL, -1, "context already in waiting"); + for (bool hold_lock = false; !hold_lock;) { + switch (args.phase) { + case OooPhase::COLLECTED: + // result alread collected before wait + if (args.th != CURRENT) + LOG_ERROR_RETURN(EINVAL, -1, "context is not issued by current thread"); + return args.ret; + case OooPhase::ISSUED: + args.th = photon::CURRENT; + args.phase = OooPhase::WAITING; + case OooPhase::WAITING: + { + if (m_mutex_r.try_lock() == 0) { + hold_lock = true; + break; + } + auto ret = m_wait.wait(args.phaselock, args.timeout); + // Check if collected + if (args.phase == OooPhase::COLLECTED && + args.th == CURRENT) { + return args.ret; + } + if (ret == -1) { + // or just timed out + { + SCOPED_LOCK(m_mutex_map); + m_map.erase(args.tag); + m_cond_collected.notify_one(); + } + LOG_ERROR_RETURN(ETIMEDOUT, -1, "waiting for completion timeout"); + } + break; + } + default: + LOG_ERROR_RETURN(EINVAL, -1, "unexpected phase"); + } + } + } + + // Holding mutex_r + // My origin tag is o_tag + auto o_tag = args.tag; + DEFER(m_mutex_r.unlock()); + for (;;) { + int ret = args.do_completion(&args); + //this do_completion may recieve results for other threads. // but still works because even if tag of this issue have a unique do_completion // which make other threads never could recieve it's result // the thread will waiting till it hold the lock and get it by itself // Since thread may not know the result of an issue will recieve by which thread // User must make sure that the do_completion can atleast recieve the result of it's own issue. - if (ret < 0) { - // set with nullptr means the thread is once issued but failed when wait_completion - m_map.erase(o_tag); - LOG_ERROR_RETURN(0, -1, "failed to do_completion()"); - } - - if (o_tag == args.tag) { - m_map.erase(o_tag); - break; // it's my result, let's break, and collect it - } + OutOfOrderContext* targ = nullptr; + unordered_map::iterator it; + { + SCOPED_LOCK(m_mutex_map); + DEFER(m_cond_collected.notify_one()); + if (ret < 0) { + // set with nullptr means the thread is once issued but failed when wait_completion + m_map.erase(o_tag); + LOG_ERROR_RETURN(0, -1, "failed to do_completion()"); + } - auto it = m_map.find(args.tag); + it = m_map.find(args.tag); - if (it == m_map.end()) { - // response tag never issued - m_map.erase(o_tag); - LOG_ERROR_RETURN(ENOENT, -2, "response's tag ` not found, response should be dropped", args.tag); + if (it == m_map.end()) { + // response tag never issued + m_map.erase(o_tag); + LOG_ERROR_RETURN(ENOENT, -2, "response's tag ` not found, response should be dropped", args.tag); + } + targ = it->second; + m_map.erase(it); } - auto targ = it->second; - auto th = targ->th; + // collect with mutex_r + targ->ret = targ->do_collect(targ); - if (!th) - // issued but requesting thread just failed in completion when waiting - LOG_ERROR_RETURN(ENOENT, -2, "response recvd, but requesting thread is NULL!"); - - it->second->ret = targ->do_collect(targ); - it->second->collected = true; - thread_interrupt(th); // other threads' response, resume him + { + photon::thread *th; + { + SCOPED_LOCK(targ->phaselock); + th = targ->th; + targ->phase = OooPhase::COLLECTED; + } + if (o_tag == args.tag) { + if (th != CURRENT) { + LOG_ERROR_RETURN(EINVAL, -1, "args tag ` not belong to current thread `", VALUE(args.tag), VALUE(CURRENT)); + } + return args.ret; // it's my result, let's break, and + // collect it + } + if (!th) + // issued but requesting thread just failed in completion when waiting + LOG_ERROR_RETURN(ENOENT, -2, "response recvd, but requesting thread is NULL!"); + thread_interrupt(th, EINTR); // other threads' response, resume him + } } - // only break can bring out the while-loop - // means my result has been completed, - // ready to collect - DEFER(thread_yield_to(nullptr)); - return args.do_collect(&args); } int issue_wait(OutOfOrderContext& args) { diff --git a/rpc/out-of-order-execution.h b/rpc/out-of-order-execution.h index 52bdafb8..5eca9779 100644 --- a/rpc/out-of-order-execution.h +++ b/rpc/out-of-order-execution.h @@ -27,7 +27,7 @@ collection of result. The first 2 parts are realized via callbacks. #pragma once #include #include -#include +#include namespace photon{ @@ -42,6 +42,12 @@ namespace rpc { void delete_ooo_execution_engine(OutOfOrder_Execution_Engine* engine); + enum class OooPhase : int { + BEFORE_ISSUE = 0, + ISSUED = 1, + WAITING = 2, + COLLECTED = 3 + }; struct OutOfOrderContext { OutOfOrder_Execution_Engine* engine; @@ -49,7 +55,7 @@ namespace rpc { // an unique tag of the opeartion, which can be filled // by user (together with `flag_tag_valid` = true), // by the `engine`, or by `do_completion`. - uint64_t tag; + uint64_t tag = 0; // The `CallbackType` have an prototype of // either `int (*)(void*, OutOfOrderContext*)`, @@ -77,19 +83,35 @@ namespace rpc { CallbackType do_collect; // thread that binding with this argument - thread * th; + thread * th = nullptr; // Timeout for wait Timeout timeout; + // Context phase + photon::spinlock phaselock; + volatile OooPhase phase = OooPhase::BEFORE_ISSUE; + // return value of collection - int ret; + int ret = -1; // whether or not the `tag` field is valid bool flag_tag_valid = false; - // whether the context result is collected - volatile bool collected = false; + OutOfOrderContext() = default; + OutOfOrderContext& operator=(const OutOfOrderContext& rhs) { + engine = rhs.engine; + tag = rhs.tag; + do_issue = rhs.do_issue; + do_completion = rhs.do_completion; + do_collect = rhs.do_collect; + th = rhs.th; + timeout = rhs.timeout; + flag_tag_valid = rhs.flag_tag_valid; + phase = rhs.phase; + ret = rhs.ret; + return *this; + } };