Skip to content

Commit

Permalink
use Timeout (#332)
Browse files Browse the repository at this point in the history
use Timeout for thread, io engines and basic socket
  • Loading branch information
lihuiba committed Jan 23, 2024
1 parent a21d8f7 commit ac2fd2d
Show file tree
Hide file tree
Showing 31 changed files with 408 additions and 401 deletions.
28 changes: 12 additions & 16 deletions common/expirecontainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,24 @@ limitations under the License.
*/

#include "expirecontainer.h"

#include <photon/thread/thread.h>

ExpireContainerBase::ExpireContainerBase(uint64_t expiration,
ExpireContainerBase::ExpireContainerBase(uint64_t lifespan,
uint64_t timer_cycle)
: _expiration(expiration),
: _lifespan(lifespan),
_timer(std::max(static_cast<uint64_t>(1000), timer_cycle),
{this, &ExpireContainerBase::expire}, true, 8UL * 1024 * 1024) {}

std::pair<ExpireContainerBase::iterator, bool> ExpireContainerBase::insert(
Item* item) {
auto ExpireContainerBase::insert(Item* item) -> std::pair<iterator, bool> {
return _set.emplace(item);
}

ExpireContainerBase::iterator ExpireContainerBase::__find_prelock(
const Item& key_item) {
auto ExpireContainerBase::__find_prelock(const Item& key_item) -> iterator {
auto it = _set.find((Item*)&key_item);
return it;
}

ExpireContainerBase::iterator ExpireContainerBase::find(const Item& key_item) {
auto ExpireContainerBase::find(const Item& key_item) -> iterator {
SCOPED_LOCK(_lock);
return __find_prelock(key_item);
}
Expand All @@ -54,7 +51,7 @@ uint64_t ExpireContainerBase::expire() {
({
SCOPED_LOCK(_lock);
_list.split_by_predicate([&](Item* x) {
bool ret = x->_timeout.expire() < photon::now;
bool ret = x->_timeout.expiration() < photon::now;
if (ret) _set.erase(x);
return ret;
});
Expand All @@ -77,9 +74,9 @@ bool ExpireContainerBase::keep_alive(const Item& x, bool insert_if_not_exists) {
return true;
}

ObjectCacheBase::Item* ObjectCacheBase::ref_acquire(const Item& key_item,
Delegate<void, void*> ctor,
uint64_t failure_cooldown) {
auto ObjectCacheBase::ref_acquire(const Item& key_item,
Delegate<void, void*> ctor,
uint64_t failure_cooldown) -> Item* {
Base::iterator holder;
Item* item = nullptr;
expire();
Expand All @@ -104,8 +101,8 @@ ObjectCacheBase::Item* ObjectCacheBase::ref_acquire(const Item& key_item,
} while (!item);
{
SCOPED_LOCK(item->_mtx);
if (!item->_obj && (item->_failure <=
photon::sat_sub(photon::now, failure_cooldown))) {
auto ts = photon::sat_sub(photon::now, failure_cooldown);
if (!item->_obj && item->_failure <= ts) {
ctor(item);
if (!item->_obj) item->_failure = photon::now;
}
Expand Down Expand Up @@ -150,8 +147,7 @@ int ObjectCacheBase::ref_release(ItemPtr item, bool recycle) {
}

// the argument `key` plays the roles of (type-erased) key
int ObjectCacheBase::release(const ObjectCacheBase::Item& key_item,
bool recycle) {
int ObjectCacheBase::release(const Item& key_item, bool recycle) {
auto item = ExpireContainerBase::TypedIterator<Item>(Base::find(key_item));
if (item == end()) return -1;
return ref_release(*item, recycle);
Expand Down
13 changes: 8 additions & 5 deletions common/expirecontainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ExpireContainerBase : public Object {
Item() : _timeout(0) {}

public:
Timeout _timeout;
photon::Timeout _timeout;
virtual ~Item() {}
virtual size_t key_hash() const = 0;
virtual bool key_equal(const Item* rhs) const = 0;
Expand Down Expand Up @@ -77,7 +77,7 @@ class ExpireContainerBase : public Object {
};

intrusive_list<Item> _list;
uint64_t _expiration;
uint64_t _lifespan;
photon::Timer _timer;
photon::spinlock _lock; // protect _list/_set operations

Expand All @@ -94,7 +94,7 @@ class ExpireContainerBase : public Object {
using Set = std::unordered_set<ItemPtr, ItemHash, ItemEqual>;
Set _set;

ExpireContainerBase(uint64_t expiration, uint64_t timer_cycle);
ExpireContainerBase(uint64_t lifespan, uint64_t timer_cycle);
~ExpireContainerBase() { clear(); }

using iterator = decltype(_set)::iterator;
Expand All @@ -116,15 +116,18 @@ class ExpireContainerBase : public Object {

void enqueue(Item* item) {
_list.pop(item);
item->_timeout.timeout(_expiration);
item->_timeout.timeout(_lifespan);
_list.push_back(item);
}

public:
void clear();
uint64_t expire();
size_t size() { return _set.size(); }
size_t expiration() { return _expiration; }
size_t lifespan() { return _lifespan; }

[[deprecated("use lifespan() instead")]]
size_t expiration() { return _lifespan; }
};

template <typename KeyType, typename... Ts>
Expand Down
2 changes: 1 addition & 1 deletion common/lockfree_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ class RingChannel : public QueueType {
idler.fetch_add(1, std::memory_order_acq_rel);
DEFER(idler.fetch_sub(1, std::memory_order_acq_rel));
while (!pop(x)) {
if (yield_turn > 0 && photon::now < yield_timeout.expire()) {
if (yield_turn > 0 && photon::now < yield_timeout.expiration()) {
yield_turn--;
photon::thread_yield();
} else {
Expand Down
1 change: 1 addition & 0 deletions common/throttle.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <photon/thread/thread.h>
#include <photon/common/utility.h>

namespace photon {
class throttle {
Expand Down
78 changes: 33 additions & 45 deletions common/timeout.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,59 +16,47 @@ limitations under the License.

#pragma once
#include <cinttypes>
#include <photon/common/utility.h>

namespace photon
{
extern volatile uint64_t now;
}
namespace photon {

class Timeout
{
public:
// Timeout() { }
Timeout(uint64_t x) { timeout(x); }
uint64_t timeout(uint64_t x){ return m_expire = sat_add(photon::now, x); }
uint64_t timeout() const { return sat_sub(m_expire, photon::now); }
operator uint64_t() const { return timeout(); }
uint64_t timeout_us() const { return timeout(); }
uint64_t timeout_ms() const { return divide(timeout(), 1000); }
uint64_t timeout_MS() const { return divide(timeout(), 1024); } // fast approximation
uint64_t timeout_s() const { return divide(timeout(), 1000 * 1000); }
uint64_t timeout_S() const { return divide(timeout(), 1024 * 1024); } // fast approximation
uint64_t expire() const { return m_expire; }
uint64_t expire(uint64_t x) { return m_expire = x; }
extern volatile uint64_t now;

class Timeout {
protected:
uint64_t m_expire; // time of expiration, in us
uint64_t m_expiration = -1; // time of expiration, in us

// Saturating addition, no upward overflow
__attribute__((always_inline)) static
uint64_t sat_add(uint64_t x, uint64_t y)
{
#if defined(__x86_64__)
register uint64_t z asm ("rax");
asm("add %2, %1; sbb %0, %0; or %1, %0;" : "=r"(z), "+r"(x) : "r"(y) : "cc");
return z;
#elif defined(__aarch64__)
return (x + y < x) ? -1UL : x + y;
#endif
public:
Timeout() = default; // never timeout
Timeout(uint64_t x) { m_expiration = x ? sat_add(now, x) : 0; }
uint64_t timeout(uint64_t x) { return m_expiration = sat_add(now, x); }
uint64_t timeout() const { return sat_sub(m_expiration, now); }
operator uint64_t() const { return timeout(); }
bool expired() const { return (m_expiration == 0) || (m_expiration <= now); }
uint64_t timeout_us() const { return timeout(); }
uint64_t timeout_ms() const { return divide(timeout(), 1000); }
uint64_t timeout_MS() const { return divide(timeout(), 1024); } // fast approximation
uint64_t timeout_s() const { return divide(timeout(), 1000 * 1000); }
uint64_t timeout_S() const { return divide(timeout(), 1024 * 1024); } // fast approximation
uint64_t expiration() const { return m_expiration; }
uint64_t expiration(uint64_t x) { return m_expiration = x; }
Timeout& operator = (uint64_t x) { timeout(x); return *this; }
Timeout& operator = (const Timeout& rhs) = default;
bool operator < (const Timeout& rhs) const {
return m_expiration < rhs.m_expiration;
}

// Saturating subtract, no downward overflow
__attribute__((always_inline)) static
uint64_t sat_sub(uint64_t x, uint64_t y)
{
#if defined(__x86_64__)
register uint64_t z asm ("rax");
asm("xor %0, %0; subq %2, %1; cmovaeq %1, %0;" : "=r"(z), "+r"(x) ,"+r"(y) : : "cc");
return z;
#elif defined(__aarch64__)
return x > y ? x - y : 0;
#endif
Timeout& timeout_at_most(uint64_t x) {
x = sat_add(now, x);
if (x < m_expiration)
m_expiration = x;
return *this;
}

static uint64_t divide(uint64_t x, uint64_t divisor)
{
protected:
operator bool() const = delete;
static uint64_t divide(uint64_t x, uint64_t divisor) {
return (x + divisor / 2) / divisor;
}
};

}
20 changes: 20 additions & 0 deletions common/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,23 @@ int version_compare(std::string_view a, std::string_view b, int& result);
int kernel_version_compare(std::string_view dst, int& result);
void print_stacktrace();

namespace photon {

// Saturating addition, no upward overflow
__attribute__((always_inline)) inline
uint64_t sat_add(uint64_t x, uint64_t y) {
uint64_t z, c = __builtin_uaddl_overflow(x, y, (unsigned long*)&z);
return -c | z;
}

// Saturating subtract, no downward overflow
__attribute__((always_inline)) inline
uint64_t sat_sub(uint64_t x, uint64_t y) {
uint64_t z, c = __builtin_usubl_overflow(x, y, (unsigned long*)&z);
return c ? 0 : z;
}

}



4 changes: 2 additions & 2 deletions fs/httpfs/httpfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class HttpFile : public fs::VirtualReadOnlyFile {
net::DummyReaderWriter dummy;
ret = curl->GET(get_url().c_str(), &dummy, tmo.timeout());
if (ret < 200) {
if (photon::now >= tmo.expire()) {
if (photon::now >= tmo.expiration()) {
// set errno to ENOENT since stat should not ETIMEDOUT
LOG_ERROR_RETURN(ENOENT, -1, "Failed to update file stat");
}
Expand Down Expand Up @@ -214,7 +214,7 @@ class HttpFile : public fs::VirtualReadOnlyFile {
curl->set_header_container(&headers);
ret = curl->GET(get_url().c_str(), &writer, tmo.timeout());
if (ret < 200) {
if (photon::now > tmo.expire()) {
if (photon::now > tmo.expiration()) {
LOG_ERROR_RETURN(ETIMEDOUT, -1, "Failed to perform GET ", VALUE(url),
VALUE(offset));
}
Expand Down
10 changes: 5 additions & 5 deletions fs/httpfs/httpfs_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,19 +147,19 @@ class HttpFile_v2 : public fs::VirtualReadOnlyFile {
m_stat.st_size = len;
return 0;
}
void send_read_request(net::http::Client::Operation &op, off_t offset, size_t length, const Timeout &tmo) {
again:
void send_read_request(net::http::Client::Operation &op, off_t offset, size_t length, Timeout tmo) {
estring url;
url.appends(m_url, "?", m_url_param);
op.set_enable_proxy(m_fs->get_client()->has_proxy());
again:
op.req.reset(net::http::Verb::GET, url, op.enable_proxy);
op.req.headers.merge(m_common_header);
op.req.headers.range(offset, offset + length - 1);
op.req.headers.content_length(0);
op.timeout = tmo.timeout();
op.timeout = tmo;
m_fs->get_client()->call(&op);
if (op.status_code < 0) {
if (tmo.timeout() == 0) {
if (tmo.expired()) {
m_etimeout = true;
LOG_ERROR_RETURN(ENOENT, , "http timedout");
}
Expand Down Expand Up @@ -254,7 +254,7 @@ class HttpFile_v2 : public fs::VirtualReadOnlyFile {
}
};

IFile* HttpFs_v2::open(const char* pathname, int flags) {
inline IFile* HttpFs_v2::open(const char* pathname, int flags) {
if (!pathname) LOG_ERROR_RETURN(EINVAL, nullptr, "NULL is not allowed");
if (flags != O_RDONLY) return nullptr;

Expand Down
20 changes: 9 additions & 11 deletions io/epoll-ng.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,17 @@ class EventEngineEPollNG : public MasterEventEngine,
// in such condition, timeout_ms should be at least 1
// or it may call epoll_wait without any idle
timeout = (timeout && timeout < 1024) ? 1 : timeout / 1024;
timeout &= 0x7fffffff; // make sure less than INT32_MAX
while (epfd > 0) {
int ret = epoll_wait(epfd, events, LEN(events), timeout);
if (ret < 0) {
ERRNO err;
if (err.no == EINTR) continue;
usleep(1024L * cool_down_ms);
::usleep(1024L * cool_down_ms);
if (cool_down_ms > 16)
LOG_ERROR_RETURN(err.no, , "epoll_wait() failed ", err);
timeout = sat_sub(timeout, cool_down_ms);
if (cool_down_ms < 16) {
cool_down_ms *= 2;
continue;
}
LOG_ERROR_RETURN(err.no, , "epoll_wait() failed ", err);
cool_down_ms *= 2;
}
remains += ret;
return;
Expand Down Expand Up @@ -254,9 +253,8 @@ class EventEngineEPollNG : public MasterEventEngine,
}
}
virtual ssize_t wait_for_events(void** data, size_t count,
uint64_t timeout = -1) override {
int ret = get_vcpu()->master_event_engine->wait_for_fd_readable(
engine.epfd, timeout);
Timeout timeout) override {
int ret = ::photon::wait_for_fd_readable(engine.epfd, timeout);
if (ret < 0) {
return errno == ETIMEDOUT ? 0 : -1;
}
Expand All @@ -273,7 +271,7 @@ class EventEngineEPollNG : public MasterEventEngine,
}
return ptr - data;
}
virtual ssize_t wait_and_fire_events(uint64_t timeout = -1) override {
virtual ssize_t wait_and_fire_events(uint64_t timeout) override {
ssize_t n = 0;
wait_for_events(
timeout,
Expand All @@ -289,7 +287,7 @@ class EventEngineEPollNG : public MasterEventEngine,
}
virtual int cancel_wait() override { return eventfd_write(evfd, 1); }

int wait_for_fd(int fd, uint32_t interests, uint64_t timeout) override {
int wait_for_fd(int fd, uint32_t interests, Timeout timeout) override {
Event waiter{fd, interests | ONE_SHOT, CURRENT};
Event event{fd, interests | ONE_SHOT, &waiter};
int ret = add_interest(event);
Expand Down
Loading

0 comments on commit ac2fd2d

Please sign in to comment.