Skip to content

Commit

Permalink
io: support reset photon at fork
Browse files Browse the repository at this point in the history
In photon context, event engine based module need reset after
fork, if exec will not be called after fork.
This is implicitly done by pthread_atfork hook.

Signed-off-by: yuchen.cc <yuchen.cc@alibaba-inc.com>
  • Loading branch information
yuchen0cc committed Dec 4, 2023
1 parent f97ac8f commit a24cd48
Show file tree
Hide file tree
Showing 13 changed files with 555 additions and 22 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ file(GLOB PHOTON_SRC
fs/xfile.cpp
fs/httpfs/*.cpp
io/signal.cpp
io/reset_handle.cpp
net/*.cpp
net/http/*.cpp
net/security-context/tls-stream.cpp
Expand Down
40 changes: 33 additions & 7 deletions io/aio-wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ limitations under the License.
#include "fd-events.h"
#include "../common/utility.h"
#include "../common/alog.h"
#include "reset_handle.h"

namespace photon
{
Expand Down Expand Up @@ -186,7 +187,6 @@ namespace photon

static void* libaio_polling(void*)
{
libaio_ctx->running = 1;
DEFER(libaio_ctx->running = 0);
while (libaio_ctx->running == 1)
{
Expand Down Expand Up @@ -342,6 +342,29 @@ namespace photon
return rst;
}

class AioResetHandle : public ResetHandle {
int reset() override {
if (!libaio_ctx)
return 0;
LOG_INFO("reset libaio by reset handle");
close(libaio_ctx->evfd);
libaio_ctx->evfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (libaio_ctx->evfd < 0) {
LOG_ERROR("failed to create eventfd ", ERRNO());
exit(-1);
}
io_destroy(libaio_ctx->aio_ctx);
libaio_ctx->aio_ctx = {0};
int ret = io_setup(IODEPTH, &libaio_ctx->aio_ctx);
if (ret < 0) {
LOG_ERROR("failed to create aio context by io_setup() ", ERRNO(), VALUE(ret));
exit(-1);
}
thread_interrupt(libaio_ctx->polling_thread, ECANCELED);
return 0;
}
};
static thread_local AioResetHandle *reset_handler = nullptr;

int libaio_wrapper_init()
{
Expand All @@ -356,15 +379,19 @@ namespace photon
int ret = io_setup(IODEPTH, &ctx->aio_ctx);
if (ret < 0)
{
LOG_ERROR("failed to create aio context by io_setup() ", ERRNO());
LOG_ERROR("failed to create aio context by io_setup() ", ERRNO(), VALUE(ret));
close(ctx->evfd);
return ret;
}

ctx->polling_thread = thread_create(&libaio_polling, nullptr);
assert(ctx->polling_thread);
libaio_ctx = ctx.release();
thread_yield_to(libaio_ctx->polling_thread);
libaio_ctx->running = 1;
if (reset_handler == nullptr) {
reset_handler = new AioResetHandle();
}
LOG_DEBUG("libaio initialized");
return 0;
}

Expand All @@ -384,10 +411,9 @@ namespace photon
io_destroy(libaio_ctx->aio_ctx);
close(libaio_ctx->evfd);
libaio_ctx->evfd = -1;
delete libaio_ctx;
libaio_ctx = nullptr;
safe_delete(libaio_ctx);
safe_delete(reset_handler);
LOG_DEBUG("libaio finished");
return 0;
}
}


10 changes: 9 additions & 1 deletion io/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ limitations under the License.
#include <photon/thread/thread.h>
#include <photon/io/fd-events.h>
#include "events_map.h"
#include "reset_handle.h"

namespace photon {
#ifndef EPOLLRDHUP
Expand All @@ -48,7 +49,7 @@ struct InFlightEvent {
void* error_data;
};

class EventEngineEPoll : public MasterEventEngine, public CascadingEventEngine {
class EventEngineEPoll : public MasterEventEngine, public CascadingEventEngine, public ResetHandle {
public:
int _evfd = -1;
int _engine_fd = -1;
Expand All @@ -71,6 +72,13 @@ class EventEngineEPoll : public MasterEventEngine, public CascadingEventEngine {
epfd = evfd = -1;
return 0;
}
int reset() override {
if_close_fd(_engine_fd); // close original fd
if_close_fd(_evfd);
_inflight_events.clear(); // reset members
_events_remain = 0;
return init(); // re-init
}
virtual ~EventEngineEPoll() override {
LOG_INFO("Finish event engine: epoll");
if_close_fd(_engine_fd);
Expand Down
7 changes: 6 additions & 1 deletion io/fstack-dpdk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ limitations under the License.
#include "../thread/thread11.h"
#include "../common/alog.h"
#include "../net/basic_socket.h"
#include "reset_handle.h"

#ifndef EVFILT_EXCEPT
#define EVFILT_EXCEPT (-15)
Expand All @@ -37,7 +38,7 @@ namespace photon {
constexpr static EventsMap<EVUnderlay<EVFILT_READ, EVFILT_WRITE, EVFILT_EXCEPT>>
evmap;

class FstackDpdkEngine : public MasterEventEngine, public CascadingEventEngine {
class FstackDpdkEngine : public MasterEventEngine, public CascadingEventEngine, public ResetHandle {
public:
struct InFlightEvent {
uint32_t interests = 0;
Expand Down Expand Up @@ -76,6 +77,10 @@ class FstackDpdkEngine : public MasterEventEngine, public CascadingEventEngine {
return 0;
}

int reset() override {
assert(false);
}

~FstackDpdkEngine() override {
LOG_INFO("Finish f-stack dpdk engine");
// if (_n > 0) LOG_INFO(VALUE(_events[0].ident), VALUE(_events[0].filter), VALUE(_events[0].flags));
Expand Down
14 changes: 13 additions & 1 deletion io/iouring-wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ limitations under the License.
#include <photon/thread/thread11.h>
#include <photon/io/fd-events.h>
#include "events_map.h"
#include "reset_handle.h"

#ifndef _GNU_SOURCE
#define _GNU_SOURCE
Expand All @@ -42,12 +43,22 @@ namespace photon {

constexpr static EventsMap<EVUnderlay<POLLIN | POLLRDHUP, POLLOUT, POLLERR>> evmap;

class iouringEngine : public MasterEventEngine, public CascadingEventEngine {
class iouringEngine : public MasterEventEngine, public CascadingEventEngine, public ResetHandle {
public:
explicit iouringEngine(bool master) : m_master(master) {}

~iouringEngine() {
LOG_INFO("Finish event engine: iouring ", VALUE(m_master));
fini();
}

int reset() override {
fini();
m_event_contexts.clear();
return init();
}

int fini() {
if (m_eventfd >= 0 && !m_master) {
if (io_uring_unregister_eventfd(m_ring) != 0) {
LOG_ERROR("iouring: failed to unregister cascading event fd");
Expand All @@ -61,6 +72,7 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine {
}
delete m_ring;
m_ring = nullptr;
return 0;
}

int init() {
Expand Down
12 changes: 11 additions & 1 deletion io/kqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ limitations under the License.
#include <sys/event.h>
#include <photon/common/alog.h>
#include "events_map.h"
#include "reset_handle.h"

namespace photon {

constexpr static EventsMap<EVUnderlay<EVFILT_READ, EVFILT_WRITE, EVFILT_EXCEPT>>
evmap;

class KQueue : public MasterEventEngine, public CascadingEventEngine {
class KQueue : public MasterEventEngine, public CascadingEventEngine, public ResetHandle {
public:
struct InFlightEvent {
uint32_t interests = 0;
Expand Down Expand Up @@ -55,6 +56,15 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine {
return 0;
}

int reset() override {
LOG_INFO("Reset event engine: kqueue");
_kq = -1; // kqueue fd is not inherited from the parent process
_inflight_events.clear(); // reset members
_n = 0;
_tm = {0, 0};
return init(); // re-init
}

~KQueue() override {
LOG_INFO("Finish event engine: kqueue");
// if (_n > 0) LOG_INFO(VALUE(_events[0].ident), VALUE(_events[0].filter), VALUE(_events[0].flags));
Expand Down
55 changes: 55 additions & 0 deletions io/reset_handle.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
Copyright 2022 The Photon Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#include "reset_handle.h"
#include <photon/common/alog.h>
#include <photon/thread/std-compat.h>
#include <pthread.h>

namespace photon {

static ResetHandle *&ree_list() {
static thread_local ResetHandle *list = nullptr;
return list;
}

ResetHandle::ResetHandle() {
LOG_DEBUG("push ", VALUE(this));
if (ree_list() == nullptr) {
ree_list() = this;
} else {
ree_list()->insert_tail(this);
}
}

ResetHandle::~ResetHandle() {
LOG_DEBUG("erase ", VALUE(this));
auto nx = this->remove_from_list();
if (this == ree_list())
ree_list() = nx;
}

void reset_all_handle() {
if (ree_list() == nullptr)
return;
LOG_INFO("reset all handle called");
for (auto ree : *ree_list()) {
LOG_DEBUG("reset ", VALUE(ree));
ree->reset();
}
}

} // namespace photon
32 changes: 32 additions & 0 deletions io/reset_handle.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
Copyright 2022 The Photon Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#pragma once
#include <photon/thread/list.h>

namespace photon {

class ResetHandle : public intrusive_list_node<ResetHandle> {
public:
ResetHandle();
virtual ~ResetHandle();

virtual int reset() = 0;
};

void reset_all_handle();

} // namespace photon
45 changes: 35 additions & 10 deletions io/signal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ limitations under the License.
#include "fd-events.h"
#include "../common/event-loop.h"
#include "../common/alog.h"
#include "reset_handle.h"

namespace photon
{
Expand Down Expand Up @@ -227,14 +228,35 @@ namespace photon
#endif
}

// should be invoked in child process after forked, to clear signal mask
static void fork_hook_child(void)
{
LOG_DEBUG("Fork hook");
sigset_t sigset0; // can NOT use photon::clear_signal_mask(),
sigemptyset(&sigset0); // as memory may be shared with parent, when vfork()ed
sigprocmask(SIG_SETMASK, &sigset0, nullptr);
}
class SignalResetHandle : public ResetHandle {
int reset() override {
if (sgfd < 0)
return 0;
LOG_INFO("reset signalfd by reset handle");
sigset_t sigset0; // can NOT use photon::clear_signal_mask(),
sigemptyset(&sigset0); // as memory may be shared with parent, when vfork()ed
sigprocmask(SIG_SETMASK, &sigset0, nullptr);
// reset sgfd
memset(sighandlers, 0, sizeof(sighandlers));
#ifdef __APPLE__
sgfd = kqueue(); // kqueue fd is not inherited from the parent process
#else
close(sgfd);
sigfillset(&sigset);
sgfd = signalfd(-1, &sigset, SFD_CLOEXEC | SFD_NONBLOCK);
#endif
if (sgfd == -1) {
LOG_ERROR("failed to create signalfd() or kqueue()");
exit(1);
}
// interrupt event loop by ETIMEDOUT to replace sgfd
if (eloop)
thread_interrupt(eloop->loop_thread(), ETIMEDOUT);

return 0;
}
};
static SignalResetHandle *reset_handler = nullptr;

int sync_signal_init()
{
Expand Down Expand Up @@ -262,9 +284,11 @@ namespace photon
LOG_ERROR_RETURN(EFAULT, -1, "failed to thread_create() for signal handling");
}
eloop->async_run();
LOG_INFO("signalfd initialized");
thread_yield(); // give a chance let eloop to execute do_wait
pthread_atfork(nullptr, nullptr, &fork_hook_child);
if (reset_handler == nullptr) {
reset_handler = new SignalResetHandle();
}
LOG_INFO("signalfd initialized");
return clear_signal_mask();
}

Expand Down Expand Up @@ -298,6 +322,7 @@ namespace photon
}
}
#endif
safe_delete(reset_handler);
LOG_INFO("signalfd finished");
return clear_signal_mask();
}
Expand Down
7 changes: 7 additions & 0 deletions io/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ add_executable(signalfdboom signalfdboom.cpp)
target_link_libraries(signalfdboom PRIVATE photon_shared)
add_test(NAME signalfdboom COMMAND $<TARGET_FILE:signalfdboom>)

add_executable(test-fork test-fork.cpp)
target_link_libraries(test-fork PRIVATE photon_shared)
if (PHOTON_ENABLE_URING)
target_compile_definitions(test-fork PRIVATE PHOTON_URING=on)
endif()
add_test(NAME test-fork COMMAND $<TARGET_FILE:test-fork>)

if (NOT APPLE)
add_executable(test-syncio test-syncio.cpp)
target_link_libraries(test-syncio PRIVATE photon_shared)
Expand Down
Loading

0 comments on commit a24cd48

Please sign in to comment.