Skip to content

Commit

Permalink
karm-sys: Added epoll async I/O backend.
Browse files Browse the repository at this point in the history
  • Loading branch information
sleepy-monax committed Jan 24, 2025
1 parent 5d0f65e commit a3da0fc
Show file tree
Hide file tree
Showing 24 changed files with 333 additions and 153 deletions.
2 changes: 1 addition & 1 deletion meta/plugins/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
git+https://github.com/cute-engineering/cutekit.git@0.7.13
git+https://github.com/cute-engineering/cutekit.git@0.7.14
Markdown~=3.7
123 changes: 123 additions & 0 deletions src/impls/impl-posix/epoll/async.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#include <impl-posix/fd.h>
#include <impl-posix/utils.h>
#include <karm-async/promise.h>
#include <karm-base/map.h>
#include <karm-sys/_embed.h>
#include <karm-sys/async.h>
#include <karm-sys/time.h>
#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <unistd.h>

namespace Karm::Sys::_Embed {

struct EpollSched : public Sys::Sched {
int _epollFd;
usize _id = 0;
Map<usize, Async::Promise<>> _promises;

EpollSched(int epollFd)
: _epollFd(epollFd) {}

~EpollSched() { close(_epollFd); }

Async::Task<> waitFor(epoll_event ev, int fd) {
usize id = _id++;
auto promise = Async::Promise<>();
auto future = promise.future();

ev.data.u64 = id;
if (::epoll_ctl(_epollFd, EPOLL_CTL_ADD, fd, &ev) < 0)
panic("epoll_ctl");

_promises.put(id, std::move(promise));
return Async::makeTask(future);
}

Async::Task<usize> readAsync(Rc<Fd> fd, MutBytes buf) override {
co_trya$(waitFor({.events = EPOLLIN | EPOLLET, .data = {}}, fd->handle().value()));
co_return Ok(co_try$(fd->read(buf)));
}

Async::Task<usize> writeAsync(Rc<Fd> fd, Bytes buf) override {
co_trya$(waitFor({.events = EPOLLOUT | EPOLLET, .data = {}}, fd->handle().value()));
co_return Ok(co_try$(fd->write(buf)));
}

Async::Task<usize> flushAsync(Rc<Fd> fd) override {
co_trya$(waitFor({.events = EPOLLOUT | EPOLLET, .data = {}}, fd->handle().value()));
co_return Ok(co_try$(fd->flush()));
}

Async::Task<_Accepted> acceptAsync(Rc<Fd> fd) override {
co_trya$(waitFor({.events = EPOLLIN | EPOLLET, .data = {}}, fd->handle().value()));
co_return Ok(co_try$(fd->accept()));
}

Async::Task<_Sent> sendAsync(Rc<Fd> fd, Bytes buf, Slice<Handle> handles, SocketAddr addr) override {
co_trya$(waitFor({.events = EPOLLOUT | EPOLLET, .data = {}}, fd->handle().value()));
co_return Ok(co_try$(fd->send(buf, handles, addr)));
}

Async::Task<_Received> recvAsync(Rc<Fd> fd, MutBytes buf, MutSlice<Handle> hnds) override {
co_trya$(waitFor({.events = EPOLLIN | EPOLLET, .data = {}}, fd->handle().value()));
co_return Ok(co_try$(fd->recv(buf, hnds)));
}

Async::Task<> sleepAsync(Instant until) override {
Instant instant = Sys::instant();
Duration delta = Duration::zero();
if (instant < until)
delta = until - instant;

int timeFd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
if (timeFd < 0)
co_return Posix::fromLastErrno();
Defer defer{[&] {
close(timeFd);
}};

itimerspec spec{};
spec.it_value = Posix::toTimespec(delta);
if (timerfd_settime(timeFd, 0, &spec, nullptr) < 0)
co_return Posix::fromLastErrno();

co_trya$(waitFor({.events = EPOLLIN, .data = {}}, timeFd));

co_return Ok();
}

Res<> wait(Instant until) override {
epoll_event ev;
auto instant = Sys::instant();
Duration delta = Duration::zero();
if (instant < until)
delta = until - instant;
int timeout = until.isEndOfTime() ? -1 : delta.toMSecs();

int n = ::epoll_wait(_epollFd, &ev, 1, timeout);

if (n < 0)
return Posix::fromLastErrno();

if (n == 0)
return Ok();

usize id = ev.data.u64;
auto promise = _promises.take(id);
promise.resolve(Ok());
return Ok();
}
};

Sched& globalSched() {
static EpollSched sched = [] {
int fd = ::epoll_create1(0);
if (fd < 0)
panic("epoll_create1");
return EpollSched(fd);
}();
return sched;
}

} // namespace Karm::Sys::_Embed
19 changes: 19 additions & 0 deletions src/impls/impl-posix/epoll/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"$schema": "https://schemas.cute.engineering/stable/cutekit.manifest.component.v1",
"id": "impl-posix.epoll",
"type": "lib",
"props": {
"cpp-excluded": true
},
"enableIf": {
"sys": [
"linux"
],
"async": [
"epoll"
]
},
"provides": [
"karm-sys-async-impl"
]
}
10 changes: 5 additions & 5 deletions src/impls/impl-posix/kevent/async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ struct DarwinSched :
close(_kqueue);
}

