Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interruptible throttle; DNS Resolver support filter #495

Merged
merged 1 commit into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions common/alog.h
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,11 @@ struct LogBuilder {
ALOG_TEMP, __VA_ARGS__); \
errno = _err_bak; \
}
#ifndef DISABLE_AUDIT
#define LOG_AUDIT(...) (__LOG__((), default_audit_logger, ALOG_AUDIT, __VA_ARGS__))
#else
#define LOG_AUDIT(...)
#endif

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

跟 alog-audit.h 统一

#ifndef DISABLE_AUDIT

inline void set_log_output(ILogOutput* output) {
default_logger.log_output = output;
Expand Down
20 changes: 11 additions & 9 deletions common/throttle.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class throttle {
photon::semaphore sem;
uint64_t last_retrieve = 0;
uint64_t m_limit = -1UL;
uint64_t m_limit_per_slice;
uint64_t m_limit_per_slice = -1UL;
uint64_t m_time_window;
uint64_t m_time_window_per_slice;
uint64_t m_slice_num;
Expand Down Expand Up @@ -62,10 +62,8 @@ class throttle {
uint64_t fulfil_percent = get_fulfill_percent(prio);
uint64_t starving_percent = m_starving_slice_percent[int(prio)];

// TODO:
// if (unlikely(amount > m_limit && m_limit > 0)) {
// return 0;
// }
// TODO: Handle the situation when throttle limit is extremely low
assert(amount < m_limit);

int ret = -1;
int err = ETIMEDOUT;
Expand All @@ -81,12 +79,16 @@ class throttle {
if (sem.count() * 100 < m_limit * fulfil_percent) {
// Request are fulfilled only if they saw enough percent of tokens,
// otherwise wait a `time_window_per_slice`.
photon::thread_usleep(m_time_window_per_slice);
ret = photon::thread_usleep(m_time_window_per_slice);
if (ret != 0) {
// Interrupted, just return
return -1;
}
starving_slice_num++;
continue;
}
break_starving:
ret = sem.wait(amount, m_time_window_per_slice);
ret = sem.wait_interruptible(amount, m_time_window_per_slice);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可打断的限流在一些场合很有用,比如做了优先级限流之后,一些低优先级的任务在后台排队(睡眠),如果这时候又来了新的相同的高优先级任务,需要打断睡眠,相当于提高优先级。

err = errno;
} while (ret < 0 && err == ETIMEDOUT);
if (ret < 0) {
Expand Down Expand Up @@ -136,7 +138,7 @@ class throttle {
}
}

uint64_t m_starving_slice_num[int(Priority::NumPriorities)];
uint64_t m_starving_slice_percent[int(Priority::NumPriorities)];
uint64_t m_starving_slice_num[int(Priority::NumPriorities)] = {};
uint64_t m_starving_slice_percent[int(Priority::NumPriorities)] = {};
};
} // namespace photon
3 changes: 3 additions & 0 deletions net/curl.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ class cURL {
return setopt(CURLOPT_HEADERDATA, (void*)stream),
setopt(CURLOPT_HEADERFUNCTION, &writer<T>);
}
cURL& set_unix_socket(const char* path) {
return _setopt(CURLOPT_UNIX_SOCKET_PATH, path);
}
// Turn on/off verbose, log to ALOG
// ATTENTION: during verbose mode on, make sure ALOG configured.
// modify ALOG configurations during verbose on may cause
Expand Down
26 changes: 21 additions & 5 deletions net/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -725,14 +725,30 @@ TEST(utils, gethostbyname) {
TEST(utils, resolver) {
auto *resolver = new_default_resolver();
DEFER(delete resolver);
net::IPAddr localhost("127.0.0.1");
net::IPAddr addr = resolver->resolve("localhost");
if (addr.is_ipv4()) { EXPECT_EQ(localhost.to_nl(), addr.to_nl()); }
auto func = [&](net::IPAddr addr_){
if (addr_.is_ipv4()) { EXPECT_EQ(localhost.to_nl(), addr_.to_nl()); }
if (addr.is_ipv4()) {
EXPECT_EQ(net::IPAddr::V4Loopback(), addr);
} else {
EXPECT_EQ(net::IPAddr::V6Loopback(), addr);
}
}

TEST(utils, resolver_filter) {
auto *resolver = new_default_resolver();
DEFER(delete resolver);
auto filter = [&](net::IPAddr addr_) -> bool {
return !addr_.is_ipv4();
};
resolver->resolve("localhost", func);
auto addr = resolver->resolve_filter("localhost", filter);
ASSERT_TRUE(!addr.is_ipv4());
}

TEST(utils, resolver_discard) {
auto *resolver = new_default_resolver();
DEFER(delete resolver);
(void) resolver->resolve("localhost");
resolver->discard_cache("non-exist-host.com");
resolver->discard_cache("localhost");
}

#ifdef __linux__
Expand Down
33 changes: 20 additions & 13 deletions net/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,24 +264,17 @@ class DefaultResolver : public Resolver {
IPAddrNode(IPAddr addr) : addr(addr) {}
};
using IPAddrList = intrusive_list<IPAddrNode>;
public:
DefaultResolver(uint64_t cache_ttl, uint64_t resolve_timeout)
: dnscache_(cache_ttl), resolve_timeout_(resolve_timeout) {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这些没变的话,就尽量不动

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没动,只是把protected的函数移到了上面,就显得有变化

~DefaultResolver() {
for (auto it : dnscache_) {
((IPAddrList*)it->_obj)->delete_all();
}
dnscache_.clear();
}

IPAddr resolve(std::string_view host) override {
IPAddr do_resolve(std::string_view host, Delegate<bool, IPAddr> filter) {
auto ctr = [&]() -> IPAddrList* {
auto addrs = new IPAddrList();
photon::semaphore sem;
std::thread([&]() {
auto now = std::chrono::steady_clock::now();
IPAddrList ret;
auto cb = [&](IPAddr addr) {
auto cb = [&](IPAddr addr) -> int {
if (filter && !filter.fire(addr))
return 0;
ret.push_back(new IPAddrNode(addr));
return 0;
};
Expand All @@ -307,8 +300,22 @@ class DefaultResolver : public Resolver {
return ret->addr;
}

void resolve(std::string_view host, Delegate<void, IPAddr> func) override {
func(resolve(host));
public:
DefaultResolver(uint64_t cache_ttl, uint64_t resolve_timeout)
: dnscache_(cache_ttl), resolve_timeout_(resolve_timeout) {}
~DefaultResolver() {
for (auto it : dnscache_) {
((IPAddrList*)it->_obj)->delete_all();
}
dnscache_.clear();
}

IPAddr resolve(std::string_view host) override {
return do_resolve(host, nullptr);
}

IPAddr resolve_filter(std::string_view host, Delegate<bool, IPAddr> filter) override {
return do_resolve(host, filter);
}

void discard_cache(std::string_view host, IPAddr ip) override {
Expand Down
7 changes: 5 additions & 2 deletions net/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,11 @@ class Resolver : public Object {
// When failed, return an Undefined IPAddr
// Normally dns servers return multiple ips in random order, choosing the first one should suffice.
virtual IPAddr resolve(std::string_view host) = 0;
virtual void resolve(std::string_view host, Delegate<void, IPAddr> func) = 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why old API have to be removed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

等于是用上面那个resolve的结果自己再执行一遍func?

Copy link
Collaborator

@Coldwings Coldwings May 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func(resolve(host)) is good enough (and used to be just implemented like this).

virtual void discard_cache(std::string_view host, IPAddr ip = IPAddr()) = 0; // discard current cache of ip
void resolve(std::string_view host, Delegate<void, IPAddr> func) { func(resolve(host)); }
// If filter callback returns false, the IP will be abandoned.
virtual IPAddr resolve_filter(std::string_view host, Delegate<bool, IPAddr> filter) = 0;
// Discard cache of a hostname, ip can be specified
virtual void discard_cache(std::string_view host, IPAddr ip = IPAddr()) = 0;
};

/**
Expand Down
4 changes: 2 additions & 2 deletions thread/thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1682,7 +1682,7 @@ R"(
splock.lock();
CURRENT->semaphore_count = count;
int ret = 0;
while (!try_substract(count)) {
while (!try_subtract(count)) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

substract is old English, which also appears in French and Spanish. In modern English it is subtract.

ret = waitq::wait_defer(timeout, spinlock_unlock, &splock);
ERRNO err;
splock.lock();
Expand Down Expand Up @@ -1714,7 +1714,7 @@ R"(
prelocked_thread_interrupt(th, -1);
}
}
bool semaphore::try_substract(uint64_t count)
bool semaphore::try_subtract(uint64_t count)
{
while(true)
{
Expand Down
2 changes: 1 addition & 1 deletion thread/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ namespace photon
protected:
std::atomic<uint64_t> m_count;
spinlock splock;
bool try_substract(uint64_t count);
bool try_subtract(uint64_t count);
void try_resume();
};

Expand Down
Loading