Skip to content

Commit

Permalink
io: support reset photon at fork
Browse files Browse the repository at this point in the history
In photon context, event engine based module need reset after
fork, if exec will not be called after fork.
This is implicitly done by pthread_atfork hook.

Signed-off-by: yuchen.cc <yuchen.cc@alibaba-inc.com>
  • Loading branch information
yuchen0cc committed Nov 30, 2023
1 parent c25141b commit 3090204
Show file tree
Hide file tree
Showing 12 changed files with 528 additions and 14 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ file(GLOB PHOTON_SRC
fs/xfile.cpp
fs/httpfs/*.cpp
io/signal.cpp
io/resettable_ee.cpp
net/*.cpp
net/http/*.cpp
net/security-context/tls-stream.cpp
Expand Down
36 changes: 29 additions & 7 deletions io/aio-wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ namespace photon

static void* libaio_polling(void*)
{
libaio_ctx->running = 1;
DEFER(libaio_ctx->running = 0);
while (libaio_ctx->running == 1)
{
Expand Down Expand Up @@ -342,6 +341,26 @@ namespace photon
return rst;
}

static thread_local bool registed = false;
void fork_hook_libaio() {
if (!registed) return;
LOG_INFO("reset libaio at fork");
close(libaio_ctx->evfd);
libaio_ctx->evfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (libaio_ctx->evfd < 0) {
LOG_ERROR("failed to create eventfd ", ERRNO());
exit(-1);
}
io_destroy(libaio_ctx->aio_ctx);
libaio_ctx->aio_ctx = {0};
int ret = io_setup(IODEPTH, &libaio_ctx->aio_ctx);
if (ret < 0)
{
LOG_ERROR("failed to create aio context by io_setup() ", ERRNO(), VALUE(ret));
exit(-1);
}
thread_interrupt(libaio_ctx->polling_thread, ECANCELED);
}

int libaio_wrapper_init()
{
Expand All @@ -356,15 +375,20 @@ namespace photon
int ret = io_setup(IODEPTH, &ctx->aio_ctx);
if (ret < 0)
{
LOG_ERROR("failed to create aio context by io_setup() ", ERRNO());
LOG_ERROR("failed to create aio context by io_setup() ", ERRNO(), VALUE(ret));
close(ctx->evfd);
return ret;
}

ctx->polling_thread = thread_create(&libaio_polling, nullptr);
assert(ctx->polling_thread);
libaio_ctx = ctx.release();
thread_yield_to(libaio_ctx->polling_thread);
libaio_ctx->running = 1;
if (!registed) {
pthread_atfork(nullptr, nullptr, &fork_hook_libaio);
registed = true;
}
LOG_DEBUG("libaio initialized");
return 0;
}

Expand All @@ -384,10 +408,8 @@ namespace photon
io_destroy(libaio_ctx->aio_ctx);
close(libaio_ctx->evfd);
libaio_ctx->evfd = -1;
delete libaio_ctx;
libaio_ctx = nullptr;
safe_delete(libaio_ctx);
LOG_DEBUG("libaio finished");
return 0;
}
}


10 changes: 9 additions & 1 deletion io/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ limitations under the License.
#include <photon/thread/thread.h>
#include <photon/io/fd-events.h>
#include "events_map.h"
#include "resettable_ee.h"

namespace photon {
#ifndef EPOLLRDHUP
Expand All @@ -48,7 +49,7 @@ struct InFlightEvent {
void* error_data;
};

class EventEngineEPoll : public MasterEventEngine, public CascadingEventEngine {
class EventEngineEPoll : public MasterEventEngine, public CascadingEventEngine, public ResettableEventEngine {
public:
int _evfd = -1;
int _engine_fd = -1;
Expand All @@ -71,6 +72,13 @@ class EventEngineEPoll : public MasterEventEngine, public CascadingEventEngine {
epfd = evfd = -1;
return 0;
}
int reset() override {
if_close_fd(_engine_fd); // close original fd
if_close_fd(_evfd);
_inflight_events.clear(); // reset members
_events_remain = 0;
return init(); // re-init
}
virtual ~EventEngineEPoll() override {
LOG_INFO("Finish event engine: epoll");
if_close_fd(_engine_fd);
Expand Down
7 changes: 6 additions & 1 deletion io/fstack-dpdk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ limitations under the License.
#include "../thread/thread11.h"
#include "../common/alog.h"
#include "../net/basic_socket.h"
#include "resettable_ee.h"

#ifndef EVFILT_EXCEPT
#define EVFILT_EXCEPT (-15)
Expand All @@ -37,7 +38,7 @@ namespace photon {
constexpr static EventsMap<EVUnderlay<EVFILT_READ, EVFILT_WRITE, EVFILT_EXCEPT>>
evmap;

class FstackDpdkEngine : public MasterEventEngine, public CascadingEventEngine {
class FstackDpdkEngine : public MasterEventEngine, public CascadingEventEngine, public ResettableEventEngine {
public:
struct InFlightEvent {
uint32_t interests = 0;
Expand Down Expand Up @@ -76,6 +77,10 @@ class FstackDpdkEngine : public MasterEventEngine, public CascadingEventEngine {
return 0;
}

int reset() override {
assert(false);
}

~FstackDpdkEngine() override {
LOG_INFO("Finish f-stack dpdk engine");
// if (_n > 0) LOG_INFO(VALUE(_events[0].ident), VALUE(_events[0].filter), VALUE(_events[0].flags));
Expand Down
14 changes: 13 additions & 1 deletion io/iouring-wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ limitations under the License.
#include <photon/thread/thread11.h>
#include <photon/io/fd-events.h>
#include "events_map.h"
#include "resettable_ee.h"

#ifndef _GNU_SOURCE
#define _GNU_SOURCE
Expand All @@ -42,12 +43,22 @@ namespace photon {

constexpr static EventsMap<EVUnderlay<POLLIN | POLLRDHUP, POLLOUT, POLLERR>> evmap;

class iouringEngine : public MasterEventEngine, public CascadingEventEngine {
class iouringEngine : public MasterEventEngine, public CascadingEventEngine, public ResettableEventEngine {
public:
explicit iouringEngine(bool master) : m_master(master) {}

~iouringEngine() {
LOG_INFO("Finish event engine: iouring ", VALUE(m_master));
fini();
}

int reset() override {
fini();
m_event_contexts.clear();
return init();
}

int fini() {
if (m_eventfd >= 0 && !m_master) {
if (io_uring_unregister_eventfd(m_ring) != 0) {
LOG_ERROR("iouring: failed to unregister cascading event fd");
Expand All @@ -61,6 +72,7 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine {
}
delete m_ring;
m_ring = nullptr;
return 0;
}

int init() {
Expand Down
12 changes: 11 additions & 1 deletion io/kqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ limitations under the License.
#include <sys/event.h>
#include <photon/common/alog.h>
#include "events_map.h"
#include "resettable_ee.h"

namespace photon {

constexpr static EventsMap<EVUnderlay<EVFILT_READ, EVFILT_WRITE, EVFILT_EXCEPT>>
evmap;

class KQueue : public MasterEventEngine, public CascadingEventEngine {
class KQueue : public MasterEventEngine, public CascadingEventEngine, public ResettableEventEngine {
public:
struct InFlightEvent {
uint32_t interests = 0;
Expand Down Expand Up @@ -55,6 +56,15 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine {
return 0;
}

int reset() override {
LOG_INFO("Reset event engine: kqueue");
_kq = -1; // kqueue fd is not inherited from the parent process
_inflight_events.clear(); // reset members
_n = 0;
_tm = {0, 0};
return init(); // re-init
}

~KQueue() override {
LOG_INFO("Finish event engine: kqueue");
// if (_n > 0) LOG_INFO(VALUE(_events[0].ident), VALUE(_events[0].filter), VALUE(_events[0].flags));
Expand Down
56 changes: 56 additions & 0 deletions io/resettable_ee.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
Copyright 2022 The Photon Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#include "resettable_ee.h"
#include <photon/common/alog.h>
#include <photon/thread/std-compat.h>
#include <pthread.h>

namespace photon {

static thread_local bool registed = false;

static intrusive_list<ResettableEventEngine> &ree_list() {
static photon::thread_local_ptr<intrusive_list<ResettableEventEngine>,
ResettableEventEngine *> list(nullptr);
return *list;
}

void fork_hook_event_engine() {
if (!registed || ree_list().empty())
return;
LOG_INFO("reset event engine at fork");
for (auto ree : ree_list()) {
LOG_DEBUG("reset event engine ", VALUE(ree));
ree->reset();
}
}

ResettableEventEngine::ResettableEventEngine() {
if (!registed) {
pthread_atfork(nullptr, nullptr, &fork_hook_event_engine);
registed = true;
}
LOG_DEBUG("push ", VALUE(this));
ree_list().push_back(this);
}

ResettableEventEngine::~ResettableEventEngine() {
LOG_DEBUG("erase ", VALUE(this));
ree_list().erase(this);
}

} // namespace photon
30 changes: 30 additions & 0 deletions io/resettable_ee.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
Copyright 2022 The Photon Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#pragma once
#include <photon/thread/list.h>

namespace photon {

class ResettableEventEngine : public intrusive_list_node<ResettableEventEngine> {
public:
ResettableEventEngine();
virtual ~ResettableEventEngine();

virtual int reset() = 0;
};

} // namespace photon
26 changes: 23 additions & 3 deletions io/signal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,30 @@ namespace photon
#endif
}

static thread_local bool registed = false;
// should be invoked in child process after forked, to clear signal mask
static void fork_hook_child(void)
static void fork_hook_signalfd(void)
{
LOG_DEBUG("Fork hook");
if (!registed || sgfd < 0)
return;
LOG_INFO("reset signalfd at fork");
sigset_t sigset0; // can NOT use photon::clear_signal_mask(),
sigemptyset(&sigset0); // as memory may be shared with parent, when vfork()ed
sigprocmask(SIG_SETMASK, &sigset0, nullptr);
// reset sgfd
memset(sighandlers, 0, sizeof(sighandlers));
#ifdef __APPLE__
sgfd = kqueue(); // kqueue fd is not inherited from the parent process
#else
close(sgfd);
sigfillset(&sigset);
sgfd = signalfd(-1, &sigset, SFD_CLOEXEC | SFD_NONBLOCK);
#endif
if (sgfd == -1)
LOG_ERROR("failed to create signalfd() or kqueue()");
// interrupt event loop by ETIMEDOUT to replace sgfd
if (eloop)
thread_interrupt(eloop->loop_thread(), ETIMEDOUT);
}

int sync_signal_init()
Expand Down Expand Up @@ -264,7 +281,10 @@ namespace photon
eloop->async_run();
LOG_INFO("signalfd initialized");
thread_yield(); // give a chance let eloop to execute do_wait
pthread_atfork(nullptr, nullptr, &fork_hook_child);
if (!registed) {
pthread_atfork(nullptr, nullptr, &fork_hook_signalfd);
registed = true;
}
return clear_signal_mask();
}

Expand Down
7 changes: 7 additions & 0 deletions io/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ add_executable(signalfdboom signalfdboom.cpp)
target_link_libraries(signalfdboom PRIVATE photon_shared)
add_test(NAME signalfdboom COMMAND $<TARGET_FILE:signalfdboom>)

add_executable(test-fork test-fork.cpp)
target_link_libraries(test-fork PRIVATE photon_shared)
if (PHOTON_ENABLE_URING)
target_compile_definitions(test-fork PRIVATE PHOTON_URING=on)
endif()
add_test(NAME test-fork COMMAND $<TARGET_FILE:test-fork>)

if (NOT APPLE)
add_executable(test-syncio test-syncio.cpp)
target_link_libraries(test-syncio PRIVATE photon_shared)
Expand Down
Loading

0 comments on commit 3090204

Please sign in to comment.