struct timespec _computeTimeout(TimeStamp until) {
TimeStamp now = Sys::now();
TimeSpan delta = TimeSpan::zero();
struct timespec _computeTimeout(Instant until) {
Instant now = Sys::now();
Duration delta = Duration::zero();
if (now < until)
delta = until - now;
return Posix::toTimespec(delta);
Expand Down Expand Up @@ -138,7 +138,7 @@ struct DarwinSched :
co_return Ok(co_try$(fd->recv(buf, hnds)));
}

Async::Task<> sleepAsync(TimeStamp until) override {
Async::Task<> sleepAsync(Instant until) override {
struct timespec ts = _computeTimeout(until);

co_trya$(waitFor({
Expand All @@ -153,7 +153,7 @@ struct DarwinSched :
co_return Ok();
}

Res<> wait(TimeStamp until) override {
Res<> wait(Instant until) override {
struct kevent64_s ev;
struct timespec ts = _computeTimeout(until);

Expand Down
18 changes: 12 additions & 6 deletions src/impls/impl-posix/sys.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,18 +301,24 @@ Res<Rc<Fd>> listenIpc(Mime::Url url) {

// MARK: Time ------------------------------------------------------------------

TimeSpan fromTimeSpec(struct timespec const& ts) {
Duration fromTimeSpec(struct timespec const& ts) {
auto usecs = (u64)ts.tv_sec * 1000000 + (u64)ts.tv_nsec / 1000;
return TimeSpan::fromUSecs(usecs);
return Duration::fromUSecs(usecs);
}

TimeStamp now() {
SystemTime now() {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return TimeStamp::epoch() + fromTimeSpec(ts);
return SystemTime::epoch() + fromTimeSpec(ts);
}

TimeSpan uptime() {
Instant instant() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return Instant::epoch() + fromTimeSpec(ts);
}

Duration uptime() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return fromTimeSpec(ts);
Expand Down Expand Up @@ -422,7 +428,7 @@ Res<> populate(Vec<UserInfo>& infos) {

// MARK: Process Managment -----------------------------------------------------

Res<> sleep(TimeSpan span) {
Res<> sleep(Duration span) {
struct timespec ts;
ts.tv_sec = span.toSecs();
ts.tv_nsec = (span.toUSecs() % 1000000) * 1000;
Expand Down
18 changes: 9 additions & 9 deletions src/impls/impl-posix/uring/async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

namespace Karm::Sys::_Embed {

struct __kernel_timespec toKernelTimespec(TimeStamp ts) {
struct __kernel_timespec toKernelTimespec(Instant ts) {
struct __kernel_timespec kts;
if (ts.isEndOfTime()) {
kts.tv_sec = LONG_MAX;
Expand All @@ -25,7 +25,7 @@ struct __kernel_timespec toKernelTimespec(TimeStamp ts) {
return kts;
}

struct __kernel_timespec toKernelTimespec(TimeSpan ts) {
struct __kernel_timespec toKernelTimespec(Duration ts) {
struct __kernel_timespec kts;
if (ts.isInfinite()) {
kts.tv_sec = LONG_MAX;
Expand Down Expand Up @@ -282,19 +282,19 @@ struct UringSched : public Sys::Sched {
return Async::makeTask(job->future());
}

Async::Task<> sleepAsync(TimeStamp until) override {
Async::Task<> sleepAsync(Instant until) override {
struct Job : public _Job {
TimeStamp _until;
Instant _until;
Async::Promise<> _promise;

struct __kernel_timespec _ts{};

Job(TimeStamp until)
Job(Instant until)
: _until(until) {}

void submit(io_uring_sqe* sqe) override {
_ts = toKernelTimespec(_until);
io_uring_prep_timeout(sqe, &_ts, 0, IORING_TIMEOUT_ABS | IORING_TIMEOUT_REALTIME);
io_uring_prep_timeout(sqe, &_ts, 0, IORING_TIMEOUT_ABS);
}

void complete(io_uring_cqe* cqe) override {
Expand All @@ -314,12 +314,12 @@ struct UringSched : public Sys::Sched {
return Async::makeTask(job->future());
}

Res<> wait(TimeStamp until) override {
Res<> wait(Instant until) override {
// HACK: io_uring_wait_cqes doesn't support absolute timeout
// so we have to do it ourselves
TimeStamp now = Sys::now();
Instant now = Sys::instant();

TimeSpan delta = TimeSpan::zero();
Duration delta = Duration::zero();
if (now < until)
delta = until - now;

Expand Down
4 changes: 4 additions & 0 deletions src/impls/impl-posix/uring/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
"enableIf": {
"sys": [
"linux"
],
"async": [
null,
"uring"
]
},
"requires": [
Expand Down
10 changes: 5 additions & 5 deletions src/impls/impl-posix/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,13 @@ Sys::Stat fromStat(struct stat const& buf) {
type = Sys::Type::DIR;
stat.type = type;
stat.size = (usize)buf.st_size;
stat.accessTime = TimeStamp::epoch() + TimeSpan::fromSecs(buf.st_atime);
stat.modifyTime = TimeStamp::epoch() + TimeSpan::fromSecs(buf.st_mtime);
stat.changeTime = TimeStamp::epoch() + TimeSpan::fromSecs(buf.st_ctime);
stat.accessTime = SystemTime::epoch() + Duration::fromSecs(buf.st_atime);
stat.modifyTime = SystemTime::epoch() + Duration::fromSecs(buf.st_mtime);
stat.changeTime = SystemTime::epoch() + Duration::fromSecs(buf.st_ctime);
return stat;
}

struct timespec toTimespec(TimeStamp ts) {
struct timespec toTimespec(SystemTime ts) {
struct timespec pts;
if (ts.isEndOfTime()) {
pts.tv_sec = Limits<long>::MAX;
Expand All @@ -238,7 +238,7 @@ struct timespec toTimespec(TimeStamp ts) {
return pts;
}

struct timespec toTimespec(TimeSpan ts) {
struct timespec toTimespec(Duration ts) {
struct timespec pts;
if (ts.isInfinite()) {
pts.tv_sec = Limits<long>::MAX;
Expand Down
4 changes: 2 additions & 2 deletions src/impls/impl-posix/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ Sys::SocketAddr fromSockAddr(struct sockaddr_in sockaddr);

Sys::Stat fromStat(struct stat const& buf);

struct timespec toTimespec(TimeStamp ts);
struct timespec toTimespec(SystemTime ts);

struct timespec toTimespec(TimeSpan ts);
struct timespec toTimespec(Duration ts);

enum struct RepoType {
CUTEKIT,
Expand Down
6 changes: 3 additions & 3 deletions src/impls/impl-sdl/ui.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ struct SdlHost :
g.pop();
}

Res<> wait(TimeStamp ts) override {
Res<> wait(Instant ts) override {
// HACK: Since we don't have a lot of control onto how SDL wait for
// events we can't integrate it properly with our event loop
// To remedi this we will just cap how long we wait, this way
Expand All @@ -524,8 +524,8 @@ struct SdlHost :
// NOTE: A better option would be to have SDL in a separeted thread
// and do the communication over an inter-thread channel but
// but this would require to make the Framework thread safe
auto delay = TimeSpan::fromMSecs((usize)(FRAME_TIME * 1000));
auto cappedWait = min(ts, Sys::now() + delay);
auto delay = Duration::fromMSecs((usize)(FRAME_TIME * 1000));
auto cappedWait = min(ts, Sys::instant() + delay);
try$(Sys::globalSched().wait(cappedWait));

SDL_Event e{};
Expand Down
4 changes: 2 additions & 2 deletions src/impls/impl-wasm/sys.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ struct JSConsole : public Sys::Fd {
}
};

TimeStamp now() {
SystemTime now() {
auto span = embedGetTimeStamp();
return TimeStamp::epoch() + TimeSpan::fromMSecs(span);
return SystemTime::epoch() + Duration::fromMSecs(span);
}

Res<Rc<Sys::Fd>> createIn() {
Expand Down
Loading

0 comments on commit a3da0fc

Please sign in to comment.