Skip to content

Commit

Permalink
fix io_uring cancel_wait
Browse files Browse the repository at this point in the history
  • Loading branch information
beef9999 committed Nov 20, 2023
1 parent fd5b957 commit 0a012f6
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 65 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.linux.arm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
push:
branches: [ "main", "release/*" ]
pull_request:
branches: [ "release/*" ]
branches: [ "main", "release/*" ]

jobs:
centos8-gcc921-epoll-release:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.linux.x86.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
push:
branches: [ "main", "release/*" ]
pull_request:
branches: [ "release/*" ]
branches: [ "main", "release/*" ]

jobs:
centos8-gcc921-epoll-release:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.macos.arm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
push:
branches: [ "main", "release/*" ]
pull_request:
branches: [ "release/*" ]
branches: [ "main", "release/*" ]

jobs:
macOS-clang-debug:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
push:
branches: [ "main", "release/*" ]
pull_request:
branches: [ "release/*" ]
branches: [ "main", "release/*" ]

jobs:
macOS-12-Monterey-debug:
Expand Down
2 changes: 1 addition & 1 deletion CMake/build-from-src.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ function(build_from_src [dep])
URL_MD5 bad68bb6bd9908da75e2c8dedc536b29
BUILD_IN_SOURCE ON
CONFIGURE_COMMAND ./config -fPIC no-unit-test no-shared --openssldir=${BINARY_DIR} --prefix=${BINARY_DIR}
BUILD_COMMAND make depend -j ${NumCPU} && make -j ${NumCPU}
BUILD_COMMAND make depend -j 4 && make -j 4 # Not using ${NumCPU}. Too may parallel might fail
INSTALL_COMMAND make install
)
ExternalProject_Get_Property(openssl SOURCE_DIR)
Expand Down
102 changes: 43 additions & 59 deletions io/iouring-wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,28 +40,21 @@ limitations under the License.

