diff --git a/io/epoll.cpp b/io/epoll.cpp index 16f18c66..d6656185 100644 --- a/io/epoll.cpp +++ b/io/epoll.cpp @@ -271,6 +271,21 @@ class EventEngineEPoll : public MasterEventEngine, public CascadingEventEngine { if (ret < 0) LOG_ERROR_RETURN(0, -1, "failed to add event interest"); // if timeout is just simple 0, wait for a tiny little moment // so that events can be collect. + if (!timeout) { + ret = -1; + wait_for_events( + 0, + [&](void* data) __INLINE__ { + if ((thread*)data == CURRENT) { + ret = 0; + } else { + thread_interrupt((thread*)data, EOK); + } + }, + [&]() __INLINE__ { return true; }); + if (ret < 0) errno = ETIMEDOUT; + return ret; + } ret = thread_usleep(timeout ? timeout : 10); ERRNO err; if (ret == -1 && err.no == EOK) { diff --git a/io/kqueue.cpp b/io/kqueue.cpp index 1272b3c3..8b63f066 100644 --- a/io/kqueue.cpp +++ b/io/kqueue.cpp @@ -77,21 +77,8 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine { return 0; } - int wait_for_fd(int fd, uint32_t interests, uint64_t timeout) override { - short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE; - enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, CURRENT); - int ret = thread_usleep(timeout ? timeout : 10); - ERRNO err; - if (ret == -1 && err.no == EOK) { - return 0; // event arrived - } - - // enqueue(fd, ev, EV_DELETE, 0, CURRENT, true); // immediately - errno = (ret == 0) ? ETIMEDOUT : err.no; - return -1; - } - - ssize_t wait_and_fire_events(uint64_t timeout = -1) override { + template + ssize_t do_wait_and_fire_events(uint64_t timeout, EVCB&& event_callback) { ssize_t nev = 0; struct timespec tm; tm.tv_sec = timeout / 1000 / 1000; @@ -106,8 +93,7 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine { nev += ret; for (int i = 0; i < ret; ++i) { if (_events[i].filter == EVFILT_USER) continue; - auto th = (thread*) _events[i].udata; - if (th) thread_interrupt(th, EOK); + event_callback(_events[i].udata); } if (ret == (int) LEN(_events)) { // there may be more events tm.tv_sec = tm.tv_nsec = 0; @@ -116,6 +102,40 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine { return nev; } + int wait_for_fd(int fd, uint32_t interests, uint64_t timeout) override { + short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE; + enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, CURRENT); + if (!timeout) { + int ret = -1; + do_wait_and_fire_events(0, [&](void* data) { + auto th = (thread*) _events[i].udata; + if (th == CURRENT) + ret = 0; + else if (th) + thread_interrupt(th, EOK); + + }); + if (ret < 0) errno = ETIMEDOUT; + return ret; + } + int ret = thread_usleep(timeout); + ERRNO err; + if (ret == -1 && err.no == EOK) { + return 0; // event arrived + } + + // enqueue(fd, ev, EV_DELETE, 0, CURRENT, true); // immediately + errno = (ret == 0) ? ETIMEDOUT : err.no; + return -1; + } + + ssize_t wait_and_fire_events(uint64_t timeout = -1) override { + return do_wait_and_fire_events(timeout, [&]{ + auto th = (thread*)_events[i].udata; + if (th) thread_interrupt(th, EOK); + }; + } + int cancel_wait() override { enqueue(_kq, EVFILT_USER, EV_ONESHOT, NOTE_TRIGGER, nullptr, true); return 0;