Skip to content

Commit

Permalink
Fix/sockpool wait_for_fd 0.8 (#557)
Browse files Browse the repository at this point in the history
* FIX: wait_for_fd in epoll should atleast put into sleep for a tiny moment

* Also fix on kqueue

wait_for_fd reap events when wait_for_fd in 0 timeout

fix kevent immeidate enqueue

KQueue event submitt always use immediate

fix

Fix kqueue response for wait_for_fd with interests=0

Remove unused code

* Since now thread_usleep(0) can also bring errno of interruption, it should be working

* No more needs of  in kqueue

* No need to change epoll
  • Loading branch information
Coldwings authored Sep 18, 2024
1 parent e951a93 commit 812ae91
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 23 deletions.
40 changes: 21 additions & 19 deletions io/kqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,14 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res
close(_kq);
}

__attribute__((noinline))
static void debug_breakpoint() {
}

int enqueue(int fd, short event, uint16_t action, uint32_t event_flags, void* udata, bool immediate = false) {
// if (fd == _kq) debug_breakpoint();
// immediate = true;
// LOG_DEBUG(VALUE(_kq), VALUE(fd), VALUE(event), VALUE(action), VALUE(event_flags), VALUE(udata), VALUE(immediate));
assert(_n < LEN(_events));
auto entry = &_events[_n++];
EV_SET(entry, fd, event, action, event_flags, 0, udata);
if (immediate || _n == LEN(_events)) {
int ret = kevent(_kq, _events, _n, nullptr, 0, nullptr);
struct timespec tm{0, 0};
int ret = kevent(_kq, _events, _n, nullptr, 0, &tm);
if (ret < 0) {
// debug_breakpoint();
LOG_ERRNO_RETURN(0, -1, "failed to submit events with kevent()");
}
_n = 0;
Expand All @@ -95,16 +88,21 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res
}

int wait_for_fd(int fd, uint32_t interests, Timeout timeout) override {
if (unlikely(interests == 0))
return 0;
if (unlikely(interests == 0)) {
errno = ENOSYS;
return -1;
}
short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE;
enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, CURRENT);
int ret = thread_usleep(timeout);
auto current = CURRENT;
int ret = enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, current);
if (ret < 0) return ret;
ret = thread_usleep(timeout);
ERRNO err;
if (ret == -1 && err.no == EOK) {
return 0; // event arrived
}

enqueue(fd, ev, EV_DELETE, 0, current, true);
errno = (ret == 0) ? ETIMEDOUT : err.no;
return -1;
}
Expand All @@ -117,26 +115,30 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res

again:
int ret = kevent(_kq, _events, _n, _events, LEN(_events), &tm);
if (ret < 0)
LOG_ERRNO_RETURN(0, -1, "failed to call kevent()");
if (ret < 0) LOG_ERRNO_RETURN(0, -1, "failed to call kevent()");

_n = 0;
nev += ret;
for (int i = 0; i < ret; ++i) {
if (_events[i].filter == EVFILT_USER) continue;
auto th = (thread*) _events[i].udata;
auto th = (thread*)_events[i].udata;
if (th) thread_interrupt(th, EOK);
}
if (ret == (int) LEN(_events)) { // there may be more events
if (ret == (int)LEN(_events)) { // there may be more events
tm.tv_sec = tm.tv_nsec = 0;
goto again;
}
return nev;
}

int cancel_wait() override {
enqueue(_kq, EVFILT_USER, EV_ONESHOT, NOTE_TRIGGER, nullptr, true);
return 0;
// cannot call `enqueue` directly since it will be called from another vCPU.
// directly use kqueue to submit event, which is safe.
// as same as `enqueue(_kq, EVFILT_USER, EV_ONESHOT, NOTE_TRIGGER, nullptr, true)`
struct kevent entry;
EV_SET(&entry, _kq, EVFILT_USER, EV_ONESHOT, NOTE_TRIGGER, 0, nullptr);
struct timespec tm{0, 0};
return kevent(_kq, _events, _n, nullptr, 0, &tm);
}

// This vector is used to filter invalid add/rm_interest requests which may affect kevent's
Expand Down
9 changes: 5 additions & 4 deletions net/pooled_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,10 @@ class TCPSocketPool : public ForwardSocketClient {
photon::Timer timer;

// all fd < 0 treated as socket not based on fd
// and always alive. Using such socket needs user
// and always reuseable. Using such socket needs user
// to check if connected socket is still usable.
bool stream_alive(int fd) {
// if there still have unread bytes in strema, it should be closed.
bool stream_reusable(int fd) {
return (fd < 0) || (wait_for_fd_readable(fd, 0) != 0);
}

Expand Down Expand Up @@ -196,7 +197,7 @@ class TCPSocketPool : public ForwardSocketClient {
if (!stream) {
stream = m_underlay->connect(remote, local);
if (!stream) return nullptr;
} else if (!stream_alive(stream->get_underlay_fd())) {
} else if (!stream_reusable(stream->get_underlay_fd())) {
delete stream;
goto again;
}
Expand Down Expand Up @@ -231,7 +232,7 @@ class TCPSocketPool : public ForwardSocketClient {
bool release(const EndPoint& ep, ISocketStream* stream) {
auto fd = stream->get_underlay_fd();
ERRNO err;
if (!stream_alive(fd)) return false;
if (!stream_reusable(fd)) return false;
auto node = new StreamListNode(ep, stream, fd, TTL_us);
push_into_pool(node);
errno = err.no;
Expand Down

0 comments on commit 812ae91

Please sign in to comment.