namespace photon {

constexpr static EventsMap<EVUnderlay<POLLIN | POLLRDHUP, POLLOUT, POLLERR>>
evmap;
constexpr static EventsMap<EVUnderlay<POLLIN | POLLRDHUP, POLLOUT, POLLERR>> evmap;

class iouringEngine : public MasterEventEngine, public CascadingEventEngine {
public:
explicit iouringEngine(bool master) : m_master(master) {}

~iouringEngine() {
LOG_INFO("Finish event engine: iouring ", VALUE(m_master));
if (m_cancel_poller != nullptr) {
m_cancel_poller_running = false;
thread_interrupt(m_cancel_poller);
thread_join((join_handle*) m_cancel_poller);
}
if (m_cancel_fd >= 0) {
close(m_cancel_fd);
}
if (m_cascading_event_fd >= 0) {
if (m_eventfd >= 0 && !m_master) {
if (io_uring_unregister_eventfd(m_ring) != 0) {
LOG_ERROR("iouring: failed to unregister cascading event fd");
}
close(m_cascading_event_fd);
}
if (m_eventfd >= 0) {
close(m_eventfd);
}
if (m_ring != nullptr) {
io_uring_queue_exit(m_ring);
Expand Down Expand Up @@ -128,22 +121,25 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine {
}
}

m_eventfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (m_eventfd < 0) {
LOG_ERRNO_RETURN(0, -1, "iouring: failed to create eventfd");
}

if (m_master) {
// Setup a cancel poller to watch on master engine
m_cancel_fd = eventfd(0, EFD_CLOEXEC);
if (m_cancel_fd < 0) {
LOG_ERRNO_RETURN(0, -1, "iouring: failed to create eventfd");
// Setup a multishot poll on master engine to watch the cancel_wait
uint32_t poll_mask = evmap.translate_bitwisely(EVENT_READ);
auto sqe = _get_sqe();
if (!sqe) return -1;
io_uring_prep_poll_multishot(sqe, m_eventfd, poll_mask);
io_uring_sqe_set_data(sqe, this);
ret = io_uring_submit(m_ring);
if (ret <= 0) {
LOG_ERROR_RETURN(0, -1, "iouring: fail to submit multishot poll, ", ERRNO(-ret));
}
m_cancel_poller = thread_create11(64 * 1024, &iouringEngine::run_cancel_poller, this);
thread_enable_join(m_cancel_poller);

} else {
// Register an event fd for cascading engine
m_cascading_event_fd = eventfd(0, EFD_CLOEXEC);
if (m_cascading_event_fd < 0) {
LOG_ERRNO_RETURN(0, -1, "iouring: failed to create cascading event fd");
}
if (io_uring_register_eventfd(m_ring, m_cascading_event_fd) != 0) {
// Register cascading engine to eventfd
if (io_uring_register_eventfd(m_ring, m_eventfd) != 0) {
LOG_ERRNO_RETURN(0, -1, "iouring: failed to register cascading event fd");
}
}
Expand All @@ -155,7 +151,7 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine {
* later in the `wait_and_fire_events`.
* @param timeout Timeout in usec. It could be 0 (immediate cancel), and -1 (most efficient way, no linked SQE).
* Note that the cancelling has no guarantee to succeed, it's just an attempt.
* @param ring_flags The lowest 8 bits is for sqe.flags, and the rest is reserved.
* @param ring_flags The lowest 8 bits is for sqe.flags. The rest is reserved.
* @retval Non negative integers for success, -1 for failure. If failed with timeout, errno will
* be set to ETIMEDOUT. If failed because of external interruption, errno will also be set accordingly.
*/
Expand All @@ -165,15 +161,15 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine {
if (sqe == nullptr)
return -1;
prep(sqe, args...);
sqe->flags |= (uint8_t) (ring_flags & 0xff);
return _async_io(sqe, timeout);
return _async_io(sqe, timeout, ring_flags);
}

int32_t _async_io(io_uring_sqe* sqe, uint64_t timeout) {
ioCtx io_ctx{photon::CURRENT, -1, false, false};
int32_t _async_io(io_uring_sqe* sqe, uint64_t timeout, uint32_t ring_flags) {
sqe->flags |= (uint8_t) (ring_flags & 0xff);
ioCtx io_ctx(false, false);
io_uring_sqe_set_data(sqe, &io_ctx);

ioCtx timer_ctx{photon::CURRENT, -1, true, false};
ioCtx timer_ctx(true, false);
__kernel_timespec ts{};
if (timeout < std::numeric_limits<int64_t>::max()) {
sqe->flags |= IOSQE_IO_LINK;
Expand Down Expand Up @@ -202,7 +198,7 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine {
sqe = _get_sqe();
if (sqe == nullptr)
return -1;
ioCtx cancel_ctx{CURRENT, -1, true, false};
ioCtx cancel_ctx(true, false);
io_uring_prep_cancel(sqe, &io_ctx, 0);
io_uring_sqe_set_data(sqe, &cancel_ctx);
photon::thread_sleep(-1);
Expand Down Expand Up @@ -231,7 +227,7 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine {

bool one_shot = e.interests & ONE_SHOT;
fdInterest fd_interest{e.fd, (uint32_t)evmap.translate_bitwisely(e.interests)};
ioCtx io_ctx{CURRENT, -1, false, true};
ioCtx io_ctx(false, true);
eventCtx event_ctx{e, one_shot, io_ctx};
auto pair = m_event_contexts.insert({fd_interest, event_ctx});
if (!pair.second) {
Expand Down Expand Up @@ -265,12 +261,12 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine {

ssize_t wait_for_events(void** data, size_t count, uint64_t timeout = -1) override {
// Use master engine to wait for self event fd
int ret = get_vcpu()->master_event_engine->wait_for_fd_readable(m_cascading_event_fd, timeout);
int ret = get_vcpu()->master_event_engine->wait_for_fd_readable(m_eventfd, timeout);
if (ret < 0) {
return errno == ETIMEDOUT ? 0 : -1;
}
uint64_t value = 0;
if (eventfd_read(m_cascading_event_fd, &value)) {
if (eventfd_read(m_eventfd, &value)) {
LOG_ERROR("iouring: error reading cascading event fd, `", ERRNO());
}

Expand Down Expand Up @@ -334,6 +330,13 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine {
continue;
}

if (ctx == (ioCtx*) this) {
// Triggered by cancel_wait
eventfd_t val;
eventfd_read(m_eventfd, &val);
continue;
}

if (cqe->flags & IORING_CQE_F_NOTIF) {
// The cqe for notify, corresponding to IORING_CQE_F_MORE
if (unlikely(cqe->res != 0))
Expand Down Expand Up @@ -367,7 +370,7 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine {
}

int cancel_wait() override {
if (eventfd_write(m_cancel_fd, 1) != 0) {
if (eventfd_write(m_eventfd, 1) != 0) {
LOG_ERRNO_RETURN(0, -1, "iouring: write eventfd failed");
}
return 0;
Expand Down Expand Up @@ -399,8 +402,9 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine {

private:
struct ioCtx {
photon::thread* th_id;
int32_t res;
ioCtx(bool canceller, bool event) : is_canceller(canceller), is_event(event) {}
photon::thread* th_id = photon::CURRENT;
int32_t res = -1;
bool is_canceller;
bool is_event;
};
Expand Down Expand Up @@ -501,23 +505,6 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine {
}
}

void run_cancel_poller() {
while (m_cancel_poller_running) {
uint32_t poll_mask = evmap.translate_bitwisely(EVENT_READ);
int ret = async_io(&io_uring_prep_poll_add, -1, 0, m_cancel_fd, poll_mask);
if (ret < 0) {
if (errno == EINTR) {
break;
}
LOG_ERROR("iouring: poll eventfd failed, `", ERRNO());
}
eventfd_t val;
if (eventfd_read(m_cancel_fd, &val) != 0) {
LOG_ERROR("iouring: read eventfd failed, `", ERRNO());
}
}
}

static void usec_to_timespec(int64_t usec, __kernel_timespec* ts) {
int64_t usec_rounded_to_sec = usec / 1000000L * 1000000L;
long long nsec = (usec - usec_rounded_to_sec) * 1000L;
Expand All @@ -533,11 +520,8 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine {
static const int REGISTER_FILES_SPARSE_FD = -1;
static const int REGISTER_FILES_MAX_NUM = 10000;
bool m_master;
int m_cascading_event_fd = -1;
io_uring* m_ring = nullptr;
int m_cancel_fd = -1;
thread* m_cancel_poller = nullptr;
bool m_cancel_poller_running = true;
int m_eventfd = -1;
std::unordered_map<fdInterest, eventCtx, fdInterestHasher> m_event_contexts;
static int m_register_files_flag;
static int m_cooperative_task_flag;
Expand Down
6 changes: 5 additions & 1 deletion thread/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,8 @@ add_test(NAME test-tls-order-photon COMMAND $<TARGET_FILE:test-tls-order-photon>

add_executable(test-lib-data test-lib-data.cpp)
target_link_libraries(test-lib-data PRIVATE photon_shared)
add_test(NAME test-lib-data COMMAND $<TARGET_FILE:test-lib-data>)
add_test(NAME test-lib-data COMMAND $<TARGET_FILE:test-lib-data>)

add_executable(test-multi-vcpu-locking test-multi-vcpu-locking.cpp)
target_link_libraries(test-multi-vcpu-locking PRIVATE photon_static)
add_test(NAME test-multi-vcpu-locking COMMAND $<TARGET_FILE:test-multi-vcpu-locking>)
93 changes: 93 additions & 0 deletions thread/test/test-multi-vcpu-locking.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
Copyright 2022 The Photon Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#include <atomic>
#include <chrono>
#include <vector>
#include <gtest/gtest.h>
#include <photon/photon.h>
#include <photon/thread/std-compat.h>
#include <photon/common/alog.h>
#include <photon/common/utility.h>

static constexpr int nMutexes = 4;
static constexpr int nWorkers = 4;
static constexpr int nThreads = 32; // Increase if necessary
static constexpr int testTimeSeconds = 60;

photon_std::mutex mutexes[nMutexes];
std::atomic<int64_t> acquisitionCounters[nMutexes];
std::atomic<int64_t> acquisitionDurations[nMutexes];
bool running = true;

static void timedLock(photon_std::mutex& mutex, int index) {
long long durationMicros;
auto start = std::chrono::steady_clock::now();
{
photon_std::lock_guard<photon_std::mutex> lock(mutex);
acquisitionCounters[index].fetch_add(1);
auto end = std::chrono::steady_clock::now();
durationMicros = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
acquisitionDurations[index].fetch_add(durationMicros);
}
if (durationMicros > 1'000'000) {
LOG_ERROR("long acquisition. mutex `, duration: `ms", index, durationMicros / 1000);
std::abort();
}
}

static void myThread(int tid) {
LOG_INFO("thread ` starting", tid);
while (running) {
for (int i = 0; i < nMutexes; i++) {
timedLock(mutexes[i], i);
}
}
}

TEST(multi_vcpu_locking, long_time_acquisition_should_abort) {
int ret = photon::init(photon::INIT_EVENT_DEFAULT, photon::INIT_IO_NONE);
GTEST_ASSERT_EQ(0, ret);
DEFER(photon::fini());

ret = photon_std::work_pool_init(nWorkers, photon::INIT_EVENT_DEFAULT, photon::INIT_IO_NONE);
GTEST_ASSERT_EQ(0, ret);
DEFER(photon_std::work_pool_fini());

std::vector<photon_std::thread> threads;
for (int i = 0; i < nThreads; i++) {
threads.emplace_back(myThread, i);
}

for (int i = 0; i < testTimeSeconds; ++i) {
for (int j = 0; j < nMutexes; j++) {
auto count = acquisitionCounters[j].load();
auto durationMs = acquisitionDurations[j].load() / 1000;
LOG_INFO("mutex `: total acquisitions: `, total wait time: `ms", j, count, durationMs);
}
photon_std::this_thread::sleep_for(std::chrono::seconds(1));
}

running = false;
for (auto& th: threads) {
th.join();
}
}

int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

0 comments on commit 0a012f6

Please sign in to comment.