From 16a5bf0efe61f63bd04be58dfc1db42e9a5390d8 Mon Sep 17 00:00:00 2001 From: "xunyi.lh" Date: Wed, 20 Dec 2023 18:05:03 +0800 Subject: [PATCH 1/4] Support ipv6 in http client. - refactor KernelSocketClient. - apply dns load balance in DefaultResolver. - fix bug in DefaultResolver when timeout happen. --- net/http/client.cpp | 2 +- net/http/test/client_function_test.cpp | 10 +++++ net/kernel_socket.cpp | 51 +++++++++++------------- net/utils.cpp | 55 ++++++++++++++------------ net/utils.h | 5 ++- 5 files changed, 68 insertions(+), 55 deletions(-) diff --git a/net/http/client.cpp b/net/http/client.cpp index 59f08faf..dc388f74 100644 --- a/net/http/client.cpp +++ b/net/http/client.cpp @@ -91,7 +91,7 @@ ISocketStream* PooledDialer::dial(std::string_view host, uint16_t port, bool sec if (ipaddr.undefined()) LOG_DEBUG("No connectable resolve result"); // When failed, remove resolved result from dns cache so that following retries can try // different ips. - resolver->discard_cache(strhost.c_str()); + resolver->discard_cache(strhost.c_str(), ipaddr); return nullptr; } diff --git a/net/http/test/client_function_test.cpp b/net/http/test/client_function_test.cpp index 1fb7cf9f..8e0b48cd 100644 --- a/net/http/test/client_function_test.cpp +++ b/net/http/test/client_function_test.cpp @@ -513,6 +513,16 @@ TEST(http_client, partial_body) { EXPECT_EQ(true, buf == "http_clien"); } +TEST(DISABLED_http_client, ipv6) { // make sure runing in a ipv6-ready environment + auto client = new_http_client(); + DEFER(delete client); + // here is an ipv6-only website + auto op = client->new_operation(Verb::GET, "http://test6.ustc.edu.cn"); + DEFER(delete op); + op->call(); + EXPECT_EQ(200, op->resp.status_code()); +} + TEST(url, url_escape_unescape) { EXPECT_EQ( url_escape("?a=x:b&b=cd&c= feg&d=2/1[+]@alibaba.com&e='!bad';"), diff --git a/net/kernel_socket.cpp b/net/kernel_socket.cpp index 59749dd8..08715afa 100644 --- a/net/kernel_socket.cpp +++ b/net/kernel_socket.cpp @@ -191,9 +191,7 @@ class KernelSocketStream : public SocketStreamBase { class KernelSocketClient : public SocketClientBase { public: - KernelSocketClient(int socket_family, bool nonblocking) : - m_socket_family(socket_family), - m_nonblocking(nonblocking) {} + KernelSocketClient(bool nonblocking) : m_nonblocking(nonblocking) {} ISocketStream* connect(const char* path, size_t count) override { struct sockaddr_un addr_un; @@ -211,11 +209,10 @@ class KernelSocketClient : public SocketClientBase { } protected: - int m_socket_family; bool m_nonblocking; - virtual KernelSocketStream* create_stream() { - return new KernelSocketStream(m_socket_family, m_nonblocking); + virtual KernelSocketStream* create_stream(int socket_family) { + return new KernelSocketStream(socket_family, m_nonblocking); } virtual int fd_connect(int fd, const sockaddr* remote, socklen_t addrlen) { @@ -224,7 +221,7 @@ class KernelSocketClient : public SocketClientBase { ISocketStream* do_connect(const sockaddr* remote, socklen_t len_remote, const sockaddr* local = nullptr, socklen_t len_local = 0) { - auto stream = create_stream(); + auto stream = create_stream(remote->sa_family); auto deleter = [&](KernelSocketStream*) { auto errno_backup = errno; delete stream; @@ -504,8 +501,8 @@ class ZeroCopySocketClient : public KernelSocketClient { public: using KernelSocketClient::KernelSocketClient; protected: - KernelSocketStream* create_stream() override { - return new ZeroCopySocketStream(m_socket_family, m_nonblocking); + KernelSocketStream* create_stream(int socket_family) override { + return new ZeroCopySocketStream(socket_family, m_nonblocking); } }; @@ -544,8 +541,8 @@ class IouringSocketClient : public KernelSocketClient { using KernelSocketClient::KernelSocketClient; protected: - KernelSocketStream* create_stream() override { - return new IouringSocketStream(m_socket_family, m_nonblocking); + KernelSocketStream* create_stream(int socket_family) override { + return new IouringSocketStream(socket_family, m_nonblocking); } int fd_connect(int fd, const sockaddr* remote, socklen_t addrlen) override { @@ -614,8 +611,8 @@ class IouringFixedFileSocketClient : public IouringSocketClient { protected: using IouringSocketClient::IouringSocketClient; - KernelSocketStream* create_stream() override { - return new IouringFixedFileSocketStream(m_socket_family, m_nonblocking); + KernelSocketStream* create_stream(int socket_family) override { + return new IouringFixedFileSocketStream(socket_family, m_nonblocking); } }; @@ -683,8 +680,8 @@ class FstackDpdkSocketClient : public KernelSocketClient { protected: using KernelSocketClient::KernelSocketClient; - KernelSocketStream* create_stream() override { - return new FstackDpdkSocketStream(m_socket_family, m_nonblocking); + KernelSocketStream* create_stream(int socket_family) override { + return new FstackDpdkSocketStream(socket_family, m_nonblocking); } int fd_connect(int fd, const sockaddr* remote, socklen_t addrlen) override { @@ -939,8 +936,8 @@ class ETKernelSocketClient : public KernelSocketClient { using KernelSocketClient::KernelSocketClient; protected: - KernelSocketStream* create_stream() override { - return new ETKernelSocketStream(m_socket_family, m_nonblocking); + KernelSocketStream* create_stream(int socket_family) override { + return new ETKernelSocketStream(socket_family, m_nonblocking); } }; @@ -974,10 +971,10 @@ class ETKernelSocketServer : public KernelSocketServer, public NotifyContext { /* ET Socket - End */ extern "C" ISocketClient* new_tcp_socket_client() { - return new KernelSocketClient(AF_INET, true); + return new KernelSocketClient(true); } extern "C" ISocketClient* new_tcp_socket_client_ipv6() { - return new KernelSocketClient(AF_INET6, true); + return new KernelSocketClient(true); } extern "C" ISocketServer* new_tcp_socket_server() { return NewObj(AF_INET, false, true)->init(); @@ -986,7 +983,7 @@ extern "C" ISocketServer* new_tcp_socket_server_ipv6() { return NewObj(AF_INET6, false, true)->init(); } extern "C" ISocketClient* new_uds_client() { - return new KernelSocketClient(AF_UNIX, true); + return new KernelSocketClient(true); } extern "C" ISocketServer* new_uds_server(bool autoremove) { return NewObj(AF_UNIX, autoremove, true)->init(); @@ -996,14 +993,14 @@ extern "C" ISocketServer* new_zerocopy_tcp_server() { return NewObj(AF_INET, false, true)->init(); } extern "C" ISocketClient* new_zerocopy_tcp_client() { - return new ZeroCopySocketClient(AF_INET, true); + return new ZeroCopySocketClient(true); } #ifdef PHOTON_URING extern "C" ISocketClient* new_iouring_tcp_client() { if (photon::iouring_register_files_enabled()) - return new IouringFixedFileSocketClient(AF_INET, false); + return new IouringFixedFileSocketClient(false); else - return new IouringSocketClient(AF_INET, false); + return new IouringSocketClient(false); } extern "C" ISocketServer* new_iouring_tcp_server() { if (photon::iouring_register_files_enabled()) @@ -1013,23 +1010,23 @@ extern "C" ISocketServer* new_iouring_tcp_server() { } #endif // PHOTON_URING extern "C" ISocketClient* new_et_tcp_socket_client() { - return new ETKernelSocketClient(AF_INET, true); + return new ETKernelSocketClient(true); } extern "C" ISocketServer* new_et_tcp_socket_server() { return NewObj(AF_INET, false, true)->init(); } extern "C" ISocketClient* new_smc_socket_client() { - return new KernelSocketClient(AF_SMC, true); + return new KernelSocketClient(true); } extern "C" ISocketServer* new_smc_socket_server() { return NewObj(AF_SMC, false, true)->init(); } #ifdef ENABLE_FSTACK_DPDK extern "C" ISocketClient* new_fstack_dpdk_socket_client() { - return new FstackDpdkSocketClient(AF_INET, true); + return new FstackDpdkSocketClient(true); } extern "C" ISocketServer* new_fstack_dpdk_socket_server() { - return NewObj(AF_INET, false, true)->init(); + return NewObj(false, true)->init(); } #endif // ENABLE_FSTACK_DPDK #endif // __linux__ diff --git a/net/utils.cpp b/net/utils.cpp index 82985ffb..e2721284 100644 --- a/net/utils.cpp +++ b/net/utils.cpp @@ -21,6 +21,8 @@ limitations under the License. #include #include +#include +#include #include #include @@ -255,43 +257,46 @@ class DefaultResolver : public Resolver { ~DefaultResolver() { dnscache_.clear(); } IPAddr resolve(const char *host) override { - auto ctr = [&]() -> IPAddr * { - std::vector addrs; + auto ctr = [&]() -> std::list* { + auto addrs = new std::list(); photon::semaphore sem; std::thread([&]() { - int ret = gethostbyname(host, addrs); - if (ret < 0) { - addrs.clear(); - } - sem.signal(1); + auto now = std::chrono::steady_clock::now(); + gethostbyname(host, *addrs); + auto time_elapsed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - now).count(); + if ((uint64_t)time_elapsed <= resolve_timeout_) sem.signal(1); }).detach(); - auto ret = sem.wait(1, resolve_timeout_); - if (ret < 0 && errno == ETIMEDOUT) { - LOG_WARN("Domain resolution for ` timeout!", host); - return new IPAddr; // undefined addr - } else if (addrs.empty()) { - LOG_WARN("Domain resolution for ` failed", host); - return new IPAddr; // undefined addr - } - // TODO: support ipv6 - for (auto& ip : addrs) { - if (ip.is_ipv4()) - return new IPAddr(ip); - } - return new IPAddr; // undefined addr + sem.wait(1, resolve_timeout_); + return addrs; }; - return *(dnscache_.borrow(host, ctr)); + auto ips = dnscache_.borrow(host, ctr); + if (ips->empty()) LOG_ERRNO_RETURN(0, IPAddr(), "Domain resolution for ` failed", host); + auto ret = ips->front(); + if (ips->size() > 1) { // access in round robin order + ips->pop_front(); + ips->push_back(ret); + } + return ret; } void resolve(const char *host, Delegate func) override { func(resolve(host)); } - void discard_cache(const char *host) override { + void discard_cache(const char *host, IPAddr ip) override { auto ipaddr = dnscache_.borrow(host); - ipaddr.recycle(true); + if (ip.undefined() || ipaddr->empty()) ipaddr.recycle(true); + else { + for (auto it = ipaddr->begin(); it != ipaddr->end(); ++it) { + if (*it == ip) { + ipaddr->erase(it); + break; + } + } + } } private: - ObjectCache dnscache_; + ObjectCache*> dnscache_; uint64_t resolve_timeout_; }; diff --git a/net/utils.h b/net/utils.h index 3f63f27c..006b4307 100644 --- a/net/utils.h +++ b/net/utils.h @@ -109,7 +109,8 @@ inline int gethostbyname(const char* name, IPAddr* buf, int bufsize = 1) { * @param ret `std::vector` reference to get results * @return sum of resolved address number. -1 means error. */ -inline int gethostbyname(const char* name, std::vector& ret) { +template +inline int gethostbyname(const char* name, Container& ret) { ret.clear(); auto cb = [&](IPAddr addr) { ret.push_back(addr); @@ -157,7 +158,7 @@ class Resolver : public Object { // Normally dns servers return multiple ips in random order, choosing the first one should suffice. virtual IPAddr resolve(const char* host) = 0; virtual void resolve(const char* host, Delegate func) = 0; - virtual void discard_cache(const char* host) = 0; // discard current cache of host:ip + virtual void discard_cache(const char* host, IPAddr ip = IPAddr()) = 0; // discard current cache of ip }; /** From b5d2ed47744676a0adb63489355e835696975f37 Mon Sep 17 00:00:00 2001 From: "xunyi.lh" Date: Wed, 20 Dec 2023 18:05:27 +0800 Subject: [PATCH 2/4] Add big writes/uid/gid/du support to FUSE - In Fuse 2.x, without big writes, write io size is limited to 4k. Turing it on can enlarge write io size to 128K at maximum. - Support du by adding st_blocks to xmp_getattr result. - Add default uid/gid to xmp_getattr to avoid permission problems. --- fs/fuse_adaptor.cpp | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/fs/fuse_adaptor.cpp b/fs/fuse_adaptor.cpp index 655eb55a..f6821712 100644 --- a/fs/fuse_adaptor.cpp +++ b/fs/fuse_adaptor.cpp @@ -80,13 +80,21 @@ namespace photon { namespace fs{ static IFileSystem* fs = nullptr; +static uid_t cuid = 0, cgid = 0; #define CHECK_FS() if (!fs) return -EFAULT; static void* xmp_init(struct fuse_conn_info *conn) { REPORT_PERF(xmp_init, 1) - (void) conn; + cuid = geteuid(); + cgid = getegid(); + if ((unsigned int)conn->capable & FUSE_CAP_ATOMIC_O_TRUNC) { + conn->want |= FUSE_CAP_ATOMIC_O_TRUNC; + } + if ((unsigned int)conn->capable & FUSE_CAP_BIG_WRITES) { + conn->want |= FUSE_CAP_BIG_WRITES; + } return NULL; } @@ -102,9 +110,13 @@ static int xmp_getattr(const char *path, struct stat *stbuf) CHECK_FS(); errno = 0; int res = fs->lstat(path, stbuf); - if(res) \ - LOG_ERROR_RETURN(0, errno ? -errno : res, VALUE(path)); - errno = 0; + if(res) return -errno; + stbuf->st_blocks = stbuf->st_size / 512 + 1; // du relies on this + stbuf->st_blksize = 4096; + // use current user/group when backendfs doesn't support uid/gid + if (stbuf->st_uid == 0) stbuf->st_uid = cuid; + if (stbuf->st_gid == 0) stbuf->st_gid = cgid; + if (stbuf->st_nlink == 0) stbuf->st_nlink = 1; return 0; } From 9107c43cd7f36b7453bd1992ed995491628b6f12 Mon Sep 17 00:00:00 2001 From: "xunyi.lh" Date: Fri, 22 Dec 2023 15:48:43 +0800 Subject: [PATCH 3/4] Fix bug when resolving thread timeout --- net/utils.cpp | 36 +++++++++++++++++++++++++----------- net/utils.h | 4 ++-- 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/net/utils.cpp b/net/utils.cpp index e2721284..efbe1bac 100644 --- a/net/utils.cpp +++ b/net/utils.cpp @@ -251,21 +251,35 @@ bool Base64Decode(std::string_view in, std::string &out) { } class DefaultResolver : public Resolver { +protected: + struct IPAddrNode : public intrusive_list_node { + IPAddr addr; + IPAddrNode(IPAddr addr) : addr(addr) {} + }; + using IPAddrList = intrusive_list; public: DefaultResolver(uint64_t cache_ttl, uint64_t resolve_timeout) : dnscache_(cache_ttl), resolve_timeout_(resolve_timeout) {} ~DefaultResolver() { dnscache_.clear(); } IPAddr resolve(const char *host) override { - auto ctr = [&]() -> std::list* { - auto addrs = new std::list(); + auto ctr = [&]() -> IPAddrList* { + auto addrs = new IPAddrList(); photon::semaphore sem; std::thread([&]() { auto now = std::chrono::steady_clock::now(); - gethostbyname(host, *addrs); + IPAddrList ret; + auto cb = [&](IPAddr addr) { + ret.push_back(new IPAddrNode(addr)); + return 0; + }; + _gethostbyname(host, cb); auto time_elapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - now).count(); - if ((uint64_t)time_elapsed <= resolve_timeout_) sem.signal(1); + if ((uint64_t)time_elapsed <= resolve_timeout_) { + addrs->push_back(std::move(ret)); + sem.signal(1); + } }).detach(); sem.wait(1, resolve_timeout_); return addrs; @@ -273,11 +287,11 @@ class DefaultResolver : public Resolver { auto ips = dnscache_.borrow(host, ctr); if (ips->empty()) LOG_ERRNO_RETURN(0, IPAddr(), "Domain resolution for ` failed", host); auto ret = ips->front(); - if (ips->size() > 1) { // access in round robin order - ips->pop_front(); - ips->push_back(ret); + if (!ret->single()) { // access in round robin order + auto front = ips->pop_front(); + ips->push_back(front); } - return ret; + return ret->addr; } void resolve(const char *host, Delegate func) override { func(resolve(host)); } @@ -286,8 +300,8 @@ class DefaultResolver : public Resolver { auto ipaddr = dnscache_.borrow(host); if (ip.undefined() || ipaddr->empty()) ipaddr.recycle(true); else { - for (auto it = ipaddr->begin(); it != ipaddr->end(); ++it) { - if (*it == ip) { + for (auto it : *ipaddr) { + if (it->addr == ip) { ipaddr->erase(it); break; } @@ -296,7 +310,7 @@ class DefaultResolver : public Resolver { } private: - ObjectCache*> dnscache_; + ObjectCache dnscache_; uint64_t resolve_timeout_; }; diff --git a/net/utils.h b/net/utils.h index 006b4307..bf61c3b0 100644 --- a/net/utils.h +++ b/net/utils.h @@ -109,8 +109,7 @@ inline int gethostbyname(const char* name, IPAddr* buf, int bufsize = 1) { * @param ret `std::vector` reference to get results * @return sum of resolved address number. -1 means error. */ -template -inline int gethostbyname(const char* name, Container& ret) { +inline int gethostbyname(const char* name, std::vector& ret) { ret.clear(); auto cb = [&](IPAddr addr) { ret.push_back(addr); @@ -163,6 +162,7 @@ class Resolver : public Object { /** * @brief A non-blocking Resolver based on gethostbyname. + * Currently, it's not thread safe. * * @param cache_ttl cache's lifetime in microseconds. * @param resolve_timeout timeout in microseconds for domain resolution. From 1c653cd221be8dde48ae24924c67b255e6cfa3aa Mon Sep 17 00:00:00 2001 From: "xunyi.lh" Date: Sun, 24 Dec 2023 11:48:55 +0800 Subject: [PATCH 4/4] Fix unittest for default resovler --- net/test/test.cpp | 4 ++-- net/utils.cpp | 18 +++++++++-------- thread/list.h | 51 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 10 deletions(-) diff --git a/net/test/test.cpp b/net/test/test.cpp index a61d6f72..ad182a06 100644 --- a/net/test/test.cpp +++ b/net/test/test.cpp @@ -714,9 +714,9 @@ TEST(utils, resolver) { DEFER(delete resolver); net::IPAddr localhost("127.0.0.1"); net::IPAddr addr = resolver->resolve("localhost"); - EXPECT_EQ(localhost.to_nl(), addr.to_nl()); + if (addr.is_ipv4()) EXPECT_EQ(localhost.to_nl(), addr.to_nl()); auto func = [&](net::IPAddr addr_){ - EXPECT_EQ(localhost.to_nl(), addr_.to_nl()); + if (addr_.is_ipv4()) EXPECT_EQ(localhost.to_nl(), addr_.to_nl()); }; resolver->resolve("localhost", func); resolver->discard_cache("non-exist-host.com"); diff --git a/net/utils.cpp b/net/utils.cpp index efbe1bac..ad9209d8 100644 --- a/net/utils.cpp +++ b/net/utils.cpp @@ -260,7 +260,12 @@ class DefaultResolver : public Resolver { public: DefaultResolver(uint64_t cache_ttl, uint64_t resolve_timeout) : dnscache_(cache_ttl), resolve_timeout_(resolve_timeout) {} - ~DefaultResolver() { dnscache_.clear(); } + ~DefaultResolver() { + for (auto it : dnscache_) { + ((IPAddrList*)it->_obj)->delete_all(); + } + dnscache_.clear(); + } IPAddr resolve(const char *host) override { auto ctr = [&]() -> IPAddrList* { @@ -287,10 +292,7 @@ class DefaultResolver : public Resolver { auto ips = dnscache_.borrow(host, ctr); if (ips->empty()) LOG_ERRNO_RETURN(0, IPAddr(), "Domain resolution for ` failed", host); auto ret = ips->front(); - if (!ret->single()) { // access in round robin order - auto front = ips->pop_front(); - ips->push_back(front); - } + ips->node = ret->next(); // access in round robin order return ret->addr; } @@ -300,9 +302,9 @@ class DefaultResolver : public Resolver { auto ipaddr = dnscache_.borrow(host); if (ip.undefined() || ipaddr->empty()) ipaddr.recycle(true); else { - for (auto it : *ipaddr) { - if (it->addr == ip) { - ipaddr->erase(it); + for (auto itr = ipaddr->rbegin(); itr != ipaddr->rend(); itr++) { + if ((*itr)->addr == ip) { + ipaddr->erase(*itr); break; } } diff --git a/thread/list.h b/thread/list.h index 435f8aa8..3f42a8de 100644 --- a/thread/list.h +++ b/thread/list.h @@ -221,6 +221,48 @@ class intrusive_list_node : public __intrusive_list_node { return {nullptr, nullptr}; } + + struct reverse_iterator + { + __intrusive_list_node* ptr; + __intrusive_list_node* end; + T* operator*() + { + return static_cast(ptr); + } + reverse_iterator& operator++() + { + ptr = ptr->__prev_ptr; + if (ptr == end) + ptr = nullptr; + return *this; + } + reverse_iterator operator++(int) + { + auto rst = *this; + ptr = ptr->__prev_ptr; + if (ptr == end) + ptr = nullptr; + return rst; + } + bool operator == (const reverse_iterator& rhs) const + { + return ptr == rhs.ptr; + } + bool operator != (const reverse_iterator& rhs) const + { + return !(*this == rhs); + } + }; + + reverse_iterator rbegin() + { + return {this->__prev_ptr, this->__prev_ptr}; + } + reverse_iterator rend() + { + return {nullptr, nullptr}; + } }; @@ -321,6 +363,7 @@ class intrusive_list return node == nullptr; } typedef typename NodeType::iterator iterator; + typedef typename NodeType::reverse_iterator reverse_iterator; iterator begin() { return node ? node->begin() : end(); @@ -329,6 +372,14 @@ class intrusive_list { return {nullptr, nullptr}; } + reverse_iterator rbegin() + { + return node ? node->rbegin() : rend(); + } + reverse_iterator rend() + { + return {nullptr, nullptr}; + } intrusive_list split_front_inclusive(NodeType* ptr) { auto ret = node;