diff --git a/common/expirecontainer.cpp b/common/expirecontainer.cpp index 6bf68df9..4e9e3a0b 100644 --- a/common/expirecontainer.cpp +++ b/common/expirecontainer.cpp @@ -15,27 +15,24 @@ limitations under the License. */ #include "expirecontainer.h" - #include -ExpireContainerBase::ExpireContainerBase(uint64_t expiration, +ExpireContainerBase::ExpireContainerBase(uint64_t lifespan, uint64_t timer_cycle) - : _expiration(expiration), + : _lifespan(lifespan), _timer(std::max(static_cast(1000), timer_cycle), {this, &ExpireContainerBase::expire}, true, 8UL * 1024 * 1024) {} -std::pair ExpireContainerBase::insert( - Item* item) { +auto ExpireContainerBase::insert(Item* item) -> std::pair { return _set.emplace(item); } -ExpireContainerBase::iterator ExpireContainerBase::__find_prelock( - const Item& key_item) { +auto ExpireContainerBase::__find_prelock(const Item& key_item) -> iterator { auto it = _set.find((Item*)&key_item); return it; } -ExpireContainerBase::iterator ExpireContainerBase::find(const Item& key_item) { +auto ExpireContainerBase::find(const Item& key_item) -> iterator { SCOPED_LOCK(_lock); return __find_prelock(key_item); } @@ -54,7 +51,7 @@ uint64_t ExpireContainerBase::expire() { ({ SCOPED_LOCK(_lock); _list.split_by_predicate([&](Item* x) { - bool ret = x->_timeout.expire() < photon::now; + bool ret = x->_timeout.expiration() < photon::now; if (ret) _set.erase(x); return ret; }); @@ -77,9 +74,9 @@ bool ExpireContainerBase::keep_alive(const Item& x, bool insert_if_not_exists) { return true; } -ObjectCacheBase::Item* ObjectCacheBase::ref_acquire(const Item& key_item, - Delegate ctor, - uint64_t failure_cooldown) { +auto ObjectCacheBase::ref_acquire(const Item& key_item, + Delegate ctor, + uint64_t failure_cooldown) -> Item* { Base::iterator holder; Item* item = nullptr; expire(); @@ -104,8 +101,8 @@ ObjectCacheBase::Item* ObjectCacheBase::ref_acquire(const Item& key_item, } while (!item); { SCOPED_LOCK(item->_mtx); - if (!item->_obj && (item->_failure <= - photon::sat_sub(photon::now, failure_cooldown))) { + auto ts = photon::sat_sub(photon::now, failure_cooldown); + if (!item->_obj && item->_failure <= ts) { ctor(item); if (!item->_obj) item->_failure = photon::now; } @@ -150,8 +147,7 @@ int ObjectCacheBase::ref_release(ItemPtr item, bool recycle) { } // the argument `key` plays the roles of (type-erased) key -int ObjectCacheBase::release(const ObjectCacheBase::Item& key_item, - bool recycle) { +int ObjectCacheBase::release(const Item& key_item, bool recycle) { auto item = ExpireContainerBase::TypedIterator(Base::find(key_item)); if (item == end()) return -1; return ref_release(*item, recycle); diff --git a/common/expirecontainer.h b/common/expirecontainer.h index c34b01c9..e807a539 100644 --- a/common/expirecontainer.h +++ b/common/expirecontainer.h @@ -41,7 +41,7 @@ class ExpireContainerBase : public Object { Item() : _timeout(0) {} public: - Timeout _timeout; + photon::Timeout _timeout; virtual ~Item() {} virtual size_t key_hash() const = 0; virtual bool key_equal(const Item* rhs) const = 0; @@ -77,7 +77,7 @@ class ExpireContainerBase : public Object { }; intrusive_list _list; - uint64_t _expiration; + uint64_t _lifespan; photon::Timer _timer; photon::spinlock _lock; // protect _list/_set operations @@ -94,7 +94,7 @@ class ExpireContainerBase : public Object { using Set = std::unordered_set; Set _set; - ExpireContainerBase(uint64_t expiration, uint64_t timer_cycle); + ExpireContainerBase(uint64_t lifespan, uint64_t timer_cycle); ~ExpireContainerBase() { clear(); } using iterator = decltype(_set)::iterator; @@ -116,7 +116,7 @@ class ExpireContainerBase : public Object { void enqueue(Item* item) { _list.pop(item); - item->_timeout.timeout(_expiration); + item->_timeout.timeout(_lifespan); _list.push_back(item); } @@ -124,7 +124,10 @@ class ExpireContainerBase : public Object { void clear(); uint64_t expire(); size_t size() { return _set.size(); } - size_t expiration() { return _expiration; } + size_t lifespan() { return _lifespan; } + + [[deprecated("use lifespan() instead")]] + size_t expiration() { return _lifespan; } }; template diff --git a/common/lockfree_queue.h b/common/lockfree_queue.h index 8bdcaac3..ea288396 100644 --- a/common/lockfree_queue.h +++ b/common/lockfree_queue.h @@ -587,7 +587,7 @@ class RingChannel : public QueueType { idler.fetch_add(1, std::memory_order_acq_rel); DEFER(idler.fetch_sub(1, std::memory_order_acq_rel)); while (!pop(x)) { - if (yield_turn > 0 && photon::now < yield_timeout.expire()) { + if (yield_turn > 0 && photon::now < yield_timeout.expiration()) { yield_turn--; photon::thread_yield(); } else { diff --git a/common/throttle.h b/common/throttle.h index 9e8fd646..7de49114 100644 --- a/common/throttle.h +++ b/common/throttle.h @@ -1,6 +1,7 @@ #pragma once #include +#include namespace photon { class throttle { diff --git a/common/timeout.h b/common/timeout.h index b9f8b49c..d825fd31 100644 --- a/common/timeout.h +++ b/common/timeout.h @@ -16,59 +16,47 @@ limitations under the License. #pragma once #include +#include -namespace photon -{ - extern volatile uint64_t now; -} +namespace photon { -class Timeout -{ -public: - // Timeout() { } - Timeout(uint64_t x) { timeout(x); } - uint64_t timeout(uint64_t x){ return m_expire = sat_add(photon::now, x); } - uint64_t timeout() const { return sat_sub(m_expire, photon::now); } - operator uint64_t() const { return timeout(); } - uint64_t timeout_us() const { return timeout(); } - uint64_t timeout_ms() const { return divide(timeout(), 1000); } - uint64_t timeout_MS() const { return divide(timeout(), 1024); } // fast approximation - uint64_t timeout_s() const { return divide(timeout(), 1000 * 1000); } - uint64_t timeout_S() const { return divide(timeout(), 1024 * 1024); } // fast approximation - uint64_t expire() const { return m_expire; } - uint64_t expire(uint64_t x) { return m_expire = x; } +extern volatile uint64_t now; +class Timeout { protected: - uint64_t m_expire; // time of expiration, in us + uint64_t m_expiration = -1; // time of expiration, in us - // Saturating addition, no upward overflow - __attribute__((always_inline)) static - uint64_t sat_add(uint64_t x, uint64_t y) - { -#if defined(__x86_64__) - register uint64_t z asm ("rax"); - asm("add %2, %1; sbb %0, %0; or %1, %0;" : "=r"(z), "+r"(x) : "r"(y) : "cc"); - return z; -#elif defined(__aarch64__) - return (x + y < x) ? -1UL : x + y; -#endif +public: + Timeout() = default; // never timeout + Timeout(uint64_t x) { m_expiration = x ? sat_add(now, x) : 0; } + uint64_t timeout(uint64_t x) { return m_expiration = sat_add(now, x); } + uint64_t timeout() const { return sat_sub(m_expiration, now); } + operator uint64_t() const { return timeout(); } + bool expired() const { return (m_expiration == 0) || (m_expiration <= now); } + uint64_t timeout_us() const { return timeout(); } + uint64_t timeout_ms() const { return divide(timeout(), 1000); } + uint64_t timeout_MS() const { return divide(timeout(), 1024); } // fast approximation + uint64_t timeout_s() const { return divide(timeout(), 1000 * 1000); } + uint64_t timeout_S() const { return divide(timeout(), 1024 * 1024); } // fast approximation + uint64_t expiration() const { return m_expiration; } + uint64_t expiration(uint64_t x) { return m_expiration = x; } + Timeout& operator = (uint64_t x) { timeout(x); return *this; } + Timeout& operator = (const Timeout& rhs) = default; + bool operator < (const Timeout& rhs) const { + return m_expiration < rhs.m_expiration; } - - // Saturating subtract, no downward overflow - __attribute__((always_inline)) static - uint64_t sat_sub(uint64_t x, uint64_t y) - { -#if defined(__x86_64__) - register uint64_t z asm ("rax"); - asm("xor %0, %0; subq %2, %1; cmovaeq %1, %0;" : "=r"(z), "+r"(x) ,"+r"(y) : : "cc"); - return z; -#elif defined(__aarch64__) - return x > y ? x - y : 0; -#endif + Timeout& timeout_at_most(uint64_t x) { + x = sat_add(now, x); + if (x < m_expiration) + m_expiration = x; + return *this; } - static uint64_t divide(uint64_t x, uint64_t divisor) - { +protected: + operator bool() const = delete; + static uint64_t divide(uint64_t x, uint64_t divisor) { return (x + divisor / 2) / divisor; } }; + +} diff --git a/common/utility.h b/common/utility.h index 3363d102..29e85762 100644 --- a/common/utility.h +++ b/common/utility.h @@ -290,3 +290,23 @@ int version_compare(std::string_view a, std::string_view b, int& result); int kernel_version_compare(std::string_view dst, int& result); void print_stacktrace(); +namespace photon { + +// Saturating addition, no upward overflow +__attribute__((always_inline)) inline +uint64_t sat_add(uint64_t x, uint64_t y) { + uint64_t z, c = __builtin_uaddl_overflow(x, y, (unsigned long*)&z); + return -c | z; +} + +// Saturating subtract, no downward overflow +__attribute__((always_inline)) inline +uint64_t sat_sub(uint64_t x, uint64_t y) { + uint64_t z, c = __builtin_usubl_overflow(x, y, (unsigned long*)&z); + return c ? 0 : z; +} + +} + + + diff --git a/fs/httpfs/httpfs.cpp b/fs/httpfs/httpfs.cpp index 511332d3..16dc380e 100644 --- a/fs/httpfs/httpfs.cpp +++ b/fs/httpfs/httpfs.cpp @@ -145,7 +145,7 @@ class HttpFile : public fs::VirtualReadOnlyFile { net::DummyReaderWriter dummy; ret = curl->GET(get_url().c_str(), &dummy, tmo.timeout()); if (ret < 200) { - if (photon::now >= tmo.expire()) { + if (photon::now >= tmo.expiration()) { // set errno to ENOENT since stat should not ETIMEDOUT LOG_ERROR_RETURN(ENOENT, -1, "Failed to update file stat"); } @@ -214,7 +214,7 @@ class HttpFile : public fs::VirtualReadOnlyFile { curl->set_header_container(&headers); ret = curl->GET(get_url().c_str(), &writer, tmo.timeout()); if (ret < 200) { - if (photon::now > tmo.expire()) { + if (photon::now > tmo.expiration()) { LOG_ERROR_RETURN(ETIMEDOUT, -1, "Failed to perform GET ", VALUE(url), VALUE(offset)); } diff --git a/fs/httpfs/httpfs_v2.cpp b/fs/httpfs/httpfs_v2.cpp index e12f2a8c..a21ae154 100644 --- a/fs/httpfs/httpfs_v2.cpp +++ b/fs/httpfs/httpfs_v2.cpp @@ -147,19 +147,19 @@ class HttpFile_v2 : public fs::VirtualReadOnlyFile { m_stat.st_size = len; return 0; } - void send_read_request(net::http::Client::Operation &op, off_t offset, size_t length, const Timeout &tmo) { - again: + void send_read_request(net::http::Client::Operation &op, off_t offset, size_t length, Timeout tmo) { estring url; url.appends(m_url, "?", m_url_param); op.set_enable_proxy(m_fs->get_client()->has_proxy()); + again: op.req.reset(net::http::Verb::GET, url, op.enable_proxy); op.req.headers.merge(m_common_header); op.req.headers.range(offset, offset + length - 1); op.req.headers.content_length(0); - op.timeout = tmo.timeout(); + op.timeout = tmo; m_fs->get_client()->call(&op); if (op.status_code < 0) { - if (tmo.timeout() == 0) { + if (tmo.expired()) { m_etimeout = true; LOG_ERROR_RETURN(ENOENT, , "http timedout"); } @@ -254,7 +254,7 @@ class HttpFile_v2 : public fs::VirtualReadOnlyFile { } }; -IFile* HttpFs_v2::open(const char* pathname, int flags) { +inline IFile* HttpFs_v2::open(const char* pathname, int flags) { if (!pathname) LOG_ERROR_RETURN(EINVAL, nullptr, "NULL is not allowed"); if (flags != O_RDONLY) return nullptr; diff --git a/io/epoll-ng.cpp b/io/epoll-ng.cpp index 13367fff..b536e647 100644 --- a/io/epoll-ng.cpp +++ b/io/epoll-ng.cpp @@ -106,18 +106,17 @@ class EventEngineEPollNG : public MasterEventEngine, // in such condition, timeout_ms should be at least 1 // or it may call epoll_wait without any idle timeout = (timeout && timeout < 1024) ? 1 : timeout / 1024; + timeout &= 0x7fffffff; // make sure less than INT32_MAX while (epfd > 0) { int ret = epoll_wait(epfd, events, LEN(events), timeout); if (ret < 0) { ERRNO err; if (err.no == EINTR) continue; - usleep(1024L * cool_down_ms); + ::usleep(1024L * cool_down_ms); + if (cool_down_ms > 16) + LOG_ERROR_RETURN(err.no, , "epoll_wait() failed ", err); timeout = sat_sub(timeout, cool_down_ms); - if (cool_down_ms < 16) { - cool_down_ms *= 2; - continue; - } - LOG_ERROR_RETURN(err.no, , "epoll_wait() failed ", err); + cool_down_ms *= 2; } remains += ret; return; @@ -254,9 +253,8 @@ class EventEngineEPollNG : public MasterEventEngine, } } virtual ssize_t wait_for_events(void** data, size_t count, - uint64_t timeout = -1) override { - int ret = get_vcpu()->master_event_engine->wait_for_fd_readable( - engine.epfd, timeout); + Timeout timeout) override { + int ret = ::photon::wait_for_fd_readable(engine.epfd, timeout); if (ret < 0) { return errno == ETIMEDOUT ? 0 : -1; } @@ -273,7 +271,7 @@ class EventEngineEPollNG : public MasterEventEngine, } return ptr - data; } - virtual ssize_t wait_and_fire_events(uint64_t timeout = -1) override { + virtual ssize_t wait_and_fire_events(uint64_t timeout) override { ssize_t n = 0; wait_for_events( timeout, @@ -289,7 +287,7 @@ class EventEngineEPollNG : public MasterEventEngine, } virtual int cancel_wait() override { return eventfd_write(evfd, 1); } - int wait_for_fd(int fd, uint32_t interests, uint64_t timeout) override { + int wait_for_fd(int fd, uint32_t interests, Timeout timeout) override { Event waiter{fd, interests | ONE_SHOT, CURRENT}; Event event{fd, interests | ONE_SHOT, &waiter}; int ret = add_interest(event); diff --git a/io/epoll.cpp b/io/epoll.cpp index 4100bd69..60ca9037 100644 --- a/io/epoll.cpp +++ b/io/epoll.cpp @@ -175,18 +175,17 @@ class EventEngineEPoll : public MasterEventEngine, public CascadingEventEngine, // in such condition, timeout_ms should be at least 1 // or it may call epoll_wait without any idle timeout = (timeout && timeout < 1024) ? 1 : timeout / 1024; + timeout &= 0x7fffffff; // make sure less than INT32_MAX while (_engine_fd > 0) { - int ret = epoll_wait(_engine_fd, _events, LEN(_events), timeout); + int ret = ::epoll_wait(_engine_fd, _events, LEN(_events), timeout); if (ret < 0) { ERRNO err; if (err.no == EINTR) continue; - usleep(1024L * cool_down_ms); + ::usleep(1024L * cool_down_ms); + if (cool_down_ms > 16) + LOG_ERROR_RETURN(err.no, -1, "epoll_wait() failed ", err); timeout = sat_sub(timeout, cool_down_ms); - if (cool_down_ms < 16) { - cool_down_ms *= 2; - continue; - } - LOG_ERROR_RETURN(err.no, -1, "epoll_wait() failed ", err); + cool_down_ms *= 2; } return _events_remain = ret; } @@ -240,9 +239,8 @@ class EventEngineEPoll : public MasterEventEngine, public CascadingEventEngine, } } virtual ssize_t wait_for_events(void** data, size_t count, - uint64_t timeout = -1) override { - int ret = get_vcpu()->master_event_engine->wait_for_fd_readable( - _engine_fd, timeout); + Timeout timeout) override { + int ret = ::photon::wait_for_fd_readable(_engine_fd, timeout); if (ret < 0) { return errno == ETIMEDOUT ? 0 : -1; } @@ -259,7 +257,7 @@ class EventEngineEPoll : public MasterEventEngine, public CascadingEventEngine, } return ptr - data; } - virtual ssize_t wait_and_fire_events(uint64_t timeout = -1) override { + virtual ssize_t wait_and_fire_events(uint64_t timeout) override { ssize_t n = 0; wait_for_events( timeout, @@ -273,7 +271,7 @@ class EventEngineEPoll : public MasterEventEngine, public CascadingEventEngine, } virtual int cancel_wait() override { return eventfd_write(_evfd, 1); } - int wait_for_fd(int fd, uint32_t interests, uint64_t timeout) override { + int wait_for_fd(int fd, uint32_t interests, Timeout timeout) override { Event event{fd, interests | ONE_SHOT, CURRENT}; int ret = add_interest(event); if (ret < 0) LOG_ERROR_RETURN(0, -1, "failed to add event interest"); diff --git a/io/fd-events.h b/io/fd-events.h index 20e6a91a..b06d38b0 100644 --- a/io/fd-events.h +++ b/io/fd-events.h @@ -18,6 +18,7 @@ limitations under the License. #include #include #include +#include namespace photon { @@ -44,17 +45,17 @@ class MasterEventEngine { * @return 0 for success, which means event arrived in time * -1 for failure, could be timeout or interrupted by another thread */ - virtual int wait_for_fd(int fd, uint32_t interests, uint64_t timeout) = 0; + virtual int wait_for_fd(int fd, uint32_t interests, Timeout timeout) = 0; - int wait_for_fd_readable(int fd, uint64_t timeout = -1) { + int wait_for_fd_readable(int fd, Timeout timeout = {}) { return wait_for_fd(fd, EVENT_READ, timeout); } - int wait_for_fd_writable(int fd, uint64_t timeout = -1) { + int wait_for_fd_writable(int fd, Timeout timeout = {}) { return wait_for_fd(fd, EVENT_WRITE, timeout); } - int wait_for_fd_error(int fd, uint64_t timeout = -1) { + int wait_for_fd_error(int fd, Timeout timeout = {}) { return wait_for_fd(fd, EVENT_ERROR, timeout); } @@ -66,20 +67,20 @@ class MasterEventEngine { * @warning Do NOT invoke photon::usleep() or photon::sleep() in this function, because their * implementations also rely on this function. */ - virtual ssize_t wait_and_fire_events(uint64_t timeout = -1) = 0; + virtual ssize_t wait_and_fire_events(uint64_t timeout) = 0; virtual int cancel_wait() = 0; }; -inline int wait_for_fd_readable(int fd, uint64_t timeout = -1) { +inline int wait_for_fd_readable(int fd, Timeout timeout = {}) { return get_vcpu()->master_event_engine->wait_for_fd_readable(fd, timeout); } -inline int wait_for_fd_writable(int fd, uint64_t timeout = -1) { +inline int wait_for_fd_writable(int fd, Timeout timeout = {}) { return get_vcpu()->master_event_engine->wait_for_fd_writable(fd, timeout); } -inline int wait_for_fd_error(int fd, uint64_t timeout = -1) { +inline int wait_for_fd_error(int fd, Timeout timeout = {}) { return get_vcpu()->master_event_engine->wait_for_fd_error(fd, timeout); } @@ -116,7 +117,7 @@ class CascadingEventEngine { * @return -1 for error, positive integer for the number of events, 0 for no events and should run it again * @warning Do NOT block vcpu */ - virtual ssize_t wait_for_events(void** data, size_t count, uint64_t timeout = -1) = 0; + virtual ssize_t wait_for_events(void** data, size_t count, Timeout timeout = {}) = 0; }; template inline diff --git a/io/fstack-dpdk.cpp b/io/fstack-dpdk.cpp index 96d53ee0..f1002acc 100644 --- a/io/fstack-dpdk.cpp +++ b/io/fstack-dpdk.cpp @@ -103,7 +103,7 @@ class FstackDpdkEngine : public MasterEventEngine, public CascadingEventEngine, return 0; } - int wait_for_fd(int fd, uint32_t interests, uint64_t timeout) override { + int wait_for_fd(int fd, uint32_t interests, Timeout timeout) override { short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE; enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, CURRENT); int ret = thread_usleep(timeout); @@ -117,7 +117,7 @@ class FstackDpdkEngine : public MasterEventEngine, public CascadingEventEngine, return -1; } - ssize_t wait_and_fire_events(uint64_t timeout = -1) override { + ssize_t wait_and_fire_events(uint64_t timeout) override { ssize_t nev = 0; struct timespec tm; tm.tv_sec = timeout / 1000 / 1000; @@ -190,8 +190,8 @@ class FstackDpdkEngine : public MasterEventEngine, public CascadingEventEngine, } ssize_t wait_for_events(void** data, - size_t count, uint64_t timeout = -1) override { - int ret = get_vcpu()->master_event_engine->wait_for_fd_readable(_kq, timeout); + size_t count, Timeout timeout) override { + int ret = ::photon::wait_for_fd_readable(_kq, timeout); if (ret < 0) return errno == ETIMEDOUT ? 0 : -1; if (count > LEN(_events)) count = LEN(_events); @@ -244,7 +244,7 @@ int fstack_socket(int domain, int type, int protocol) { // linux_sockaddr is required by f-stack api, and has the same layout to sockaddr static_assert(sizeof(linux_sockaddr) == sizeof(sockaddr)); -int fstack_connect(int sockfd, const struct sockaddr* addr, socklen_t addrlen, uint64_t timeout) { +int fstack_connect(int sockfd, const struct sockaddr* addr, socklen_t addrlen, Timeout timeout) { int err = 0; while (true) { int ret = ff_connect(sockfd, (linux_sockaddr*) addr, addrlen); @@ -279,7 +279,7 @@ int fstack_bind(int sockfd, const struct sockaddr* addr, socklen_t addrlen) { return ff_bind(sockfd, (linux_sockaddr*) addr, addrlen); } -int fstack_accept(int sockfd, struct sockaddr* addr, socklen_t* addrlen, uint64_t timeout) { +int fstack_accept(int sockfd, struct sockaddr* addr, socklen_t* addrlen, Timeout timeout) { return net::doio(LAMBDA(ff_accept(sockfd, (linux_sockaddr*) addr, addrlen)), LAMBDA_TIMEOUT(g_engine->wait_for_fd_readable(sockfd, timeout))); } @@ -292,22 +292,22 @@ int fstack_shutdown(int sockfd, int how) { return ff_shutdown(sockfd, how); } -ssize_t fstack_send(int sockfd, const void* buf, size_t count, int flags, uint64_t timeout) { +ssize_t fstack_send(int sockfd, const void* buf, size_t count, int flags, Timeout timeout) { return net::doio(LAMBDA(ff_send(sockfd, buf, count, flags)), LAMBDA_TIMEOUT(g_engine->wait_for_fd_writable(sockfd, timeout))); } -ssize_t fstack_sendmsg(int sockfd, const struct msghdr* message, int flags, uint64_t timeout) { +ssize_t fstack_sendmsg(int sockfd, const struct msghdr* message, int flags, Timeout timeout) { return net::doio(LAMBDA(ff_sendmsg(sockfd, message, flags)), LAMBDA_TIMEOUT(g_engine->wait_for_fd_writable(sockfd, timeout))); } -ssize_t fstack_recv(int sockfd, void* buf, size_t count, int flags, uint64_t timeout) { +ssize_t fstack_recv(int sockfd, void* buf, size_t count, int flags, Timeout timeout) { return net::doio(LAMBDA(ff_recv(sockfd, buf, count, flags)), LAMBDA_TIMEOUT(g_engine->wait_for_fd_readable(sockfd, timeout))); } -ssize_t fstack_recvmsg(int sockfd, struct msghdr* message, int flags, uint64_t timeout) { +ssize_t fstack_recvmsg(int sockfd, struct msghdr* message, int flags, Timeout timeout) { return net::doio(LAMBDA(ff_recvmsg(sockfd, message, flags)), LAMBDA_TIMEOUT(g_engine->wait_for_fd_readable(sockfd, timeout))); } diff --git a/io/fstack-dpdk.h b/io/fstack-dpdk.h index e0416b97..27fd1f62 100644 --- a/io/fstack-dpdk.h +++ b/io/fstack-dpdk.h @@ -28,25 +28,25 @@ int fstack_dpdk_fini(); int fstack_socket(int domain, int type, int protocol); -int fstack_connect(int sockfd, const struct sockaddr* addr, socklen_t addrlen, uint64_t timeout); +int fstack_connect(int sockfd, const struct sockaddr* addr, socklen_t addrlen, Timeout timeout = {}); int fstack_listen(int sockfd, int backlog); int fstack_bind(int sockfd, const struct sockaddr* addr, socklen_t addrlen); -int fstack_accept(int sockfd, struct sockaddr* addr, socklen_t* addrlen, uint64_t timeout); +int fstack_accept(int sockfd, struct sockaddr* addr, socklen_t* addrlen, Timeout timeout = {}); int fstack_close(int fd); int fstack_shutdown(int sockfd, int how); -ssize_t fstack_send(int sockfd, const void* buf, size_t count, int flags, uint64_t timeout); +ssize_t fstack_send(int sockfd, const void* buf, size_t count, int flags, Timeout timeout = {}); -ssize_t fstack_sendmsg(int sockfd, const struct msghdr* message, int flags, uint64_t timeout); +ssize_t fstack_sendmsg(int sockfd, const struct msghdr* message, int flags, Timeout timeout = {}); -ssize_t fstack_recv(int sockfd, void* buf, size_t count, int flags, uint64_t timeout); +ssize_t fstack_recv(int sockfd, void* buf, size_t count, int flags, Timeout timeout = {}); -ssize_t fstack_recvmsg(int sockfd, struct msghdr* message, int flags, uint64_t timeout); +ssize_t fstack_recvmsg(int sockfd, struct msghdr* message, int flags, Timeout timeout = {}); int fstack_setsockopt(int socket, int level, int option_name, const void* option_value, socklen_t option_len); diff --git a/io/iouring-wrapper.cpp b/io/iouring-wrapper.cpp index ab3aadd1..bc58a8ba 100644 --- a/io/iouring-wrapper.cpp +++ b/io/iouring-wrapper.cpp @@ -172,7 +172,7 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub * be set to ETIMEDOUT. If failed because of external interruption, errno will also be set accordingly. */ template - int32_t async_io(Prep prep, uint64_t timeout, uint32_t ring_flags, Args... args) { + int32_t async_io(Prep prep, Timeout timeout, uint32_t ring_flags, Args... args) { auto* sqe = _get_sqe(); if (sqe == nullptr) return -1; @@ -180,16 +180,17 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub return _async_io(sqe, timeout, ring_flags); } - int32_t _async_io(io_uring_sqe* sqe, uint64_t timeout, uint32_t ring_flags) { + int32_t _async_io(io_uring_sqe* sqe, Timeout timeout, uint32_t ring_flags) { sqe->flags |= (uint8_t) (ring_flags & 0xff); ioCtx io_ctx(false, false); io_uring_sqe_set_data(sqe, &io_ctx); ioCtx timer_ctx(true, false); - __kernel_timespec ts{}; - if (timeout < std::numeric_limits::max()) { + __kernel_timespec ts; + auto usec = timeout.timeout_us(); + if (usec < std::numeric_limits::max()) { sqe->flags |= IOSQE_IO_LINK; - usec_to_timespec(timeout, &ts); + ts = usec_to_timespec(usec); sqe = _get_sqe(); if (sqe == nullptr) return -1; @@ -223,7 +224,7 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub } } - int wait_for_fd(int fd, uint32_t interests, uint64_t timeout) override { + int wait_for_fd(int fd, uint32_t interests, Timeout timeout) override { unsigned poll_mask = evmap.translate_bitwisely(interests); // The io_uring_prep_poll_add's return value is the same as poll(2)'s revents. int ret = async_io(&io_uring_prep_poll_add, timeout, 0, fd, poll_mask); @@ -283,9 +284,9 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub return 0; } - ssize_t wait_for_events(void** data, size_t count, uint64_t timeout = -1) override { + ssize_t wait_for_events(void** data, size_t count, Timeout timeout) override { // Use master engine to wait for self event fd - int ret = get_vcpu()->master_event_engine->wait_for_fd_readable(m_eventfd, timeout); + int ret = ::photon::wait_for_fd_readable(m_eventfd, timeout); if (ret < 0) { return errno == ETIMEDOUT ? 0 : -1; } @@ -329,21 +330,20 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub return num; } - ssize_t wait_and_fire_events(uint64_t timeout = -1) override { + ssize_t wait_and_fire_events(uint64_t timeout) override { // Prepare own timeout - __kernel_timespec ts{}; if (timeout > std::numeric_limits::max()) { timeout = std::numeric_limits::max(); } - usec_to_timespec(timeout, &ts); - io_uring_cqe* cqe = nullptr; - if (m_submit_wait_func(m_ring, &ts, &cqe) != 0) { + auto ts = usec_to_timespec(timeout); + if (m_submit_wait_func(m_ring, &ts) != 0) { return -1; } uint32_t head = 0; unsigned i = 0; + io_uring_cqe* cqe; io_uring_for_each_cqe(m_ring, head, cqe) { i++; auto ctx = (ioCtx*) io_uring_cqe_get_data(cqe); @@ -461,7 +461,7 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub return sqe; } - static int submit_wait_by_timer(io_uring* ring, __kernel_timespec* ts, io_uring_cqe** cqe) { + static int submit_wait_by_timer(io_uring* ring, __kernel_timespec* ts) { io_uring_sqe* sqe = io_uring_get_sqe(ring); if (!sqe) { LOG_ERROR_RETURN(EBUSY, -1, "iouring: submission queue is full"); @@ -477,16 +477,17 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub return 0; } - static int submit_wait_by_api(io_uring* ring, __kernel_timespec* ts, io_uring_cqe** cqe) { + static int submit_wait_by_api(io_uring* ring, __kernel_timespec* ts) { // Batch submit all SQEs - int ret = io_uring_submit_and_wait_timeout(ring, cqe, 1, ts, nullptr); + io_uring_cqe* cqe; + int ret = io_uring_submit_and_wait_timeout(ring, &cqe, 1, ts, nullptr); if (ret < 0 && ret != -ETIME) { LOG_ERRNO_RETURN(0, -1, "iouring: failed to submit io"); } return 0; } - using SubmitWaitFunc = int (*)(io_uring* ring, __kernel_timespec* ts, io_uring_cqe** cqe); + using SubmitWaitFunc = int (*)(io_uring* ring, __kernel_timespec* ts); static SubmitWaitFunc m_submit_wait_func; static void set_submit_wait_function() { @@ -527,11 +528,10 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub } } - static void usec_to_timespec(int64_t usec, __kernel_timespec* ts) { - int64_t usec_rounded_to_sec = usec / 1000000L * 1000000L; - long long nsec = (usec - usec_rounded_to_sec) * 1000L; - ts->tv_sec = usec_rounded_to_sec / 1000000L; - ts->tv_nsec = nsec; + __kernel_timespec usec_to_timespec(int64_t usec) { + int64_t sec = usec / 1000000L; + long long nsec = (usec % 1000000L) * 1000L; + return {sec, nsec}; } static constexpr const uint32_t REQUIRED_FEATURES[] = { @@ -563,67 +563,67 @@ inline size_t do_async_io(Ts...xs) { return mee->async_io(xs...); } -ssize_t iouring_pread(int fd, void* buf, size_t count, off_t offset, uint64_t flags, uint64_t timeout) { +ssize_t iouring_pread(int fd, void* buf, size_t count, off_t offset, uint64_t flags, Timeout timeout) { uint32_t ring_flags = flags >> 32; return do_async_io(&io_uring_prep_read, timeout, ring_flags, fd, buf, count, offset); } -ssize_t iouring_pwrite(int fd, const void* buf, size_t count, off_t offset, uint64_t flags, uint64_t timeout) { +ssize_t iouring_pwrite(int fd, const void* buf, size_t count, off_t offset, uint64_t flags, Timeout timeout) { uint32_t ring_flags = flags >> 32; return do_async_io(&io_uring_prep_write, timeout, ring_flags, fd, buf, count, offset); } -ssize_t iouring_preadv(int fd, const iovec* iov, int iovcnt, off_t offset, uint64_t flags, uint64_t timeout) { +ssize_t iouring_preadv(int fd, const iovec* iov, int iovcnt, off_t offset, uint64_t flags, Timeout timeout) { uint32_t ring_flags = flags >> 32; return do_async_io(&io_uring_prep_readv, timeout, ring_flags, fd, iov, iovcnt, offset); } -ssize_t iouring_pwritev(int fd, const iovec* iov, int iovcnt, off_t offset, uint64_t flags, uint64_t timeout) { +ssize_t iouring_pwritev(int fd, const iovec* iov, int iovcnt, off_t offset, uint64_t flags, Timeout timeout) { uint32_t ring_flags = flags >> 32; return do_async_io(&io_uring_prep_writev, timeout, ring_flags, fd, iov, iovcnt, offset); } -ssize_t iouring_send(int fd, const void* buf, size_t len, uint64_t flags, uint64_t timeout) { +ssize_t iouring_send(int fd, const void* buf, size_t len, uint64_t flags, Timeout timeout) { uint32_t io_flags = flags & 0xffffffff; uint32_t ring_flags = flags >> 32; return do_async_io(&io_uring_prep_send, timeout, ring_flags, fd, buf, len, io_flags); } -ssize_t iouring_send_zc(int fd, const void* buf, size_t len, uint64_t flags, uint64_t timeout) { +ssize_t iouring_send_zc(int fd, const void* buf, size_t len, uint64_t flags, Timeout timeout) { uint32_t io_flags = flags & 0xffffffff; uint32_t ring_flags = flags >> 32; return do_async_io(&io_uring_prep_send_zc, timeout, ring_flags, fd, buf, len, io_flags, 0); } -ssize_t iouring_sendmsg(int fd, const msghdr* msg, uint64_t flags, uint64_t timeout) { +ssize_t iouring_sendmsg(int fd, const msghdr* msg, uint64_t flags, Timeout timeout) { uint32_t io_flags = flags & 0xffffffff; uint32_t ring_flags = flags >> 32; return do_async_io(&io_uring_prep_sendmsg, timeout, ring_flags, fd, msg, io_flags); } -ssize_t iouring_sendmsg_zc(int fd, const msghdr* msg, uint64_t flags, uint64_t timeout) { +ssize_t iouring_sendmsg_zc(int fd, const msghdr* msg, uint64_t flags, Timeout timeout) { uint32_t io_flags = flags & 0xffffffff; uint32_t ring_flags = flags >> 32; return do_async_io(&io_uring_prep_sendmsg_zc, timeout, ring_flags, fd, msg, io_flags); } -ssize_t iouring_recv(int fd, void* buf, size_t len, uint64_t flags, uint64_t timeout) { +ssize_t iouring_recv(int fd, void* buf, size_t len, uint64_t flags, Timeout timeout) { uint32_t io_flags = flags & 0xffffffff; uint32_t ring_flags = flags >> 32; return do_async_io(&io_uring_prep_recv, timeout, ring_flags, fd, buf, len, io_flags); } -ssize_t iouring_recvmsg(int fd, msghdr* msg, uint64_t flags, uint64_t timeout) { +ssize_t iouring_recvmsg(int fd, msghdr* msg, uint64_t flags, Timeout timeout) { uint32_t io_flags = flags & 0xffffffff; uint32_t ring_flags = flags >> 32; return do_async_io(&io_uring_prep_recvmsg, timeout, ring_flags, fd, msg, io_flags); } -int iouring_connect(int fd, const sockaddr* addr, socklen_t addrlen, uint64_t timeout) { +int iouring_connect(int fd, const sockaddr* addr, socklen_t addrlen, Timeout timeout) { return do_async_io(&io_uring_prep_connect, timeout, 0, fd, addr, addrlen); } -int iouring_accept(int fd, sockaddr* addr, socklen_t* addrlen, uint64_t timeout) { +int iouring_accept(int fd, sockaddr* addr, socklen_t* addrlen, Timeout timeout) { return do_async_io(&io_uring_prep_accept, timeout, 0, fd, addr, addrlen, 0); } diff --git a/io/iouring-wrapper.h b/io/iouring-wrapper.h index 433e5641..9b4287b6 100644 --- a/io/iouring-wrapper.h +++ b/io/iouring-wrapper.h @@ -22,34 +22,35 @@ limitations under the License. #include #include #include +#include namespace photon { static const uint64_t IouringFixedFileFlag = 1UL << 32; -ssize_t iouring_pread(int fd, void* buf, size_t count, off_t offset, uint64_t flags = 0, uint64_t timeout = -1); +ssize_t iouring_pread(int fd, void* buf, size_t count, off_t offset, uint64_t flags = 0, Timeout timeout = {}); -ssize_t iouring_pwrite(int fd, const void* buf, size_t count, off_t offset, uint64_t flags = 0, uint64_t timeout = -1); +ssize_t iouring_pwrite(int fd, const void* buf, size_t count, off_t offset, uint64_t flags = 0, Timeout timeout = {}); -ssize_t iouring_preadv(int fd, const iovec* iov, int iovcnt, off_t offset, uint64_t flags = 0, uint64_t timeout = -1); +ssize_t iouring_preadv(int fd, const iovec* iov, int iovcnt, off_t offset, uint64_t flags = 0, Timeout timeout = {}); -ssize_t iouring_pwritev(int fd, const iovec* iov, int iovcnt, off_t offset, uint64_t flags = 0, uint64_t timeout = -1); +ssize_t iouring_pwritev(int fd, const iovec* iov, int iovcnt, off_t offset, uint64_t flags = 0, Timeout timeout = {}); -ssize_t iouring_send(int fd, const void* buf, size_t len, uint64_t flags = 0, uint64_t timeout = -1); +ssize_t iouring_send(int fd, const void* buf, size_t len, uint64_t flags = 0, Timeout timeout = {}); -ssize_t iouring_send_zc(int fd, const void* buf, size_t len, uint64_t flags = 0, uint64_t timeout = -1); +ssize_t iouring_send_zc(int fd, const void* buf, size_t len, uint64_t flags = 0, Timeout timeout = {}); -ssize_t iouring_sendmsg(int fd, const msghdr* msg, uint64_t flags = 0, uint64_t timeout = -1); +ssize_t iouring_sendmsg(int fd, const msghdr* msg, uint64_t flags = 0, Timeout timeout = {}); -ssize_t iouring_sendmsg_zc(int fd, const msghdr* msg, uint64_t flags = 0, uint64_t timeout = -1); +ssize_t iouring_sendmsg_zc(int fd, const msghdr* msg, uint64_t flags = 0, Timeout timeout = {}); -ssize_t iouring_recv(int fd, void* buf, size_t len, uint64_t flags = 0, uint64_t timeout = -1); +ssize_t iouring_recv(int fd, void* buf, size_t len, uint64_t flags = 0, Timeout timeout = {}); -ssize_t iouring_recvmsg(int fd, msghdr* msg, uint64_t flags = 0, uint64_t timeout = -1); +ssize_t iouring_recvmsg(int fd, msghdr* msg, uint64_t flags = 0, Timeout timeout = {}); -int iouring_connect(int fd, const sockaddr* addr, socklen_t addrlen, uint64_t timeout = -1); +int iouring_connect(int fd, const sockaddr* addr, socklen_t addrlen, Timeout timeout = {}); -int iouring_accept(int fd, sockaddr* addr, socklen_t* addrlen, uint64_t timeout = -1); +int iouring_accept(int fd, sockaddr* addr, socklen_t* addrlen, Timeout timeout = {}); int iouring_fsync(int fd); @@ -69,27 +70,27 @@ int iouring_unregister_files(int fd); struct iouring { - static ssize_t pread(int fd, void *buf, size_t count, off_t offset, uint64_t timeout = -1) + static ssize_t pread(int fd, void *buf, size_t count, off_t offset, Timeout timeout = {}) { return iouring_pread(fd, buf, count, offset, 0, timeout); } - static ssize_t preadv(int fd, const struct iovec *iov, int iovcnt, off_t offset, uint64_t timeout = -1) + static ssize_t preadv(int fd, const struct iovec *iov, int iovcnt, off_t offset, Timeout timeout = {}) { return iouring_preadv(fd, iov, iovcnt, offset, 0, timeout); } - static ssize_t pwrite(int fd, const void *buf, size_t count, off_t offset, uint64_t timeout = -1) + static ssize_t pwrite(int fd, const void *buf, size_t count, off_t offset, Timeout timeout = {}) { return iouring_pwrite(fd, buf, count, offset, 0, timeout); } - static ssize_t pwritev(int fd, const struct iovec *iov, int iovcnt, off_t offset, uint64_t timeout = -1) + static ssize_t pwritev(int fd, const struct iovec *iov, int iovcnt, off_t offset, Timeout timeout = {}) { return iouring_pwritev(fd, iov, iovcnt, offset, 0, timeout); } - static int fsync(int fd, uint64_t timeout = -1) + static int fsync(int fd, Timeout timeout = {}) { return iouring_fsync(fd); } - static int fdatasync(int fd, uint64_t timeout = -1) + static int fdatasync(int fd, Timeout timeout = {}) { return iouring_fdatasync(fd); } diff --git a/io/kqueue.cpp b/io/kqueue.cpp index 4e314585..10b1b205 100644 --- a/io/kqueue.cpp +++ b/io/kqueue.cpp @@ -39,7 +39,6 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res struct kevent _events[32]; int _kq = -1; uint32_t _n = 0; // # of events to submit - struct timespec _tm = {0, 0}; // used for poll int init() { if (_kq >= 0) @@ -49,6 +48,7 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res if (_kq < 0) LOG_ERRNO_RETURN(0, -1, "failed to create kqueue()"); + LOG_DEBUG("kqueue_fd = ", _kq); if (enqueue(_kq, EVFILT_USER, EV_ADD | EV_CLEAR, 0, nullptr, true) < 0) { DEFER({ close(_kq); _kq = -1; }); LOG_ERRNO_RETURN(0, -1, "failed to setup self-wakeup EVFILT_USER event by kevent()"); @@ -61,7 +61,6 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res _kq = -1; // kqueue fd is not inherited from the parent process _inflight_events.clear(); // reset members _n = 0; - _tm = {0, 0}; return init(); // re-init } @@ -73,21 +72,29 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res close(_kq); } + __attribute__((noinline)) + static void debug_breakpoint() { + } + int enqueue(int fd, short event, uint16_t action, uint32_t event_flags, void* udata, bool immediate = false) { - // LOG_INFO("enqueue _kq: `, fd: `, event: `, action: `", _kq, fd, event, action); + // if (fd == _kq) debug_breakpoint(); + // immediate = true; + // LOG_DEBUG(VALUE(_kq), VALUE(fd), VALUE(event), VALUE(action), VALUE(event_flags), VALUE(udata), VALUE(immediate)); assert(_n < LEN(_events)); auto entry = &_events[_n++]; EV_SET(entry, fd, event, action, event_flags, 0, udata); if (immediate || _n == LEN(_events)) { int ret = kevent(_kq, _events, _n, nullptr, 0, nullptr); - if (ret < 0) + if (ret < 0) { + // debug_breakpoint(); LOG_ERRNO_RETURN(0, -1, "failed to submit events with kevent()"); + } _n = 0; } return 0; } - int wait_for_fd(int fd, uint32_t interests, uint64_t timeout) override { + int wait_for_fd(int fd, uint32_t interests, Timeout timeout) override { short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE; enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, CURRENT); int ret = thread_usleep(timeout); @@ -96,12 +103,11 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res return 0; // event arrived } - // enqueue(fd, ev, EV_DELETE, 0, CURRENT, true); // immediately errno = (ret == 0) ? ETIMEDOUT : err.no; return -1; } - ssize_t wait_and_fire_events(uint64_t timeout = -1) override { + ssize_t wait_and_fire_events(uint64_t timeout) override { ssize_t nev = 0; struct timespec tm; tm.tv_sec = timeout / 1000 / 1000; @@ -174,11 +180,12 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res } ssize_t wait_for_events(void** data, - size_t count, uint64_t timeout = -1) override { - int ret = get_vcpu()->master_event_engine->wait_for_fd_readable(_kq, timeout); + size_t count, Timeout timeout) override { + int ret = ::photon::wait_for_fd_readable(_kq, timeout); if (ret < 0) return errno == ETIMEDOUT ? 0 : -1; if (count > LEN(_events)) count = LEN(_events); + static const struct timespec _tm = {0, 0}; ret = kevent(_kq, _events, _n, _events, count, &_tm); if (ret < 0) LOG_ERRNO_RETURN(0, -1, "failed to call kevent()"); @@ -193,7 +200,7 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res }; __attribute__((noinline)) -KQueue* new_kqueue_engine() { +static KQueue* new_kqueue_engine() { LOG_INFO("Init event engine: kqueue"); return NewObj()->init(); } diff --git a/net/basic_socket.cpp b/net/basic_socket.cpp index 5b5e8ac3..409d05e0 100644 --- a/net/basic_socket.cpp +++ b/net/basic_socket.cpp @@ -81,7 +81,7 @@ int socket(int domain, int type, int protocol) { #endif } int connect(int fd, const struct sockaddr *addr, socklen_t addrlen, - uint64_t timeout) { + Timeout timeout) { int err = 0; while (true) { int ret = ::connect(fd, addr, addrlen); @@ -110,7 +110,7 @@ int connect(int fd, const struct sockaddr *addr, socklen_t addrlen, } int accept(int fd, struct sockaddr *addr, socklen_t *addrlen, - uint64_t timeout) { + Timeout timeout) { #ifdef __APPLE__ auto ret = (int)doio(LAMBDA(::accept(fd, addr, addrlen)), LAMBDA_TIMEOUT(photon::wait_for_fd_readable(fd, timeout))); @@ -125,11 +125,11 @@ int accept(int fd, struct sockaddr *addr, socklen_t *addrlen, LAMBDA_TIMEOUT(photon::wait_for_fd_readable(fd, timeout))); #endif } -ssize_t read(int fd, void *buf, size_t count, uint64_t timeout) { +ssize_t read(int fd, void *buf, size_t count, Timeout timeout) { return doio(LAMBDA(::read(fd, buf, count)), LAMBDA_TIMEOUT(photon::wait_for_fd_readable(fd, timeout))); } -ssize_t readv(int fd, const struct iovec *iov, int iovcnt, uint64_t timeout) { +ssize_t readv(int fd, const struct iovec *iov, int iovcnt, Timeout timeout) { if (iovcnt <= 0) { errno = EINVAL; return -1; @@ -139,7 +139,7 @@ ssize_t readv(int fd, const struct iovec *iov, int iovcnt, uint64_t timeout) { LAMBDA_TIMEOUT(photon::wait_for_fd_readable(fd, timeout))); } ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count, - uint64_t timeout) { + Timeout timeout) { #ifdef __APPLE__ off_t len = count; ssize_t ret = @@ -152,7 +152,7 @@ ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count, #endif } -ssize_t sendmsg_zerocopy(int fd, iovec* iov, int iovcnt, uint32_t& num_calls, uint64_t timeout) { +ssize_t sendmsg_zerocopy(int fd, iovec* iov, int iovcnt, uint32_t& num_calls, Timeout timeout) { msghdr msg = {}; msg.msg_iov = iov; msg.msg_iovlen = iovcnt; @@ -162,77 +162,77 @@ ssize_t sendmsg_zerocopy(int fd, iovec* iov, int iovcnt, uint32_t& num_calls, ui return ret; } -ssize_t read_n(int fd, void *buf, size_t count, uint64_t timeout) { +ssize_t read_n(int fd, void *buf, size_t count, Timeout timeout) { return doio_n(buf, count, LAMBDA_TIMEOUT(read(fd, buf, count, timeout))); } ssize_t sendfile_n(int out_fd, int in_fd, off_t *offset, size_t count, - uint64_t timeout) { + Timeout timeout) { void* buf_unused = nullptr; return doio_n(buf_unused, count, LAMBDA_TIMEOUT(sendfile(out_fd, in_fd, offset, count, timeout))); } -ssize_t readv_n(int fd, struct iovec *iov, int iovcnt, uint64_t timeout) { +ssize_t readv_n(int fd, struct iovec *iov, int iovcnt, Timeout timeout) { iovector_view v(iov, iovcnt); return doiov_n(v, LAMBDA_TIMEOUT(readv(fd, v.iov, v.iovcnt, timeout))); } -ssize_t zerocopy_n(int fd, iovec* iov, int iovcnt, uint32_t& num_calls, uint64_t timeout) { +ssize_t zerocopy_n(int fd, iovec* iov, int iovcnt, uint32_t& num_calls, Timeout timeout) { iovector_view v(iov, iovcnt); return doiov_n(v, LAMBDA_TIMEOUT(sendmsg_zerocopy(fd, v.iov, v.iovcnt, num_calls, timeout))); } -ssize_t send(int fd, const void *buf, size_t count, int flags, uint64_t timeout) { +ssize_t send(int fd, const void *buf, size_t count, int flags, Timeout timeout) { return doio(LAMBDA(::send(fd, buf, count, flags)), LAMBDA_TIMEOUT(photon::wait_for_fd_writable(fd, timeout))); } -ssize_t sendmsg(int fd, const struct msghdr* msg, int flags, uint64_t timeout) { +ssize_t sendmsg(int fd, const struct msghdr* msg, int flags, Timeout timeout) { return doio(LAMBDA(::sendmsg(fd, msg, flags)), LAMBDA_TIMEOUT(photon::wait_for_fd_writable(fd, timeout))); } -ssize_t recv(int fd, void* buf, size_t count, int flags, uint64_t timeout) { +ssize_t recv(int fd, void* buf, size_t count, int flags, Timeout timeout) { return doio(LAMBDA(::recv(fd, buf, count, flags)), LAMBDA_TIMEOUT(photon::wait_for_fd_readable(fd, timeout))); } -ssize_t recvmsg(int fd, struct msghdr* msg, int flags, uint64_t timeout) { +ssize_t recvmsg(int fd, struct msghdr* msg, int flags, Timeout timeout) { return doio(LAMBDA(::recvmsg(fd, msg, flags)), LAMBDA_TIMEOUT(photon::wait_for_fd_readable(fd, timeout))); } -ssize_t sendv(int fd, const struct iovec *iov, int iovcnt, int flag, uint64_t timeout) { +ssize_t sendv(int fd, const struct iovec *iov, int iovcnt, int flag, Timeout timeout) { msghdr msg = {}; msg.msg_iov = (struct iovec*)iov; msg.msg_iovlen = iovcnt; return doio(LAMBDA(::sendmsg(fd, &msg, flag | MSG_NOSIGNAL)), LAMBDA_TIMEOUT(photon::wait_for_fd_writable(fd, timeout))); } -ssize_t send_n(int fd, const void *buf, size_t count, int flag, uint64_t timeout) { +ssize_t send_n(int fd, const void *buf, size_t count, int flag, Timeout timeout) { return doio_n((void *&)buf, count, LAMBDA_TIMEOUT(send(fd, (const void*)buf, (size_t)count, flag, timeout))); } -ssize_t sendv_n(int fd, struct iovec *iov, int iovcnt, int flag, uint64_t timeout) { +ssize_t sendv_n(int fd, struct iovec *iov, int iovcnt, int flag, Timeout timeout) { iovector_view v(iov, iovcnt); return doiov_n(v, LAMBDA_TIMEOUT(sendv(fd, (struct iovec*)v.iov, (int)v.iovcnt, flag, timeout))); } -ssize_t write(int fd, const void *buf, size_t count, uint64_t timeout) { +ssize_t write(int fd, const void *buf, size_t count, Timeout timeout) { return send(fd, buf, count, MSG_NOSIGNAL, timeout); } -ssize_t writev(int fd, const struct iovec *iov, int iovcnt, uint64_t timeout) { +ssize_t writev(int fd, const struct iovec *iov, int iovcnt, Timeout timeout) { return sendv(fd, iov, iovcnt, 0, timeout); } -ssize_t write_n(int fd, const void *buf, size_t count, uint64_t timeout) { +ssize_t write_n(int fd, const void *buf, size_t count, Timeout timeout) { return send_n(fd, buf, count, 0, timeout); } -ssize_t writev_n(int fd, struct iovec *iov, int iovcnt, uint64_t timeout) { +ssize_t writev_n(int fd, struct iovec *iov, int iovcnt, Timeout timeout) { return sendv_n(fd, iov, iovcnt, 0, timeout); } ssize_t sendfile_fallback(ISocketStream* out_stream, - int in_fd, off_t offset, size_t count, uint64_t timeout) { + int in_fd, off_t offset, size_t count, Timeout timeout) { char buf[64 * 1024]; void* ptr_unused = nullptr; auto func = [&]() -> ssize_t { @@ -352,7 +352,7 @@ static ssize_t recv_errqueue(int fd, uint32_t &ret_counter) { return 0; } -static int64_t read_counter(int fd, uint64_t timeout) { +static int64_t read_counter(int fd, Timeout timeout) { uint32_t counter = 0; auto ret = doio(LAMBDA(recv_errqueue(fd, counter)), LAMBDA_TIMEOUT(photon::wait_for_fd_error(fd, timeout))); @@ -365,7 +365,7 @@ inline bool is_counter_less_than(uint32_t left, uint32_t right) { return (left < right) || (left > right && left > mid && right - left < mid); } -ssize_t zerocopy_confirm(int fd, uint32_t num_calls, uint64_t timeout) { +ssize_t zerocopy_confirm(int fd, uint32_t num_calls, Timeout timeout) { auto func = LAMBDA_TIMEOUT(read_counter(fd, timeout)); uint32_t counter = 0; do { diff --git a/net/basic_socket.h b/net/basic_socket.h index 30f8e7f3..74423862 100644 --- a/net/basic_socket.h +++ b/net/basic_socket.h @@ -28,64 +28,59 @@ namespace net { int socket(int domain, int type, int protocol); int connect(int fd, const struct sockaddr *addr, socklen_t addrlen, - uint64_t timeout = -1); + Timeout timeout = {}); int accept(int fd, struct sockaddr *addr, socklen_t *addrlen, - uint64_t timeout = -1); + Timeout timeout = {}); -ssize_t send(int fd, const void* buf, size_t len, int flags, uint64_t timeout = -1); -ssize_t sendmsg(int fd, const struct msghdr* msg, int flags, uint64_t timeout = -1); -ssize_t recv(int fd, void* buf, size_t count, int flags, uint64_t timeout = -1); -ssize_t recvmsg(int fd, struct msghdr* msg, int flags, uint64_t timeout = -1); +ssize_t send(int fd, const void* buf, size_t len, int flags, Timeout timeout = {}); +ssize_t sendmsg(int fd, const struct msghdr* msg, int flags, Timeout timeout = {}); +ssize_t recv(int fd, void* buf, size_t count, int flags, Timeout timeout = {}); +ssize_t recvmsg(int fd, struct msghdr* msg, int flags, Timeout timeout = {}); -ssize_t read(int fd, void *buf, size_t count, uint64_t timeout = -1); +ssize_t read(int fd, void *buf, size_t count, Timeout timeout = {}); ssize_t readv(int fd, const struct iovec *iov, int iovcnt, - uint64_t timeout = -1); + Timeout timeout = {}); -ssize_t write(int fd, const void *buf, size_t count, uint64_t timeout = -1); +ssize_t write(int fd, const void *buf, size_t count, Timeout timeout = {}); ssize_t writev(int fd, const struct iovec *iov, int iovcnt, - uint64_t timeout = -1); + Timeout timeout = {}); ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count, - uint64_t timeout = -1); + Timeout timeout = {}); -ssize_t read_n(int fd, void *buf, size_t count, uint64_t timeout = -1); +ssize_t read_n(int fd, void *buf, size_t count, Timeout timeout = {}); -ssize_t write_n(int fd, const void *buf, size_t count, uint64_t timeout = -1); +ssize_t write_n(int fd, const void *buf, size_t count, Timeout timeout = {}); -ssize_t readv_n(int fd, struct iovec *iov, int iovcnt, uint64_t timeout = -1); +ssize_t readv_n(int fd, struct iovec *iov, int iovcnt, Timeout timeout = {}); -ssize_t writev_n(int fd, struct iovec *iov, int iovcnt, uint64_t timeout = -1); +ssize_t writev_n(int fd, struct iovec *iov, int iovcnt, Timeout timeout = {}); ssize_t sendfile_n(int out_fd, int in_fd, off_t *offset, size_t count, - uint64_t timeout = -1); + Timeout timeout = {}); class ISocketStream; -ssize_t sendfile_fallback(ISocketStream* out_stream, int in_fd, off_t offset, size_t count, uint64_t timeout = -1); +ssize_t sendfile_fallback(ISocketStream* out_stream, int in_fd, off_t offset, size_t count, Timeout timeout = {}); -ssize_t zerocopy_n(int fd, iovec* iov, int iovcnt, uint32_t& num_calls, uint64_t timeout = -1); +ssize_t zerocopy_n(int fd, iovec* iov, int iovcnt, uint32_t& num_calls, Timeout timeout = {}); -ssize_t zerocopy_confirm(int fd, uint32_t num_calls, uint64_t timeout = -1); +ssize_t zerocopy_confirm(int fd, uint32_t num_calls, Timeout timeout = {}); -ssize_t sendv(int fd, const struct iovec *iov, int iovcnt, int flag, uint64_t timeout =-1); +ssize_t sendv(int fd, const struct iovec *iov, int iovcnt, int flag, Timeout timeout = {}); -ssize_t send_n(int fd, const void *buf, size_t count, int flag, uint64_t timeout =-1); +ssize_t send_n(int fd, const void *buf, size_t count, int flag, Timeout timeout = {}); -ssize_t sendv_n(int fd, struct iovec *iov, int iovcnt, int flag, uint64_t timeout =-1); +ssize_t sendv_n(int fd, struct iovec *iov, int iovcnt, int flag, Timeout timeout = {}); int set_socket_nonblocking(int fd); int set_fd_nonblocking(int fd); #define LAMBDA(expr) [&]() __INLINE__ { return expr; } -#define LAMBDA_TIMEOUT(expr) \ - [&]() __INLINE__ { \ - Timeout __tmo(timeout); \ - DEFER(timeout = __tmo.timeout()); \ - return expr; \ - } +#define LAMBDA_TIMEOUT(expr) LAMBDA(expr) template __FORCE_INLINE__ int doio(IOCB iocb, WAIT waitcb) { diff --git a/net/http/client.cpp b/net/http/client.cpp index 7c9cf547..22b0945f 100644 --- a/net/http/client.cpp +++ b/net/http/client.cpp @@ -66,9 +66,8 @@ class PooledDialer { }; ISocketStream* PooledDialer::dial(std::string_view host, uint16_t port, bool secure, uint64_t timeout) { - LOG_DEBUG("Dial to ` `", host, port); - std::string strhost(host); - auto ipaddr = resolver->resolve(strhost.c_str()); + LOG_DEBUG("Dialing to `:`", host, port); + auto ipaddr = resolver->resolve(host); if (ipaddr.undefined()) { LOG_ERROR_RETURN(ENOENT, nullptr, "DNS resolve failed, name = `", host) } @@ -91,7 +90,7 @@ ISocketStream* PooledDialer::dial(std::string_view host, uint16_t port, bool sec if (ipaddr.undefined()) LOG_DEBUG("No connectable resolve result"); // When failed, remove resolved result from dns cache so that following retries can try // different ips. - resolver->discard_cache(strhost.c_str(), ipaddr); + resolver->discard_cache(host, ipaddr); return nullptr; } diff --git a/net/http/test/client_tls_test.cpp b/net/http/test/client_tls_test.cpp index fce96908..92d16c43 100644 --- a/net/http/test/client_tls_test.cpp +++ b/net/http/test/client_tls_test.cpp @@ -34,6 +34,7 @@ int idiot_handler(void*, net::http::Request &req, net::http::Response &resp, std std::string str; auto r = req.headers.range(); auto cl = r.second - r.first + 1; + LOG_DEBUG("content_range: `-` (`)", r.first, r.second, cl); if (cl > 4096) { LOG_ERROR_RETURN(0, -1, "RetType failed test"); } @@ -54,7 +55,11 @@ TEST(client_tls, basic) { DEFER(delete tcpserver); tcpserver->timeout(1000UL*1000); tcpserver->setsockopt(SOL_SOCKET, SO_REUSEPORT, 1); - tcpserver->bind(19876, net::IPAddr("127.0.0.1")); + auto addr = net::IPAddr::localhost(); + int r = tcpserver->bind(19876, addr); + if (r != 0) + LOG_ERRNO_RETURN(0, , "failed to bind to `:", addr, 19876); + LOG_DEBUG("bind to `:", addr, " : 19876"); tcpserver->listen(); auto server = net::http::new_http_server(); @@ -75,10 +80,11 @@ TEST(client_tls, basic) { char buf[4096]; auto ret = op->resp.read(buf, 4096); EXPECT_EQ(exp_len, ret); - EXPECT_EQ(true, "test" == op->resp.headers["Test_Handle"]); + EXPECT_EQ("test", op->resp.headers["Test_Handle"]); } int main(int argc, char** arg) { + LOG_DEBUG("Begin test"); if (photon::init(photon::INIT_EVENT_DEFAULT, photon::INIT_IO_NONE)) return -1; DEFER(photon::fini()); diff --git a/net/kernel_socket.cpp b/net/kernel_socket.cpp index ada1c05a..614ddcff 100644 --- a/net/kernel_socket.cpp +++ b/net/kernel_socket.cpp @@ -88,26 +88,26 @@ class KernelSocketStream : public SocketStreamBase { close(); } ssize_t read(void* buf, size_t count) override { - uint64_t timeout = m_timeout; + Timeout timeout(m_timeout); auto cb = LAMBDA_TIMEOUT(do_recv(fd, buf, count, 0, timeout)); return net::doio_n(buf, count, cb); } ssize_t readv(const iovec* iov, int iovcnt) override { SmartCloneIOV<8> clone(iov, iovcnt); iovector_view view(clone.ptr, iovcnt); - uint64_t timeout = m_timeout; + Timeout timeout(m_timeout); auto cb = LAMBDA_TIMEOUT(do_recvmsg(fd, tmp_msg_hdr(view), 0, timeout)); return net::doiov_n(view, cb); } ssize_t write(const void* buf, size_t count) override { - uint64_t timeout = m_timeout; + Timeout timeout(m_timeout); auto cb = LAMBDA_TIMEOUT(do_send(fd, buf, count, MSG_NOSIGNAL, timeout)); return net::doio_n((void*&) buf, count, cb); } ssize_t writev(const iovec* iov, int iovcnt) override { SmartCloneIOV<8> clone(iov, iovcnt); iovector_view view(clone.ptr, iovcnt); - uint64_t timeout = m_timeout; + Timeout timeout(m_timeout); auto cb = LAMBDA_TIMEOUT(do_sendmsg(fd, tmp_msg_hdr(view), MSG_NOSIGNAL, timeout)); return net::doiov_n(view, cb); } @@ -389,11 +389,10 @@ class KernelSocketServer : public SocketServerBase { int m_socket_family; bool m_autoremove; bool m_nonblocking; - - Handler m_handler; - photon::thread* workth = nullptr; bool m_block = false; bool waiting = false; + Handler m_handler; + photon::thread* workth = nullptr; int m_listen_fd = -1; virtual KernelSocketStream* create_stream(int fd) { @@ -468,9 +467,10 @@ class ZeroCopySocketStream : public KernelSocketStream { } ssize_t do_sendmsg(int sockfd, const struct msghdr* message, int flags, uint64_t timeout) override { - ssize_t n = photon::net::sendmsg(sockfd, message, flags | ZEROCOPY_FLAG, timeout); + Timeout tmo(timeout); + ssize_t n = photon::net::sendmsg(sockfd, message, flags | ZEROCOPY_FLAG, tmo); m_num_calls++; - auto ret = zerocopy_confirm(sockfd, m_num_calls - 1, timeout); + auto ret = zerocopy_confirm(sockfd, m_num_calls - 1, tmo); if (ret < 0) return ret; return n; diff --git a/net/pooled_socket.cpp b/net/pooled_socket.cpp index 667e40fd..7de8726c 100644 --- a/net/pooled_socket.cpp +++ b/net/pooled_socket.cpp @@ -105,10 +105,10 @@ struct StreamListNode : public intrusive_list_node { EndPoint key; std::unique_ptr stream; int fd; - Timeout expire; + Timeout timeout; - StreamListNode(EndPoint key, ISocketStream* stream, int fd, uint64_t expire) - : key(key), stream(stream), fd(fd), expire(expire) { + StreamListNode(EndPoint key, ISocketStream* stream, int fd, uint64_t TTL_us) + : key(key), stream(stream), fd(fd), timeout(TTL_us) { } }; @@ -117,7 +117,7 @@ class TCPSocketPool : public ForwardSocketClient { CascadingEventEngine* ev; photon::thread* collector; std::unordered_map> fdmap; - uint64_t expiration; + uint64_t TTL_us; photon::Timer timer; // all fd < 0 treated as socket not based on fd @@ -162,12 +162,12 @@ class TCPSocketPool : public ForwardSocketClient { } public: - TCPSocketPool(ISocketClient* client, uint64_t expiration, + TCPSocketPool(ISocketClient* client, uint64_t TTL_us, bool client_ownership = false) : ForwardSocketClient(client, client_ownership), ev(photon::new_default_cascading_engine()), - expiration(expiration), - timer(expiration, {this, &TCPSocketPool::evict}) { + TTL_us(TTL_us), + timer(TTL_us, {this, &TCPSocketPool::evict}) { collector = (photon::thread*)photon::thread_enable_join( photon::thread_create11(&TCPSocketPool::collect, this)); } @@ -205,18 +205,18 @@ class TCPSocketPool : public ForwardSocketClient { uint64_t evict() { // remove empty entry in fdmap intrusive_list freelist; - uint64_t near_expire = expiration; + uint64_t near_expire = TTL_us + now; for (auto it = fdmap.begin(); it != fdmap.end();) { auto& list = it->second; - while (!list.empty() && - list.front()->expire.expire() < photon::now) { + uint64_t exp; + while (!list.empty() && now >= + (exp = list.front()->timeout.expiration())) { freelist.push_back(list.pop_front()); } - if (it->second.empty()) { + if (list.empty()) { it = fdmap.erase(it); } else { - near_expire = - std::min(near_expire, it->second.front()->expire.timeout()); + near_expire = std::min(near_expire, exp); it++; } } @@ -224,13 +224,14 @@ class TCPSocketPool : public ForwardSocketClient { rm_watch(node); } freelist.delete_all(); - return near_expire; + assert(near_expire > now); + return sat_sub(near_expire, now); } bool release(EndPoint ep, ISocketStream* stream) { auto fd = stream->get_underlay_fd(); if (!stream_alive(fd)) return false; - auto node = new StreamListNode(ep, stream, fd, expiration); + auto node = new StreamListNode(ep, stream, fd, TTL_us); push_into_pool(node); return true; } @@ -256,8 +257,8 @@ PooledTCPSocketStream::~PooledTCPSocketStream() { } } -extern "C" ISocketClient* new_tcp_socket_pool(ISocketClient* client, uint64_t expire, bool client_ownership) { - return new TCPSocketPool(client, expire, client_ownership); +extern "C" ISocketClient* new_tcp_socket_pool(ISocketClient* client, uint64_t TTL_us, bool client_ownership) { + return new TCPSocketPool(client, TTL_us, client_ownership); } } diff --git a/net/security-context/test/test.cpp b/net/security-context/test/test.cpp index b9413e29..a1e72397 100644 --- a/net/security-context/test/test.cpp +++ b/net/security-context/test/test.cpp @@ -233,7 +233,9 @@ int s_handler(void*, net::ISocketStream* stream) { void s_client_test(net::ISocketStream* stream) { char buf[] = "Hello"; + LOG_DEBUG("befor write"); auto ret = stream->write(buf, 6); + LOG_DEBUG("after write ret=", ret); EXPECT_EQ(6, ret); char b[4096]; size_t rx = 0; @@ -291,6 +293,9 @@ TEST(cs, uds) { } TEST(Socket, nested) { +#ifdef __APPLE__ + LOG_INFO("skip this case in MacOS"); +#endif #ifdef __linux___ ASSERT_GE(net::et_poller_init(), 0); DEFER(net::et_poller_fini()); diff --git a/net/socket.h b/net/socket.h index 93b45283..8f18cec1 100644 --- a/net/socket.h +++ b/net/socket.h @@ -86,28 +86,28 @@ namespace net { } // Check if it's actually an IPv4 address mapped in IPV6 bool is_ipv4() const { - if (ntohl(addr._in_addr_field[2]) != 0x0000ffff) { - return false; - } - if (addr._in_addr_field[0] != 0 || addr._in_addr_field[1] != 0) { - return false; - } - return true; + return addr._in_addr_field[0] == 0 && + addr._in_addr_field[1] == 0 && + addr._in_addr_field[2] == htonl(0x0000ffff); } // We regard the default IPv4 0.0.0.0 as undefined bool undefined() const { - return *this == V4Any(); + return mem_equal(V4Any()); } // Should ONLY be used for IPv4 address uint32_t to_nl() const { + assert(is_ipv4()); return addr._in_addr_field[3]; } bool is_loopback() const { - return is_ipv4() ? (*this == V4Loopback()) : (*this == V6Loopback()); + return is_ipv4() ? mem_equal(V4Loopback()) : mem_equal(V6Loopback()); + } + bool is_localhost() const { + return is_loopback(); } bool is_broadcast() const { // IPv6 does not support broadcast - return is_ipv4() && (*this == V4Broadcast()); + return is_ipv4() && mem_equal(V4Broadcast()); } bool is_link_local() const { if (is_ipv4()) { @@ -117,12 +117,11 @@ namespace net { } } bool operator==(const IPAddr& rhs) const { - return memcmp(this, &rhs, sizeof(rhs)) == 0; + return mem_equal(rhs) || (is_localhost() && rhs.is_localhost()); } bool operator!=(const IPAddr& rhs) const { return !(*this == rhs); } - public: static IPAddr V6None() { return IPAddr(htonl(0xffffffff), htonl(0xffffffff), htonl(0xffffffff), htonl(0xffffffff)); } @@ -131,7 +130,11 @@ namespace net { static IPAddr V4Broadcast() { return IPAddr(htonl(INADDR_BROADCAST)); } static IPAddr V4Any() { return IPAddr(htonl(INADDR_ANY)); } static IPAddr V4Loopback() { return IPAddr(htonl(INADDR_LOOPBACK)); } + static IPAddr localhost() { return V4Loopback(); } private: + bool mem_equal(const IPAddr& rhs) const { + return memcmp(this, &rhs, sizeof(rhs)) == 0; + } void map_v4(in_addr addr_) { map_v4(addr_.s_addr); } diff --git a/net/test/test-client.cpp b/net/test/test-client.cpp index 72b227c3..6052a924 100644 --- a/net/test/test-client.cpp +++ b/net/test/test-client.cpp @@ -43,7 +43,7 @@ int main(int argc, char** argv) { Timeout tmo(timeout_sec * 1000 * 1000); uint64_t cnt = 0; LOG_INFO(tmo.timeout()); - while (photon::now < tmo.expire()) { + while (photon::now < tmo.expiration()) { auto ret = tls->send(buff, 4096); if (ret < 0) LOG_ERROR_RETURN(0, -1, "Failed to send"); cnt += ret; diff --git a/net/test/test.cpp b/net/test/test.cpp index ad182a06..8843ff5e 100644 --- a/net/test/test.cpp +++ b/net/test/test.cpp @@ -697,13 +697,14 @@ TEST(utils, gethostbyname) { net::IPAddr localhost("127.0.0.1"); net::IPAddr addr; net::gethostbyname("localhost", &addr); - EXPECT_EQ(localhost.to_nl(), addr.to_nl()); + LOG_DEBUG(VALUE(localhost), VALUE(addr)); + EXPECT_EQ(localhost, addr); std::vector addrs; net::gethostbyname("localhost", addrs); - EXPECT_GT((int)addrs.size(), 0); - EXPECT_EQ(localhost.to_nl(), addrs[0].to_nl()); + EXPECT_GT(addrs.size(), 0); + EXPECT_EQ(localhost, addrs[0]); net::IPAddr host = net::gethostbypeer("localhost"); - EXPECT_EQ(localhost.to_nl(), host.to_nl()); + EXPECT_EQ(localhost, host); for (auto &x : addrs) { LOG_INFO(VALUE(x)); } diff --git a/net/utils.cpp b/net/utils.cpp index ad9209d8..24b33cdf 100644 --- a/net/utils.cpp +++ b/net/utils.cpp @@ -27,6 +27,7 @@ limitations under the License. #include #include +#include #include #include #include @@ -62,7 +63,7 @@ IPAddr gethostbypeer(IPAddr remote) { return s_local.to_endpoint().addr; } -IPAddr gethostbypeer(const char *domain) { +IPAddr gethostbypeer(std::string_view domain) { // get self ip by remote domain instead of ip IPAddr remote; auto ret = gethostbyname(domain, &remote); @@ -71,36 +72,38 @@ IPAddr gethostbypeer(const char *domain) { return gethostbypeer(remote); } -int _gethostbyname(const char* name, Delegate append_op) { - assert(name); - int idx = 0; +int _gethostbyname(std::string_view name, Delegate append_op) { + if (name.empty()) return -1; addrinfo* result = nullptr; addrinfo hints = {}; hints.ai_socktype = SOCK_STREAM; hints.ai_family = AF_UNSPEC; - int ret = getaddrinfo(name, nullptr, &hints, &result); + std::string _name(name); + int ret = getaddrinfo(_name.c_str(), nullptr, &hints, &result); if (ret != 0) { LOG_ERROR_RETURN(0, -1, "Fail to getaddrinfo: `", gai_strerror(ret)); } assert(result); + int cnt = 0; for (auto* cur = result; cur != nullptr; cur = cur->ai_next) { + IPAddr addr; if (cur->ai_family == AF_INET6) { auto sock_addr = (sockaddr_in6*) cur->ai_addr; - if (append_op(IPAddr(sock_addr->sin6_addr)) < 0) { - break; - } - idx++; + addr = IPAddr(sock_addr->sin6_addr); } else if (cur->ai_family == AF_INET) { auto sock_addr = (sockaddr_in*) cur->ai_addr; - if (append_op(IPAddr(sock_addr->sin_addr)) < 0) { - break; - } - idx++; + addr = IPAddr(sock_addr->sin_addr); + } else { + LOG_DEBUG("skip unsupported address family ", cur->ai_family); + continue; } + // LOG_DEBUG(VALUE(addr)); + if (append_op(addr) < 0) break; + cnt++; } freeaddrinfo(result); - return idx; + return cnt; } struct xlator { @@ -267,7 +270,7 @@ class DefaultResolver : public Resolver { dnscache_.clear(); } - IPAddr resolve(const char *host) override { + IPAddr resolve(std::string_view host) override { auto ctr = [&]() -> IPAddrList* { auto addrs = new IPAddrList(); photon::semaphore sem; @@ -284,6 +287,10 @@ class DefaultResolver : public Resolver { if ((uint64_t)time_elapsed <= resolve_timeout_) { addrs->push_back(std::move(ret)); sem.signal(1); + } else { + LOG_ERROR("resolve timeout"); + while(!ret.empty()) + delete ret.pop_front(); } }).detach(); sem.wait(1, resolve_timeout_); @@ -296,9 +303,11 @@ class DefaultResolver : public Resolver { return ret->addr; } - void resolve(const char *host, Delegate func) override { func(resolve(host)); } + void resolve(std::string_view host, Delegate func) override { + func(resolve(host)); + } - void discard_cache(const char *host, IPAddr ip) override { + void discard_cache(std::string_view host, IPAddr ip) override { auto ipaddr = dnscache_.borrow(host); if (ip.undefined() || ipaddr->empty()) ipaddr.recycle(true); else { diff --git a/net/utils.h b/net/utils.h index bf61c3b0..c80c6d92 100644 --- a/net/utils.h +++ b/net/utils.h @@ -53,11 +53,11 @@ IPAddr gethostbypeer(IPAddr remote); * @param name target hostname when detecting * @return `IPAddr` of this host */ -IPAddr gethostbypeer(const char* domain); +IPAddr gethostbypeer(std::string_view name); // Callback returns -1 means break -int _gethostbyname(const char* name, Callback append_op); +int _gethostbyname(std::string_view name, Callback append_op); // inline implemention for compatible @@ -69,7 +69,7 @@ int _gethostbyname(const char* name, Callback append_op); * @param name Host name to resolve * @return first resolved address. */ -inline IPAddr gethostbyname(const char* name) { +inline IPAddr gethostbyname(std::string_view name) { IPAddr ret; auto cb = [&](IPAddr addr) { ret = addr; @@ -90,10 +90,11 @@ inline IPAddr gethostbyname(const char* name) { * @param bufsize size of `buf`, takes `sizeof(IPAddr)` as unit * @return sum of resolved address number. -1 means error. result will be filled into `buf` */ -inline int gethostbyname(const char* name, IPAddr* buf, int bufsize = 1) { - int i = 0; +inline int gethostbyname(std::string_view name, IPAddr* buf, size_t bufsize = 1) { + size_t i = 0; auto cb = [&](IPAddr addr) { - if (i < bufsize) buf[i++] = addr; + if (i >= bufsize) return -1; + buf[i++] = addr; return 0; }; return _gethostbyname(name, cb); @@ -109,7 +110,7 @@ inline int gethostbyname(const char* name, IPAddr* buf, int bufsize = 1) { * @param ret `std::vector` reference to get results * @return sum of resolved address number. -1 means error. */ -inline int gethostbyname(const char* name, std::vector& ret) { +inline int gethostbyname(std::string_view name, std::vector& ret) { ret.clear(); auto cb = [&](IPAddr addr) { ret.push_back(addr); @@ -129,7 +130,7 @@ inline int gethostbyname(const char* name, std::vector& ret) { * @param ret `std::vector` reference to get results * @return sum of resolved address number. */ -inline int gethostbyname_nb(const char* name, std::vector& ret) { +inline int gethostbyname_nb(std::string_view name, std::vector& ret) { photon::semaphore sem(0); int r = 0; ret.clear(); @@ -155,9 +156,9 @@ class Resolver : public Object { public: // When failed, return an Undefined IPAddr // Normally dns servers return multiple ips in random order, choosing the first one should suffice. - virtual IPAddr resolve(const char* host) = 0; - virtual void resolve(const char* host, Delegate func) = 0; - virtual void discard_cache(const char* host, IPAddr ip = IPAddr()) = 0; // discard current cache of ip + virtual IPAddr resolve(std::string_view host) = 0; + virtual void resolve(std::string_view host, Delegate func) = 0; + virtual void discard_cache(std::string_view host, IPAddr ip = IPAddr()) = 0; // discard current cache of ip }; /** diff --git a/rpc/rpc.cpp b/rpc/rpc.cpp index d55a778e..0727b8b5 100644 --- a/rpc/rpc.cpp +++ b/rpc/rpc.cpp @@ -65,7 +65,7 @@ namespace rpc { int do_send(OutOfOrderContext* args_) { auto args = (OooArgs*)args_; - if (args->timeout.expire() < photon::now) { + if (args->timeout.expiration() < photon::now) { LOG_ERROR_RETURN(ETIMEDOUT, -1, "Request timedout before send"); } auto size = args->request->sum(); @@ -92,7 +92,7 @@ namespace rpc { { auto args = (OooArgs*)args_; m_header.magic = 0; - if (args->timeout.expire() < photon::now) { + if (args->timeout.expiration() < photon::now) { // m_stream->shutdown(ShutdownHow::ReadWrite); LOG_ERROR_RETURN(ETIMEDOUT, -1, "Timeout before read header "); } @@ -154,7 +154,7 @@ namespace rpc { int do_call(FunctionID function, iovector* request, iovector* response, uint64_t timeout) override { scoped_rwlock rl(m_rwlock, photon::RLOCK); Timeout tmo(timeout); - if (tmo.expire() < photon::now) { + if (tmo.expiration() < photon::now) { LOG_ERROR_RETURN(ETIMEDOUT, -1, "Timed out before rpc start", VALUE(timeout), VALUE(tmo.timeout())); } int ret = 0; diff --git a/thread/thread.cpp b/thread/thread.cpp index 9ddcd274..442e8e9b 100644 --- a/thread/thread.cpp +++ b/thread/thread.cpp @@ -98,7 +98,7 @@ namespace photon std::atomic_bool notify{false}; __attribute__((noinline)) - int wait_for_fd(int fd, uint32_t interests, uint64_t timeout) override { + int wait_for_fd(int fd, uint32_t interests, Timeout timeout) override { return -1; } @@ -113,7 +113,7 @@ namespace photon } __attribute__((noinline)) - ssize_t wait_and_fire_events(uint64_t timeout = -1) override { + ssize_t wait_and_fire_events(uint64_t timeout) override { DEFER(notify.store(false, std::memory_order_release)); if (!timeout) return 0; timeout = min(timeout, 1000 * 100UL); @@ -1195,7 +1195,7 @@ R"( } __attribute__((always_inline)) inline - Switch prepare_usleep(uint64_t useconds, thread_list* waitq, RunQ rq = {}) + Switch prepare_usleep(Timeout timeout, thread_list* waitq, RunQ rq = {}) { spinlock* waitq_lock = waitq ? &waitq->lock : nullptr; SCOPED_LOCK(waitq_lock, ((bool) waitq) * 2); @@ -1207,92 +1207,90 @@ R"( sw.from->waitq = waitq; } if_update_now(true); - sw.from->ts_wakeup = sat_add(now, useconds); + sw.from->ts_wakeup = timeout.expiration(); sw.from->get_vcpu()->sleepq.push(sw.from); return sw; } // returns 0 if slept well (at lease `useconds`), -1 otherwise - static int thread_usleep(uint64_t useconds, thread_list* waitq) + static int thread_usleep(Timeout timeout, thread_list* waitq) { - if (unlikely(useconds == 0)) { + if (unlikely(timeout.expired())) { thread_yield(); return 0; } - auto r = prepare_usleep(useconds, waitq); + auto r = prepare_usleep(timeout, waitq); switch_context(r.from, r.to); assert(r.from->waitq == nullptr); return r.from->set_error_number(); } typedef void (*defer_func)(void*); - static int thread_usleep_defer(uint64_t useconds, + static int thread_usleep_defer(Timeout timeout, thread_list* waitq, defer_func defer, void* defer_arg) { - auto r = prepare_usleep(useconds, waitq); + auto r = prepare_usleep(timeout, waitq); switch_context_defer(r.from, r.to, defer, defer_arg); assert(r.from->waitq == nullptr); return r.from->set_error_number(); } __attribute__((noinline)) - static int do_thread_usleep_defer(uint64_t useconds, + static int do_thread_usleep_defer(Timeout timeout, defer_func defer, void* defer_arg, RunQ rq) { - auto r = prepare_usleep(useconds, nullptr, rq); + auto r = prepare_usleep(timeout, nullptr, rq); switch_context_defer(r.from, r.to, defer, defer_arg); assert(r.from->waitq == nullptr); return r.from->set_error_number(); } - static int do_shutdown_usleep_defer(uint64_t useconds, + static int do_shutdown_usleep_defer(Timeout timeout, defer_func defer, void* defer_arg, RunQ rq) { - if (likely(useconds > 10*1000)) - useconds = 10*1000; - int ret = do_thread_usleep_defer(useconds, defer, defer_arg, rq); + timeout.timeout_at_most(10 * 1000); + int ret = do_thread_usleep_defer(timeout, defer, defer_arg, rq); if (ret >= 0) errno = EPERM; return -1; } #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wstrict-aliasing" - int thread_usleep_defer(uint64_t useconds, defer_func defer, void* defer_arg) { + int thread_usleep_defer(Timeout timeout, defer_func defer, void* defer_arg) { RunQ rq; if (unlikely(!rq.current)) LOG_ERROR_RETURN(ENOSYS, -1, "Photon not initialized in this thread"); if (unlikely(AtomicRunQ(rq).defer_to_new_thread())) { thread_create((thread_entry&)defer, defer_arg); - return thread_usleep(useconds); + return thread_usleep(timeout); } if (unlikely(rq.current->is_shutting_down())) - return do_shutdown_usleep_defer(useconds, defer, defer_arg, rq); - return do_thread_usleep_defer(useconds, defer, defer_arg, rq); + return do_shutdown_usleep_defer(timeout, defer, defer_arg, rq); + return do_thread_usleep_defer(timeout, defer, defer_arg, rq); } #pragma GCC diagnostic pop __attribute__((noinline)) - static int do_thread_usleep(uint64_t useconds, RunQ rq) { - auto r = prepare_usleep(useconds, nullptr, rq); + static int do_thread_usleep(Timeout timeout, RunQ rq) { + auto r = prepare_usleep(timeout, nullptr, rq); switch_context(r.from, r.to); assert(r.from->waitq == nullptr); return r.from->set_error_number(); } - static int do_shutdown_usleep(uint64_t useconds, RunQ rq) { - if (likely(useconds > 10*1000)) - useconds = 10*1000; - int ret = do_thread_usleep(useconds, rq); + static int do_shutdown_usleep(Timeout timeout, RunQ rq) { + timeout.timeout_at_most(10 * 1000); + int ret = do_thread_usleep(timeout, rq); if (ret >= 0) errno = EPERM; return -1; } - int thread_usleep(uint64_t useconds) { + int thread_usleep(Timeout timeout) { RunQ rq; if (unlikely(!rq.current)) LOG_ERROR_RETURN(ENOSYS, -1, "Photon not initialized in this thread"); - if (unlikely(!useconds)) + if (unlikely(timeout.expired())) return thread_yield(), 0; if (unlikely(rq.current->is_shutting_down())) - return do_shutdown_usleep(useconds, rq); - return do_thread_usleep(useconds, rq); + return do_shutdown_usleep(timeout, rq); + return do_thread_usleep(timeout, rq); } static void prelocked_thread_interrupt(thread* th, int error_number) @@ -1485,14 +1483,14 @@ R"( } return (*perrno == ECANCELED) ? 0 : -1; } - int waitq::wait(uint64_t timeout) + int waitq::wait(Timeout timeout) { static_assert(sizeof(q) == sizeof(thread_list), "..."); auto lst = (thread_list*)&q; int ret = thread_usleep(timeout, lst); return waitq_translate_errno(ret); } - int waitq::wait_defer(uint64_t timeout, void(*defer)(void*), void* arg) { + int waitq::wait_defer(Timeout timeout, void(*defer)(void*), void* arg) { static_assert(sizeof(q) == sizeof(thread_list), "..."); auto lst = (thread_list*)&q; int ret = thread_usleep_defer(timeout, lst, defer, arg); @@ -1544,7 +1542,7 @@ R"( auto splock = (spinlock*)s_; splock->unlock(); } - int mutex::lock(uint64_t timeout) { + int mutex::lock(Timeout timeout) { if (try_lock() == 0) return 0; for (auto re = retries; re; --re) { thread_yield(); @@ -1557,7 +1555,7 @@ R"( return 0; } - if (timeout == 0) { + if (timeout.expired()) { errno = ETIMEDOUT; splock.unlock(); return -1; @@ -1602,7 +1600,7 @@ R"( do_mutex_unlock(this); } - int recursive_mutex::lock(uint64_t timeout) { + int recursive_mutex::lock(Timeout timeout) { if (owner == CURRENT || mutex::lock(timeout) == 0) { recursive_count++; return 0; @@ -1637,7 +1635,7 @@ R"( int spinlock_lock(void* arg) { return ((spinlock*)arg)->lock(); } - static int cvar_do_wait(thread_list* q, void* m, uint64_t timeout, int(*lock)(void*), void(*unlock)(void*)) { + static int cvar_do_wait(thread_list* q, void* m, Timeout timeout, int(*lock)(void*), void(*unlock)(void*)) { assert(m); if (!m) LOG_ERROR_RETURN(EINVAL, -1, "there must be a lock"); @@ -1653,23 +1651,22 @@ R"( return waitq_translate_errno(ret); } - int condition_variable::wait(mutex* m, uint64_t timeout) + int condition_variable::wait(mutex* m, Timeout timeout) { return cvar_do_wait((thread_list*)&q, m, timeout, mutex_lock, mutex_unlock); } - int condition_variable::wait(spinlock* m, uint64_t timeout) + int condition_variable::wait(spinlock* m, Timeout timeout) { return cvar_do_wait((thread_list*)&q, m, timeout, spinlock_lock, spinlock_unlock); } - int semaphore::wait(uint64_t count, uint64_t timeout) + int semaphore::wait(uint64_t count, Timeout timeout) { if (count == 0) return 0; splock.lock(); CURRENT->semaphore_count = count; - Timeout tmo(timeout); int ret = 0; while (!try_substract(count)) { - ret = waitq::wait_defer(tmo.timeout(), spinlock_unlock, &splock); + ret = waitq::wait_defer(timeout, spinlock_unlock, &splock); splock.lock(); if (ret < 0 && errno == ETIMEDOUT) { CURRENT->semaphore_count = 0; @@ -1709,7 +1706,7 @@ R"( return true; } } - int rwlock::lock(int mode, uint64_t timeout) + int rwlock::lock(int mode, Timeout timeout) { if (mode != RLOCK && mode != WLOCK) LOG_ERROR_RETURN(EINVAL, -1, "mode unknow"); diff --git a/thread/thread.h b/thread/thread.h index 9bb35347..0e99fc64 100644 --- a/thread/thread.h +++ b/thread/thread.h @@ -21,6 +21,7 @@ limitations under the License. #include #include #include +#include #ifndef __aarch64__ #include #endif @@ -76,12 +77,18 @@ namespace photon // suspend CURRENT thread for specified time duration, and switch // control to other threads, resuming possible sleepers. // Return 0 if timeout, return -1 if interrupted, and errno is set by the interrupt invoker. - int thread_usleep(uint64_t useconds); + int thread_usleep(Timeout timeout); + + inline int thread_usleep(uint64_t useconds) { + return thread_usleep(Timeout(useconds)); + } + // thread_usleep_defer sets a callback, and will execute callback in another photon thread // after this photon thread fall in sleep. The defer function should NEVER fall into sleep! typedef void (*defer_func)(void*); - int thread_usleep_defer(uint64_t useconds, defer_func defer, void* defer_arg=nullptr); + int thread_usleep_defer(Timeout timeout, defer_func defer, void* defer_arg=nullptr); + inline int thread_sleep(uint64_t seconds) { const uint64_t max_seconds = ((uint64_t)-1) / 1000 / 1000; @@ -203,8 +210,8 @@ namespace photon class waitq { protected: - int wait(uint64_t timeout = -1); - int wait_defer(uint64_t timeout, void(*defer)(void*), void* arg); + int wait(Timeout timeout = {}); + int wait_defer(Timeout Timeout, void(*defer)(void*), void* arg); void resume(thread* th, int error_number = ECANCELED); // `th` must be waiting in this waitq! int resume_all(int error_number = ECANCELED); thread* resume_one(int error_number = ECANCELED); @@ -227,7 +234,7 @@ namespace photon { public: mutex(uint16_t max_retries = 100) : retries(max_retries) { } - int lock(uint64_t timeout = -1); + int lock(Timeout timeout = {}); int try_lock(); void unlock(); ~mutex() @@ -250,7 +257,7 @@ namespace photon class recursive_mutex : protected mutex { public: using mutex::mutex; - int lock(uint64_t timeout = -1); + int lock(Timeout timeout = {}); int try_lock(); void unlock(); protected: @@ -337,21 +344,21 @@ namespace photon class condition_variable : protected waitq { public: - int wait(mutex* m, uint64_t timeout = -1); - int wait(mutex& m, uint64_t timeout = -1) + int wait(mutex* m, Timeout timeout = {}); + int wait(mutex& m, Timeout timeout = {}) { return wait(&m, timeout); } - int wait(spinlock* m, uint64_t timeout = -1); - int wait(spinlock& m, uint64_t timeout = -1) + int wait(spinlock* m, Timeout timeout = {}); + int wait(spinlock& m, Timeout timeout = {}) { return wait(&m, timeout); } - int wait(scoped_lock& lock, uint64_t timeout = -1) + int wait(scoped_lock& lock, Timeout timeout = {}) { return wait(lock.m_mutex, timeout); } - int wait_no_lock(uint64_t timeout = -1) + int wait_no_lock(Timeout timeout = {}) { return waitq::wait(timeout); } @@ -365,7 +372,7 @@ namespace photon { public: explicit semaphore(uint64_t count = 0) : m_count(count) { } - int wait(uint64_t count, uint64_t timeout = -1); + int wait(uint64_t count, Timeout timeout = {}); int signal(uint64_t count) { if (count == 0) return 0; @@ -393,7 +400,7 @@ namespace photon class rwlock { public: - int lock(int mode, uint64_t timeout = -1); + int lock(int mode, Timeout timeout = {}); int unlock(); protected: int64_t state = 0; @@ -456,36 +463,6 @@ namespace photon &default_photon_thread_stack_alloc, nullptr}, Delegate photon_thread_dealloc = { &default_photon_thread_stack_dealloc, nullptr}); - - // Saturating addition, primarily for timeout caculation - __attribute__((always_inline)) inline uint64_t sat_add(uint64_t x, - uint64_t y) { -#if defined(__x86_64__) - register uint64_t z asm("rax"); - asm("add %2, %1; sbb %0, %0; or %1, %0;" - : "=r"(z), "+r"(x) - : "r"(y) - : "cc"); - return z; -#elif defined(__aarch64__) - return (x + y < x) ? -1UL : x + y; -#endif - } - - // Saturating subtract, primarily for timeout caculation - __attribute__((always_inline)) inline uint64_t sat_sub(uint64_t x, - uint64_t y) { -#if defined(__x86_64__) - register uint64_t z asm("rax"); - asm("xor %0, %0; subq %2, %1; cmovaeq %1, %0;" - : "=r"(z), "+r"(x), "+r"(y) - : - : "cc"); - return z; -#elif defined(__aarch64__) - return x > y ? x - y : 0; -#endif - } }; /*