From e2f362d5fdf2c5f46f584a8f45bf45d724bdd9bd Mon Sep 17 00:00:00 2001 From: Bob Chen Date: Tue, 28 May 2024 15:42:41 +0800 Subject: [PATCH] Interruptible throttle; DNS Resolver support filter --- common/alog.h | 4 ++++ common/throttle.h | 20 +++++++++++--------- net/curl.h | 3 +++ net/test/test.cpp | 26 +++++++++++++++++++++----- net/utils.cpp | 33 ++++++++++++++++++++------------- net/utils.h | 7 +++++-- thread/thread.cpp | 4 ++-- thread/thread.h | 2 +- 8 files changed, 67 insertions(+), 32 deletions(-) diff --git a/common/alog.h b/common/alog.h index 276ecce0..6011bbc4 100644 --- a/common/alog.h +++ b/common/alog.h @@ -488,7 +488,11 @@ struct LogBuilder { ALOG_TEMP, __VA_ARGS__); \ errno = _err_bak; \ } +#ifndef DISABLE_AUDIT #define LOG_AUDIT(...) (__LOG__((), default_audit_logger, ALOG_AUDIT, __VA_ARGS__)) +#else +#define LOG_AUDIT(...) +#endif inline void set_log_output(ILogOutput* output) { default_logger.log_output = output; diff --git a/common/throttle.h b/common/throttle.h index b863dc8d..c3fcbb45 100644 --- a/common/throttle.h +++ b/common/throttle.h @@ -10,7 +10,7 @@ class throttle { photon::semaphore sem; uint64_t last_retrieve = 0; uint64_t m_limit = -1UL; - uint64_t m_limit_per_slice; + uint64_t m_limit_per_slice = -1UL; uint64_t m_time_window; uint64_t m_time_window_per_slice; uint64_t m_slice_num; @@ -62,10 +62,8 @@ class throttle { uint64_t fulfil_percent = get_fulfill_percent(prio); uint64_t starving_percent = m_starving_slice_percent[int(prio)]; - // TODO: - // if (unlikely(amount > m_limit && m_limit > 0)) { - // return 0; - // } + // TODO: Handle the situation when throttle limit is extremely low + assert(amount < m_limit); int ret = -1; int err = ETIMEDOUT; @@ -81,12 +79,16 @@ class throttle { if (sem.count() * 100 < m_limit * fulfil_percent) { // Request are fulfilled only if they saw enough percent of tokens, // otherwise wait a `time_window_per_slice`. - photon::thread_usleep(m_time_window_per_slice); + ret = photon::thread_usleep(m_time_window_per_slice); + if (ret != 0) { + // Interrupted, just return + return -1; + } starving_slice_num++; continue; } break_starving: - ret = sem.wait(amount, m_time_window_per_slice); + ret = sem.wait_interruptible(amount, m_time_window_per_slice); err = errno; } while (ret < 0 && err == ETIMEDOUT); if (ret < 0) { @@ -136,7 +138,7 @@ class throttle { } } - uint64_t m_starving_slice_num[int(Priority::NumPriorities)]; - uint64_t m_starving_slice_percent[int(Priority::NumPriorities)]; + uint64_t m_starving_slice_num[int(Priority::NumPriorities)] = {}; + uint64_t m_starving_slice_percent[int(Priority::NumPriorities)] = {}; }; } // namespace photon diff --git a/net/curl.h b/net/curl.h index 8a4a4db5..edbcaf19 100644 --- a/net/curl.h +++ b/net/curl.h @@ -304,6 +304,9 @@ class cURL { return setopt(CURLOPT_HEADERDATA, (void*)stream), setopt(CURLOPT_HEADERFUNCTION, &writer); } + cURL& set_unix_socket(const char* path) { + return _setopt(CURLOPT_UNIX_SOCKET_PATH, path); + } // Turn on/off verbose, log to ALOG // ATTENTION: during verbose mode on, make sure ALOG configured. // modify ALOG configurations during verbose on may cause diff --git a/net/test/test.cpp b/net/test/test.cpp index e1a871ba..b56dc9ba 100644 --- a/net/test/test.cpp +++ b/net/test/test.cpp @@ -725,14 +725,30 @@ TEST(utils, gethostbyname) { TEST(utils, resolver) { auto *resolver = new_default_resolver(); DEFER(delete resolver); - net::IPAddr localhost("127.0.0.1"); net::IPAddr addr = resolver->resolve("localhost"); - if (addr.is_ipv4()) { EXPECT_EQ(localhost.to_nl(), addr.to_nl()); } - auto func = [&](net::IPAddr addr_){ - if (addr_.is_ipv4()) { EXPECT_EQ(localhost.to_nl(), addr_.to_nl()); } + if (addr.is_ipv4()) { + EXPECT_EQ(net::IPAddr::V4Loopback(), addr); + } else { + EXPECT_EQ(net::IPAddr::V6Loopback(), addr); + } +} + +TEST(utils, resolver_filter) { + auto *resolver = new_default_resolver(); + DEFER(delete resolver); + auto filter = [&](net::IPAddr addr_) -> bool { + return !addr_.is_ipv4(); }; - resolver->resolve("localhost", func); + auto addr = resolver->resolve_filter("localhost", filter); + ASSERT_TRUE(!addr.is_ipv4()); +} + +TEST(utils, resolver_discard) { + auto *resolver = new_default_resolver(); + DEFER(delete resolver); + (void) resolver->resolve("localhost"); resolver->discard_cache("non-exist-host.com"); + // resolver->discard_cache("localhost"); } #ifdef __linux__ diff --git a/net/utils.cpp b/net/utils.cpp index 6e1139a3..d0cad785 100644 --- a/net/utils.cpp +++ b/net/utils.cpp @@ -264,24 +264,17 @@ class DefaultResolver : public Resolver { IPAddrNode(IPAddr addr) : addr(addr) {} }; using IPAddrList = intrusive_list; -public: - DefaultResolver(uint64_t cache_ttl, uint64_t resolve_timeout) - : dnscache_(cache_ttl), resolve_timeout_(resolve_timeout) {} - ~DefaultResolver() { - for (auto it : dnscache_) { - ((IPAddrList*)it->_obj)->delete_all(); - } - dnscache_.clear(); - } - IPAddr resolve(std::string_view host) override { + IPAddr do_resolve(std::string_view host, Delegate filter) { auto ctr = [&]() -> IPAddrList* { auto addrs = new IPAddrList(); photon::semaphore sem; std::thread([&]() { auto now = std::chrono::steady_clock::now(); IPAddrList ret; - auto cb = [&](IPAddr addr) { + auto cb = [&](IPAddr addr) -> int { + if (filter && !filter.fire(addr)) + return 0; ret.push_back(new IPAddrNode(addr)); return 0; }; @@ -307,8 +300,22 @@ class DefaultResolver : public Resolver { return ret->addr; } - void resolve(std::string_view host, Delegate func) override { - func(resolve(host)); +public: + DefaultResolver(uint64_t cache_ttl, uint64_t resolve_timeout) + : dnscache_(cache_ttl), resolve_timeout_(resolve_timeout) {} + ~DefaultResolver() { + for (auto it : dnscache_) { + ((IPAddrList*)it->_obj)->delete_all(); + } + dnscache_.clear(); + } + + IPAddr resolve(std::string_view host) override { + return do_resolve(host, nullptr); + } + + IPAddr resolve_filter(std::string_view host, Delegate filter) override { + return do_resolve(host, filter); } void discard_cache(std::string_view host, IPAddr ip) override { diff --git a/net/utils.h b/net/utils.h index c80c6d92..f3a1f195 100644 --- a/net/utils.h +++ b/net/utils.h @@ -157,8 +157,11 @@ class Resolver : public Object { // When failed, return an Undefined IPAddr // Normally dns servers return multiple ips in random order, choosing the first one should suffice. virtual IPAddr resolve(std::string_view host) = 0; - virtual void resolve(std::string_view host, Delegate func) = 0; - virtual void discard_cache(std::string_view host, IPAddr ip = IPAddr()) = 0; // discard current cache of ip + void resolve(std::string_view host, Delegate func) { func(resolve(host)); } + // If filter callback returns false, the IP will be abandoned. + virtual IPAddr resolve_filter(std::string_view host, Delegate filter) = 0; + // Discard cache of a hostname, ip can be specified + virtual void discard_cache(std::string_view host, IPAddr ip = IPAddr()) = 0; }; /** diff --git a/thread/thread.cpp b/thread/thread.cpp index 8ebb7169..2b37f5ea 100644 --- a/thread/thread.cpp +++ b/thread/thread.cpp @@ -1682,7 +1682,7 @@ R"( splock.lock(); CURRENT->semaphore_count = count; int ret = 0; - while (!try_substract(count)) { + while (!try_subtract(count)) { ret = waitq::wait_defer(timeout, spinlock_unlock, &splock); ERRNO err; splock.lock(); @@ -1714,7 +1714,7 @@ R"( prelocked_thread_interrupt(th, -1); } } - bool semaphore::try_substract(uint64_t count) + bool semaphore::try_subtract(uint64_t count) { while(true) { diff --git a/thread/thread.h b/thread/thread.h index 5ac3a120..31e14cf4 100644 --- a/thread/thread.h +++ b/thread/thread.h @@ -427,7 +427,7 @@ namespace photon protected: std::atomic m_count; spinlock splock; - bool try_substract(uint64_t count); + bool try_subtract(uint64_t count); void try_resume(); };