From 1e28e85ac24714f03cc0f47f50610f93b56e783d Mon Sep 17 00:00:00 2001 From: Bob Chen Date: Fri, 24 Nov 2023 09:55:03 +0800 Subject: [PATCH] Fix cascading engine submit; Support running specified event engine in CI test (#273) --- .github/workflows/ci.linux.x86.yml | 1 + CMakeLists.txt | 4 +- io/fd-events.h | 1 + io/iouring-wrapper.cpp | 16 +- io/test/CMakeLists.txt | 2 +- io/test/test-iouring.cpp | 201 +++++++++++++++--------- test/ci-tools.cpp | 26 +++ test/ci-tools.h | 7 + thread/test/test-multi-vcpu-locking.cpp | 4 +- 9 files changed, 178 insertions(+), 84 deletions(-) create mode 100644 test/ci-tools.cpp create mode 100644 test/ci-tools.h diff --git a/.github/workflows/ci.linux.x86.yml b/.github/workflows/ci.linux.x86.yml index fce7d236..d19e377b 100644 --- a/.github/workflows/ci.linux.x86.yml +++ b/.github/workflows/ci.linux.x86.yml @@ -117,4 +117,5 @@ jobs: run: | cd build ulimit -l unlimited + export PHOTON_CI_EV_ENGINE=io_uring ctest --timeout 3600 -V diff --git a/CMakeLists.txt b/CMakeLists.txt index ed48b296..3d73dc91 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -329,8 +329,10 @@ endif () if (PHOTON_BUILD_TESTING) include(CTest) + add_library(ci-tools STATIC test/ci-tools.cpp) + include_directories(photon_static ${GFLAGS_INCLUDE_DIRS} ${GOOGLETEST_INCLUDE_DIRS}) - link_libraries(${GFLAGS_LIBRARIES} ${GOOGLETEST_LIBRARIES}) + link_libraries(${GFLAGS_LIBRARIES} ${GOOGLETEST_LIBRARIES} ci-tools) add_subdirectory(examples) add_subdirectory(common/checksum/test) diff --git a/io/fd-events.h b/io/fd-events.h index fb65f8a1..0bcd9986 100644 --- a/io/fd-events.h +++ b/io/fd-events.h @@ -111,6 +111,7 @@ class CascadingEventEngine { /** * @brief Wait for events, returns number of the arrived events, and their associated `data` + * @note This call will not return until timeout, if there had been no events. * @param[out] data * @return -1 for error, positive integer for the number of events, 0 for no events and should run it again * @warning Do NOT block vcpu diff --git a/io/iouring-wrapper.cpp b/io/iouring-wrapper.cpp index d2e179ca..7137788c 100644 --- a/io/iouring-wrapper.cpp +++ b/io/iouring-wrapper.cpp @@ -117,11 +117,11 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine { } ret = io_uring_register_files(m_ring, entries, REGISTER_FILES_MAX_NUM); if (ret != 0) { - LOG_ERROR_RETURN(EPERM, -1, "iouring: unable to register files, ", ERRNO(-ret)); + LOG_ERROR_RETURN(-ret, -1, "iouring: unable to register files, ", ERRNO(-ret)); } } - m_eventfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + m_eventfd = eventfd(0, EFD_CLOEXEC); if (m_eventfd < 0) { LOG_ERRNO_RETURN(0, -1, "iouring: failed to create eventfd"); } @@ -135,7 +135,7 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine { io_uring_sqe_set_data(sqe, this); ret = io_uring_submit(m_ring); if (ret <= 0) { - LOG_ERROR_RETURN(0, -1, "iouring: fail to submit multishot poll, ", ERRNO(-ret)); + LOG_ERROR_RETURN(-ret, -1, "iouring: fail to submit multishot poll, ", ERRNO(-ret)); } } else { // Register cascading engine to eventfd @@ -240,6 +240,10 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine { io_uring_prep_poll_multishot(sqe, fd_interest.fd, fd_interest.interest); } io_uring_sqe_set_data(sqe, &pair.first->second.io_ctx); + int ret = io_uring_submit(m_ring); + if (ret < 0) { + LOG_ERROR_RETURN(-ret, -1, "iouring: fail to submit when adding interest, ", ERRNO(-ret)); + } return 0; } @@ -256,6 +260,10 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine { io_uring_prep_poll_remove(sqe, (__u64) &iter->second.io_ctx); io_uring_sqe_set_data(sqe, nullptr); + int ret = io_uring_submit(m_ring); + if (ret < 0) { + LOG_ERROR_RETURN(-ret, -1, "iouring: fail to submit when removing interest, ", ERRNO(-ret)); + } return 0; } @@ -287,7 +295,7 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine { LOG_ERROR_RETURN(0, -1, "iouring: multi-shot poll got POLLERR"); } if (!ctx->is_event) { - LOG_ERROR_RETURN(0, -1, "iouring: only cascading engine need to handle event. Must be a bug...") + LOG_ERROR_RETURN(0, -1, "iouring: cascading engine only needs to handle event. Must be a bug...") } eventCtx* event_ctx = container_of(ctx, eventCtx, io_ctx); fdInterest fd_interest{event_ctx->event.fd, (uint32_t)evmap.translate_bitwisely(event_ctx->event.interests)}; diff --git a/io/test/CMakeLists.txt b/io/test/CMakeLists.txt index eea05c43..5a3ff5d6 100644 --- a/io/test/CMakeLists.txt +++ b/io/test/CMakeLists.txt @@ -14,6 +14,6 @@ target_link_libraries(test-syncio PRIVATE photon_shared) add_test(NAME test-syncio COMMAND $) add_executable(test-iouring test-iouring.cpp) -target_link_libraries(test-iouring PRIVATE photon_shared) +target_link_libraries(test-iouring PRIVATE photon_static) add_test(NAME test-iouring COMMAND $) endif () \ No newline at end of file diff --git a/io/test/test-iouring.cpp b/io/test/test-iouring.cpp index 72f181ea..464bb088 100644 --- a/io/test/test-iouring.cpp +++ b/io/test/test-iouring.cpp @@ -33,7 +33,7 @@ limitations under the License. #include #include #include - +#include "../../test/ci-tools.h" using namespace photon; @@ -339,183 +339,238 @@ TEST(perf, DISABLED_read) { /* Event Engine Tests */ -photon::CascadingEventEngine* new_cascading_engine(bool iouring = false) { - // return photon::new_iouring_cascading_engine(); - return photon::new_epoll_cascading_engine(); -} +class event_engine : public testing::Test { +protected: + void SetUp() override { + GTEST_ASSERT_EQ(0, photon::init(ci_ev_engine, photon::INIT_IO_NONE)); +#ifdef PHOTON_URING + engine = (ci_ev_engine == photon::INIT_EVENT_EPOLL) ? photon::new_epoll_cascading_engine() + : photon::new_iouring_cascading_engine(); +#else + engine = photon::new_default_cascading_engine(); +#endif + } + void TearDown() override { + delete engine; + photon::fini(); + }; -TEST(event_engine, master) { + photon::CascadingEventEngine* engine = nullptr; +}; + +TEST_F(event_engine, master) { int fd[2]; pipe(fd); char buf[1]; + photon::semaphore sem; auto f = [&] { - LOG_INFO("sleep 2s"); - photon::thread_sleep(2); - LOG_INFO("start write"); + sem.wait(1); write(fd[1], buf, 1); }; - photon::thread* sub = photon::thread_create11(&decltype(f)::operator(), &f); + photon::thread* sub = photon::thread_create11(f); photon::thread_enable_join(sub); - LOG_INFO("wait 3s at most"); - ASSERT_EQ(photon::wait_for_fd_readable(fd[0], 3000000), 0); + sem.signal(1); + LOG_INFO("wait 1s at most"); + ASSERT_EQ(0, photon::wait_for_fd_readable(fd[0], 1000000)); photon::thread_join((photon::join_handle*) sub); } -TEST(event_engine, master_timeout) { +TEST_F(event_engine, master_timeout) { int fd[2]; pipe(fd); char buf[1]; auto f = [&] { LOG_INFO("sleep 2s"); photon::thread_sleep(2); - LOG_INFO("start write"); write(fd[1], buf, 1); }; - photon::thread* sub = photon::thread_create11(&decltype(f)::operator(), &f); + photon::thread* sub = photon::thread_create11(f); photon::thread_enable_join(sub); LOG_INFO("wait 1s at most"); - ASSERT_EQ(photon::wait_for_fd_readable(fd[0], 1000000), -1); - ASSERT_EQ(errno, ETIMEDOUT); + ASSERT_EQ(-1, photon::wait_for_fd_readable(fd[0], 1000000)); + ASSERT_EQ(ETIMEDOUT, errno); photon::thread_join((photon::join_handle*) sub); } -TEST(event_engine, master_interrupted) { +TEST_F(event_engine, master_interrupted) { int fd[2]; pipe(fd); + photon::semaphore sem; photon::thread* main = photon::CURRENT; auto f = [&] { - LOG_INFO("sleep 2s"); - photon::thread_sleep(2); + sem.wait(1); LOG_INFO("start interrupt main"); - photon::thread_interrupt(main, EPERM); + photon::thread_interrupt(main); }; - photon::thread* sub = photon::thread_create11(&decltype(f)::operator(), &f); + photon::thread* sub = photon::thread_create11(f); photon::thread_enable_join(sub); - LOG_INFO("wait 3s at most"); - ASSERT_EQ(photon::wait_for_fd_readable(fd[0], 3000000), -1); - ASSERT_EQ(errno, EPERM); + LOG_INFO("wait 1s at most"); + sem.signal(1); + ASSERT_EQ(-1, photon::wait_for_fd_readable(fd[0], 1000000)); + ASSERT_EQ(EINTR, errno); photon::thread_join((photon::join_handle*) sub); } -TEST(event_engine, master_interrupted_after_io) { +TEST_F(event_engine, master_interrupted_after_io) { int fd[2]; pipe(fd); char buf[1]; + photon::semaphore sem; photon::thread* main = photon::CURRENT; auto f = [&] { - LOG_INFO("sleep 2s"); - photon::thread_sleep(2); - LOG_INFO("start write"); + sem.wait(1); write(fd[1], buf, 1); LOG_INFO("start interrupt main"); - photon::thread_interrupt(main, EPERM); + photon::thread_interrupt(main); }; - photon::thread* sub = photon::thread_create11(&decltype(f)::operator(), &f); + photon::thread* sub = photon::thread_create11(f); photon::thread_enable_join(sub); - LOG_INFO("wait 3s at most"); - ASSERT_EQ(photon::wait_for_fd_readable(fd[0], 3000000), -1); - ASSERT_EQ(errno, EPERM); + LOG_INFO("wait 1s at most"); + sem.signal(1); + ASSERT_EQ(-1, photon::wait_for_fd_readable(fd[0], 1000000)); + ASSERT_EQ(EINTR, errno); photon::thread_join((photon::join_handle*) sub); } -TEST(event_engine, cascading) { +TEST_F(event_engine, cascading_add) { int fd1[2]; int fd2[2]; pipe(fd1); pipe(fd2); char buf[1]; + photon::semaphore sem; auto f = [&] { - photon::thread_sleep(2); - LOG_INFO("write pipe"); + sem.wait(1); write(fd1[1], buf, 1); write(fd2[1], buf, 1); }; - photon::thread* sub = photon::thread_create11(&decltype(f)::operator(), &f); + photon::thread* sub = photon::thread_create11(f); photon::thread_enable_join(sub); - auto engine = new_cascading_engine(); - DEFER(delete engine); engine->add_interest({fd1[0], photon::EVENT_READ, (void*) 0x1111}); engine->add_interest({fd2[0], photon::EVENT_READ, (void*) 0x2222}); + sem.signal(1); + void* data[5] = {}; ssize_t num_events = engine->wait_for_events(data, 5, -1UL); - ASSERT_EQ(num_events, 2); + ASSERT_EQ(2, num_events); + + // data order is not ensured bool b1 = data[0] == (void*) 0x1111 && data[1] == (void*) 0x2222; bool b2 = data[0] == (void*) 0x2222 && data[1] == (void*) 0x1111; ASSERT_EQ(b1 || b2, true); - engine->rm_interest({fd1[0], photon::EVENT_READ, (void*) 0x1111}); - engine->rm_interest({fd2[0], photon::EVENT_READ, (void*) 0x2222}); - photon::thread_join((photon::join_handle*) sub); } -TEST(event_engine, cascading_timeout) { +TEST_F(event_engine, cascading_timeout) { int fd1[2]; - int fd2[2]; pipe(fd1); - pipe(fd2); char buf[1]; auto f = [&] { photon::thread_sleep(2); write(fd1[1], buf, 1); - write(fd2[1], buf, 1); }; - photon::thread* sub = photon::thread_create11(&decltype(f)::operator(), &f); + photon::thread* sub = photon::thread_create11(f); photon::thread_enable_join(sub); - auto engine = new_cascading_engine(); - DEFER(delete engine); engine->add_interest({fd1[0], photon::EVENT_READ, (void*) 0x1111}); - engine->add_interest({fd2[0], photon::EVENT_READ, (void*) 0x2222}); void* data[5] = {}; ssize_t num_events = engine->wait_for_events(data, 5, 1000000); ASSERT_EQ(0, num_events); - engine->rm_interest({fd1[0], photon::EVENT_READ, (void*) 0x1111}); - engine->rm_interest({fd2[0], photon::EVENT_READ, (void*) 0x2222}); - photon::thread_join((photon::join_handle*) sub); } -TEST(event_engine, cascading_remove) { +TEST_F(event_engine, cascading_remove) { int fd1[2]; int fd2[2]; pipe(fd1); pipe(fd2); char buf[1]; - auto engine = new_cascading_engine(); - DEFER(delete engine); - auto f = [&] { - photon::thread_sleep(1); + photon::semaphore sem; + + auto sub = photon::thread_create11([&] { + sem.wait(1); + // 3. Remove one engine->rm_interest({fd1[0], photon::EVENT_READ, (void*) 0x1111}); - photon::thread_sleep(1); - LOG_INFO("start write fd"); + // 4. Write both write(fd1[1], buf, 1); write(fd2[1], buf, 1); - }; - photon::thread* sub = photon::thread_create11(&decltype(f)::operator(), &f); + }); photon::thread_enable_join(sub); + // 1. Add both engine->add_interest({fd1[0], photon::EVENT_READ, (void*) 0x1111}); engine->add_interest({fd2[0], photon::EVENT_READ, (void*) 0x2222}); + sem.signal(1); + + // 2. Wait both void* data[5] = {}; ssize_t num_events = engine->wait_for_events(data, 5, -1UL); - ASSERT_EQ(num_events, 1); + + // 5. Should get only one + ASSERT_EQ(1, num_events); ASSERT_EQ(data[0], (void*) 0x2222); + photon::thread_join((photon::join_handle*) sub); + sub = photon::thread_create11([&] { + // 7. Remove one + engine->rm_interest({fd2[0], photon::EVENT_READ, (void*) 0x2222}); + }); + photon::thread_enable_join(sub); - engine->rm_interest({fd2[0], photon::EVENT_READ, (void*) 0x2222}); + // 6. Wait one + num_events = engine->wait_for_events(data, 5, 1000000); + ASSERT_EQ(0, num_events); photon::thread_join((photon::join_handle*) sub); } -TEST(event_engine, cascading_one_shot) { +TEST_F(event_engine, cascading_remove_inplace) { + int fd1[2]; + pipe(fd1); + char buf[1]; + + engine->add_interest({fd1[0], photon::EVENT_READ, (void*) 0x1111}); + engine->rm_interest({fd1[0], photon::EVENT_READ, (void*) 0x2222}); + + write(fd1[1], buf, 1); + + void* data[5] = {}; + ssize_t num_events = engine->wait_for_events(data, 5, 1000000); + ASSERT_EQ(0, num_events); +} + +TEST_F(event_engine, cascading_interrupt) { + int fd1[2]; + pipe(fd1); + photon::thread* main = photon::CURRENT; + photon::semaphore sem; + + engine->add_interest({fd1[0], photon::EVENT_READ, (void*) 0x1111}); + + auto th = photon::thread_create11([&] { + sem.wait(1); + photon::thread_interrupt(main); + }); + photon::thread_enable_join(th); + + sem.signal(1); + + void* data[5] = {}; + ssize_t num_events = engine->wait_for_events(data, 5, 1000000); + ASSERT_EQ(-1, num_events); + ASSERT_EQ(EINTR, errno); + photon::thread_join((photon::join_handle*) th); +} + +TEST_F(event_engine, cascading_one_shot) { int fd1[2]; int fd2[2]; pipe(fd1); @@ -532,11 +587,9 @@ TEST(event_engine, cascading_one_shot) { write(fd2[1], buf, 1); }; - photon::thread* sub = photon::thread_create11(&decltype(f)::operator(), &f); + photon::thread* sub = photon::thread_create11(f); photon::thread_enable_join(sub); - auto engine = new_cascading_engine(); - DEFER(delete engine); engine->add_interest({fd1[0], photon::EVENT_READ | photon::ONE_SHOT, (void*) 0x1111}); engine->add_interest({fd2[0], photon::EVENT_READ, (void*) 0x2222}); @@ -558,7 +611,6 @@ TEST(event_engine, cascading_one_shot) { LOG_INFO("wait non events"); num_events = engine->wait_for_events(data, 5, 2000000); ASSERT_EQ(num_events, 0); - ASSERT_EQ(errno, ETIMEDOUT); photon::thread_join((photon::join_handle*) sub); } @@ -566,14 +618,9 @@ TEST(event_engine, cascading_one_shot) { int main(int argc, char** arg) { srand(time(nullptr)); set_log_output_level(ALOG_INFO); - testing::InitGoogleTest(&argc, arg); testing::FLAGS_gtest_break_on_failure = true; gflags::ParseCommandLineFlags(&argc, &arg, true); - - if (photon::init()) - return -1; - DEFER(photon::fini()); - + ci_parse_env(); return RUN_ALL_TESTS(); } diff --git a/test/ci-tools.cpp b/test/ci-tools.cpp new file mode 100644 index 00000000..265b4ac3 --- /dev/null +++ b/test/ci-tools.cpp @@ -0,0 +1,26 @@ +#include "ci-tools.h" +#include +#include +#include "../photon.h" + +#if __linux__ +uint64_t ci_ev_engine = photon::INIT_EVENT_EPOLL; +#else +uint64_t ci_ev_engine = photon::INIT_EVENT_KQUEUE; +#endif + +uint64_t ci_ev_engine_with_signal = ci_ev_engine | photon::INIT_EVENT_SIGNAL; + +void ci_parse_env() { + const char* ev_engine = getenv("PHOTON_CI_EV_ENGINE"); + if (!ev_engine) + return; + if (strcmp(ev_engine, "epoll") == 0) { + ci_ev_engine = photon::INIT_EVENT_EPOLL; + } else if (strcmp(ev_engine, "io_uring") == 0) { + ci_ev_engine = photon::INIT_EVENT_IOURING; + } else if (strcmp(ev_engine, "kqueue") == 0) { + ci_ev_engine = photon::INIT_EVENT_KQUEUE; + } + ci_ev_engine_with_signal = ci_ev_engine | photon::INIT_EVENT_SIGNAL; +} \ No newline at end of file diff --git a/test/ci-tools.h b/test/ci-tools.h new file mode 100644 index 00000000..f0cbedd8 --- /dev/null +++ b/test/ci-tools.h @@ -0,0 +1,7 @@ +#pragma once +#include + +extern uint64_t ci_ev_engine; +extern uint64_t ci_ev_engine_with_signal; + +void ci_parse_env(); \ No newline at end of file diff --git a/thread/test/test-multi-vcpu-locking.cpp b/thread/test/test-multi-vcpu-locking.cpp index 86456812..aefafe0c 100644 --- a/thread/test/test-multi-vcpu-locking.cpp +++ b/thread/test/test-multi-vcpu-locking.cpp @@ -22,6 +22,7 @@ limitations under the License. #include #include #include +#include "../../test/ci-tools.h" static constexpr int nMutexes = 4; static constexpr int nWorkers = 4; @@ -59,7 +60,7 @@ static void myThread(int tid) { } TEST(multi_vcpu_locking, long_time_acquisition_should_abort) { - int ret = photon::init(photon::INIT_EVENT_DEFAULT, photon::INIT_IO_NONE); + int ret = photon::init(ci_ev_engine, photon::INIT_IO_NONE); GTEST_ASSERT_EQ(0, ret); DEFER(photon::fini()); @@ -89,5 +90,6 @@ TEST(multi_vcpu_locking, long_time_acquisition_should_abort) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); + ci_parse_env(); return RUN_ALL_TESTS(); } \ No newline at end of file