diff --git a/io/kqueue.cpp b/io/kqueue.cpp index 616f1372..ff905609 100644 --- a/io/kqueue.cpp +++ b/io/kqueue.cpp @@ -72,21 +72,14 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res close(_kq); } - __attribute__((noinline)) - static void debug_breakpoint() { - } - int enqueue(int fd, short event, uint16_t action, uint32_t event_flags, void* udata, bool immediate = false) { - // if (fd == _kq) debug_breakpoint(); - // immediate = true; - // LOG_DEBUG(VALUE(_kq), VALUE(fd), VALUE(event), VALUE(action), VALUE(event_flags), VALUE(udata), VALUE(immediate)); assert(_n < LEN(_events)); auto entry = &_events[_n++]; EV_SET(entry, fd, event, action, event_flags, 0, udata); if (immediate || _n == LEN(_events)) { - int ret = kevent(_kq, _events, _n, nullptr, 0, nullptr); + struct timespec tm{0, 0}; + int ret = kevent(_kq, _events, _n, nullptr, 0, &tm); if (ret < 0) { - // debug_breakpoint(); LOG_ERRNO_RETURN(0, -1, "failed to submit events with kevent()"); } _n = 0; @@ -95,16 +88,21 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res } int wait_for_fd(int fd, uint32_t interests, Timeout timeout) override { - if (unlikely(interests == 0)) - return 0; + if (unlikely(interests == 0)) { + errno = ENOSYS; + return -1; + } short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE; - enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, CURRENT); - int ret = thread_usleep(timeout); + auto current = CURRENT; + int ret = enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, current); + if (ret < 0) return ret; + ret = thread_usleep(timeout); ERRNO err; if (ret == -1 && err.no == EOK) { return 0; // event arrived } + enqueue(fd, ev, EV_DELETE, 0, current, true); errno = (ret == 0) ? ETIMEDOUT : err.no; return -1; } @@ -117,17 +115,16 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res again: int ret = kevent(_kq, _events, _n, _events, LEN(_events), &tm); - if (ret < 0) - LOG_ERRNO_RETURN(0, -1, "failed to call kevent()"); + if (ret < 0) LOG_ERRNO_RETURN(0, -1, "failed to call kevent()"); _n = 0; nev += ret; for (int i = 0; i < ret; ++i) { if (_events[i].filter == EVFILT_USER) continue; - auto th = (thread*) _events[i].udata; + auto th = (thread*)_events[i].udata; if (th) thread_interrupt(th, EOK); } - if (ret == (int) LEN(_events)) { // there may be more events + if (ret == (int)LEN(_events)) { // there may be more events tm.tv_sec = tm.tv_nsec = 0; goto again; } @@ -135,8 +132,13 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res } int cancel_wait() override { - enqueue(_kq, EVFILT_USER, EV_ONESHOT, NOTE_TRIGGER, nullptr, true); - return 0; + // cannot call `enqueue` directly since it will be called from another vCPU. + // directly use kqueue to submit event, which is safe. + // as same as `enqueue(_kq, EVFILT_USER, EV_ONESHOT, NOTE_TRIGGER, nullptr, true)` + struct kevent entry; + EV_SET(&entry, _kq, EVFILT_USER, EV_ONESHOT, NOTE_TRIGGER, 0, nullptr); + struct timespec tm{0, 0}; + return kevent(_kq, _events, _n, nullptr, 0, &tm); } // This vector is used to filter invalid add/rm_interest requests which may affect kevent's diff --git a/net/pooled_socket.cpp b/net/pooled_socket.cpp index 541c44d9..668798b7 100644 --- a/net/pooled_socket.cpp +++ b/net/pooled_socket.cpp @@ -121,9 +121,10 @@ class TCPSocketPool : public ForwardSocketClient { photon::Timer timer; // all fd < 0 treated as socket not based on fd - // and always alive. Using such socket needs user + // and always reuseable. Using such socket needs user // to check if connected socket is still usable. - bool stream_alive(int fd) { + // if there still have unread bytes in strema, it should be closed. + bool stream_reusable(int fd) { return (fd < 0) || (wait_for_fd_readable(fd, 0) != 0); } @@ -196,7 +197,7 @@ class TCPSocketPool : public ForwardSocketClient { if (!stream) { stream = m_underlay->connect(remote, local); if (!stream) return nullptr; - } else if (!stream_alive(stream->get_underlay_fd())) { + } else if (!stream_reusable(stream->get_underlay_fd())) { delete stream; goto again; } @@ -231,7 +232,7 @@ class TCPSocketPool : public ForwardSocketClient { bool release(const EndPoint& ep, ISocketStream* stream) { auto fd = stream->get_underlay_fd(); ERRNO err; - if (!stream_alive(fd)) return false; + if (!stream_reusable(fd)) return false; auto node = new StreamListNode(ep, stream, fd, TTL_us); push_into_pool(node); errno = err.no;