From f275c048ec0f0d639c311c06f6dc54270a0275b8 Mon Sep 17 00:00:00 2001 From: Coldwings Date: Mon, 9 Sep 2024 15:57:11 +0800 Subject: [PATCH 1/5] FIX: wait_for_fd in epoll should atleast put into sleep for a tiny moment --- io/epoll.cpp | 4 +++- net/pooled_socket.cpp | 9 +++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/io/epoll.cpp b/io/epoll.cpp index a45ec09f..794fd066 100644 --- a/io/epoll.cpp +++ b/io/epoll.cpp @@ -301,7 +301,9 @@ ok: entry.interests |= eint; return rm_interest({fd, EVENT_RWE| ONE_SHOT, 0}); // remove fd from epoll int ret = add_interest({fd, interest | ONE_SHOT, CURRENT}); if (ret < 0) LOG_ERROR_RETURN(0, -1, "failed to add event interest"); - ret = thread_usleep(timeout); + // if timeout is just simple 0, wait for a tiny little moment + // so that events can be collect. + ret = thread_usleep(timeout.timeout() ? timeout : Timeout(10)); ERRNO err; if (ret == -1 && err.no == EOK) { return 0; // Event arrived diff --git a/net/pooled_socket.cpp b/net/pooled_socket.cpp index 541c44d9..668798b7 100644 --- a/net/pooled_socket.cpp +++ b/net/pooled_socket.cpp @@ -121,9 +121,10 @@ class TCPSocketPool : public ForwardSocketClient { photon::Timer timer; // all fd < 0 treated as socket not based on fd - // and always alive. Using such socket needs user + // and always reuseable. Using such socket needs user // to check if connected socket is still usable. - bool stream_alive(int fd) { + // if there still have unread bytes in strema, it should be closed. + bool stream_reusable(int fd) { return (fd < 0) || (wait_for_fd_readable(fd, 0) != 0); } @@ -196,7 +197,7 @@ class TCPSocketPool : public ForwardSocketClient { if (!stream) { stream = m_underlay->connect(remote, local); if (!stream) return nullptr; - } else if (!stream_alive(stream->get_underlay_fd())) { + } else if (!stream_reusable(stream->get_underlay_fd())) { delete stream; goto again; } @@ -231,7 +232,7 @@ class TCPSocketPool : public ForwardSocketClient { bool release(const EndPoint& ep, ISocketStream* stream) { auto fd = stream->get_underlay_fd(); ERRNO err; - if (!stream_alive(fd)) return false; + if (!stream_reusable(fd)) return false; auto node = new StreamListNode(ep, stream, fd, TTL_us); push_into_pool(node); errno = err.no; From 7706ac72408d11703d8bdc2291c329a41d103ae7 Mon Sep 17 00:00:00 2001 From: Coldwings Date: Mon, 9 Sep 2024 16:49:16 +0800 Subject: [PATCH 2/5] Also fix on kqueue wait_for_fd reap events when wait_for_fd in 0 timeout fix kevent immeidate enqueue KQueue event submitt always use immediate fix Fix kqueue response for wait_for_fd with interests=0 Remove unused code --- io/epoll.cpp | 23 +++++++++++++++-- io/kqueue.cpp | 69 ++++++++++++++++++++++++++++++++------------------- 2 files changed, 64 insertions(+), 28 deletions(-) diff --git a/io/epoll.cpp b/io/epoll.cpp index 794fd066..ec8a3fa7 100644 --- a/io/epoll.cpp +++ b/io/epoll.cpp @@ -299,11 +299,30 @@ ok: entry.interests |= eint; LOG_ERROR_RETURN(EINVAL, -1, "can not wait for multiple interests"); if (unlikely(interest == 0)) return rm_interest({fd, EVENT_RWE| ONE_SHOT, 0}); // remove fd from epoll - int ret = add_interest({fd, interest | ONE_SHOT, CURRENT}); + thread* current = CURRENT; + int ret = add_interest({fd, interest | ONE_SHOT, current}); if (ret < 0) LOG_ERROR_RETURN(0, -1, "failed to add event interest"); // if timeout is just simple 0, wait for a tiny little moment // so that events can be collect. - ret = thread_usleep(timeout.timeout() ? timeout : Timeout(10)); + if (timeout.expired()) { + ret = -1; + wait_for_events( + 0, + [current, &ret](void* data) __INLINE__ { + if ((thread*)data == current) { + ret = 0; + } else { + thread_interrupt((thread*)data, EOK); + } + }, + [&]() __INLINE__ { return true; }); + if (ret < 0) { + rm_interest({fd, interest, 0}); + errno = ETIMEDOUT; + } + return ret; + } + ret = thread_usleep(timeout); ERRNO err; if (ret == -1 && err.no == EOK) { return 0; // Event arrived diff --git a/io/kqueue.cpp b/io/kqueue.cpp index 616f1372..90b60659 100644 --- a/io/kqueue.cpp +++ b/io/kqueue.cpp @@ -72,21 +72,14 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res close(_kq); } - __attribute__((noinline)) - static void debug_breakpoint() { - } - int enqueue(int fd, short event, uint16_t action, uint32_t event_flags, void* udata, bool immediate = false) { - // if (fd == _kq) debug_breakpoint(); - // immediate = true; - // LOG_DEBUG(VALUE(_kq), VALUE(fd), VALUE(event), VALUE(action), VALUE(event_flags), VALUE(udata), VALUE(immediate)); assert(_n < LEN(_events)); auto entry = &_events[_n++]; EV_SET(entry, fd, event, action, event_flags, 0, udata); if (immediate || _n == LEN(_events)) { - int ret = kevent(_kq, _events, _n, nullptr, 0, nullptr); + struct timespec tm{0, 0}; + int ret = kevent(_kq, _events, _n, nullptr, 0, &tm); if (ret < 0) { - // debug_breakpoint(); LOG_ERRNO_RETURN(0, -1, "failed to submit events with kevent()"); } _n = 0; @@ -94,22 +87,8 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res return 0; } - int wait_for_fd(int fd, uint32_t interests, Timeout timeout) override { - if (unlikely(interests == 0)) - return 0; - short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE; - enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, CURRENT); - int ret = thread_usleep(timeout); - ERRNO err; - if (ret == -1 && err.no == EOK) { - return 0; // event arrived - } - - errno = (ret == 0) ? ETIMEDOUT : err.no; - return -1; - } - - ssize_t wait_and_fire_events(uint64_t timeout) override { + template + ssize_t do_wait_and_fire_events(uint64_t timeout, EVCB&& event_callback) { ssize_t nev = 0; struct timespec tm; tm.tv_sec = timeout / 1000 / 1000; @@ -125,7 +104,7 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res for (int i = 0; i < ret; ++i) { if (_events[i].filter == EVFILT_USER) continue; auto th = (thread*) _events[i].udata; - if (th) thread_interrupt(th, EOK); + if (th) event_callback(th); } if (ret == (int) LEN(_events)) { // there may be more events tm.tv_sec = tm.tv_nsec = 0; @@ -134,6 +113,44 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res return nev; } + int wait_for_fd(int fd, uint32_t interests, Timeout timeout) override { + if (unlikely(interests == 0)) { + errno = ENOSYS; + return -1; + } + short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE; + auto current = CURRENT; + int ret = enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, current); + if (ret < 0) return ret; + if (timeout.expired()) { + ret = -1; + do_wait_and_fire_events(0, [current, &ret](thread* th) { + if (th == current) + ret = 0; + else + thread_interrupt(th, EOK); + }); + if (ret <0) { + enqueue(fd, ev, EV_DELETE, 0, current, true); + errno = ETIMEDOUT; + } + return ret; + } + ret = thread_usleep(timeout); + ERRNO err; + if (ret == -1 && err.no == EOK) { + return 0; // event arrived + } + + enqueue(fd, ev, EV_DELETE, 0, current, true); + errno = (ret == 0) ? ETIMEDOUT : err.no; + return -1; + } + + ssize_t wait_and_fire_events(uint64_t timeout) override { + return do_wait_and_fire_events(timeout, [](thread *th) { thread_interrupt(th, EOK); }); + } + int cancel_wait() override { enqueue(_kq, EVFILT_USER, EV_ONESHOT, NOTE_TRIGGER, nullptr, true); return 0; From f81596457d50c8b16563de758b2a6ef3295bd47b Mon Sep 17 00:00:00 2001 From: Coldwings Date: Wed, 18 Sep 2024 15:14:28 +0800 Subject: [PATCH 3/5] Since now thread_usleep(0) can also bring errno of interruption, it should be working --- io/epoll.cpp | 20 -------------------- io/kqueue.cpp | 23 +++++++---------------- 2 files changed, 7 insertions(+), 36 deletions(-) diff --git a/io/epoll.cpp b/io/epoll.cpp index ec8a3fa7..e3ffdedb 100644 --- a/io/epoll.cpp +++ b/io/epoll.cpp @@ -302,26 +302,6 @@ ok: entry.interests |= eint; thread* current = CURRENT; int ret = add_interest({fd, interest | ONE_SHOT, current}); if (ret < 0) LOG_ERROR_RETURN(0, -1, "failed to add event interest"); - // if timeout is just simple 0, wait for a tiny little moment - // so that events can be collect. - if (timeout.expired()) { - ret = -1; - wait_for_events( - 0, - [current, &ret](void* data) __INLINE__ { - if ((thread*)data == current) { - ret = 0; - } else { - thread_interrupt((thread*)data, EOK); - } - }, - [&]() __INLINE__ { return true; }); - if (ret < 0) { - rm_interest({fd, interest, 0}); - errno = ETIMEDOUT; - } - return ret; - } ret = thread_usleep(timeout); ERRNO err; if (ret == -1 && err.no == EOK) { diff --git a/io/kqueue.cpp b/io/kqueue.cpp index 90b60659..8f103946 100644 --- a/io/kqueue.cpp +++ b/io/kqueue.cpp @@ -122,20 +122,6 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res auto current = CURRENT; int ret = enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, current); if (ret < 0) return ret; - if (timeout.expired()) { - ret = -1; - do_wait_and_fire_events(0, [current, &ret](thread* th) { - if (th == current) - ret = 0; - else - thread_interrupt(th, EOK); - }); - if (ret <0) { - enqueue(fd, ev, EV_DELETE, 0, current, true); - errno = ETIMEDOUT; - } - return ret; - } ret = thread_usleep(timeout); ERRNO err; if (ret == -1 && err.no == EOK) { @@ -152,8 +138,13 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res } int cancel_wait() override { - enqueue(_kq, EVFILT_USER, EV_ONESHOT, NOTE_TRIGGER, nullptr, true); - return 0; + // cannot call `enqueue` directly since it will be called from another vCPU. + // directly use kqueue to submit event, which is safe. + // as same as `enqueue(_kq, EVFILT_USER, EV_ONESHOT, NOTE_TRIGGER, nullptr, true)` + struct kevent entry; + EV_SET(&entry, _kq, EVFILT_USER, EV_ONESHOT, NOTE_TRIGGER, 0, nullptr); + struct timespec tm{0, 0}; + return kevent(_kq, _events, _n, nullptr, 0, &tm); } // This vector is used to filter invalid add/rm_interest requests which may affect kevent's From 547367a0315b79f7c1eae2415aa91dd5c49f4d1c Mon Sep 17 00:00:00 2001 From: Coldwings Date: Wed, 18 Sep 2024 16:37:08 +0800 Subject: [PATCH 4/5] No more needs of in kqueue --- io/kqueue.cpp | 48 +++++++++++++++++++++--------------------------- 1 file changed, 21 insertions(+), 27 deletions(-) diff --git a/io/kqueue.cpp b/io/kqueue.cpp index 8f103946..ff905609 100644 --- a/io/kqueue.cpp +++ b/io/kqueue.cpp @@ -87,32 +87,6 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res return 0; } - template - ssize_t do_wait_and_fire_events(uint64_t timeout, EVCB&& event_callback) { - ssize_t nev = 0; - struct timespec tm; - tm.tv_sec = timeout / 1000 / 1000; - tm.tv_nsec = (timeout % (1000 * 1000)) * 1000; - - again: - int ret = kevent(_kq, _events, _n, _events, LEN(_events), &tm); - if (ret < 0) - LOG_ERRNO_RETURN(0, -1, "failed to call kevent()"); - - _n = 0; - nev += ret; - for (int i = 0; i < ret; ++i) { - if (_events[i].filter == EVFILT_USER) continue; - auto th = (thread*) _events[i].udata; - if (th) event_callback(th); - } - if (ret == (int) LEN(_events)) { // there may be more events - tm.tv_sec = tm.tv_nsec = 0; - goto again; - } - return nev; - } - int wait_for_fd(int fd, uint32_t interests, Timeout timeout) override { if (unlikely(interests == 0)) { errno = ENOSYS; @@ -134,7 +108,27 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res } ssize_t wait_and_fire_events(uint64_t timeout) override { - return do_wait_and_fire_events(timeout, [](thread *th) { thread_interrupt(th, EOK); }); + ssize_t nev = 0; + struct timespec tm; + tm.tv_sec = timeout / 1000 / 1000; + tm.tv_nsec = (timeout % (1000 * 1000)) * 1000; + + again: + int ret = kevent(_kq, _events, _n, _events, LEN(_events), &tm); + if (ret < 0) LOG_ERRNO_RETURN(0, -1, "failed to call kevent()"); + + _n = 0; + nev += ret; + for (int i = 0; i < ret; ++i) { + if (_events[i].filter == EVFILT_USER) continue; + auto th = (thread*)_events[i].udata; + if (th) thread_interrupt(th, EOK); + } + if (ret == (int)LEN(_events)) { // there may be more events + tm.tv_sec = tm.tv_nsec = 0; + goto again; + } + return nev; } int cancel_wait() override { From 564d2b5ed0310f682422d6c8f1d96691a01bcf57 Mon Sep 17 00:00:00 2001 From: Coldwings Date: Wed, 18 Sep 2024 16:47:48 +0800 Subject: [PATCH 5/5] No need to change epoll --- io/epoll.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/io/epoll.cpp b/io/epoll.cpp index e3ffdedb..a45ec09f 100644 --- a/io/epoll.cpp +++ b/io/epoll.cpp @@ -299,8 +299,7 @@ ok: entry.interests |= eint; LOG_ERROR_RETURN(EINVAL, -1, "can not wait for multiple interests"); if (unlikely(interest == 0)) return rm_interest({fd, EVENT_RWE| ONE_SHOT, 0}); // remove fd from epoll - thread* current = CURRENT; - int ret = add_interest({fd, interest | ONE_SHOT, current}); + int ret = add_interest({fd, interest | ONE_SHOT, CURRENT}); if (ret < 0) LOG_ERROR_RETURN(0, -1, "failed to add event interest"); ret = thread_usleep(timeout); ERRNO err;