diff --git a/CMakeLists.txt b/CMakeLists.txt index 3d73dc91..9f86eb6e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -166,6 +166,7 @@ file(GLOB PHOTON_SRC fs/xfile.cpp fs/httpfs/*.cpp io/signal.cpp + io/reset_handle.cpp net/*.cpp net/http/*.cpp net/security-context/tls-stream.cpp diff --git a/examples/rpc/server.h b/examples/rpc/server.h index ff6ce94e..a7348957 100644 --- a/examples/rpc/server.h +++ b/examples/rpc/server.h @@ -59,7 +59,7 @@ struct ExampleServer { // Serve provides handler for socket server int serve(photon::net::ISocketStream* stream) { - return skeleton->serve(stream, false); + return skeleton->serve(stream); } void term() { diff --git a/io/aio-wrapper.cpp b/io/aio-wrapper.cpp index 2b7b9b0d..2ea2f2c2 100644 --- a/io/aio-wrapper.cpp +++ b/io/aio-wrapper.cpp @@ -34,6 +34,7 @@ limitations under the License. #include "fd-events.h" #include "../common/utility.h" #include "../common/alog.h" +#include "reset_handle.h" namespace photon { @@ -186,7 +187,6 @@ namespace photon static void* libaio_polling(void*) { - libaio_ctx->running = 1; DEFER(libaio_ctx->running = 0); while (libaio_ctx->running == 1) { @@ -342,6 +342,29 @@ namespace photon return rst; } + class AioResetHandle : public ResetHandle { + int reset() override { + if (!libaio_ctx) + return 0; + LOG_INFO("reset libaio by reset handle"); + close(libaio_ctx->evfd); + libaio_ctx->evfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + if (libaio_ctx->evfd < 0) { + LOG_ERROR("failed to create eventfd ", ERRNO()); + exit(-1); + } + io_destroy(libaio_ctx->aio_ctx); + libaio_ctx->aio_ctx = {0}; + int ret = io_setup(IODEPTH, &libaio_ctx->aio_ctx); + if (ret < 0) { + LOG_ERROR("failed to create aio context by io_setup() ", ERRNO(), VALUE(ret)); + exit(-1); + } + thread_interrupt(libaio_ctx->polling_thread, ECANCELED); + return 0; + } + }; + static thread_local AioResetHandle *reset_handler = nullptr; int libaio_wrapper_init() { @@ -356,7 +379,7 @@ namespace photon int ret = io_setup(IODEPTH, &ctx->aio_ctx); if (ret < 0) { - LOG_ERROR("failed to create aio context by io_setup() ", ERRNO()); + LOG_ERROR("failed to create aio context by io_setup() ", ERRNO(), VALUE(ret)); close(ctx->evfd); return ret; } @@ -364,7 +387,11 @@ namespace photon ctx->polling_thread = thread_create(&libaio_polling, nullptr); assert(ctx->polling_thread); libaio_ctx = ctx.release(); - thread_yield_to(libaio_ctx->polling_thread); + libaio_ctx->running = 1; + if (reset_handler == nullptr) { + reset_handler = new AioResetHandle(); + } + LOG_DEBUG("libaio initialized"); return 0; } @@ -384,10 +411,9 @@ namespace photon io_destroy(libaio_ctx->aio_ctx); close(libaio_ctx->evfd); libaio_ctx->evfd = -1; - delete libaio_ctx; - libaio_ctx = nullptr; + safe_delete(libaio_ctx); + safe_delete(reset_handler); + LOG_DEBUG("libaio finished"); return 0; } } - - diff --git a/io/epoll.cpp b/io/epoll.cpp index d6340831..4100bd69 100644 --- a/io/epoll.cpp +++ b/io/epoll.cpp @@ -26,6 +26,7 @@ limitations under the License. #include #include #include "events_map.h" +#include "reset_handle.h" namespace photon { #ifndef EPOLLRDHUP @@ -48,7 +49,7 @@ struct InFlightEvent { void* error_data; }; -class EventEngineEPoll : public MasterEventEngine, public CascadingEventEngine { +class EventEngineEPoll : public MasterEventEngine, public CascadingEventEngine, public ResetHandle { public: int _evfd = -1; int _engine_fd = -1; @@ -71,6 +72,13 @@ class EventEngineEPoll : public MasterEventEngine, public CascadingEventEngine { epfd = evfd = -1; return 0; } + int reset() override { + if_close_fd(_engine_fd); // close original fd + if_close_fd(_evfd); + _inflight_events.clear(); // reset members + _events_remain = 0; + return init(); // re-init + } virtual ~EventEngineEPoll() override { LOG_INFO("Finish event engine: epoll"); if_close_fd(_engine_fd); diff --git a/io/fstack-dpdk.cpp b/io/fstack-dpdk.cpp index 8308b4db..96d53ee0 100644 --- a/io/fstack-dpdk.cpp +++ b/io/fstack-dpdk.cpp @@ -27,6 +27,7 @@ limitations under the License. #include "../thread/thread11.h" #include "../common/alog.h" #include "../net/basic_socket.h" +#include "reset_handle.h" #ifndef EVFILT_EXCEPT #define EVFILT_EXCEPT (-15) @@ -37,7 +38,7 @@ namespace photon { constexpr static EventsMap> evmap; -class FstackDpdkEngine : public MasterEventEngine, public CascadingEventEngine { +class FstackDpdkEngine : public MasterEventEngine, public CascadingEventEngine, public ResetHandle { public: struct InFlightEvent { uint32_t interests = 0; @@ -76,6 +77,10 @@ class FstackDpdkEngine : public MasterEventEngine, public CascadingEventEngine { return 0; } + int reset() override { + assert(false); + } + ~FstackDpdkEngine() override { LOG_INFO("Finish f-stack dpdk engine"); // if (_n > 0) LOG_INFO(VALUE(_events[0].ident), VALUE(_events[0].filter), VALUE(_events[0].flags)); diff --git a/io/iouring-wrapper.cpp b/io/iouring-wrapper.cpp index 7137788c..d0e29059 100644 --- a/io/iouring-wrapper.cpp +++ b/io/iouring-wrapper.cpp @@ -30,6 +30,7 @@ limitations under the License. #include #include #include "events_map.h" +#include "reset_handle.h" #ifndef _GNU_SOURCE #define _GNU_SOURCE @@ -42,12 +43,22 @@ namespace photon { constexpr static EventsMap> evmap; -class iouringEngine : public MasterEventEngine, public CascadingEventEngine { +class iouringEngine : public MasterEventEngine, public CascadingEventEngine, public ResetHandle { public: explicit iouringEngine(bool master) : m_master(master) {} ~iouringEngine() { LOG_INFO("Finish event engine: iouring ", VALUE(m_master)); + fini(); + } + + int reset() override { + fini(); + m_event_contexts.clear(); + return init(); + } + + int fini() { if (m_eventfd >= 0 && !m_master) { if (io_uring_unregister_eventfd(m_ring) != 0) { LOG_ERROR("iouring: failed to unregister cascading event fd"); @@ -61,6 +72,7 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine { } delete m_ring; m_ring = nullptr; + return 0; } int init() { diff --git a/io/kqueue.cpp b/io/kqueue.cpp index c2b8121e..4e314585 100644 --- a/io/kqueue.cpp +++ b/io/kqueue.cpp @@ -21,13 +21,14 @@ limitations under the License. #include #include #include "events_map.h" +#include "reset_handle.h" namespace photon { constexpr static EventsMap> evmap; -class KQueue : public MasterEventEngine, public CascadingEventEngine { +class KQueue : public MasterEventEngine, public CascadingEventEngine, public ResetHandle { public: struct InFlightEvent { uint32_t interests = 0; @@ -55,6 +56,15 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine { return 0; } + int reset() override { + LOG_INFO("Reset event engine: kqueue"); + _kq = -1; // kqueue fd is not inherited from the parent process + _inflight_events.clear(); // reset members + _n = 0; + _tm = {0, 0}; + return init(); // re-init + } + ~KQueue() override { LOG_INFO("Finish event engine: kqueue"); // if (_n > 0) LOG_INFO(VALUE(_events[0].ident), VALUE(_events[0].filter), VALUE(_events[0].flags)); diff --git a/io/reset_handle.cpp b/io/reset_handle.cpp new file mode 100644 index 00000000..c560a611 --- /dev/null +++ b/io/reset_handle.cpp @@ -0,0 +1,52 @@ +/* +Copyright 2022 The Photon Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include "reset_handle.h" +#include +#include +#include + +namespace photon { + +static thread_local ResetHandle *list = nullptr; + +ResetHandle::ResetHandle() { + LOG_DEBUG("push ", VALUE(this)); + if (list == nullptr) { + list = this; + } else { + list->insert_tail(this); + } +} + +ResetHandle::~ResetHandle() { + LOG_DEBUG("erase ", VALUE(this)); + auto nx = this->remove_from_list(); + if (this == list) + list = nx; +} + +void reset_all_handle() { + if (list == nullptr) + return; + LOG_INFO("reset all handle called"); + for (auto handler : *list) { + LOG_DEBUG("reset ", VALUE(handler)); + handler->reset(); + } +} + +} // namespace photon diff --git a/io/reset_handle.h b/io/reset_handle.h new file mode 100644 index 00000000..cbc92a6c --- /dev/null +++ b/io/reset_handle.h @@ -0,0 +1,32 @@ +/* +Copyright 2022 The Photon Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#pragma once +#include + +namespace photon { + +class ResetHandle : public intrusive_list_node { +public: + ResetHandle(); + virtual ~ResetHandle(); + + virtual int reset() = 0; +}; + +void reset_all_handle(); + +} // namespace photon diff --git a/io/signal.cpp b/io/signal.cpp index 77bf2cd8..63b1a751 100644 --- a/io/signal.cpp +++ b/io/signal.cpp @@ -26,6 +26,7 @@ limitations under the License. #include "fd-events.h" #include "../common/event-loop.h" #include "../common/alog.h" +#include "reset_handle.h" namespace photon { @@ -227,14 +228,35 @@ namespace photon #endif } - // should be invoked in child process after forked, to clear signal mask - static void fork_hook_child(void) - { - LOG_DEBUG("Fork hook"); - sigset_t sigset0; // can NOT use photon::clear_signal_mask(), - sigemptyset(&sigset0); // as memory may be shared with parent, when vfork()ed - sigprocmask(SIG_SETMASK, &sigset0, nullptr); - } + class SignalResetHandle : public ResetHandle { + int reset() override { + if (sgfd < 0) + return 0; + LOG_INFO("reset signalfd by reset handle"); + sigset_t sigset0; // can NOT use photon::clear_signal_mask(), + sigemptyset(&sigset0); // as memory may be shared with parent, when vfork()ed + sigprocmask(SIG_SETMASK, &sigset0, nullptr); + // reset sgfd + memset(sighandlers, 0, sizeof(sighandlers)); +#ifdef __APPLE__ + sgfd = kqueue(); // kqueue fd is not inherited from the parent process +#else + close(sgfd); + sigfillset(&sigset); + sgfd = signalfd(-1, &sigset, SFD_CLOEXEC | SFD_NONBLOCK); +#endif + if (sgfd == -1) { + LOG_ERROR("failed to create signalfd() or kqueue()"); + exit(1); + } + // interrupt event loop by ETIMEDOUT to replace sgfd + if (eloop) + thread_interrupt(eloop->loop_thread(), ETIMEDOUT); + + return 0; + } + }; + static SignalResetHandle *reset_handler = nullptr; int sync_signal_init() { @@ -262,9 +284,11 @@ namespace photon LOG_ERROR_RETURN(EFAULT, -1, "failed to thread_create() for signal handling"); } eloop->async_run(); - LOG_INFO("signalfd initialized"); thread_yield(); // give a chance let eloop to execute do_wait - pthread_atfork(nullptr, nullptr, &fork_hook_child); + if (reset_handler == nullptr) { + reset_handler = new SignalResetHandle(); + } + LOG_INFO("signalfd initialized"); return clear_signal_mask(); } @@ -298,6 +322,7 @@ namespace photon } } #endif + safe_delete(reset_handler); LOG_INFO("signalfd finished"); return clear_signal_mask(); } diff --git a/io/test/CMakeLists.txt b/io/test/CMakeLists.txt index 5a3ff5d6..de102709 100644 --- a/io/test/CMakeLists.txt +++ b/io/test/CMakeLists.txt @@ -8,6 +8,13 @@ add_executable(signalfdboom signalfdboom.cpp) target_link_libraries(signalfdboom PRIVATE photon_shared) add_test(NAME signalfdboom COMMAND $) +add_executable(test-fork test-fork.cpp) +target_link_libraries(test-fork PRIVATE photon_shared) +if (PHOTON_ENABLE_URING) + target_compile_definitions(test-fork PRIVATE PHOTON_URING=on) +endif() +add_test(NAME test-fork COMMAND $) + if (NOT APPLE) add_executable(test-syncio test-syncio.cpp) target_link_libraries(test-syncio PRIVATE photon_shared) diff --git a/io/test/test-fork.cpp b/io/test/test-fork.cpp new file mode 100644 index 00000000..1f8f8b8a --- /dev/null +++ b/io/test/test-fork.cpp @@ -0,0 +1,325 @@ +/* +Copyright 2022 The Photon Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +bool exit_flag = false; +bool exit_normal = false; + +void sigint_handler(int signal = SIGINT) { + LOG_INFO("signal ` received, pid `", signal, getpid()); + exit_flag = true; +} + +inline int check_process_exit_stat(int &statVal, int &pid) { + if (WIFEXITED(statVal)) { // child exit normally + LOG_INFO("process with pid ` finished with code `.", pid, WEXITSTATUS(statVal)); + return WEXITSTATUS(statVal); + } else { + if (WIFSIGNALED(statVal)) { // child terminated due to uncaptured signal + LOG_INFO("process with pid ` terminated due to uncaptured signal `.", pid, + WTERMSIG(statVal)); + } else if (WIFSTOPPED(statVal)) { // child terminated unexpectedly + LOG_INFO("process with pid ` terminated unexpectedly with signal `.", pid, + WSTOPSIG(statVal)); + } else { + LOG_INFO("process with pid ` terminated abnormally.", pid); + } + return -1; + } +} + +void wait_process_end(pid_t pid) { + if (pid > 0) { + int statVal; + if (waitpid(pid, &statVal, 0) > 0) { + check_process_exit_stat(statVal, pid); + } else { + /// EINTR + if (EINTR == errno) { + LOG_INFO("process with pid ` waitpid is interrupted.", pid); + } else { + LOG_INFO("process with pid ` waitpid exception, strerror: `.", pid, + strerror(errno)); + } + } + } +} + +void wait_process_end_no_hang(pid_t pid) { + if (pid > 0) { + int statVal; + int retry = 100; + again: + if (waitpid(pid, &statVal, WNOHANG) <= 0) { + if (retry--) { + photon::thread_usleep(50 * 1000); + goto again; + } else { + if (kill(pid, SIGKILL) == 0) { + LOG_WARN("force kill child process with pid `", pid); + } else { + LOG_ERROR("force kill child process with pid ` error, errno:`:`", pid, errno, + strerror(errno)); + } + wait_process_end(pid); + } + } else { + if (check_process_exit_stat(statVal, pid) == 0) { + exit_normal = true; + } + } + } +} + +int fork_child_process() { + pid_t pid = fork(); + if (pid < 0) { + LOG_ERRNO_RETURN(0, -1, "fork error"); + return -1; + } + + if (pid == 0) { + photon::block_all_signal(); + photon::sync_signal(SIGTERM, &sigint_handler); + + LOG_INFO("child hello, pid `", getpid()); + while (!exit_flag) { + photon::thread_usleep(200 * 1000); + } + photon::fini(); + LOG_INFO("child exited, pid `", getpid()); + exit(0); + } else { + LOG_INFO("parent hello, pid `", getpid()); + return pid; + } +} + +int fork_parent_process(uint64_t event_engine) { + pid_t m_pid = fork(); + if (m_pid < 0) { + LOG_ERRNO_RETURN(0, -1, "fork error"); + return -1; + } + + if (m_pid > 0) { + LOG_INFO("main hello, pid `", getpid()); + return m_pid; + } + photon::fini(); + photon::init(event_engine, photon::INIT_IO_LIBCURL); + + photon::block_all_signal(); + photon::sync_signal(SIGINT, &sigint_handler); + + LOG_INFO("parent hello, pid `", getpid()); + photon::thread_sleep(1); + auto pid = fork_child_process(); + photon::thread_sleep(1); + + int statVal; + if (waitpid(pid, &statVal, WNOHANG) == 0) { + if (kill(pid, SIGTERM) == 0) { + LOG_INFO("kill child process with pid `", pid); + } else { + ERRNO eno; + LOG_ERROR("kill child process with pid ` error, `", pid, eno); + } + wait_process_end_no_hang(pid); + } else { + check_process_exit_stat(statVal, pid); + LOG_ERROR("child process exit unexpected"); + } + + LOG_INFO("child process exit status `", exit_normal); + EXPECT_EQ(true, exit_normal); + + while (!exit_flag) { + photon::thread_usleep(200 * 1000); + } + LOG_INFO("parent exited, pid `", getpid()); + photon::fini(); + exit(exit_normal ? 0 : -1); +} + +TEST(ForkTest, Fork) { + photon::init(photon::INIT_EVENT_NONE, photon::INIT_IO_NONE); + DEFER(photon::fini()); + exit_flag = false; + exit_normal = false; +#if defined(__linux__) + auto pid = fork_parent_process(photon::INIT_EVENT_EPOLL | photon::INIT_EVENT_SIGNAL); +#else // macOS, FreeBSD ... + auto pid = fork_parent_process(photon::INIT_EVENT_DEFAULT); +#endif + photon::thread_sleep(5); + + int statVal; + if (waitpid(pid, &statVal, WNOHANG) == 0) { + if (kill(pid, SIGINT) == 0) { + LOG_INFO("kill parent process with pid `", pid); + } else { + ERRNO eno; + LOG_ERROR("kill parent process with pid ` error, `", pid, eno); + } + wait_process_end_no_hang(pid); + } else { + check_process_exit_stat(statVal, pid); + LOG_ERROR("parent process exit unexpected"); + } + + LOG_INFO("parent process exit status `", exit_normal); + EXPECT_EQ(true, exit_normal); +} + +TEST(ForkTest, ForkInThread) { + photon::init(photon::INIT_EVENT_DEFAULT, photon::INIT_IO_LIBCURL); + DEFER(photon::fini()); + + int ret = -1; + std::thread th([&]() { + pid_t pid = fork(); + ASSERT_GE(pid, 0); + + if (pid == 0) { + LOG_INFO("child hello, pid `", getpid()); + exit(0); + } else { + LOG_INFO("parent hello, pid `", getpid()); + int statVal; + waitpid(pid, &statVal, 0); + ret = check_process_exit_stat(statVal, pid); + } + }); + th.join(); + EXPECT_EQ(0, ret); +} + +TEST(ForkTest, PopenInThread) { + photon::init(photon::INIT_EVENT_DEFAULT, photon::INIT_IO_LIBCURL); + DEFER(photon::fini()); + + photon::semaphore sem(0); + auto cmd = "du -s \"/tmp\""; + ssize_t size = -1; + std::thread([&] { + auto f = popen(cmd, "r"); + EXPECT_NE(nullptr, f); + DEFER(fclose(f)); + fscanf(f, "%lu", &size); + sem.signal(1); + LOG_INFO("popen done"); + }).detach(); + sem.wait(1); + EXPECT_NE(-1, size); + LOG_INFO(VALUE(size)); +} + +#if defined(__linux__) && defined(PHOTON_URING) +TEST(ForkTest, Iouring) { + photon::init(photon::INIT_EVENT_NONE, photon::INIT_IO_NONE); + DEFER(photon::fini()); + exit_flag = false; + exit_normal = false; + auto pid = fork_parent_process(photon::INIT_EVENT_IOURING | photon::INIT_EVENT_SIGNAL); + + photon::thread_sleep(5); + + int statVal; + if (waitpid(pid, &statVal, WNOHANG) == 0) { + if (kill(pid, SIGINT) == 0) { + LOG_INFO("kill parent process with pid `", pid); + } else { + ERRNO eno; + LOG_ERROR("kill parent process with pid ` error, `", pid, eno); + } + wait_process_end_no_hang(pid); + } else { + check_process_exit_stat(statVal, pid); + LOG_ERROR("parent process exit unexpected"); + } + + LOG_INFO("parent process exit status `", exit_normal); + EXPECT_EQ(true, exit_normal); +} +#endif + +#if defined(__linux__) +TEST(ForkTest, LIBAIO) { + photon::init(photon::INIT_EVENT_EPOLL, photon::INIT_IO_LIBAIO); + DEFER(photon::fini()); + + std::unique_ptr fs( + photon::fs::new_localfs_adaptor("/tmp/", photon::fs::ioengine_libaio)); + std::unique_ptr lf( + fs->open("test_local_fs_fork_parent", O_RDWR | O_CREAT, 0755)); + void* buf = nullptr; + ::posix_memalign(&buf, 4096, 4096); + DEFER(free(buf)); + int ret = lf->pwrite(buf, 4096, 0); + EXPECT_EQ(ret, 4096); + + ret = -1; + pid_t pid = fork(); + ASSERT_GE(pid, 0); + + if (pid == 0) { + std::unique_ptr fs( + photon::fs::new_localfs_adaptor("/tmp/", photon::fs::ioengine_libaio)); + std::unique_ptr lf( + fs->open("test_local_fs_fork", O_RDWR | O_CREAT, 0755)); + void* buf = nullptr; + ::posix_memalign(&buf, 4096, 4096); + DEFER(free(buf)); + auto ret = lf->pwrite(buf, 4096, 0); + EXPECT_EQ(ret, 4096); + ret = lf->close(); + photon::fini(); + exit(ret); + } else { + int statVal; + waitpid(pid, &statVal, 0); + ret = check_process_exit_stat(statVal, pid); + } + EXPECT_EQ(0, ret); + + ret = lf->pwrite(buf, 4096, 0); + EXPECT_EQ(ret, 4096); + ret = lf->close(); + EXPECT_EQ(0, ret); +} +#endif + +int main(int argc, char **argv) { + set_log_output_level(0); + + ::testing::InitGoogleTest(&argc, argv); + auto ret = RUN_ALL_TESTS(); + if (ret) LOG_ERROR_RETURN(0, ret, VALUE(ret)); +} diff --git a/net/basic_socket.cpp b/net/basic_socket.cpp index bd54f8fd..8446b8dd 100644 --- a/net/basic_socket.cpp +++ b/net/basic_socket.cpp @@ -97,7 +97,7 @@ int connect(int fd, const struct sockaddr *addr, socklen_t addrlen, if (ret < 0) return -1; socklen_t n = sizeof(err); ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &n); - if (ret < 0) return -1; + if (unlikely(ret < 0)) return -1; if (err) { errno = err; return -1; diff --git a/net/curl.cpp b/net/curl.cpp index f2eeb33f..6906f43d 100644 --- a/net/curl.cpp +++ b/net/curl.cpp @@ -28,6 +28,7 @@ limitations under the License. #include #include #include +#include "../io/reset_handle.h" namespace photon { namespace net { @@ -159,6 +160,8 @@ class cURLLoop : public Object { void stop() { loop->stop(); } + photon::thread* loop_thread() { return loop->loop_thread(); } + protected: EventLoop* loop; int cnt; @@ -210,6 +213,17 @@ void __OpenSSLGlobalInit(); // curl_global_cleanup(); // } +class CurlResetHandle : public ResetHandle { + int reset() override { + LOG_INFO("reset libcurl by reset handle"); + // interrupt g_loop by ETIMEDOUT to replace g_poller + if (cctx.g_loop) + thread_interrupt(cctx.g_loop->loop_thread(), ETIMEDOUT); + return 0; + } +}; +static thread_local CurlResetHandle *reset_handler = nullptr; + int libcurl_init(long flags, long pipelining, long maxconn) { if (cctx.g_loop == nullptr) { __OpenSSLGlobalInit(); @@ -240,6 +254,10 @@ int libcurl_init(long flags, long pipelining, long maxconn) { libcurl_set_pipelining(pipelining); libcurl_set_maxconnects(maxconn); + if (reset_handler == nullptr) { + reset_handler = new CurlResetHandle(); + } + LOG_INFO("libcurl initialized"); } return 0; @@ -256,6 +274,8 @@ void libcurl_fini() { if (ret != CURLM_OK) LOG_ERROR("libcurl-multi cleanup error: ", curl_multi_strerror(ret)); cctx.g_libcurl_multi = nullptr; + safe_delete(reset_handler); + LOG_INFO("libcurl finished"); } std::string url_escape(const char* str) { diff --git a/net/socket.h b/net/socket.h index 75592a66..c0914ef8 100644 --- a/net/socket.h +++ b/net/socket.h @@ -255,6 +255,7 @@ namespace net { using Handler = Callback; virtual ISocketServer* set_handler(Handler handler) = 0; virtual int start_loop(bool block = false) = 0; + // Close the listening fd. It's the user's responsibility to close the active connections. virtual void terminate() = 0; }; diff --git a/photon.cpp b/photon.cpp index cafc770c..8780969b 100644 --- a/photon.cpp +++ b/photon.cpp @@ -22,6 +22,7 @@ limitations under the License. #ifdef ENABLE_FSTACK_DPDK #include "io/fstack-dpdk.h" #endif +#include "io/reset_handle.h" #include "net/curl.h" #include "net/socket.h" #include "fs/exportfs.h" @@ -31,6 +32,7 @@ namespace photon { using namespace fs; using namespace net; +static bool reset_handle_registed = false; static thread_local uint64_t g_event_engine = 0, g_io_engine = 0; #define INIT_IO(name, prefix) if (INIT_IO_##name & io_engine) { if (prefix##_init() < 0) return -1; } @@ -74,6 +76,11 @@ int init(uint64_t event_engine, uint64_t io_engine) { #endif g_event_engine = event_engine; g_io_engine = io_engine; + if (!reset_handle_registed) { + pthread_atfork(nullptr, nullptr, &reset_all_handle); + LOG_DEBUG("reset_all_handle registed ", VALUE(getpid())); + reset_handle_registed = true; + } return 0; } @@ -96,4 +103,4 @@ int fini() { return 0; } -} \ No newline at end of file +} diff --git a/rpc/rpc.cpp b/rpc/rpc.cpp index 8082674f..d55a778e 100644 --- a/rpc/rpc.cpp +++ b/rpc/rpc.cpp @@ -222,7 +222,7 @@ namespace rpc { bool got_it; int* stream_serv_count; photon::condition_variable *stream_cv; - std::shared_ptr w_lock; + photon::mutex* w_lock; Context(SkeletonImpl* sk, IStream* s) : request(sk->m_allocator), stream(s), sk(sk) { } @@ -289,6 +289,7 @@ namespace rpc { } int response_sender(iovector* resp) { + assert(w_lock); Header h; h.size = (uint32_t)resp->sum(); h.function = header.function; @@ -297,13 +298,11 @@ namespace rpc { resp->push_front(&h, sizeof(h)); if (stream == nullptr) LOG_ERRNO_RETURN(0, -1, "socket closed "); - if (w_lock) { - w_lock->lock(); - } + + w_lock->lock(); ssize_t ret = stream->writev(resp->iovec(), resp->iovcnt()); - if (w_lock) { - w_lock->unlock(); - } + w_lock->unlock(); + if (ret < (ssize_t)(sizeof(h) + h.size)) { stream->shutdown(ShutdownHow::ReadWrite); LOG_ERRNO_RETURN(0, -1, "failed to send rpc response to stream ", stream); @@ -314,43 +313,38 @@ namespace rpc { condition_variable m_cond_served; struct ThreadLink : public intrusive_list_node { - photon::thread* thread = nullptr; + photon::thread* thread = photon::CURRENT; }; - intrusive_list m_list; + intrusive_list m_list; // Stores the thread ID of every stream uint64_t m_serving_count = 0; - bool m_concurrent; bool m_running = true; photon::ThreadPoolBase *m_thread_pool; - virtual int serve(IStream* stream, bool ownership) override + virtual int serve(IStream* stream) override { - if (!m_running) - LOG_ERROR_RETURN(ENOTSUP, -1, "the skeleton has closed"); + if (unlikely(!m_running)) + return -1; ThreadLink node; m_list.push_back(&node); DEFER(m_list.erase(&node)); - DEFER(if (ownership) delete stream;); // stream serve refcount int stream_serv_count = 0; + photon::mutex w_lock; photon::condition_variable stream_cv; - // once serve goint to exit, stream may destruct + // once serve exit, stream will destruct // make sure all requests relies on this stream are finished DEFER({ while (stream_serv_count > 0) stream_cv.wait_no_lock(); }); if (stream_accept_notify) stream_accept_notify(stream); DEFER(if (stream_close_notify) stream_close_notify(stream)); - auto w_lock = m_concurrent ? std::make_shared() : nullptr; - while(m_running) - { + + while(likely(m_running)) { Context context(this, stream); context.stream_serv_count = &stream_serv_count; context.stream_cv = &stream_cv; - context.w_lock = w_lock; - node.thread = CURRENT; + context.w_lock = &w_lock; int ret = context.read_request(); - ERRNO err; - node.thread = nullptr; if (ret < 0) { // should only shutdown read, for other threads // might still writing @@ -363,16 +357,11 @@ namespace rpc { } } - if (!m_concurrent) { - context.serve_request(); - } else { - context.got_it = false; - m_thread_pool->thread_create(&async_serve, &context); - // async_serve will be start, add refcount here - stream_serv_count ++; - while(!context.got_it) - thread_yield_to(nullptr); - } + context.got_it = false; + m_thread_pool->thread_create(&async_serve, &context); + stream_serv_count ++; + while(!context.got_it) + thread_yield(); } return 0; } @@ -381,48 +370,45 @@ namespace rpc { auto ctx = (Context*)args_; Context context(std::move(*ctx)); ctx->got_it = true; - thread_yield_to(nullptr); + thread_yield(); context.serve_request(); // serve done, here reduce refcount (*ctx->stream_serv_count) --; ctx->stream_cv->notify_all(); return nullptr; } - virtual int shutdown_no_wait() override { - photon::thread_create11(&SkeletonImpl::shutdown, this); + virtual int shutdown(bool no_more_requests) override { + m_running = !no_more_requests; + while (m_list) { + auto th = m_list.front()->thread; + thread_enable_join(th); + if (no_more_requests) { + thread_interrupt(th); + } + // Wait all streams destructed. Their attached RPC requests are finished as well. + thread_join((join_handle*) th); + } return 0; } - virtual int shutdown() override - { + int shutdown_no_wait() override { m_running = false; - for (const auto& x: m_list) - if (x->thread) - thread_interrupt(x->thread); - // it should confirm that all threads are finished - // or m_list may not destruct correctly - while (m_serving_count > 0) { - // means shutdown called by rpc serve, should return to give chance to shutdown - if ((m_serving_count == 1) && (m_list.front()->thread == nullptr)) - return 0; - m_cond_served.wait_no_lock(); + for (auto* each: m_list) { + thread_interrupt(each->thread); } - while (!m_list.empty()) - thread_usleep(1000); return 0; } virtual ~SkeletonImpl() { - shutdown(); + shutdown(true); photon::delete_thread_pool(m_thread_pool); } - explicit SkeletonImpl(bool concurrent = true, uint32_t pool_size = 128) - : m_concurrent(concurrent), + explicit SkeletonImpl(uint32_t pool_size = 128) : m_thread_pool(photon::new_thread_pool(pool_size)) { m_thread_pool->enable_autoscale(); } }; - Skeleton* new_skeleton(bool concurrent, uint32_t pool_size) + Skeleton* new_skeleton(uint32_t pool_size) { - return new SkeletonImpl(concurrent, pool_size); + return new SkeletonImpl(pool_size); } class StubPoolImpl : public StubPool { diff --git a/rpc/rpc.h b/rpc/rpc.h index 0b2b19fa..eb195342 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -161,16 +161,25 @@ namespace rpc virtual int set_close_notify(Notifier notifier) = 0; // can be invoked concurrently by multiple threads - // if have the ownership of stream, `serve` will delete it before exit - virtual int serve(IStream* stream, bool ownership_stream = false) = 0; + virtual int serve(IStream* stream) = 0; + + __attribute__((deprecated)) + int serve(IStream* stream, bool /*ownership_stream*/) { + return serve(stream); + } // set the allocator to allocate memory for recving responses // the default allocator is defined in iovector.h/cpp virtual void set_allocator(IOAlloc allocation) = 0; - virtual int shutdown_no_wait() = 0; + /** + * @brief Shutdown the rpc server from outside. + * @warning DO NOT invoke this function within the RPC request. + * You should create a thread to invoke it, or just use shutdown_no_wait. + */ + virtual int shutdown(bool no_more_requests = true) = 0; - virtual int shutdown() = 0; + virtual int shutdown_no_wait() = 0; template int register_service(ServerClass* obj) @@ -238,7 +247,12 @@ namespace rpc extern "C" StubPool* new_uds_stub_pool(const char* path, uint64_t expiration, uint64_t connect_timeout, uint64_t rpc_timeout); - extern "C" Skeleton* new_skeleton(bool concurrent = true, uint32_t pool_size = 128); + extern "C" Skeleton* new_skeleton(uint32_t pool_size = 128); + + __attribute__((deprecated)) + inline Skeleton* new_skeleton(bool /*concurrent*/, uint32_t pool_size = 128) { + return new_skeleton(pool_size); + } struct __example__operation1__ // defination of operator { diff --git a/rpc/test/test-rpc-message.cpp b/rpc/test/test-rpc-message.cpp index 6597e0cd..ef188830 100644 --- a/rpc/test/test-rpc-message.cpp +++ b/rpc/test/test-rpc-message.cpp @@ -156,7 +156,7 @@ class TestRPCServer { } int serve(photon::net::ISocketStream* stream) { - return skeleton->serve(stream, false); + return skeleton->serve(stream); } int run() { diff --git a/rpc/test/test.cpp b/rpc/test/test.cpp index 3b06f18f..29f85bda 100644 --- a/rpc/test/test.cpp +++ b/rpc/test/test.cpp @@ -16,16 +16,30 @@ limitations under the License. #include "../../rpc/rpc.cpp" #include +#include #include #include #include #include #include #include +#include +#include "../../test/ci-tools.h" + using namespace std; using namespace photon; using namespace rpc; +class RpcTest : public testing::Test { +public: + void SetUp() override { + GTEST_ASSERT_EQ(0, photon::init(ci_ev_engine, photon::INIT_IO_NONE)); + } + void TearDown() override { + photon::fini(); + } +}; + std::string S = "1234567890"; struct Args { @@ -38,15 +52,10 @@ struct Args } void verify() { - LOG_DEBUG(VALUE(a)); EXPECT_EQ(a, 123); - LOG_DEBUG(VALUE(b)); EXPECT_EQ(b, 123); - LOG_DEBUG(VALUE(c)); EXPECT_EQ(c, 123); - LOG_DEBUG(VALUE(d)); EXPECT_EQ(d, 123); - LOG_DEBUG(VALUE(s)); EXPECT_EQ(s, S); } uint64_t serialize(iovector& iov) @@ -142,13 +151,13 @@ int server_exit_function(void* instance, iovector* request, rpc::Skeleton::Respo bool skeleton_exited; photon::condition_variable skeleton_exit; -rpc::Skeleton* g_sk; + void* rpc_skeleton(void* args) { - skeleton_exited = false; auto s = (IStream*)args; auto sk = new_skeleton(); - g_sk = sk; + DEFER(delete sk); + sk->add_function(FID, rpc::Skeleton::Function((void*)123, &server_function)); sk->add_function(-1, rpc::Skeleton::Function(sk, &server_exit_function)); sk->serve(s); @@ -171,7 +180,7 @@ void do_call(StubImpl& stub, uint64_t function) EXPECT_EQ(memcmp(STR, resp_iov.iov.back().iov_base, LEN(STR)), 0); } -TEST(rpc, call) +TEST_F(RpcTest, call) { unique_ptr ds( new_duplex_memory_stream(16) ); thread_create(&rpc_skeleton, ds->endpoint_a); @@ -207,13 +216,14 @@ void* do_concurrent_call_shut(void* arg) return nullptr; } -TEST(rpc, concurrent) +TEST_F(RpcTest, concurrent) { // log_output_level = 1; LOG_INFO("Creating 1,000 threads, each doing 1,000 RPC calls"); // ds will be destruct just after function returned // but server will not // therefore, it will cause assert when destruction + skeleton_exited = false; unique_ptr ds( new_duplex_memory_stream(16) ); thread_create(&rpc_skeleton, ds->endpoint_a); @@ -232,7 +242,6 @@ TEST(rpc, concurrent) ds->close(); if (!skeleton_exited) skeleton_exit.wait_no_lock(); - log_output_level = 0; } void do_call_timeout(StubImpl& stub, uint64_t function) @@ -280,10 +289,9 @@ int server_function_timeout(void* instance, iovector* request, rpc::Skeleton::Re void* rpc_skeleton_timeout(void* args) { - skeleton_exited = false; auto s = (IStream*)args; auto sk = new_skeleton(); - g_sk = sk; + DEFER(delete sk); sk->add_function(FID, rpc::Skeleton::Function((void*)123, &server_function_timeout)); sk->add_function(-1, rpc::Skeleton::Function(sk, &server_exit_function)); sk->serve(s); @@ -293,13 +301,14 @@ void* rpc_skeleton_timeout(void* args) return nullptr; } -TEST(rpc, timeout) { +TEST_F(RpcTest, timeout) { LOG_INFO("Creating 1,000 threads, each doing 1,000 RPC calls"); // ds will be destruct just after function returned // but server will not // therefore, it will cause assert when destruction unique_ptr ds( new_duplex_memory_stream(655360) ); + skeleton_exited = false; thread_create(&rpc_skeleton_timeout, ds->endpoint_a); LOG_DEBUG("asdf1"); @@ -320,9 +329,159 @@ TEST(rpc, timeout) { log_output_level = 0; } +class RpcServer { +public: + RpcServer(Skeleton* skeleton, net::ISocketServer* socket) : m_socket(socket), m_skeleton(skeleton) { + m_skeleton->register_service(this); + m_socket->set_handler({this, &RpcServer::serve}); + } + struct Operation { + const static uint32_t IID = 0x1; + const static uint32_t FID = 0x2; + struct Request : public photon::rpc::Message { + int code = 0; + PROCESS_FIELDS(code); + }; + struct Response : public photon::rpc::Message { + int code = 0; + PROCESS_FIELDS(code); + }; + }; + int do_rpc_service(Operation::Request* req, Operation::Response* resp, IOVector* iov, IStream* stream) { + resp->code = req->code; + return 0; + } + int serve(photon::net::ISocketStream* stream) { + return m_skeleton->serve(stream); + } + int run() { + m_socket->setsockopt(SOL_SOCKET, SO_REUSEPORT, 1); + if (m_socket->bind(9527, net::IPAddr::V6Any()) != 0) + LOG_ERRNO_RETURN(0, -1, "bind failed"); + if (m_socket->listen() != 0) + LOG_ERRNO_RETURN(0, -1, "listen failed"); + return m_socket->start_loop(false); + } + net::ISocketServer* m_socket; + Skeleton* m_skeleton; +}; + +static int do_call_2(Stub* stub) { + RpcServer::Operation::Request req; + RpcServer::Operation::Response resp; + return stub->call(req, resp); +} + +TEST_F(RpcTest, shutdown) { + auto socket_server = photon::net::new_tcp_socket_server_ipv6(); + GTEST_ASSERT_NE(nullptr, socket_server); + DEFER(delete socket_server); + auto sk = photon::rpc::new_skeleton(); + GTEST_ASSERT_NE(nullptr, sk); + DEFER(delete sk); + + RpcServer rpc_server(sk, socket_server); + GTEST_ASSERT_EQ(0, rpc_server.run()); + + auto pool = photon::rpc::new_stub_pool(-1, -1, -1); + DEFER(delete pool); + + photon::net::EndPoint ep(net::IPAddr::V4Loopback(), 9527); + auto stub = pool->get_stub(ep, false); + ASSERT_NE(nullptr, stub); + DEFER(pool->put_stub(ep, true)); + + photon::thread_create11([&]{ + photon::thread_sleep(1); + sk->shutdown(); + delete sk; + sk = nullptr; + }); + + auto start = std::chrono::steady_clock::now(); + while (true) { + int ret = do_call_2(stub); + if (ret < 0) { + GTEST_ASSERT_EQ(ECONNRESET, errno); + break; + } + } + auto end = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(end - start).count(); + GTEST_ASSERT_GT(duration, 900); + GTEST_ASSERT_LT(duration, 1100); +} + +TEST_F(RpcTest, passive_shutdown) { + auto socket_server = photon::net::new_tcp_socket_server_ipv6(); + GTEST_ASSERT_NE(nullptr, socket_server); + DEFER(delete socket_server); + auto sk = photon::rpc::new_skeleton(); + GTEST_ASSERT_NE(nullptr, sk); + DEFER(delete sk); + + RpcServer rpc_server(sk, socket_server); + GTEST_ASSERT_EQ(0, rpc_server.run()); + + photon::net::EndPoint ep(net::IPAddr::V4Loopback(), 9527); + + photon::thread_create11([&]{ + // Should always succeed in 3 seconds + auto pool = photon::rpc::new_stub_pool(-1, -1, -1); + DEFER(delete pool); + auto stub = pool->get_stub(ep, false); + if (!stub) abort(); + DEFER(pool->put_stub(ep, true)); + for (int i = 0 ; i < 30; ++i) { + int ret = do_call_2(stub); + if (ret < 0) { + LOG_ERROR(VALUE(ret)); + abort(); + } + photon::thread_usleep(100'000); + } + }); + + photon::thread_create11([&]{ + photon::thread_sleep(2); + // Should get connection refused after 2 seconds. Because socket closed listen fd at 1 second. + auto pool = photon::rpc::new_stub_pool(-1, -1, -1); + DEFER(delete pool); + auto stub = pool->get_stub(ep, false); + if (stub) { + LOG_ERROR("should not get stub"); + abort(); + } + if (errno != ECONNREFUSED) { + LOG_ERROR(ERRNO()); + abort(); + } + }); + + auto start = std::chrono::steady_clock::now(); + + photon::thread_sleep(1); + socket_server->terminate(); + delete socket_server; + socket_server = nullptr; + + LOG_INFO("begin passive shutdown"); + sk->shutdown(false); + LOG_INFO("end passive shutdown"); + delete sk; + sk = nullptr; + + auto end = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(end - start).count(); + + // The passive shutdown took 3 seconds, until client closed the connection + GTEST_ASSERT_GT(duration, 2900); + GTEST_ASSERT_LT(duration, 3200); +} + int main(int argc, char** arg) { - ::photon::vcpu_init(); + ci_parse_env(); ::testing::InitGoogleTest(&argc, arg); return RUN_ALL_TESTS(); } diff --git a/thread/thread.cpp b/thread/thread.cpp index 1a6d3fe6..f9f1a67e 100644 --- a/thread/thread.cpp +++ b/thread/thread.cpp @@ -72,8 +72,6 @@ inline int posix_memalign(void** memptr, size_t alignment, size_t size) { by target vcpu in resume_thread(), when its runq becomes empty; */ -#define SCOPED_MEMBER_LOCK(x) SCOPED_LOCK(&(x)->lock, ((bool)x) * 2) - // Define assembly section header for clang and gcc #if defined(__APPLE__) #define DEF_ASM_FUNC(name) ".text\n" \ @@ -1199,7 +1197,8 @@ R"( __attribute__((always_inline)) inline Switch prepare_usleep(uint64_t useconds, thread_list* waitq, RunQ rq = {}) { - SCOPED_MEMBER_LOCK(waitq); + spinlock* waitq_lock = waitq ? &waitq->lock : nullptr; + SCOPED_LOCK(waitq_lock, ((bool) waitq) * 2); SCOPED_LOCK(rq.current->lock); assert(!AtomicRunQ(rq).single()); auto sw = AtomicRunQ(rq).remove_current(states::SLEEPING); @@ -1545,12 +1544,12 @@ R"( auto splock = (spinlock*)s_; splock->unlock(); } - int mutex::lock(uint64_t timeout) - { - for (int tries = 0; tries < MaxTries; ++tries) { + int mutex::lock(uint64_t timeout) { + if (try_lock() == 0) return 0; + for (auto re = retries; re; --re) { + thread_yield(); if (try_lock() == 0) return 0; - thread_yield(); } splock.lock(); if (try_lock() == 0) { diff --git a/thread/thread.h b/thread/thread.h index 69f64dce..9bb35347 100644 --- a/thread/thread.h +++ b/thread/thread.h @@ -226,8 +226,9 @@ namespace photon class mutex : protected waitq { public: - int lock(uint64_t timeout = -1); // threads are guaranteed to get the lock - int try_lock(); // in FIFO order, when there's contention + mutex(uint16_t max_retries = 100) : retries(max_retries) { } + int lock(uint64_t timeout = -1); + int try_lock(); void unlock(); ~mutex() { @@ -235,13 +236,20 @@ namespace photon } protected: - static constexpr const int MaxTries = 100; std::atomic owner{nullptr}; + uint16_t retries; spinlock splock; }; + class seq_mutex : protected mutex { + public: + // threads are guaranteed to get the lock in sequental order (FIFO) + seq_mutex() : mutex(0) { } + }; + class recursive_mutex : protected mutex { public: + using mutex::mutex; int lock(uint64_t timeout = -1); int try_lock(); void unlock();