From 0a012f6bb33fab96b825f67eadd85b042fb6c302 Mon Sep 17 00:00:00 2001 From: Bob Chen Date: Sun, 19 Nov 2023 22:14:15 +0800 Subject: [PATCH] fix io_uring cancel_wait --- .github/workflows/ci.linux.arm.yml | 2 +- .github/workflows/ci.linux.x86.yml | 2 +- .github/workflows/ci.macos.arm.yml | 2 +- .github/workflows/ci.macos.yml | 2 +- CMake/build-from-src.cmake | 2 +- io/iouring-wrapper.cpp | 102 ++++++++++-------------- thread/test/CMakeLists.txt | 6 +- thread/test/test-multi-vcpu-locking.cpp | 93 +++++++++++++++++++++ 8 files changed, 146 insertions(+), 65 deletions(-) create mode 100644 thread/test/test-multi-vcpu-locking.cpp diff --git a/.github/workflows/ci.linux.arm.yml b/.github/workflows/ci.linux.arm.yml index 68b3656c..4b903cf8 100644 --- a/.github/workflows/ci.linux.arm.yml +++ b/.github/workflows/ci.linux.arm.yml @@ -4,7 +4,7 @@ on: push: branches: [ "main", "release/*" ] pull_request: - branches: [ "release/*" ] + branches: [ "main", "release/*" ] jobs: centos8-gcc921-epoll-release: diff --git a/.github/workflows/ci.linux.x86.yml b/.github/workflows/ci.linux.x86.yml index 974cf2ff..cf083d05 100644 --- a/.github/workflows/ci.linux.x86.yml +++ b/.github/workflows/ci.linux.x86.yml @@ -4,7 +4,7 @@ on: push: branches: [ "main", "release/*" ] pull_request: - branches: [ "release/*" ] + branches: [ "main", "release/*" ] jobs: centos8-gcc921-epoll-release: diff --git a/.github/workflows/ci.macos.arm.yml b/.github/workflows/ci.macos.arm.yml index 271ad498..c8e17718 100644 --- a/.github/workflows/ci.macos.arm.yml +++ b/.github/workflows/ci.macos.arm.yml @@ -4,7 +4,7 @@ on: push: branches: [ "main", "release/*" ] pull_request: - branches: [ "release/*" ] + branches: [ "main", "release/*" ] jobs: macOS-clang-debug: diff --git a/.github/workflows/ci.macos.yml b/.github/workflows/ci.macos.yml index 33ce1e58..5e75d552 100644 --- a/.github/workflows/ci.macos.yml +++ b/.github/workflows/ci.macos.yml @@ -4,7 +4,7 @@ on: push: branches: [ "main", "release/*" ] pull_request: - branches: [ "release/*" ] + branches: [ "main", "release/*" ] jobs: macOS-12-Monterey-debug: diff --git a/CMake/build-from-src.cmake b/CMake/build-from-src.cmake index ee9fcca7..16f455ea 100644 --- a/CMake/build-from-src.cmake +++ b/CMake/build-from-src.cmake @@ -83,7 +83,7 @@ function(build_from_src [dep]) URL_MD5 bad68bb6bd9908da75e2c8dedc536b29 BUILD_IN_SOURCE ON CONFIGURE_COMMAND ./config -fPIC no-unit-test no-shared --openssldir=${BINARY_DIR} --prefix=${BINARY_DIR} - BUILD_COMMAND make depend -j ${NumCPU} && make -j ${NumCPU} + BUILD_COMMAND make depend -j 4 && make -j 4 # Not using ${NumCPU}. Too may parallel might fail INSTALL_COMMAND make install ) ExternalProject_Get_Property(openssl SOURCE_DIR) diff --git a/io/iouring-wrapper.cpp b/io/iouring-wrapper.cpp index 2dfa4274..d2e179ca 100644 --- a/io/iouring-wrapper.cpp +++ b/io/iouring-wrapper.cpp @@ -40,8 +40,7 @@ limitations under the License. namespace photon { -constexpr static EventsMap> - evmap; +constexpr static EventsMap> evmap; class iouringEngine : public MasterEventEngine, public CascadingEventEngine { public: @@ -49,19 +48,13 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine { ~iouringEngine() { LOG_INFO("Finish event engine: iouring ", VALUE(m_master)); - if (m_cancel_poller != nullptr) { - m_cancel_poller_running = false; - thread_interrupt(m_cancel_poller); - thread_join((join_handle*) m_cancel_poller); - } - if (m_cancel_fd >= 0) { - close(m_cancel_fd); - } - if (m_cascading_event_fd >= 0) { + if (m_eventfd >= 0 && !m_master) { if (io_uring_unregister_eventfd(m_ring) != 0) { LOG_ERROR("iouring: failed to unregister cascading event fd"); } - close(m_cascading_event_fd); + } + if (m_eventfd >= 0) { + close(m_eventfd); } if (m_ring != nullptr) { io_uring_queue_exit(m_ring); @@ -128,22 +121,25 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine { } } + m_eventfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + if (m_eventfd < 0) { + LOG_ERRNO_RETURN(0, -1, "iouring: failed to create eventfd"); + } + if (m_master) { - // Setup a cancel poller to watch on master engine - m_cancel_fd = eventfd(0, EFD_CLOEXEC); - if (m_cancel_fd < 0) { - LOG_ERRNO_RETURN(0, -1, "iouring: failed to create eventfd"); + // Setup a multishot poll on master engine to watch the cancel_wait + uint32_t poll_mask = evmap.translate_bitwisely(EVENT_READ); + auto sqe = _get_sqe(); + if (!sqe) return -1; + io_uring_prep_poll_multishot(sqe, m_eventfd, poll_mask); + io_uring_sqe_set_data(sqe, this); + ret = io_uring_submit(m_ring); + if (ret <= 0) { + LOG_ERROR_RETURN(0, -1, "iouring: fail to submit multishot poll, ", ERRNO(-ret)); } - m_cancel_poller = thread_create11(64 * 1024, &iouringEngine::run_cancel_poller, this); - thread_enable_join(m_cancel_poller); - } else { - // Register an event fd for cascading engine - m_cascading_event_fd = eventfd(0, EFD_CLOEXEC); - if (m_cascading_event_fd < 0) { - LOG_ERRNO_RETURN(0, -1, "iouring: failed to create cascading event fd"); - } - if (io_uring_register_eventfd(m_ring, m_cascading_event_fd) != 0) { + // Register cascading engine to eventfd + if (io_uring_register_eventfd(m_ring, m_eventfd) != 0) { LOG_ERRNO_RETURN(0, -1, "iouring: failed to register cascading event fd"); } } @@ -155,7 +151,7 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine { * later in the `wait_and_fire_events`. * @param timeout Timeout in usec. It could be 0 (immediate cancel), and -1 (most efficient way, no linked SQE). * Note that the cancelling has no guarantee to succeed, it's just an attempt. - * @param ring_flags The lowest 8 bits is for sqe.flags, and the rest is reserved. + * @param ring_flags The lowest 8 bits is for sqe.flags. The rest is reserved. * @retval Non negative integers for success, -1 for failure. If failed with timeout, errno will * be set to ETIMEDOUT. If failed because of external interruption, errno will also be set accordingly. */ @@ -165,15 +161,15 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine { if (sqe == nullptr) return -1; prep(sqe, args...); - sqe->flags |= (uint8_t) (ring_flags & 0xff); - return _async_io(sqe, timeout); + return _async_io(sqe, timeout, ring_flags); } - int32_t _async_io(io_uring_sqe* sqe, uint64_t timeout) { - ioCtx io_ctx{photon::CURRENT, -1, false, false}; + int32_t _async_io(io_uring_sqe* sqe, uint64_t timeout, uint32_t ring_flags) { + sqe->flags |= (uint8_t) (ring_flags & 0xff); + ioCtx io_ctx(false, false); io_uring_sqe_set_data(sqe, &io_ctx); - ioCtx timer_ctx{photon::CURRENT, -1, true, false}; + ioCtx timer_ctx(true, false); __kernel_timespec ts{}; if (timeout < std::numeric_limits::max()) { sqe->flags |= IOSQE_IO_LINK; @@ -202,7 +198,7 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine { sqe = _get_sqe(); if (sqe == nullptr) return -1; - ioCtx cancel_ctx{CURRENT, -1, true, false}; + ioCtx cancel_ctx(true, false); io_uring_prep_cancel(sqe, &io_ctx, 0); io_uring_sqe_set_data(sqe, &cancel_ctx); photon::thread_sleep(-1); @@ -231,7 +227,7 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine { bool one_shot = e.interests & ONE_SHOT; fdInterest fd_interest{e.fd, (uint32_t)evmap.translate_bitwisely(e.interests)}; - ioCtx io_ctx{CURRENT, -1, false, true}; + ioCtx io_ctx(false, true); eventCtx event_ctx{e, one_shot, io_ctx}; auto pair = m_event_contexts.insert({fd_interest, event_ctx}); if (!pair.second) { @@ -265,12 +261,12 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine { ssize_t wait_for_events(void** data, size_t count, uint64_t timeout = -1) override { // Use master engine to wait for self event fd - int ret = get_vcpu()->master_event_engine->wait_for_fd_readable(m_cascading_event_fd, timeout); + int ret = get_vcpu()->master_event_engine->wait_for_fd_readable(m_eventfd, timeout); if (ret < 0) { return errno == ETIMEDOUT ? 0 : -1; } uint64_t value = 0; - if (eventfd_read(m_cascading_event_fd, &value)) { + if (eventfd_read(m_eventfd, &value)) { LOG_ERROR("iouring: error reading cascading event fd, `", ERRNO()); } @@ -334,6 +330,13 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine { continue; } + if (ctx == (ioCtx*) this) { + // Triggered by cancel_wait + eventfd_t val; + eventfd_read(m_eventfd, &val); + continue; + } + if (cqe->flags & IORING_CQE_F_NOTIF) { // The cqe for notify, corresponding to IORING_CQE_F_MORE if (unlikely(cqe->res != 0)) @@ -367,7 +370,7 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine { } int cancel_wait() override { - if (eventfd_write(m_cancel_fd, 1) != 0) { + if (eventfd_write(m_eventfd, 1) != 0) { LOG_ERRNO_RETURN(0, -1, "iouring: write eventfd failed"); } return 0; @@ -399,8 +402,9 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine { private: struct ioCtx { - photon::thread* th_id; - int32_t res; + ioCtx(bool canceller, bool event) : is_canceller(canceller), is_event(event) {} + photon::thread* th_id = photon::CURRENT; + int32_t res = -1; bool is_canceller; bool is_event; }; @@ -501,23 +505,6 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine { } } - void run_cancel_poller() { - while (m_cancel_poller_running) { - uint32_t poll_mask = evmap.translate_bitwisely(EVENT_READ); - int ret = async_io(&io_uring_prep_poll_add, -1, 0, m_cancel_fd, poll_mask); - if (ret < 0) { - if (errno == EINTR) { - break; - } - LOG_ERROR("iouring: poll eventfd failed, `", ERRNO()); - } - eventfd_t val; - if (eventfd_read(m_cancel_fd, &val) != 0) { - LOG_ERROR("iouring: read eventfd failed, `", ERRNO()); - } - } - } - static void usec_to_timespec(int64_t usec, __kernel_timespec* ts) { int64_t usec_rounded_to_sec = usec / 1000000L * 1000000L; long long nsec = (usec - usec_rounded_to_sec) * 1000L; @@ -533,11 +520,8 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine { static const int REGISTER_FILES_SPARSE_FD = -1; static const int REGISTER_FILES_MAX_NUM = 10000; bool m_master; - int m_cascading_event_fd = -1; io_uring* m_ring = nullptr; - int m_cancel_fd = -1; - thread* m_cancel_poller = nullptr; - bool m_cancel_poller_running = true; + int m_eventfd = -1; std::unordered_map m_event_contexts; static int m_register_files_flag; static int m_cooperative_task_flag; diff --git a/thread/test/CMakeLists.txt b/thread/test/CMakeLists.txt index a1847f54..44634a93 100644 --- a/thread/test/CMakeLists.txt +++ b/thread/test/CMakeLists.txt @@ -30,4 +30,8 @@ add_test(NAME test-tls-order-photon COMMAND $ add_executable(test-lib-data test-lib-data.cpp) target_link_libraries(test-lib-data PRIVATE photon_shared) -add_test(NAME test-lib-data COMMAND $) \ No newline at end of file +add_test(NAME test-lib-data COMMAND $) + +add_executable(test-multi-vcpu-locking test-multi-vcpu-locking.cpp) +target_link_libraries(test-multi-vcpu-locking PRIVATE photon_static) +add_test(NAME test-multi-vcpu-locking COMMAND $) \ No newline at end of file diff --git a/thread/test/test-multi-vcpu-locking.cpp b/thread/test/test-multi-vcpu-locking.cpp new file mode 100644 index 00000000..86456812 --- /dev/null +++ b/thread/test/test-multi-vcpu-locking.cpp @@ -0,0 +1,93 @@ +/* +Copyright 2022 The Photon Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include +#include +#include +#include +#include +#include +#include +#include + +static constexpr int nMutexes = 4; +static constexpr int nWorkers = 4; +static constexpr int nThreads = 32; // Increase if necessary +static constexpr int testTimeSeconds = 60; + +photon_std::mutex mutexes[nMutexes]; +std::atomic acquisitionCounters[nMutexes]; +std::atomic acquisitionDurations[nMutexes]; +bool running = true; + +static void timedLock(photon_std::mutex& mutex, int index) { + long long durationMicros; + auto start = std::chrono::steady_clock::now(); + { + photon_std::lock_guard lock(mutex); + acquisitionCounters[index].fetch_add(1); + auto end = std::chrono::steady_clock::now(); + durationMicros = std::chrono::duration_cast(end - start).count(); + acquisitionDurations[index].fetch_add(durationMicros); + } + if (durationMicros > 1'000'000) { + LOG_ERROR("long acquisition. mutex `, duration: `ms", index, durationMicros / 1000); + std::abort(); + } +} + +static void myThread(int tid) { + LOG_INFO("thread ` starting", tid); + while (running) { + for (int i = 0; i < nMutexes; i++) { + timedLock(mutexes[i], i); + } + } +} + +TEST(multi_vcpu_locking, long_time_acquisition_should_abort) { + int ret = photon::init(photon::INIT_EVENT_DEFAULT, photon::INIT_IO_NONE); + GTEST_ASSERT_EQ(0, ret); + DEFER(photon::fini()); + + ret = photon_std::work_pool_init(nWorkers, photon::INIT_EVENT_DEFAULT, photon::INIT_IO_NONE); + GTEST_ASSERT_EQ(0, ret); + DEFER(photon_std::work_pool_fini()); + + std::vector threads; + for (int i = 0; i < nThreads; i++) { + threads.emplace_back(myThread, i); + } + + for (int i = 0; i < testTimeSeconds; ++i) { + for (int j = 0; j < nMutexes; j++) { + auto count = acquisitionCounters[j].load(); + auto durationMs = acquisitionDurations[j].load() / 1000; + LOG_INFO("mutex `: total acquisitions: `, total wait time: `ms", j, count, durationMs); + } + photon_std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + running = false; + for (auto& th: threads) { + th.join(); + } +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file