Skip to content

Commit

Permalink
Support poll in windows (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
resetius authored Nov 23, 2024
1 parent 16c8e4b commit 61f47c1
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 36 deletions.
3 changes: 0 additions & 3 deletions coroio/all.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
#include "poller.hpp"

#include "select.hpp"

#ifndef _WIN32
#include "poll.hpp"
#endif

#ifdef __linux__
#include "epoll.hpp"
Expand Down
51 changes: 43 additions & 8 deletions coroio/poll.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
#ifndef _WIN32
#include "poll.hpp"

namespace {

#if defined(__APPLE__) || defined(__FreeBSD__)
#define POLLRDHUP POLLHUP
int ppoll(struct pollfd* fds, nfds_t nfds, const struct timespec* ts, const sigset_t* /*sigmask*/) {
int timeout = 0;
if (ts) {
Expand All @@ -14,13 +12,44 @@ int ppoll(struct pollfd* fds, nfds_t nfds, const struct timespec* ts, const sigs
// TODO: support sigmask
return poll(fds, nfds, timeout);
}
#elif defined(_WIN32)
int ppoll(struct pollfd* fds, int nfds, const struct timespec* ts, const void* /*sigmask*/) {
int timeout = 0;
if (ts) {
timeout = ts->tv_sec;
timeout += ts->tv_nsec / 1000000;
}
// TODO: support sigmask
int ret = WSAPoll(fds, nfds, timeout);
if (ret < 0) { process_errno(); }
return ret;
}
#endif

} // namespace

namespace NNet {

void TPoll::Poll() {
#ifdef _WIN32
TPoll::TPoll()
: DummySocket_({}, *this)
{
int fd = DummySocket_.Fd();
MaxFd_ = std::max<int>(MaxFd_, fd);
if (static_cast<int>(InEvents_.size()) <= MaxFd_) {
InEvents_.resize(MaxFd_+1, std::make_tuple(THandlePair{}, -1));
}
auto& [_, idx] = InEvents_[fd];
idx = Fds_.size();
Fds_.emplace_back(pollfd{});
pollfd& pev = Fds_[idx];
pev.fd = fd;
pev.events |= POLLIN;
}
#endif

void TPoll::Poll()
{
auto ts = GetTimeout();

if (static_cast<int>(InEvents_.size()) <= MaxFd_) {
Expand All @@ -45,10 +74,12 @@ void TPoll::Poll() {
pev.events |= POLLOUT;
ev.Write = ch.Handle;
}
#ifdef __linux__
if (ch.Type & TEvent::RHUP) {
pev.events |= POLLRDHUP;
ev.Write = ch.Handle;
}
#endif
} else if (idx != -1) {
if (ch.Type & TEvent::READ) {
Fds_[idx].events &= ~POLLIN;
Expand All @@ -58,10 +89,12 @@ void TPoll::Poll() {
Fds_[idx].events &= ~POLLOUT;
ev.Write = {};
}
#ifdef __linux__
if (ch.Type & TEvent::RHUP) {
Fds_[idx].events &= ~POLLRDHUP;
ev.RHup = {};
}
#endif
if (Fds_[idx].events == 0) {
std::swap(Fds_[idx], Fds_.back());
std::get<1>(InEvents_[Fds_[idx].fd]) = idx;
Expand All @@ -83,20 +116,21 @@ void TPoll::Poll() {
for (auto& pev : Fds_) {
auto [ev, _] = InEvents_[pev.fd];
if (pev.revents & POLLIN) {
ReadyEvents_.emplace_back(TEvent{pev.fd, TEvent::READ, ev.Read}); ev.Read = {};
ReadyEvents_.emplace_back(TEvent{(int)pev.fd, TEvent::READ, ev.Read}); ev.Read = {};
}
if (pev.revents & POLLOUT) {
ReadyEvents_.emplace_back(TEvent{pev.fd, TEvent::WRITE, ev.Write}); ev.Write = {};
ReadyEvents_.emplace_back(TEvent{(int)pev.fd, TEvent::WRITE, ev.Write}); ev.Write = {};
}
if (pev.revents & POLLHUP) {
if (ev.Read) {
ReadyEvents_.emplace_back(TEvent{pev.fd, TEvent::READ, ev.Read});
ReadyEvents_.emplace_back(TEvent{(int)pev.fd, TEvent::READ, ev.Read});
}
if (ev.Write) {
ReadyEvents_.emplace_back(TEvent{pev.fd, TEvent::WRITE, ev.Write});
ReadyEvents_.emplace_back(TEvent{(int)pev.fd, TEvent::WRITE, ev.Write});
}
pev.revents = pev.revents & ~POLLHUP;
}
#ifdef __linux__
if (pev.revents & POLLRDHUP) {
if (ev.Read) {
ReadyEvents_.emplace_back(TEvent{pev.fd, TEvent::READ, ev.Read});
Expand All @@ -108,10 +142,11 @@ void TPoll::Poll() {
ReadyEvents_.emplace_back(TEvent{pev.fd, TEvent::RHUP, ev.RHup});
}
}
#endif
}

ProcessTimers();
}

} // namespace NNet
#endif

7 changes: 7 additions & 0 deletions coroio/poll.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,18 @@ class TPoll: public TPollerBase {
using TSocket = NNet::TSocket;
using TFileHandle = NNet::TFileHandle;

#ifdef _WIN32
TPoll();
#endif

void Poll();

private:
std::vector<std::tuple<THandlePair,int>> InEvents_; // event + index in Fds
std::vector<pollfd> Fds_;
#ifdef _WIN32
TSocket DummySocket_; // for timeouts
#endif
};

} // namespace NNet
2 changes: 0 additions & 2 deletions examples/bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,9 @@ int main(int argc, char** argv) {
if (!strcmp(method, "select")) {
run_test<TSelect>(num_pipes, num_writes, num_active);
}
#ifndef _WIN32
else if (!strcmp(method, "poll")) {
run_test<TPoll>(num_pipes, num_writes, num_active);
}
#endif
#ifdef __linux__
else if (!strcmp(method, "epoll")) {
run_test<TEPoll>(num_pipes, num_writes, num_active);
Expand Down
3 changes: 0 additions & 3 deletions examples/echoclient.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#include "coroio/sockutils.hpp"
#include <coroio/all.hpp>

#include <signal.h>
Expand Down Expand Up @@ -76,11 +75,9 @@ int main(int argc, char** argv) {
if (method == "select") {
run<TSelect>(debug, address);
}
#ifndef _WIN32
else if (method == "poll") {
run<TPoll>(debug, address);
}
#endif
#ifdef __linux__
else if (method == "epoll") {
run<TEPoll>(debug, address);
Expand Down
5 changes: 0 additions & 5 deletions examples/echoserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
using NNet::TVoidTask;
using NNet::TAddress;
using NNet::TSelect;

#ifndef _WIN32
using NNet::TPoll;
#endif

#ifdef __linux__
using NNet::TEPoll;
Expand Down Expand Up @@ -90,11 +87,9 @@ int main(int argc, char** argv) {
if (method == "select") {
run<TSelect>(debug, address, buffer_size);
}
#ifndef _WIN32
else if (method == "poll") {
run<TPoll>(debug, address, buffer_size);
}
#endif
#ifdef __linux__
else if (method == "epoll") {
run<TEPoll>(debug, address, buffer_size);
Expand Down
2 changes: 0 additions & 2 deletions examples/resolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,9 @@ int main(int argc, char** argv) {
if (method == "select") {
run<TSelect>(type);
}
#ifndef _WIN32
else if (method == "poll") {
run<TPoll>(type);
}
#endif
#ifdef __linux__
else if (method == "epoll") {
run<TEPoll>(type);
Expand Down
2 changes: 0 additions & 2 deletions examples/sslechoclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,9 @@ int main(int argc, char** argv) {
if (method == "select") {
run<TSelect>(debug, address);
}
#ifndef _WIN32
else if (method == "poll") {
run<TPoll>(debug, address);
}
#endif
#ifdef __linux__
else if (method == "epoll") {
run<TEPoll>(debug, address);
Expand Down
5 changes: 0 additions & 5 deletions examples/sslechoserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
using NNet::TVoidTask;
using NNet::TAddress;
using NNet::TSelect;

#ifndef _WIN32
using NNet::TPoll;
#endif

#ifdef __linux__
using NNet::TEPoll;
Expand Down Expand Up @@ -104,11 +101,9 @@ int main(int argc, char** argv) {
if (method == "select") {
run<TSelect>(debug, address, buffer_size);
}
#ifndef _WIN32
else if (method == "poll") {
run<TPoll>(debug, address, buffer_size);
}
#endif
#ifdef __linux__
else if (method == "epoll") {
run<TEPoll>(debug, address, buffer_size);
Expand Down
6 changes: 0 additions & 6 deletions tests/tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1202,15 +1202,9 @@ int main() {
my_unit_poller(test_futures_any_result),
my_unit_poller(test_futures_any_same_wakeup),
my_unit_poller(test_futures_all),
#ifdef _WIN32
my_unit_test(test_read_write_full_ssl, TSelect),
my_unit_test(test_resolver, TSelect),
my_unit_test(test_resolve_bad_name, TSelect),
#else
my_unit_test2(test_read_write_full_ssl, TSelect, TPoll),
my_unit_test2(test_resolver, TSelect, TPoll),
my_unit_test2(test_resolve_bad_name, TSelect, TPoll),
#endif
#ifdef __linux__
my_unit_test2(test_remote_disconnect, TPoll, TEPoll),
cmocka_unit_test(test_uring_create),
Expand Down

0 comments on commit 61f47c1

Please sign in to comment.