diff --git a/common/lockfree_queue.h b/common/lockfree_queue.h index 9739d26f..383f03fc 100644 --- a/common/lockfree_queue.h +++ b/common/lockfree_queue.h @@ -82,7 +82,7 @@ struct ThreadPause : PauseBase { }; namespace photon { -void thread_yield(); +int thread_yield(); } struct PhotonPause : PauseBase { inline static __attribute__((always_inline)) void pause() { diff --git a/thread/thread.cpp b/thread/thread.cpp index 695481b1..67fd3e53 100644 --- a/thread/thread.cpp +++ b/thread/thread.cpp @@ -1162,41 +1162,44 @@ R"( return (states) th->state; } - void thread_yield() + int thread_yield() { - assert(!AtomicRunQ().single()); - auto sw = AtomicRunQ().goto_next(); + RunQ rq; if_update_now(); + rq.current->error_number = 0; + auto sw = AtomicRunQ(rq).goto_next(); switch_context(sw.from, sw.to); + return rq.current->error_number; } - void thread_yield_fast() { - assert(!AtomicRunQ().single()); + inline void thread_yield_fast() { auto sw = AtomicRunQ().goto_next(); switch_context(sw.from, sw.to); } - void thread_yield_to(thread* th) { + int thread_yield_to(thread* th) { if (unlikely(th == nullptr)) { // yield to any thread return thread_yield(); } RunQ rq; if (unlikely(th == rq.current)) { // yield to current should just update time if_update_now(); - return; + return 0; } else if (unlikely(th->vcpu != rq.current->vcpu)) { - LOG_ERROR_RETURN(EINVAL, , "target thread ` must be run by the same vcpu as CURRENT!", th); + LOG_ERROR_RETURN(EINVAL, -1, "target thread ` must be run by the same vcpu as CURRENT!", th); } else if (unlikely(th->state == states::STANDBY)) { while (th->state == states::STANDBY) resume_threads(); assert(th->state == states::READY); } else if (unlikely(th->state != states::READY)) { - LOG_ERROR_RETURN(EINVAL, , "target thread ` must be READY!", th); + LOG_ERROR_RETURN(EINVAL, -1, "target thread ` must be READY!", th); } auto sw = AtomicRunQ(rq).try_goto(th); if_update_now(); + rq.current->error_number = 0; switch_context(sw.from, sw.to); + return rq.current->error_number; } __attribute__((always_inline)) inline @@ -1216,13 +1219,16 @@ R"( sw.from->get_vcpu()->sleepq.push(sw.from); return sw; } - + inline int yield_as_sleep() { + int ret = thread_yield(); + if (unlikely(ret)) { errno = ret; return -1; } + return 0; + } // returns 0 if slept well (at lease `useconds`), -1 otherwise static int thread_usleep(Timeout timeout, thread_list* waitq) { if (unlikely(timeout.expired())) { - thread_yield(); - return 0; + return yield_as_sleep(); } auto r = prepare_usleep(timeout, waitq); @@ -1292,7 +1298,7 @@ R"( if (unlikely(!rq.current)) LOG_ERROR_RETURN(ENOSYS, -1, "Photon not initialized in this thread"); if (unlikely(timeout.expired())) - return thread_yield(), 0; + return yield_as_sleep(); if (unlikely(rq.current->is_shutting_down())) return do_shutdown_usleep(timeout, rq); return do_thread_usleep(timeout, rq); @@ -1319,9 +1325,16 @@ R"( { if (unlikely(!th)) LOG_ERROR_RETURN(EINVAL, , "invalid parameter"); - if (unlikely(th->state != states::SLEEPING)) return; + auto state = th->state; + if (unlikely(state != states::SLEEPING)) { + out: // may have thread_yield()-ed + if (state == states::READY && th->error_number == 0) + th->error_number = error_number; + return; + } SCOPED_LOCK(th->lock); - if (unlikely(th->state != states::SLEEPING)) return; + state = th->state; + if (unlikely(state != states::SLEEPING)) goto out; prelocked_thread_interrupt(th, error_number); } @@ -1550,9 +1563,9 @@ R"( int mutex::lock(Timeout timeout) { if (try_lock() == 0) return 0; for (auto re = retries; re; --re) { - thread_yield(); - if (try_lock() == 0) - return 0; + int ret = thread_yield(); + if (unlikely(ret)) { errno = ret; return -1; } + if (try_lock() == 0) return 0; } splock.lock(); if (try_lock() == 0) { diff --git a/thread/thread.h b/thread/thread.h index 6406cb78..93b78d6e 100644 --- a/thread/thread.h +++ b/thread/thread.h @@ -78,10 +78,12 @@ namespace photon void thread_join(join_handle* jh); // switching to other threads (without going into sleep queue) - void thread_yield(); + // return error_number if interrupted during the rolling + int thread_yield(); // switching to a specific thread, which must be RUNNING - void thread_yield_to(thread* th); + // return error_number if interrupted during the rolling + int thread_yield_to(thread* th); // suspend CURRENT thread for specified time duration, and switch // control to other threads, resuming possible sleepers.