Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use Timeout #332

Merged
merged 11 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 12 additions & 16 deletions common/expirecontainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,24 @@ limitations under the License.
*/

#include "expirecontainer.h"

#include <photon/thread/thread.h>

ExpireContainerBase::ExpireContainerBase(uint64_t expiration,
ExpireContainerBase::ExpireContainerBase(uint64_t lifespan,
uint64_t timer_cycle)
: _expiration(expiration),
: _lifespan(lifespan),
_timer(std::max(static_cast<uint64_t>(1000), timer_cycle),
{this, &ExpireContainerBase::expire}, true, 8UL * 1024 * 1024) {}

std::pair<ExpireContainerBase::iterator, bool> ExpireContainerBase::insert(
Item* item) {
auto ExpireContainerBase::insert(Item* item) -> std::pair<iterator, bool> {
return _set.emplace(item);
}

ExpireContainerBase::iterator ExpireContainerBase::__find_prelock(
const Item& key_item) {
auto ExpireContainerBase::__find_prelock(const Item& key_item) -> iterator {
auto it = _set.find((Item*)&key_item);
return it;
}

ExpireContainerBase::iterator ExpireContainerBase::find(const Item& key_item) {
auto ExpireContainerBase::find(const Item& key_item) -> iterator {
SCOPED_LOCK(_lock);
return __find_prelock(key_item);
}
Expand All @@ -54,7 +51,7 @@ uint64_t ExpireContainerBase::expire() {
({
SCOPED_LOCK(_lock);
_list.split_by_predicate([&](Item* x) {
bool ret = x->_timeout.expire() < photon::now;
bool ret = x->_timeout.expiration() < photon::now;
if (ret) _set.erase(x);
return ret;
});
Expand All @@ -77,9 +74,9 @@ bool ExpireContainerBase::keep_alive(const Item& x, bool insert_if_not_exists) {
return true;
}

ObjectCacheBase::Item* ObjectCacheBase::ref_acquire(const Item& key_item,
Delegate<void, void*> ctor,
uint64_t failure_cooldown) {
auto ObjectCacheBase::ref_acquire(const Item& key_item,
Delegate<void, void*> ctor,
uint64_t failure_cooldown) -> Item* {
Base::iterator holder;
Item* item = nullptr;
expire();
Expand All @@ -104,8 +101,8 @@ ObjectCacheBase::Item* ObjectCacheBase::ref_acquire(const Item& key_item,
} while (!item);
{
SCOPED_LOCK(item->_mtx);
if (!item->_obj && (item->_failure <=
photon::sat_sub(photon::now, failure_cooldown))) {
auto ts = photon::sat_sub(photon::now, failure_cooldown);
if (!item->_obj && item->_failure <= ts) {
ctor(item);
if (!item->_obj) item->_failure = photon::now;
}
Expand Down Expand Up @@ -150,8 +147,7 @@ int ObjectCacheBase::ref_release(ItemPtr item, bool recycle) {
}

// the argument `key` plays the roles of (type-erased) key
int ObjectCacheBase::release(const ObjectCacheBase::Item& key_item,
bool recycle) {
int ObjectCacheBase::release(const Item& key_item, bool recycle) {
auto item = ExpireContainerBase::TypedIterator<Item>(Base::find(key_item));
if (item == end()) return -1;
return ref_release(*item, recycle);
Expand Down
13 changes: 8 additions & 5 deletions common/expirecontainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ExpireContainerBase : public Object {
Item() : _timeout(0) {}

public:
Timeout _timeout;
photon::Timeout _timeout;
virtual ~Item() {}
virtual size_t key_hash() const = 0;
virtual bool key_equal(const Item* rhs) const = 0;
Expand Down Expand Up @@ -77,7 +77,7 @@ class ExpireContainerBase : public Object {
};

intrusive_list<Item> _list;
uint64_t _expiration;
uint64_t _lifespan;
photon::Timer _timer;
photon::spinlock _lock; // protect _list/_set operations

Expand All @@ -94,7 +94,7 @@ class ExpireContainerBase : public Object {
using Set = std::unordered_set<ItemPtr, ItemHash, ItemEqual>;
Set _set;

ExpireContainerBase(uint64_t expiration, uint64_t timer_cycle);
ExpireContainerBase(uint64_t lifespan, uint64_t timer_cycle);
~ExpireContainerBase() { clear(); }

using iterator = decltype(_set)::iterator;
Expand All @@ -116,15 +116,18 @@ class ExpireContainerBase : public Object {

void enqueue(Item* item) {
_list.pop(item);
item->_timeout.timeout(_expiration);
item->_timeout.timeout(_lifespan);
_list.push_back(item);
}

public:
void clear();
uint64_t expire();
size_t size() { return _set.size(); }
size_t expiration() { return _expiration; }
size_t lifespan() { return _lifespan; }

[[deprecated("use lifespan() instead")]]
size_t expiration() { return _lifespan; }
};

template <typename KeyType, typename... Ts>
Expand Down
2 changes: 1 addition & 1 deletion common/lockfree_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ class RingChannel : public QueueType {
idler.fetch_add(1, std::memory_order_acq_rel);
DEFER(idler.fetch_sub(1, std::memory_order_acq_rel));
while (!pop(x)) {
if (yield_turn > 0 && photon::now < yield_timeout.expire()) {
if (yield_turn > 0 && photon::now < yield_timeout.expiration()) {
yield_turn--;
photon::thread_yield();
} else {
Expand Down
1 change: 1 addition & 0 deletions common/throttle.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <photon/thread/thread.h>
#include <photon/common/utility.h>

namespace photon {
class throttle {
Expand Down
78 changes: 33 additions & 45 deletions common/timeout.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,59 +16,47 @@ limitations under the License.

#pragma once
#include <cinttypes>
#include <photon/common/utility.h>

namespace photon
{
extern volatile uint64_t now;
}
namespace photon {

class Timeout
{
public:
// Timeout() { }
Timeout(uint64_t x) { timeout(x); }
uint64_t timeout(uint64_t x){ return m_expire = sat_add(photon::now, x); }
uint64_t timeout() const { return sat_sub(m_expire, photon::now); }
operator uint64_t() const { return timeout(); }
uint64_t timeout_us() const { return timeout(); }
uint64_t timeout_ms() const { return divide(timeout(), 1000); }
uint64_t timeout_MS() const { return divide(timeout(), 1024); } // fast approximation
uint64_t timeout_s() const { return divide(timeout(), 1000 * 1000); }
uint64_t timeout_S() const { return divide(timeout(), 1024 * 1024); } // fast approximation
uint64_t expire() const { return m_expire; }
uint64_t expire(uint64_t x) { return m_expire = x; }
extern volatile uint64_t now;

class Timeout {
protected:
uint64_t m_expire; // time of expiration, in us
uint64_t m_expiration = -1; // time of expiration, in us

// Saturating addition, no upward overflow
__attribute__((always_inline)) static
uint64_t sat_add(uint64_t x, uint64_t y)
{
#if defined(__x86_64__)
register uint64_t z asm ("rax");
asm("add %2, %1; sbb %0, %0; or %1, %0;" : "=r"(z), "+r"(x) : "r"(y) : "cc");
return z;
#elif defined(__aarch64__)
return (x + y < x) ? -1UL : x + y;
#endif
public:
Timeout() = default; // never timeout
Timeout(uint64_t x) { m_expiration = x ? sat_add(now, x) : 0; }
uint64_t timeout(uint64_t x) { return m_expiration = sat_add(now, x); }
uint64_t timeout() const { return sat_sub(m_expiration, now); }
operator uint64_t() const { return timeout(); }
bool expired() const { return (m_expiration == 0) || (m_expiration <= now); }
uint64_t timeout_us() const { return timeout(); }
uint64_t timeout_ms() const { return divide(timeout(), 1000); }
uint64_t timeout_MS() const { return divide(timeout(), 1024); } // fast approximation
uint64_t timeout_s() const { return divide(timeout(), 1000 * 1000); }
uint64_t timeout_S() const { return divide(timeout(), 1024 * 1024); } // fast approximation
uint64_t expiration() const { return m_expiration; }
uint64_t expiration(uint64_t x) { return m_expiration = x; }
Timeout& operator = (uint64_t x) { timeout(x); return *this; }
Timeout& operator = (const Timeout& rhs) = default;
bool operator < (const Timeout& rhs) const {
return m_expiration < rhs.m_expiration;
}

// Saturating subtract, no downward overflow
__attribute__((always_inline)) static
uint64_t sat_sub(uint64_t x, uint64_t y)
{
#if defined(__x86_64__)
register uint64_t z asm ("rax");
asm("xor %0, %0; subq %2, %1; cmovaeq %1, %0;" : "=r"(z), "+r"(x) ,"+r"(y) : : "cc");
return z;
#elif defined(__aarch64__)
return x > y ? x - y : 0;
#endif
Timeout& timeout_at_most(uint64_t x) {
x = sat_add(now, x);
if (x < m_expiration)
m_expiration = x;
return *this;
}

static uint64_t divide(uint64_t x, uint64_t divisor)
{
protected:
operator bool() const = delete;
static uint64_t divide(uint64_t x, uint64_t divisor) {
return (x + divisor / 2) / divisor;
}
};

}
20 changes: 20 additions & 0 deletions common/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,23 @@ int version_compare(std::string_view a, std::string_view b, int& result);
int kernel_version_compare(std::string_view dst, int& result);
void print_stacktrace();

namespace photon {

// Saturating addition, no upward overflow
__attribute__((always_inline)) inline
uint64_t sat_add(uint64_t x, uint64_t y) {
uint64_t z, c = __builtin_uaddl_overflow(x, y, (unsigned long*)&z);
return -c | z;
}

// Saturating subtract, no downward overflow
__attribute__((always_inline)) inline
uint64_t sat_sub(uint64_t x, uint64_t y) {
uint64_t z, c = __builtin_usubl_overflow(x, y, (unsigned long*)&z);
return c ? 0 : z;
}

}



4 changes: 2 additions & 2 deletions fs/httpfs/httpfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class HttpFile : public fs::VirtualReadOnlyFile {
net::DummyReaderWriter dummy;
ret = curl->GET(get_url().c_str(), &dummy, tmo.timeout());
if (ret < 200) {
if (photon::now >= tmo.expire()) {
if (photon::now >= tmo.expiration()) {
// set errno to ENOENT since stat should not ETIMEDOUT
LOG_ERROR_RETURN(ENOENT, -1, "Failed to update file stat");
}
Expand Down Expand Up @@ -214,7 +214,7 @@ class HttpFile : public fs::VirtualReadOnlyFile {
curl->set_header_container(&headers);
ret = curl->GET(get_url().c_str(), &writer, tmo.timeout());
if (ret < 200) {
if (photon::now > tmo.expire()) {
if (photon::now > tmo.expiration()) {
LOG_ERROR_RETURN(ETIMEDOUT, -1, "Failed to perform GET ", VALUE(url),
VALUE(offset));
}
Expand Down
10 changes: 5 additions & 5 deletions fs/httpfs/httpfs_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,19 +147,19 @@ class HttpFile_v2 : public fs::VirtualReadOnlyFile {
m_stat.st_size = len;
return 0;
}
void send_read_request(net::http::Client::Operation &op, off_t offset, size_t length, const Timeout &tmo) {
again:
void send_read_request(net::http::Client::Operation &op, off_t offset, size_t length, Timeout tmo) {
estring url;
url.appends(m_url, "?", m_url_param);
op.set_enable_proxy(m_fs->get_client()->has_proxy());
again:
op.req.reset(net::http::Verb::GET, url, op.enable_proxy);
op.req.headers.merge(m_common_header);
op.req.headers.range(offset, offset + length - 1);
op.req.headers.content_length(0);
op.timeout = tmo.timeout();
op.timeout = tmo;
m_fs->get_client()->call(&op);
if (op.status_code < 0) {
if (tmo.timeout() == 0) {
if (tmo.expired()) {
m_etimeout = true;
LOG_ERROR_RETURN(ENOENT, , "http timedout");
}
Expand Down Expand Up @@ -254,7 +254,7 @@ class HttpFile_v2 : public fs::VirtualReadOnlyFile {
}
};

IFile* HttpFs_v2::open(const char* pathname, int flags) {
inline IFile* HttpFs_v2::open(const char* pathname, int flags) {
if (!pathname) LOG_ERROR_RETURN(EINVAL, nullptr, "NULL is not allowed");
if (flags != O_RDONLY) return nullptr;

Expand Down
20 changes: 9 additions & 11 deletions io/epoll-ng.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,17 @@ class EventEngineEPollNG : public MasterEventEngine,
// in such condition, timeout_ms should be at least 1
// or it may call epoll_wait without any idle
timeout = (timeout && timeout < 1024) ? 1 : timeout / 1024;
timeout &= 0x7fffffff; // make sure less than INT32_MAX
while (epfd > 0) {
int ret = epoll_wait(epfd, events, LEN(events), timeout);
if (ret < 0) {
ERRNO err;
if (err.no == EINTR) continue;
usleep(1024L * cool_down_ms);
::usleep(1024L * cool_down_ms);
if (cool_down_ms > 16)
LOG_ERROR_RETURN(err.no, , "epoll_wait() failed ", err);
timeout = sat_sub(timeout, cool_down_ms);
if (cool_down_ms < 16) {
cool_down_ms *= 2;
continue;
}
LOG_ERROR_RETURN(err.no, , "epoll_wait() failed ", err);
cool_down_ms *= 2;
}
remains += ret;
return;
Expand Down Expand Up @@ -254,9 +253,8 @@ class EventEngineEPollNG : public MasterEventEngine,
}
}
virtual ssize_t wait_for_events(void** data, size_t count,
uint64_t timeout = -1) override {
int ret = get_vcpu()->master_event_engine->wait_for_fd_readable(
engine.epfd, timeout);
Timeout timeout) override {
int ret = ::photon::wait_for_fd_readable(engine.epfd, timeout);
if (ret < 0) {
return errno == ETIMEDOUT ? 0 : -1;
}
Expand All @@ -273,7 +271,7 @@ class EventEngineEPollNG : public MasterEventEngine,
}
return ptr - data;
}
virtual ssize_t wait_and_fire_events(uint64_t timeout = -1) override {
virtual ssize_t wait_and_fire_events(uint64_t timeout) override {
ssize_t n = 0;
wait_for_events(
timeout,
Expand All @@ -289,7 +287,7 @@ class EventEngineEPollNG : public MasterEventEngine,
}
virtual int cancel_wait() override { return eventfd_write(evfd, 1); }

int wait_for_fd(int fd, uint32_t interests, uint64_t timeout) override {
int wait_for_fd(int fd, uint32_t interests, Timeout timeout) override {
Event waiter{fd, interests | ONE_SHOT, CURRENT};
Event event{fd, interests | ONE_SHOT, &waiter};
int ret = add_interest(event);
Expand Down
Loading
Loading