From 882db3f9f96d507708f5a57e9ef065b53d28ee08 Mon Sep 17 00:00:00 2001 From: "yuchen.cc" Date: Wed, 29 Nov 2023 17:49:41 +0800 Subject: [PATCH] io: support reset photon at fork In photon context, event engine based module need reset after fork, if exec will not be called after fork. This is implicitly done by pthread_atfork hook. Signed-off-by: yuchen.cc --- CMakeLists.txt | 1 + io/aio-wrapper.cpp | 40 ++++- io/epoll.cpp | 10 +- io/fstack-dpdk.cpp | 7 +- io/iouring-wrapper.cpp | 14 +- io/kqueue.cpp | 12 +- io/reset_handle.cpp | 52 +++++++ io/reset_handle.h | 32 ++++ io/signal.cpp | 45 ++++-- io/test/CMakeLists.txt | 7 + io/test/test-fork.cpp | 325 +++++++++++++++++++++++++++++++++++++++++ net/curl.cpp | 20 +++ photon.cpp | 9 +- 13 files changed, 552 insertions(+), 22 deletions(-) create mode 100644 io/reset_handle.cpp create mode 100644 io/reset_handle.h create mode 100644 io/test/test-fork.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 3d73dc91..9f86eb6e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -166,6 +166,7 @@ file(GLOB PHOTON_SRC fs/xfile.cpp fs/httpfs/*.cpp io/signal.cpp + io/reset_handle.cpp net/*.cpp net/http/*.cpp net/security-context/tls-stream.cpp diff --git a/io/aio-wrapper.cpp b/io/aio-wrapper.cpp index 2b7b9b0d..2ea2f2c2 100644 --- a/io/aio-wrapper.cpp +++ b/io/aio-wrapper.cpp @@ -34,6 +34,7 @@ limitations under the License. #include "fd-events.h" #include "../common/utility.h" #include "../common/alog.h" +#include "reset_handle.h" namespace photon { @@ -186,7 +187,6 @@ namespace photon static void* libaio_polling(void*) { - libaio_ctx->running = 1; DEFER(libaio_ctx->running = 0); while (libaio_ctx->running == 1) { @@ -342,6 +342,29 @@ namespace photon return rst; } + class AioResetHandle : public ResetHandle { + int reset() override { + if (!libaio_ctx) + return 0; + LOG_INFO("reset libaio by reset handle"); + close(libaio_ctx->evfd); + libaio_ctx->evfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + if (libaio_ctx->evfd < 0) { + LOG_ERROR("failed to create eventfd ", ERRNO()); + exit(-1); + } + io_destroy(libaio_ctx->aio_ctx); + libaio_ctx->aio_ctx = {0}; + int ret = io_setup(IODEPTH, &libaio_ctx->aio_ctx); + if (ret < 0) { + LOG_ERROR("failed to create aio context by io_setup() ", ERRNO(), VALUE(ret)); + exit(-1); + } + thread_interrupt(libaio_ctx->polling_thread, ECANCELED); + return 0; + } + }; + static thread_local AioResetHandle *reset_handler = nullptr; int libaio_wrapper_init() { @@ -356,7 +379,7 @@ namespace photon int ret = io_setup(IODEPTH, &ctx->aio_ctx); if (ret < 0) { - LOG_ERROR("failed to create aio context by io_setup() ", ERRNO()); + LOG_ERROR("failed to create aio context by io_setup() ", ERRNO(), VALUE(ret)); close(ctx->evfd); return ret; } @@ -364,7 +387,11 @@ namespace photon ctx->polling_thread = thread_create(&libaio_polling, nullptr); assert(ctx->polling_thread); libaio_ctx = ctx.release(); - thread_yield_to(libaio_ctx->polling_thread); + libaio_ctx->running = 1; + if (reset_handler == nullptr) { + reset_handler = new AioResetHandle(); + } + LOG_DEBUG("libaio initialized"); return 0; } @@ -384,10 +411,9 @@ namespace photon io_destroy(libaio_ctx->aio_ctx); close(libaio_ctx->evfd); libaio_ctx->evfd = -1; - delete libaio_ctx; - libaio_ctx = nullptr; + safe_delete(libaio_ctx); + safe_delete(reset_handler); + LOG_DEBUG("libaio finished"); return 0; } } - - diff --git a/io/epoll.cpp b/io/epoll.cpp index d6340831..4100bd69 100644 --- a/io/epoll.cpp +++ b/io/epoll.cpp @@ -26,6 +26,7 @@ limitations under the License. #include #include #include "events_map.h" +#include "reset_handle.h" namespace photon { #ifndef EPOLLRDHUP @@ -48,7 +49,7 @@ struct InFlightEvent { void* error_data; }; -class EventEngineEPoll : public MasterEventEngine, public CascadingEventEngine { +class EventEngineEPoll : public MasterEventEngine, public CascadingEventEngine, public ResetHandle { public: int _evfd = -1; int _engine_fd = -1; @@ -71,6 +72,13 @@ class EventEngineEPoll : public MasterEventEngine, public CascadingEventEngine { epfd = evfd = -1; return 0; } + int reset() override { + if_close_fd(_engine_fd); // close original fd + if_close_fd(_evfd); + _inflight_events.clear(); // reset members + _events_remain = 0; + return init(); // re-init + } virtual ~EventEngineEPoll() override { LOG_INFO("Finish event engine: epoll"); if_close_fd(_engine_fd); diff --git a/io/fstack-dpdk.cpp b/io/fstack-dpdk.cpp index 8308b4db..96d53ee0 100644 --- a/io/fstack-dpdk.cpp +++ b/io/fstack-dpdk.cpp @@ -27,6 +27,7 @@ limitations under the License. #include "../thread/thread11.h" #include "../common/alog.h" #include "../net/basic_socket.h" +#include "reset_handle.h" #ifndef EVFILT_EXCEPT #define EVFILT_EXCEPT (-15) @@ -37,7 +38,7 @@ namespace photon { constexpr static EventsMap> evmap; -class FstackDpdkEngine : public MasterEventEngine, public CascadingEventEngine { +class FstackDpdkEngine : public MasterEventEngine, public CascadingEventEngine, public ResetHandle { public: struct InFlightEvent { uint32_t interests = 0; @@ -76,6 +77,10 @@ class FstackDpdkEngine : public MasterEventEngine, public CascadingEventEngine { return 0; } + int reset() override { + assert(false); + } + ~FstackDpdkEngine() override { LOG_INFO("Finish f-stack dpdk engine"); // if (_n > 0) LOG_INFO(VALUE(_events[0].ident), VALUE(_events[0].filter), VALUE(_events[0].flags)); diff --git a/io/iouring-wrapper.cpp b/io/iouring-wrapper.cpp index 7137788c..d0e29059 100644 --- a/io/iouring-wrapper.cpp +++ b/io/iouring-wrapper.cpp @@ -30,6 +30,7 @@ limitations under the License. #include #include #include "events_map.h" +#include "reset_handle.h" #ifndef _GNU_SOURCE #define _GNU_SOURCE @@ -42,12 +43,22 @@ namespace photon { constexpr static EventsMap> evmap; -class iouringEngine : public MasterEventEngine, public CascadingEventEngine { +class iouringEngine : public MasterEventEngine, public CascadingEventEngine, public ResetHandle { public: explicit iouringEngine(bool master) : m_master(master) {} ~iouringEngine() { LOG_INFO("Finish event engine: iouring ", VALUE(m_master)); + fini(); + } + + int reset() override { + fini(); + m_event_contexts.clear(); + return init(); + } + + int fini() { if (m_eventfd >= 0 && !m_master) { if (io_uring_unregister_eventfd(m_ring) != 0) { LOG_ERROR("iouring: failed to unregister cascading event fd"); @@ -61,6 +72,7 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine { } delete m_ring; m_ring = nullptr; + return 0; } int init() { diff --git a/io/kqueue.cpp b/io/kqueue.cpp index c2b8121e..4e314585 100644 --- a/io/kqueue.cpp +++ b/io/kqueue.cpp @@ -21,13 +21,14 @@ limitations under the License. #include #include #include "events_map.h" +#include "reset_handle.h" namespace photon { constexpr static EventsMap> evmap; -class KQueue : public MasterEventEngine, public CascadingEventEngine { +class KQueue : public MasterEventEngine, public CascadingEventEngine, public ResetHandle { public: struct InFlightEvent { uint32_t interests = 0; @@ -55,6 +56,15 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine { return 0; } + int reset() override { + LOG_INFO("Reset event engine: kqueue"); + _kq = -1; // kqueue fd is not inherited from the parent process + _inflight_events.clear(); // reset members + _n = 0; + _tm = {0, 0}; + return init(); // re-init + } + ~KQueue() override { LOG_INFO("Finish event engine: kqueue"); // if (_n > 0) LOG_INFO(VALUE(_events[0].ident), VALUE(_events[0].filter), VALUE(_events[0].flags)); diff --git a/io/reset_handle.cpp b/io/reset_handle.cpp new file mode 100644 index 00000000..c560a611 --- /dev/null +++ b/io/reset_handle.cpp @@ -0,0 +1,52 @@ +/* +Copyright 2022 The Photon Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include "reset_handle.h" +#include +#include +#include + +namespace photon { + +static thread_local ResetHandle *list = nullptr; + +ResetHandle::ResetHandle() { + LOG_DEBUG("push ", VALUE(this)); + if (list == nullptr) { + list = this; + } else { + list->insert_tail(this); + } +} + +ResetHandle::~ResetHandle() { + LOG_DEBUG("erase ", VALUE(this)); + auto nx = this->remove_from_list(); + if (this == list) + list = nx; +} + +void reset_all_handle() { + if (list == nullptr) + return; + LOG_INFO("reset all handle called"); + for (auto handler : *list) { + LOG_DEBUG("reset ", VALUE(handler)); + handler->reset(); + } +} + +} // namespace photon diff --git a/io/reset_handle.h b/io/reset_handle.h new file mode 100644 index 00000000..cbc92a6c --- /dev/null +++ b/io/reset_handle.h @@ -0,0 +1,32 @@ +/* +Copyright 2022 The Photon Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#pragma once +#include + +namespace photon { + +class ResetHandle : public intrusive_list_node { +public: + ResetHandle(); + virtual ~ResetHandle(); + + virtual int reset() = 0; +}; + +void reset_all_handle(); + +} // namespace photon diff --git a/io/signal.cpp b/io/signal.cpp index 77bf2cd8..63b1a751 100644 --- a/io/signal.cpp +++ b/io/signal.cpp @@ -26,6 +26,7 @@ limitations under the License. #include "fd-events.h" #include "../common/event-loop.h" #include "../common/alog.h" +#include "reset_handle.h" namespace photon { @@ -227,14 +228,35 @@ namespace photon #endif } - // should be invoked in child process after forked, to clear signal mask - static void fork_hook_child(void) - { - LOG_DEBUG("Fork hook"); - sigset_t sigset0; // can NOT use photon::clear_signal_mask(), - sigemptyset(&sigset0); // as memory may be shared with parent, when vfork()ed - sigprocmask(SIG_SETMASK, &sigset0, nullptr); - } + class SignalResetHandle : public ResetHandle { + int reset() override { + if (sgfd < 0) + return 0; + LOG_INFO("reset signalfd by reset handle"); + sigset_t sigset0; // can NOT use photon::clear_signal_mask(), + sigemptyset(&sigset0); // as memory may be shared with parent, when vfork()ed + sigprocmask(SIG_SETMASK, &sigset0, nullptr); + // reset sgfd + memset(sighandlers, 0, sizeof(sighandlers)); +#ifdef __APPLE__ + sgfd = kqueue(); // kqueue fd is not inherited from the parent process +#else + close(sgfd); + sigfillset(&sigset); + sgfd = signalfd(-1, &sigset, SFD_CLOEXEC | SFD_NONBLOCK); +#endif + if (sgfd == -1) { + LOG_ERROR("failed to create signalfd() or kqueue()"); + exit(1); + } + // interrupt event loop by ETIMEDOUT to replace sgfd + if (eloop) + thread_interrupt(eloop->loop_thread(), ETIMEDOUT); + + return 0; + } + }; + static SignalResetHandle *reset_handler = nullptr; int sync_signal_init() { @@ -262,9 +284,11 @@ namespace photon LOG_ERROR_RETURN(EFAULT, -1, "failed to thread_create() for signal handling"); } eloop->async_run(); - LOG_INFO("signalfd initialized"); thread_yield(); // give a chance let eloop to execute do_wait - pthread_atfork(nullptr, nullptr, &fork_hook_child); + if (reset_handler == nullptr) { + reset_handler = new SignalResetHandle(); + } + LOG_INFO("signalfd initialized"); return clear_signal_mask(); } @@ -298,6 +322,7 @@ namespace photon } } #endif + safe_delete(reset_handler); LOG_INFO("signalfd finished"); return clear_signal_mask(); } diff --git a/io/test/CMakeLists.txt b/io/test/CMakeLists.txt index 5a3ff5d6..de102709 100644 --- a/io/test/CMakeLists.txt +++ b/io/test/CMakeLists.txt @@ -8,6 +8,13 @@ add_executable(signalfdboom signalfdboom.cpp) target_link_libraries(signalfdboom PRIVATE photon_shared) add_test(NAME signalfdboom COMMAND $) +add_executable(test-fork test-fork.cpp) +target_link_libraries(test-fork PRIVATE photon_shared) +if (PHOTON_ENABLE_URING) + target_compile_definitions(test-fork PRIVATE PHOTON_URING=on) +endif() +add_test(NAME test-fork COMMAND $) + if (NOT APPLE) add_executable(test-syncio test-syncio.cpp) target_link_libraries(test-syncio PRIVATE photon_shared) diff --git a/io/test/test-fork.cpp b/io/test/test-fork.cpp new file mode 100644 index 00000000..1f8f8b8a --- /dev/null +++ b/io/test/test-fork.cpp @@ -0,0 +1,325 @@ +/* +Copyright 2022 The Photon Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +bool exit_flag = false; +bool exit_normal = false; + +void sigint_handler(int signal = SIGINT) { + LOG_INFO("signal ` received, pid `", signal, getpid()); + exit_flag = true; +} + +inline int check_process_exit_stat(int &statVal, int &pid) { + if (WIFEXITED(statVal)) { // child exit normally + LOG_INFO("process with pid ` finished with code `.", pid, WEXITSTATUS(statVal)); + return WEXITSTATUS(statVal); + } else { + if (WIFSIGNALED(statVal)) { // child terminated due to uncaptured signal + LOG_INFO("process with pid ` terminated due to uncaptured signal `.", pid, + WTERMSIG(statVal)); + } else if (WIFSTOPPED(statVal)) { // child terminated unexpectedly + LOG_INFO("process with pid ` terminated unexpectedly with signal `.", pid, + WSTOPSIG(statVal)); + } else { + LOG_INFO("process with pid ` terminated abnormally.", pid); + } + return -1; + } +} + +void wait_process_end(pid_t pid) { + if (pid > 0) { + int statVal; + if (waitpid(pid, &statVal, 0) > 0) { + check_process_exit_stat(statVal, pid); + } else { + /// EINTR + if (EINTR == errno) { + LOG_INFO("process with pid ` waitpid is interrupted.", pid); + } else { + LOG_INFO("process with pid ` waitpid exception, strerror: `.", pid, + strerror(errno)); + } + } + } +} + +void wait_process_end_no_hang(pid_t pid) { + if (pid > 0) { + int statVal; + int retry = 100; + again: + if (waitpid(pid, &statVal, WNOHANG) <= 0) { + if (retry--) { + photon::thread_usleep(50 * 1000); + goto again; + } else { + if (kill(pid, SIGKILL) == 0) { + LOG_WARN("force kill child process with pid `", pid); + } else { + LOG_ERROR("force kill child process with pid ` error, errno:`:`", pid, errno, + strerror(errno)); + } + wait_process_end(pid); + } + } else { + if (check_process_exit_stat(statVal, pid) == 0) { + exit_normal = true; + } + } + } +} + +int fork_child_process() { + pid_t pid = fork(); + if (pid < 0) { + LOG_ERRNO_RETURN(0, -1, "fork error"); + return -1; + } + + if (pid == 0) { + photon::block_all_signal(); + photon::sync_signal(SIGTERM, &sigint_handler); + + LOG_INFO("child hello, pid `", getpid()); + while (!exit_flag) { + photon::thread_usleep(200 * 1000); + } + photon::fini(); + LOG_INFO("child exited, pid `", getpid()); + exit(0); + } else { + LOG_INFO("parent hello, pid `", getpid()); + return pid; + } +} + +int fork_parent_process(uint64_t event_engine) { + pid_t m_pid = fork(); + if (m_pid < 0) { + LOG_ERRNO_RETURN(0, -1, "fork error"); + return -1; + } + + if (m_pid > 0) { + LOG_INFO("main hello, pid `", getpid()); + return m_pid; + } + photon::fini(); + photon::init(event_engine, photon::INIT_IO_LIBCURL); + + photon::block_all_signal(); + photon::sync_signal(SIGINT, &sigint_handler); + + LOG_INFO("parent hello, pid `", getpid()); + photon::thread_sleep(1); + auto pid = fork_child_process(); + photon::thread_sleep(1); + + int statVal; + if (waitpid(pid, &statVal, WNOHANG) == 0) { + if (kill(pid, SIGTERM) == 0) { + LOG_INFO("kill child process with pid `", pid); + } else { + ERRNO eno; + LOG_ERROR("kill child process with pid ` error, `", pid, eno); + } + wait_process_end_no_hang(pid); + } else { + check_process_exit_stat(statVal, pid); + LOG_ERROR("child process exit unexpected"); + } + + LOG_INFO("child process exit status `", exit_normal); + EXPECT_EQ(true, exit_normal); + + while (!exit_flag) { + photon::thread_usleep(200 * 1000); + } + LOG_INFO("parent exited, pid `", getpid()); + photon::fini(); + exit(exit_normal ? 0 : -1); +} + +TEST(ForkTest, Fork) { + photon::init(photon::INIT_EVENT_NONE, photon::INIT_IO_NONE); + DEFER(photon::fini()); + exit_flag = false; + exit_normal = false; +#if defined(__linux__) + auto pid = fork_parent_process(photon::INIT_EVENT_EPOLL | photon::INIT_EVENT_SIGNAL); +#else // macOS, FreeBSD ... + auto pid = fork_parent_process(photon::INIT_EVENT_DEFAULT); +#endif + photon::thread_sleep(5); + + int statVal; + if (waitpid(pid, &statVal, WNOHANG) == 0) { + if (kill(pid, SIGINT) == 0) { + LOG_INFO("kill parent process with pid `", pid); + } else { + ERRNO eno; + LOG_ERROR("kill parent process with pid ` error, `", pid, eno); + } + wait_process_end_no_hang(pid); + } else { + check_process_exit_stat(statVal, pid); + LOG_ERROR("parent process exit unexpected"); + } + + LOG_INFO("parent process exit status `", exit_normal); + EXPECT_EQ(true, exit_normal); +} + +TEST(ForkTest, ForkInThread) { + photon::init(photon::INIT_EVENT_DEFAULT, photon::INIT_IO_LIBCURL); + DEFER(photon::fini()); + + int ret = -1; + std::thread th([&]() { + pid_t pid = fork(); + ASSERT_GE(pid, 0); + + if (pid == 0) { + LOG_INFO("child hello, pid `", getpid()); + exit(0); + } else { + LOG_INFO("parent hello, pid `", getpid()); + int statVal; + waitpid(pid, &statVal, 0); + ret = check_process_exit_stat(statVal, pid); + } + }); + th.join(); + EXPECT_EQ(0, ret); +} + +TEST(ForkTest, PopenInThread) { + photon::init(photon::INIT_EVENT_DEFAULT, photon::INIT_IO_LIBCURL); + DEFER(photon::fini()); + + photon::semaphore sem(0); + auto cmd = "du -s \"/tmp\""; + ssize_t size = -1; + std::thread([&] { + auto f = popen(cmd, "r"); + EXPECT_NE(nullptr, f); + DEFER(fclose(f)); + fscanf(f, "%lu", &size); + sem.signal(1); + LOG_INFO("popen done"); + }).detach(); + sem.wait(1); + EXPECT_NE(-1, size); + LOG_INFO(VALUE(size)); +} + +#if defined(__linux__) && defined(PHOTON_URING) +TEST(ForkTest, Iouring) { + photon::init(photon::INIT_EVENT_NONE, photon::INIT_IO_NONE); + DEFER(photon::fini()); + exit_flag = false; + exit_normal = false; + auto pid = fork_parent_process(photon::INIT_EVENT_IOURING | photon::INIT_EVENT_SIGNAL); + + photon::thread_sleep(5); + + int statVal; + if (waitpid(pid, &statVal, WNOHANG) == 0) { + if (kill(pid, SIGINT) == 0) { + LOG_INFO("kill parent process with pid `", pid); + } else { + ERRNO eno; + LOG_ERROR("kill parent process with pid ` error, `", pid, eno); + } + wait_process_end_no_hang(pid); + } else { + check_process_exit_stat(statVal, pid); + LOG_ERROR("parent process exit unexpected"); + } + + LOG_INFO("parent process exit status `", exit_normal); + EXPECT_EQ(true, exit_normal); +} +#endif + +#if defined(__linux__) +TEST(ForkTest, LIBAIO) { + photon::init(photon::INIT_EVENT_EPOLL, photon::INIT_IO_LIBAIO); + DEFER(photon::fini()); + + std::unique_ptr fs( + photon::fs::new_localfs_adaptor("/tmp/", photon::fs::ioengine_libaio)); + std::unique_ptr lf( + fs->open("test_local_fs_fork_parent", O_RDWR | O_CREAT, 0755)); + void* buf = nullptr; + ::posix_memalign(&buf, 4096, 4096); + DEFER(free(buf)); + int ret = lf->pwrite(buf, 4096, 0); + EXPECT_EQ(ret, 4096); + + ret = -1; + pid_t pid = fork(); + ASSERT_GE(pid, 0); + + if (pid == 0) { + std::unique_ptr fs( + photon::fs::new_localfs_adaptor("/tmp/", photon::fs::ioengine_libaio)); + std::unique_ptr lf( + fs->open("test_local_fs_fork", O_RDWR | O_CREAT, 0755)); + void* buf = nullptr; + ::posix_memalign(&buf, 4096, 4096); + DEFER(free(buf)); + auto ret = lf->pwrite(buf, 4096, 0); + EXPECT_EQ(ret, 4096); + ret = lf->close(); + photon::fini(); + exit(ret); + } else { + int statVal; + waitpid(pid, &statVal, 0); + ret = check_process_exit_stat(statVal, pid); + } + EXPECT_EQ(0, ret); + + ret = lf->pwrite(buf, 4096, 0); + EXPECT_EQ(ret, 4096); + ret = lf->close(); + EXPECT_EQ(0, ret); +} +#endif + +int main(int argc, char **argv) { + set_log_output_level(0); + + ::testing::InitGoogleTest(&argc, argv); + auto ret = RUN_ALL_TESTS(); + if (ret) LOG_ERROR_RETURN(0, ret, VALUE(ret)); +} diff --git a/net/curl.cpp b/net/curl.cpp index f2eeb33f..6906f43d 100644 --- a/net/curl.cpp +++ b/net/curl.cpp @@ -28,6 +28,7 @@ limitations under the License. #include #include #include +#include "../io/reset_handle.h" namespace photon { namespace net { @@ -159,6 +160,8 @@ class cURLLoop : public Object { void stop() { loop->stop(); } + photon::thread* loop_thread() { return loop->loop_thread(); } + protected: EventLoop* loop; int cnt; @@ -210,6 +213,17 @@ void __OpenSSLGlobalInit(); // curl_global_cleanup(); // } +class CurlResetHandle : public ResetHandle { + int reset() override { + LOG_INFO("reset libcurl by reset handle"); + // interrupt g_loop by ETIMEDOUT to replace g_poller + if (cctx.g_loop) + thread_interrupt(cctx.g_loop->loop_thread(), ETIMEDOUT); + return 0; + } +}; +static thread_local CurlResetHandle *reset_handler = nullptr; + int libcurl_init(long flags, long pipelining, long maxconn) { if (cctx.g_loop == nullptr) { __OpenSSLGlobalInit(); @@ -240,6 +254,10 @@ int libcurl_init(long flags, long pipelining, long maxconn) { libcurl_set_pipelining(pipelining); libcurl_set_maxconnects(maxconn); + if (reset_handler == nullptr) { + reset_handler = new CurlResetHandle(); + } + LOG_INFO("libcurl initialized"); } return 0; @@ -256,6 +274,8 @@ void libcurl_fini() { if (ret != CURLM_OK) LOG_ERROR("libcurl-multi cleanup error: ", curl_multi_strerror(ret)); cctx.g_libcurl_multi = nullptr; + safe_delete(reset_handler); + LOG_INFO("libcurl finished"); } std::string url_escape(const char* str) { diff --git a/photon.cpp b/photon.cpp index cafc770c..8780969b 100644 --- a/photon.cpp +++ b/photon.cpp @@ -22,6 +22,7 @@ limitations under the License. #ifdef ENABLE_FSTACK_DPDK #include "io/fstack-dpdk.h" #endif +#include "io/reset_handle.h" #include "net/curl.h" #include "net/socket.h" #include "fs/exportfs.h" @@ -31,6 +32,7 @@ namespace photon { using namespace fs; using namespace net; +static bool reset_handle_registed = false; static thread_local uint64_t g_event_engine = 0, g_io_engine = 0; #define INIT_IO(name, prefix) if (INIT_IO_##name & io_engine) { if (prefix##_init() < 0) return -1; } @@ -74,6 +76,11 @@ int init(uint64_t event_engine, uint64_t io_engine) { #endif g_event_engine = event_engine; g_io_engine = io_engine; + if (!reset_handle_registed) { + pthread_atfork(nullptr, nullptr, &reset_all_handle); + LOG_DEBUG("reset_all_handle registed ", VALUE(getpid())); + reset_handle_registed = true; + } return 0; } @@ -96,4 +103,4 @@ int fini() { return 0; } -} \ No newline at end of file +}