From 7d0f277ad53838e1748016d979e901480c11d388 Mon Sep 17 00:00:00 2001 From: DH Date: Thu, 31 Oct 2024 22:54:16 +0300 Subject: [PATCH] orbis-kernel: Add SharedAtomic utility Initial shared atomic implementation for Darwin --- orbis-kernel/CMakeLists.txt | 7 + orbis-kernel/include/orbis/error.hpp | 162 +++++++++++++++++ .../include/orbis/utils/SharedAtomic.hpp | 165 ++++++++++++++++++ orbis-kernel/include/orbis/utils/SharedCV.hpp | 33 +++- .../include/orbis/utils/SharedMutex.hpp | 11 +- orbis-kernel/src/evf.cpp | 6 +- orbis-kernel/src/umtx.cpp | 37 ++-- orbis-kernel/src/utils/SharedAtomic.cpp | 95 ++++++++++ orbis-kernel/src/utils/SharedCV.cpp | 126 ++++++++----- orbis-kernel/src/utils/SharedMutex.cpp | 33 ++-- 10 files changed, 578 insertions(+), 97 deletions(-) create mode 100644 orbis-kernel/include/orbis/utils/SharedAtomic.hpp create mode 100644 orbis-kernel/src/utils/SharedAtomic.cpp diff --git a/orbis-kernel/CMakeLists.txt b/orbis-kernel/CMakeLists.txt index 9fced4df..c86604d3 100644 --- a/orbis-kernel/CMakeLists.txt +++ b/orbis-kernel/CMakeLists.txt @@ -3,6 +3,7 @@ set(CMAKE_POSITION_INDEPENDENT_CODE on) add_library(obj.orbis-utils-ipc OBJECT src/utils/SharedMutex.cpp src/utils/SharedCV.cpp + src/utils/SharedAtomic.cpp ) add_library(obj.orbis-kernel OBJECT src/module.cpp @@ -89,6 +90,12 @@ target_include_directories(obj.orbis-utils-ipc ${CMAKE_CURRENT_SOURCE_DIR}/include/orbis ) +if(${CMAKE_SYSTEM_NAME} STREQUAL "Linux") + target_compile_definitions(obj.orbis-utils-ipc PUBLIC ORBIS_HAS_FUTEX) +elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Darwin") + target_compile_definitions(obj.orbis-utils-ipc PUBLIC ORBIS_HAS_ULOCK) +endif() + add_library(orbis-utils-ipc STATIC) add_library(orbis-kernel STATIC) add_library(orbis-kernel-shared SHARED) diff --git a/orbis-kernel/include/orbis/error.hpp b/orbis-kernel/include/orbis/error.hpp index c670a46d..ab78f823 100644 --- a/orbis-kernel/include/orbis/error.hpp +++ b/orbis-kernel/include/orbis/error.hpp @@ -2,3 +2,165 @@ #include "error/ErrorCode.hpp" // IWYU pragma: export #include "error/SysResult.hpp" // IWYU pragma: export +#include + +namespace orbis { +static orbis::ErrorCode toErrorCode(std::errc errc) { + if (errc == std::errc{}) { + return {}; + } + + switch (errc) { + case std::errc::address_family_not_supported: + return orbis::ErrorCode::AFNOSUPPORT; + case std::errc::address_in_use: + return orbis::ErrorCode::ADDRINUSE; + case std::errc::address_not_available: + return orbis::ErrorCode::ADDRNOTAVAIL; + case std::errc::already_connected: + return orbis::ErrorCode::ISCONN; + case std::errc::argument_out_of_domain: + return orbis::ErrorCode::DOM; + case std::errc::bad_address: + return orbis::ErrorCode::FAULT; + case std::errc::bad_file_descriptor: + return orbis::ErrorCode::BADF; + case std::errc::bad_message: + return orbis::ErrorCode::BADMSG; + case std::errc::broken_pipe: + return orbis::ErrorCode::PIPE; + case std::errc::connection_aborted: + return orbis::ErrorCode::CONNABORTED; + case std::errc::connection_already_in_progress: + return orbis::ErrorCode::ALREADY; + case std::errc::connection_refused: + return orbis::ErrorCode::CONNREFUSED; + case std::errc::connection_reset: + return orbis::ErrorCode::CONNRESET; + case std::errc::cross_device_link: + return orbis::ErrorCode::XDEV; + case std::errc::destination_address_required: + return orbis::ErrorCode::DESTADDRREQ; + case std::errc::device_or_resource_busy: + return orbis::ErrorCode::BUSY; + case std::errc::directory_not_empty: + return orbis::ErrorCode::NOTEMPTY; + case std::errc::executable_format_error: + return orbis::ErrorCode::NOEXEC; + case std::errc::file_exists: + return orbis::ErrorCode::EXIST; + case std::errc::file_too_large: + return orbis::ErrorCode::FBIG; + case std::errc::filename_too_long: + return orbis::ErrorCode::NAMETOOLONG; + case std::errc::function_not_supported: + return orbis::ErrorCode::NOSYS; + case std::errc::host_unreachable: + return orbis::ErrorCode::HOSTUNREACH; + case std::errc::identifier_removed: + return orbis::ErrorCode::IDRM; + case std::errc::illegal_byte_sequence: + return orbis::ErrorCode::ILSEQ; + case std::errc::inappropriate_io_control_operation: + return orbis::ErrorCode::NOTTY; + case std::errc::interrupted: + return orbis::ErrorCode::INTR; + case std::errc::invalid_argument: + return orbis::ErrorCode::INVAL; + case std::errc::invalid_seek: + return orbis::ErrorCode::SPIPE; + case std::errc::io_error: + return orbis::ErrorCode::IO; + case std::errc::is_a_directory: + return orbis::ErrorCode::ISDIR; + case std::errc::message_size: + return orbis::ErrorCode::MSGSIZE; + case std::errc::network_down: + return orbis::ErrorCode::NETDOWN; + case std::errc::network_reset: + return orbis::ErrorCode::NETRESET; + case std::errc::network_unreachable: + return orbis::ErrorCode::NETUNREACH; + case std::errc::no_buffer_space: + return orbis::ErrorCode::NOBUFS; + case std::errc::no_child_process: + return orbis::ErrorCode::CHILD; + case std::errc::no_link: + return orbis::ErrorCode::NOLINK; + case std::errc::no_lock_available: + return orbis::ErrorCode::NOLCK; + case std::errc::no_message: + return orbis::ErrorCode::NOMSG; + case std::errc::no_protocol_option: + return orbis::ErrorCode::NOPROTOOPT; + case std::errc::no_space_on_device: + return orbis::ErrorCode::NOSPC; + case std::errc::no_such_device_or_address: + return orbis::ErrorCode::NXIO; + case std::errc::no_such_device: + return orbis::ErrorCode::NODEV; + case std::errc::no_such_file_or_directory: + return orbis::ErrorCode::NOENT; + case std::errc::no_such_process: + return orbis::ErrorCode::SRCH; + case std::errc::not_a_directory: + return orbis::ErrorCode::NOTDIR; + case std::errc::not_a_socket: + return orbis::ErrorCode::NOTSOCK; + case std::errc::not_connected: + return orbis::ErrorCode::NOTCONN; + case std::errc::not_enough_memory: + return orbis::ErrorCode::NOMEM; + case std::errc::not_supported: + return orbis::ErrorCode::NOTSUP; + case std::errc::operation_canceled: + return orbis::ErrorCode::CANCELED; + case std::errc::operation_in_progress: + return orbis::ErrorCode::INPROGRESS; + case std::errc::operation_not_permitted: + return orbis::ErrorCode::PERM; + case std::errc::operation_would_block: + return orbis::ErrorCode::WOULDBLOCK; + case std::errc::permission_denied: + return orbis::ErrorCode::ACCES; + case std::errc::protocol_error: + return orbis::ErrorCode::PROTO; + case std::errc::protocol_not_supported: + return orbis::ErrorCode::PROTONOSUPPORT; + case std::errc::read_only_file_system: + return orbis::ErrorCode::ROFS; + case std::errc::resource_deadlock_would_occur: + return orbis::ErrorCode::DEADLK; + case std::errc::result_out_of_range: + return orbis::ErrorCode::RANGE; + case std::errc::text_file_busy: + return orbis::ErrorCode::TXTBSY; + case std::errc::timed_out: + return orbis::ErrorCode::TIMEDOUT; + case std::errc::too_many_files_open_in_system: + return orbis::ErrorCode::NFILE; + case std::errc::too_many_files_open: + return orbis::ErrorCode::MFILE; + case std::errc::too_many_links: + return orbis::ErrorCode::MLINK; + case std::errc::too_many_symbolic_link_levels: + return orbis::ErrorCode::LOOP; + case std::errc::value_too_large: + return orbis::ErrorCode::OVERFLOW; + case std::errc::wrong_protocol_type: + return orbis::ErrorCode::PROTOTYPE; + default: + return orbis::ErrorCode::FAULT; + } +} + +inline constexpr orbis::ErrorCode toErrorCode(const std::error_code &code) { + if (!code) { + return {}; + } + if (code.category() != std::generic_category()) { + return orbis::ErrorCode::DOOFUS; + } + return toErrorCode(static_cast(code.value())); +} +} // namespace orbis diff --git a/orbis-kernel/include/orbis/utils/SharedAtomic.hpp b/orbis-kernel/include/orbis/utils/SharedAtomic.hpp new file mode 100644 index 00000000..326c206c --- /dev/null +++ b/orbis-kernel/include/orbis/utils/SharedAtomic.hpp @@ -0,0 +1,165 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace orbis { +inline void yield() { std::this_thread::yield(); } +inline void relax() { +#if defined(__GNUC__) && (defined __i386__ || defined __x86_64__) + __builtin_ia32_pause(); +#else + yield(); +#endif +} + +static constexpr auto kRelaxSpinCount = 12; +static constexpr auto kSpinCount = 16; + +inline namespace utils { +bool try_spin_wait(auto &&pred) { + for (std::size_t i = 0; i < kSpinCount; ++i) { + if (pred()) { + return true; + } + + if (i < kRelaxSpinCount) { + relax(); + } else { + yield(); + } + } + + return false; +} + +bool spin_wait(auto &&pred, auto &&spinCond) { + if (try_spin_wait(pred)) { + return true; + } + + while (spinCond()) { + if (pred()) { + return true; + } + } + + return false; +} + +struct shared_atomic32 : std::atomic { + using atomic::atomic; + using atomic::operator=; + + template + [[nodiscard]] std::errc wait(std::uint32_t oldValue, + std::chrono::time_point timeout) { + if (try_spin_wait( + [&] { return load(std::memory_order::acquire) != oldValue; })) { + return {}; + } + + auto now = Clock::now(); + + if (timeout < now) { + return std::errc::timed_out; + } + + return wait_impl( + oldValue, + std::chrono::duration_cast(timeout - now)); + } + + [[nodiscard]] std::errc wait(std::uint32_t oldValue, + std::chrono::microseconds usec_timeout) { + return wait_impl(oldValue, usec_timeout); + } + + [[nodiscard]] std::errc wait(std::uint32_t oldValue) { + if (try_spin_wait( + [&] { return load(std::memory_order::acquire) != oldValue; })) { + return {}; + } + + return wait_impl(oldValue); + } + + auto wait(auto &fn) -> decltype(fn(std::declval())) { + while (true) { + std::uint32_t lastValue; + if (try_spin_wait([&] { + lastValue = load(std::memory_order::acquire); + return fn(lastValue); + })) { + return; + } + + while (wait_impl(lastValue) != std::errc{}) { + } + } + } + + int notify_one() const { return notify_n(1); } + int notify_all() const { return notify_n(std::numeric_limits::max()); } + + int notify_n(int count) const; + + // Atomic operation; returns old value, or pair of old value and return value + // (cancel op if evaluates to false) + template > + std::conditional_t, std::uint32_t, + std::pair> + fetch_op(F &&func) { + std::uint32_t _new; + std::uint32_t old = load(std::memory_order::relaxed); + while (true) { + _new = old; + if constexpr (std::is_void_v) { + std::invoke(std::forward(func), _new); + if (compare_exchange_strong(old, _new)) [[likely]] { + return old; + } + } else { + RT ret = std::invoke(std::forward(func), _new); + if (!ret || compare_exchange_strong(old, _new)) [[likely]] { + return {old, std::move(ret)}; + } + } + } + } + + // Atomic operation; returns function result value + template > + RT op(F &&func) { + std::uint32_t _new; + std::uint32_t old = load(std::memory_order::relaxed); + + while (true) { + _new = old; + if constexpr (std::is_void_v) { + std::invoke(std::forward(func), _new); + if (compare_exchange_strong(old, _new)) [[likely]] { + return; + } + } else { + RT result = std::invoke(std::forward(func), _new); + if (compare_exchange_strong(old, _new)) [[likely]] { + return result; + } + } + } + } + +private: + [[nodiscard]] std::errc wait_impl(std::uint32_t oldValue, + std::chrono::microseconds usec_timeout = + std::chrono::microseconds::max()); +}; +} // namespace utils +} // namespace orbis diff --git a/orbis-kernel/include/orbis/utils/SharedCV.hpp b/orbis-kernel/include/orbis/utils/SharedCV.hpp index 8de8bc1b..6f8152f2 100644 --- a/orbis-kernel/include/orbis/utils/SharedCV.hpp +++ b/orbis-kernel/include/orbis/utils/SharedCV.hpp @@ -1,8 +1,12 @@ #pragma once +#include "orbis/utils/SharedAtomic.hpp" +#include #include +#include #include #include +#include namespace orbis { inline namespace utils { @@ -11,16 +15,18 @@ class shared_cv final { enum : unsigned { c_waiter_mask = 0xffff, c_signal_mask = 0x7fff0000, +#ifdef ORBIS_HAS_FUTEX c_locked_mask = 0x80000000, +#endif c_signal_one = c_waiter_mask + 1, }; - std::atomic m_value{0}; + shared_atomic32 m_value{0}; protected: // Increment waiter count unsigned add_waiter() noexcept { - return atomic_op(m_value, [](unsigned &value) -> unsigned { + return m_value.op([](unsigned &value) -> unsigned { if ((value & c_signal_mask) == c_signal_mask || (value & c_waiter_mask) == c_waiter_mask) { // Signal or waiter overflow, return immediately @@ -34,8 +40,8 @@ class shared_cv final { } // Internal waiting function - int impl_wait(shared_mutex &mutex, unsigned _val, - std::uint64_t usec_timeout) noexcept; + std::errc impl_wait(shared_mutex &mutex, unsigned _val, + std::uint64_t usec_timeout) noexcept; // Try to notify up to _count threads void impl_wake(shared_mutex &mutex, int _count) noexcept; @@ -43,10 +49,25 @@ class shared_cv final { public: constexpr shared_cv() = default; - int wait(shared_mutex &mutex, std::uint64_t usec_timeout = -1) noexcept { + std::errc + wait(std::unique_lock &lock, + std::chrono::microseconds timeout = std::chrono::microseconds::max()) { + return wait(*lock.mutex(), timeout.count()); + } + + template + std::errc wait(std::unique_lock &lock, + std::chrono::duration timeout) { + return wait( + lock, + std::chrono::duration_cast(timeout).count()); + } + + std::errc wait(shared_mutex &mutex, + std::uint64_t usec_timeout = -1) noexcept { const unsigned _val = add_waiter(); if (!_val) { - return 0; + return {}; } mutex.unlock(); diff --git a/orbis-kernel/include/orbis/utils/SharedMutex.hpp b/orbis-kernel/include/orbis/utils/SharedMutex.hpp index 265ad37e..8beebae7 100644 --- a/orbis-kernel/include/orbis/utils/SharedMutex.hpp +++ b/orbis-kernel/include/orbis/utils/SharedMutex.hpp @@ -1,8 +1,9 @@ #pragma once #include -#include #include +#include +#include namespace orbis { inline namespace utils { @@ -16,11 +17,11 @@ class shared_mutex final { c_err = 1u << 31, }; - std::atomic m_value{}; + shared_atomic32 m_value{}; void impl_lock_shared(unsigned val); void impl_unlock_shared(unsigned old); - int impl_wait(); + std::errc impl_wait(); void impl_signal(); void impl_lock(unsigned val); void impl_unlock(unsigned old); @@ -99,10 +100,10 @@ class shared_mutex final { } // Check whether can immediately obtain an exclusive (writer) lock - bool is_free() const { return m_value.load() == 0; } + [[nodiscard]] bool is_free() const { return m_value.load() == 0; } // Check whether can immediately obtain a shared (reader) lock - bool is_lockable() const { return m_value.load() < c_one - 1; } + [[nodiscard]] bool is_lockable() const { return m_value.load() < c_one - 1; } private: // For CV diff --git a/orbis-kernel/src/evf.cpp b/orbis-kernel/src/evf.cpp index fc6b8bee..83cd5720 100644 --- a/orbis-kernel/src/evf.cpp +++ b/orbis-kernel/src/evf.cpp @@ -33,7 +33,7 @@ orbis::ErrorCode orbis::EventFlag::wait(Thread *thread, std::uint8_t waitMode, thread->evfIsCancelled = -1; std::unique_lock lock(queueMtx); - int result = 0; + orbis::ErrorCode result = {}; while (true) { if (isDeleted) { if (thread->evfIsCancelled == UINT64_MAX) @@ -78,10 +78,10 @@ orbis::ErrorCode orbis::EventFlag::wait(Thread *thread, std::uint8_t waitMode, waitingThreads.emplace_back(waitingThread); if (timeout) { - result = thread->sync_cv.wait(queueMtx, *timeout); + result = toErrorCode(thread->sync_cv.wait(queueMtx, *timeout)); update_timeout(); } else { - result = thread->sync_cv.wait(queueMtx); + result = toErrorCode(thread->sync_cv.wait(queueMtx)); } if (thread->evfIsCancelled == UINT64_MAX) { diff --git a/orbis-kernel/src/umtx.cpp b/orbis-kernel/src/umtx.cpp index cd0ab01b..bad5ccae 100644 --- a/orbis-kernel/src/umtx.cpp +++ b/orbis-kernel/src/umtx.cpp @@ -1,9 +1,8 @@ #include "umtx.hpp" +#include "error.hpp" #include "orbis/KernelContext.hpp" #include "orbis/thread.hpp" -#include "orbis/utils/AtomicOp.hpp" #include "orbis/utils/Logs.hpp" -#include "time.hpp" #include namespace orbis { @@ -41,7 +40,7 @@ uint UmtxChain::notify_n(const UmtxKey &key, sint count) { return 0; uint n = 0; - while (count > 0) { + while (count > 0) { it->second.thr = nullptr; it->second.cv.notify_all(mtx); it = erase(it); @@ -57,9 +56,7 @@ uint UmtxChain::notify_n(const UmtxKey &key, sint count) { return n; } -uint UmtxChain::notify_one(const UmtxKey &key) { - return notify_n(key, 1); -} +uint UmtxChain::notify_one(const UmtxKey &key) { return notify_n(key, 1); } uint UmtxChain::notify_all(const UmtxKey &key) { return notify_n(key, std::numeric_limits::max()); @@ -94,7 +91,7 @@ orbis::ErrorCode orbis::umtx_wait(Thread *thread, ptr addr, ulong id, if (val == id) { if (ut + 1 == 0) { while (true) { - result = ErrorCode{node->second.cv.wait(chain.mtx)}; + result = orbis::toErrorCode(node->second.cv.wait(chain.mtx)); if (result != ErrorCode{} || node->second.thr != thread) break; } @@ -102,7 +99,8 @@ orbis::ErrorCode orbis::umtx_wait(Thread *thread, ptr addr, ulong id, auto start = std::chrono::steady_clock::now(); std::uint64_t udiff = 0; while (true) { - result = ErrorCode{node->second.cv.wait(chain.mtx, ut - udiff)}; + result = + orbis::toErrorCode(node->second.cv.wait(chain.mtx, ut - udiff)); if (node->second.thr != thread) break; udiff = std::chrono::duration_cast( @@ -193,7 +191,7 @@ static ErrorCode do_lock_normal(Thread *thread, ptr m, uint flags, auto [chain, key, lock] = g_context.getUmtxChain1(thread, flags, m); auto node = chain.enqueue(key, thread); if (m->owner.compare_exchange_strong(owner, owner | kUmutexContested)) { - error = ErrorCode{node->second.cv.wait(chain.mtx, ut)}; + error = orbis::toErrorCode(node->second.cv.wait(chain.mtx, ut)); if (error == ErrorCode{} && node->second.thr == thread) { error = ErrorCode::TIMEDOUT; } @@ -323,7 +321,6 @@ orbis::ErrorCode orbis::umtx_cv_wait(Thread *thread, ptr cv, ORBIS_LOG_WARNING("umtx_cv_wait: CLOCK_ID", wflags, cv->clockid); // std::abort(); return ErrorCode::NOSYS; - } if ((wflags & kCvWaitAbsTime) != 0 && ut + 1) { ORBIS_LOG_WARNING("umtx_cv_wait: ABSTIME unimplemented", wflags); @@ -353,7 +350,7 @@ orbis::ErrorCode orbis::umtx_cv_wait(Thread *thread, ptr cv, if (result == ErrorCode{}) { if (ut + 1 == 0) { while (true) { - result = ErrorCode{node->second.cv.wait(chain.mtx, ut)}; + result = orbis::toErrorCode(node->second.cv.wait(chain.mtx, ut)); if (result != ErrorCode{} || node->second.thr != thread) { break; } @@ -362,7 +359,8 @@ orbis::ErrorCode orbis::umtx_cv_wait(Thread *thread, ptr cv, auto start = std::chrono::steady_clock::now(); std::uint64_t udiff = 0; while (true) { - result = ErrorCode{node->second.cv.wait(chain.mtx, ut - udiff)}; + result = + orbis::toErrorCode(node->second.cv.wait(chain.mtx, ut - udiff)); if (node->second.thr != thread) { break; } @@ -457,7 +455,7 @@ orbis::ErrorCode orbis::umtx_rw_rdlock(Thread *thread, ptr rwlock, if (ut + 1 == 0) { while (true) { - result = ErrorCode{node->second.cv.wait(chain.mtx, ut)}; + result = orbis::toErrorCode(node->second.cv.wait(chain.mtx, ut)); if (result != ErrorCode{} || node->second.thr != thread) { break; } @@ -466,7 +464,8 @@ orbis::ErrorCode orbis::umtx_rw_rdlock(Thread *thread, ptr rwlock, auto start = std::chrono::steady_clock::now(); std::uint64_t udiff = 0; while (true) { - result = ErrorCode{node->second.cv.wait(chain.mtx, ut - udiff)}; + result = + orbis::toErrorCode(node->second.cv.wait(chain.mtx, ut - udiff)); if (node->second.thr != thread) break; udiff = std::chrono::duration_cast( @@ -557,7 +556,7 @@ orbis::ErrorCode orbis::umtx_rw_wrlock(Thread *thread, ptr rwlock, if (ut + 1 == 0) { while (true) { - error = ErrorCode{node->second.cv.wait(chain.mtx, ut)}; + error = orbis::toErrorCode(node->second.cv.wait(chain.mtx, ut)); if (error != ErrorCode{} || node->second.thr != thread) { break; } @@ -566,7 +565,8 @@ orbis::ErrorCode orbis::umtx_rw_wrlock(Thread *thread, ptr rwlock, auto start = std::chrono::steady_clock::now(); std::uint64_t udiff = 0; while (true) { - error = ErrorCode{node->second.cv.wait(chain.mtx, ut - udiff)}; + error = + orbis::toErrorCode(node->second.cv.wait(chain.mtx, ut - udiff)); if (node->second.thr != thread) break; udiff = std::chrono::duration_cast( @@ -729,7 +729,7 @@ orbis::ErrorCode orbis::umtx_sem_wait(Thread *thread, ptr sem, if (!sem->count) { if (ut + 1 == 0) { while (true) { - result = ErrorCode{node->second.cv.wait(chain.mtx, ut)}; + result = orbis::toErrorCode(node->second.cv.wait(chain.mtx, ut)); if (result != ErrorCode{} || node->second.thr != thread) break; } @@ -737,7 +737,8 @@ orbis::ErrorCode orbis::umtx_sem_wait(Thread *thread, ptr sem, auto start = std::chrono::steady_clock::now(); std::uint64_t udiff = 0; while (true) { - result = ErrorCode{node->second.cv.wait(chain.mtx, ut - udiff)}; + result = + orbis::toErrorCode(node->second.cv.wait(chain.mtx, ut - udiff)); if (node->second.thr != thread) break; udiff = std::chrono::duration_cast( diff --git a/orbis-kernel/src/utils/SharedAtomic.cpp b/orbis-kernel/src/utils/SharedAtomic.cpp new file mode 100644 index 00000000..5bb218fd --- /dev/null +++ b/orbis-kernel/src/utils/SharedAtomic.cpp @@ -0,0 +1,95 @@ +#include "utils/SharedAtomic.hpp" +using namespace orbis; + +#ifdef ORBIS_HAS_FUTEX +#include + +std::errc shared_atomic32::wait_impl(std::uint32_t oldValue, + std::chrono::microseconds usec_timeout) { + auto usec_timeout_count = usec_timeout.count(); + struct timespec timeout{}; + timeout.tv_nsec = (usec_timeout_count % 1000'000) * 1000; + timeout.tv_sec = (usec_timeout_count / 1000'000); + + int result = syscall( + SYS_futex, this, FUTEX_WAIT, oldValue, + usec_timeout != std::chrono::microseconds::max() ? &timeout : nullptr, 0, + 0); + + if (result < 0) { + return static_cast(errno); + } + + return {}; +} + +int shared_atomic32::notify_n(int count) const { + return syscall(SYS_futex, this, FUTEX_WAKE, count); +} +#elif defined(ORBIS_HAS_ULOCK) +#include + +#define UL_COMPARE_AND_WAIT 1 +#define UL_UNFAIR_LOCK 2 +#define UL_COMPARE_AND_WAIT_SHARED 3 +#define UL_UNFAIR_LOCK64_SHARED 4 +#define UL_COMPARE_AND_WAIT64 5 +#define UL_COMPARE_AND_WAIT64_SHARED 6 + +#define ULF_WAKE_ALL 0x00000100 +#define ULF_WAKE_THREAD 0x00000200 +#define ULF_WAKE_ALLOW_NON_OWNER 0x00000400 + +#define ULF_WAIT_WORKQ_DATA_CONTENTION 0x00010000 +#define ULF_WAIT_CANCEL_POINT 0x00020000 +#define ULF_WAIT_ADAPTIVE_SPIN 0x00040000 + +#define ULF_NO_ERRNO 0x01000000 + +#define UL_OPCODE_MASK 0x000000FF +#define UL_FLAGS_MASK 0xFFFFFF00 +#define ULF_GENERIC_MASK 0xFFFF0000 + +extern int __ulock_wait(uint32_t operation, void *addr, uint64_t value, + uint32_t timeout); +extern int __ulock_wake(uint32_t operation, void *addr, uint64_t wake_value); + +std::errc shared_atomic32::wait_impl(std::uint32_t oldValue, + std::chrono::microseconds usec_timeout) { + int result = __ulock_wait(UL_COMPARE_AND_WAIT_SHARED, (void *)this, oldValue, + usec_timeout.count()); + + if (result < 0) { + return static_cast(errno); + } + + return {}; +} + +int shared_atomic32::notify_n(int count) const { + int result = 0; + uint32_t operation = UL_COMPARE_AND_WAIT_SHARED | ULF_NO_ERRNO; + if (count == 1) { + result = __ulock_wake(operation, (void *)this, 0); + } else if (count == std::numeric_limits::max()) { + result = __ulock_wake(ULF_WAKE_ALL | operation, (void *)this, 0); + } else { + for (int i = 0; i < count; ++i) { + auto ret = __ulock_wake(operation, (void *)this, 0); + if (ret != 0) { + if (result == 0) { + result = ret; + } + + break; + } + + result++; + } + } + + return result; +} +#else +#error Unimplemented atomic for this platform +#endif diff --git a/orbis-kernel/src/utils/SharedCV.cpp b/orbis-kernel/src/utils/SharedCV.cpp index a73bb22a..97ac34d1 100644 --- a/orbis-kernel/src/utils/SharedCV.cpp +++ b/orbis-kernel/src/utils/SharedCV.cpp @@ -1,35 +1,30 @@ #include "orbis/utils/SharedCV.hpp" -#include "orbis/utils/Logs.hpp" +#include + +#ifdef ORBIS_HAS_FUTEX #include #include #include +#endif namespace orbis::utils { -int shared_cv::impl_wait(shared_mutex &mutex, unsigned _val, - std::uint64_t usec_timeout) noexcept { +std::errc shared_cv::impl_wait(shared_mutex &mutex, unsigned _val, + std::uint64_t usec_timeout) noexcept { // Not supposed to fail if (!_val) { std::abort(); } - // Wait with timeout - struct timespec timeout {}; - timeout.tv_nsec = (usec_timeout % 1000'000) * 1000; - timeout.tv_sec = (usec_timeout / 1000'000); - - int result = 0; + std::errc result = {}; while (true) { - result = syscall(SYS_futex, &m_value, FUTEX_WAIT, _val, - usec_timeout + 1 ? &timeout : nullptr, 0, 0); - if (result < 0) { - result = errno; - } + result = m_value.wait(_val, std::chrono::microseconds(usec_timeout)); // Cleanup - const auto old = atomic_fetch_op(m_value, [&](unsigned &value) { + const auto old = m_value.fetch_op([&](unsigned &value) { // Remove waiter if no signals - if (!(value & ~c_waiter_mask) && result != EAGAIN) { + if (!(value & ~c_waiter_mask) && + result != std::errc::resource_unavailable_try_again) { value -= 1; } @@ -38,23 +33,32 @@ int shared_cv::impl_wait(shared_mutex &mutex, unsigned _val, value -= c_signal_one; } +#ifdef ORBIS_HAS_FUTEX if (value & c_locked_mask) { value -= c_locked_mask; } +#endif }); +#ifdef ORBIS_HAS_FUTEX // Lock is already acquired if (old & c_locked_mask) { - return 0; + return {}; } // Wait directly (waiter has been added) if (old & c_signal_mask) { return mutex.impl_wait(); } +#else + if (old & c_signal_mask) { + result = {}; + break; + } +#endif // Possibly spurious wakeup - if (result != EAGAIN) { + if (result != std::errc::resource_unavailable_try_again) { break; } @@ -66,27 +70,69 @@ int shared_cv::impl_wait(shared_mutex &mutex, unsigned _val, } void shared_cv::impl_wake(shared_mutex &mutex, int _count) noexcept { - unsigned _old = m_value.load(); - const bool is_one = _count == 1; +#ifdef ORBIS_HAS_FUTEX + while (true) { + unsigned _old = m_value.load(); + const bool is_one = _count == 1; + + // Enqueue _count waiters + _count = std::min(_count, _old & c_waiter_mask); + if (_count <= 0) + return; + + // Try to lock the mutex + const bool locked = mutex.lock_forced(_count); + + const int max_sig = m_value.op([&](unsigned &value) { + // Verify the number of waiters + int max_sig = std::min(_count, value & c_waiter_mask); + + // Add lock signal (mutex was immediately locked) + if (locked && max_sig) + value |= c_locked_mask; + else if (locked) + std::abort(); + + // Add normal signals + value += c_signal_one * max_sig; + + // Remove waiters + value -= max_sig; + _old = value; + return max_sig; + }); - // Enqueue _count waiters + if (max_sig < _count) { + // Fixup mutex + mutex.lock_forced(max_sig - _count); + _count = max_sig; + } + + if (_count) { + // Wake up one thread + requeue remaining waiters + unsigned awake_count = locked ? 1 : 0; + if (auto r = syscall(SYS_futex, &m_value, FUTEX_REQUEUE, awake_count, + _count - awake_count, &mutex, 0); + r < _count) { + // Keep awaking waiters + _count = is_one ? 1 : INT_MAX; + continue; + } + } + + break; + } +#else + unsigned _old = m_value.load(); _count = std::min(_count, _old & c_waiter_mask); if (_count <= 0) return; - // Try to lock the mutex - const bool locked = mutex.lock_forced(_count); + mutex.lock_forced(1); - const int max_sig = atomic_op(m_value, [&](unsigned &value) { - // Verify the number of waiters + const int wakeupWaiters = m_value.op([&](unsigned &value) { int max_sig = std::min(_count, value & c_waiter_mask); - // Add lock signal (mutex was immediately locked) - if (locked && max_sig) - value |= c_locked_mask; - else if (locked) - std::abort(); - // Add normal signals value += c_signal_one * max_sig; @@ -96,21 +142,11 @@ void shared_cv::impl_wake(shared_mutex &mutex, int _count) noexcept { return max_sig; }); - if (max_sig < _count) { - // Fixup mutex - mutex.lock_forced(max_sig - _count); - _count = max_sig; + if (wakeupWaiters > 0) { + m_value.notify_n(wakeupWaiters); } - if (_count) { - // Wake up one thread + requeue remaining waiters - unsigned awake_count = locked ? 1 : 0; - if (auto r = syscall(SYS_futex, &m_value, FUTEX_REQUEUE, awake_count, - _count - awake_count, &mutex, 0); - r < _count) { - // Keep awaking waiters - return impl_wake(mutex, is_one ? 1 : INT_MAX); - } - } + mutex.unlock(); +#endif } } // namespace orbis::utils diff --git a/orbis-kernel/src/utils/SharedMutex.cpp b/orbis-kernel/src/utils/SharedMutex.cpp index 0cd345bb..af88cde7 100644 --- a/orbis-kernel/src/utils/SharedMutex.cpp +++ b/orbis-kernel/src/utils/SharedMutex.cpp @@ -1,6 +1,5 @@ #include "utils/SharedMutex.hpp" #include "utils/Logs.hpp" -#include #include #include #include @@ -48,7 +47,7 @@ void shared_mutex::impl_lock_shared(unsigned val) { if ((old % c_sig) + c_one >= c_sig) std::abort(); // "shared_mutex overflow" - while (impl_wait() != 0) {} + while (impl_wait() != std::errc{}) {} lock_downgrade(); } void shared_mutex::impl_unlock_shared(unsigned old) { @@ -60,9 +59,9 @@ void shared_mutex::impl_unlock_shared(unsigned old) { impl_signal(); } } -int shared_mutex::impl_wait() { +std::errc shared_mutex::impl_wait() { while (true) { - const auto [old, ok] = atomic_fetch_op(m_value, [](unsigned &value) { + const auto [old, ok] = m_value.fetch_op([](unsigned &value) { if (value >= c_sig) { value -= c_sig; return true; @@ -75,19 +74,17 @@ int shared_mutex::impl_wait() { break; } - int result = syscall(SYS_futex, &m_value, FUTEX_WAIT, old, 0, 0, 0); - if (result < 0) { - result = errno; - } - if (result == EINTR) { - return EINTR; + auto result = m_value.wait(old); + if (result == std::errc::interrupted) { + return result; } } + return{}; } void shared_mutex::impl_signal() { m_value += c_sig; - syscall(SYS_futex, &m_value, FUTEX_WAKE, 1, 0, 0, 0); + m_value.notify_one(); } void shared_mutex::impl_lock(unsigned val) { if (val >= c_err) @@ -123,7 +120,7 @@ void shared_mutex::impl_lock(unsigned val) { if ((old % c_sig) + c_one >= c_sig) std::abort(); // "shared_mutex overflow" - while (impl_wait() != 0) {} + while (impl_wait() != std::errc{}) {} } void shared_mutex::impl_unlock(unsigned old) { if (old - c_one >= c_err) @@ -155,27 +152,23 @@ void shared_mutex::impl_lock_upgrade() { return; } - while (impl_wait() != 0) {} + while (impl_wait() != std::errc{}) {} } bool shared_mutex::lock_forced(int count) { if (count == 0) return false; if (count > 0) { // Lock - return atomic_op(m_value, [&](unsigned &v) { + return m_value.op([&](std::uint32_t &v) { if (v & c_sig) { v -= c_sig; v += c_one * count; return true; } - if (v == 0) { - v += c_one * count; - return true; - } - + bool firstLock = v == 0; v += c_one * count; - return false; + return firstLock; }); }