Skip to content

Commit

Permalink
Support ipv6 in http client (#310)
Browse files Browse the repository at this point in the history
* Support ipv6 in http client.

- refactor KernelSocketClient.
- apply dns load balance in DefaultResolver.
- fix bug in DefaultResolver when timeout happen.

* Add big writes/uid/gid/du support to FUSE

- In Fuse 2.x, without big writes, write io size is limited to 4k. Turing it on can enlarge write io size to 128K at maximum.
- Support du by adding st_blocks to xmp_getattr result.
- Add default uid/gid to xmp_getattr to avoid permission problems.

* Fix bug when resolving thread timeout

* Fix unittest for default resovler
  • Loading branch information
HanLee13 authored Dec 25, 2023
1 parent 3e2d213 commit 7cab63e
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 60 deletions.
20 changes: 16 additions & 4 deletions fs/fuse_adaptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,21 @@ namespace photon {
namespace fs{

static IFileSystem* fs = nullptr;
static uid_t cuid = 0, cgid = 0;

#define CHECK_FS() if (!fs) return -EFAULT;

static void* xmp_init(struct fuse_conn_info *conn)
{
REPORT_PERF(xmp_init, 1)
(void) conn;
cuid = geteuid();
cgid = getegid();
if ((unsigned int)conn->capable & FUSE_CAP_ATOMIC_O_TRUNC) {
conn->want |= FUSE_CAP_ATOMIC_O_TRUNC;
}
if ((unsigned int)conn->capable & FUSE_CAP_BIG_WRITES) {
conn->want |= FUSE_CAP_BIG_WRITES;
}
return NULL;
}

Expand All @@ -102,9 +110,13 @@ static int xmp_getattr(const char *path, struct stat *stbuf)
CHECK_FS();
errno = 0;
int res = fs->lstat(path, stbuf);
if(res) \
LOG_ERROR_RETURN(0, errno ? -errno : res, VALUE(path));
errno = 0;
if(res) return -errno;
stbuf->st_blocks = stbuf->st_size / 512 + 1; // du relies on this
stbuf->st_blksize = 4096;
// use current user/group when backendfs doesn't support uid/gid
if (stbuf->st_uid == 0) stbuf->st_uid = cuid;
if (stbuf->st_gid == 0) stbuf->st_gid = cgid;
if (stbuf->st_nlink == 0) stbuf->st_nlink = 1;
return 0;
}

Expand Down
2 changes: 1 addition & 1 deletion net/http/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ ISocketStream* PooledDialer::dial(std::string_view host, uint16_t port, bool sec
if (ipaddr.undefined()) LOG_DEBUG("No connectable resolve result");
// When failed, remove resolved result from dns cache so that following retries can try
// different ips.
resolver->discard_cache(strhost.c_str());
resolver->discard_cache(strhost.c_str(), ipaddr);
return nullptr;
}

Expand Down
10 changes: 10 additions & 0 deletions net/http/test/client_function_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,16 @@ TEST(http_client, partial_body) {
EXPECT_EQ(true, buf == "http_clien");
}

TEST(DISABLED_http_client, ipv6) { // make sure runing in a ipv6-ready environment
auto client = new_http_client();
DEFER(delete client);
// here is an ipv6-only website
auto op = client->new_operation(Verb::GET, "http://test6.ustc.edu.cn");
DEFER(delete op);
op->call();
EXPECT_EQ(200, op->resp.status_code());
}

TEST(url, url_escape_unescape) {
EXPECT_EQ(
url_escape("?a=x:b&b=cd&c= feg&d=2/1[+]@alibaba.com&e='!bad';"),
Expand Down
51 changes: 24 additions & 27 deletions net/kernel_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,7 @@ class KernelSocketStream : public SocketStreamBase {

class KernelSocketClient : public SocketClientBase {
public:
KernelSocketClient(int socket_family, bool nonblocking) :
m_socket_family(socket_family),
m_nonblocking(nonblocking) {}
KernelSocketClient(bool nonblocking) : m_nonblocking(nonblocking) {}

ISocketStream* connect(const char* path, size_t count) override {
struct sockaddr_un addr_un;
Expand All @@ -211,11 +209,10 @@ class KernelSocketClient : public SocketClientBase {
}

protected:
int m_socket_family;
bool m_nonblocking;

virtual KernelSocketStream* create_stream() {
return new KernelSocketStream(m_socket_family, m_nonblocking);
virtual KernelSocketStream* create_stream(int socket_family) {
return new KernelSocketStream(socket_family, m_nonblocking);
}

virtual int fd_connect(int fd, const sockaddr* remote, socklen_t addrlen) {
Expand All @@ -224,7 +221,7 @@ class KernelSocketClient : public SocketClientBase {

ISocketStream* do_connect(const sockaddr* remote, socklen_t len_remote,
const sockaddr* local = nullptr, socklen_t len_local = 0) {
auto stream = create_stream();
auto stream = create_stream(remote->sa_family);
auto deleter = [&](KernelSocketStream*) {
auto errno_backup = errno;
delete stream;
Expand Down Expand Up @@ -504,8 +501,8 @@ class ZeroCopySocketClient : public KernelSocketClient {
public:
using KernelSocketClient::KernelSocketClient;
protected:
KernelSocketStream* create_stream() override {
return new ZeroCopySocketStream(m_socket_family, m_nonblocking);
KernelSocketStream* create_stream(int socket_family) override {
return new ZeroCopySocketStream(socket_family, m_nonblocking);
}
};

Expand Down Expand Up @@ -544,8 +541,8 @@ class IouringSocketClient : public KernelSocketClient {
using KernelSocketClient::KernelSocketClient;

protected:
KernelSocketStream* create_stream() override {
return new IouringSocketStream(m_socket_family, m_nonblocking);
KernelSocketStream* create_stream(int socket_family) override {
return new IouringSocketStream(socket_family, m_nonblocking);
}

int fd_connect(int fd, const sockaddr* remote, socklen_t addrlen) override {
Expand Down Expand Up @@ -614,8 +611,8 @@ class IouringFixedFileSocketClient : public IouringSocketClient {
protected:
using IouringSocketClient::IouringSocketClient;

KernelSocketStream* create_stream() override {
return new IouringFixedFileSocketStream(m_socket_family, m_nonblocking);
KernelSocketStream* create_stream(int socket_family) override {
return new IouringFixedFileSocketStream(socket_family, m_nonblocking);
}
};

Expand Down Expand Up @@ -683,8 +680,8 @@ class FstackDpdkSocketClient : public KernelSocketClient {
protected:
using KernelSocketClient::KernelSocketClient;

KernelSocketStream* create_stream() override {
return new FstackDpdkSocketStream(m_socket_family, m_nonblocking);
KernelSocketStream* create_stream(int socket_family) override {
return new FstackDpdkSocketStream(socket_family, m_nonblocking);
}

int fd_connect(int fd, const sockaddr* remote, socklen_t addrlen) override {
Expand Down Expand Up @@ -939,8 +936,8 @@ class ETKernelSocketClient : public KernelSocketClient {
using KernelSocketClient::KernelSocketClient;

protected:
KernelSocketStream* create_stream() override {
return new ETKernelSocketStream(m_socket_family, m_nonblocking);
KernelSocketStream* create_stream(int socket_family) override {
return new ETKernelSocketStream(socket_family, m_nonblocking);
}
};

Expand Down Expand Up @@ -974,10 +971,10 @@ class ETKernelSocketServer : public KernelSocketServer, public NotifyContext {
/* ET Socket - End */

extern "C" ISocketClient* new_tcp_socket_client() {
return new KernelSocketClient(AF_INET, true);
return new KernelSocketClient(true);
}
extern "C" ISocketClient* new_tcp_socket_client_ipv6() {
return new KernelSocketClient(AF_INET6, true);
return new KernelSocketClient(true);
}
extern "C" ISocketServer* new_tcp_socket_server() {
return NewObj<KernelSocketServer>(AF_INET, false, true)->init();
Expand All @@ -986,7 +983,7 @@ extern "C" ISocketServer* new_tcp_socket_server_ipv6() {
return NewObj<KernelSocketServer>(AF_INET6, false, true)->init();
}
extern "C" ISocketClient* new_uds_client() {
return new KernelSocketClient(AF_UNIX, true);
return new KernelSocketClient(true);
}
extern "C" ISocketServer* new_uds_server(bool autoremove) {
return NewObj<KernelSocketServer>(AF_UNIX, autoremove, true)->init();
Expand All @@ -996,14 +993,14 @@ extern "C" ISocketServer* new_zerocopy_tcp_server() {
return NewObj<ZeroCopySocketServer>(AF_INET, false, true)->init();
}
extern "C" ISocketClient* new_zerocopy_tcp_client() {
return new ZeroCopySocketClient(AF_INET, true);
return new ZeroCopySocketClient(true);
}
#ifdef PHOTON_URING
extern "C" ISocketClient* new_iouring_tcp_client() {
if (photon::iouring_register_files_enabled())
return new IouringFixedFileSocketClient(AF_INET, false);
return new IouringFixedFileSocketClient(false);
else
return new IouringSocketClient(AF_INET, false);
return new IouringSocketClient(false);
}
extern "C" ISocketServer* new_iouring_tcp_server() {
if (photon::iouring_register_files_enabled())
Expand All @@ -1013,23 +1010,23 @@ extern "C" ISocketServer* new_iouring_tcp_server() {
}
#endif // PHOTON_URING
extern "C" ISocketClient* new_et_tcp_socket_client() {
return new ETKernelSocketClient(AF_INET, true);
return new ETKernelSocketClient(true);
}
extern "C" ISocketServer* new_et_tcp_socket_server() {
return NewObj<ETKernelSocketServer>(AF_INET, false, true)->init();
}
extern "C" ISocketClient* new_smc_socket_client() {
return new KernelSocketClient(AF_SMC, true);
return new KernelSocketClient(true);
}
extern "C" ISocketServer* new_smc_socket_server() {
return NewObj<KernelSocketServer>(AF_SMC, false, true)->init();
}
#ifdef ENABLE_FSTACK_DPDK
extern "C" ISocketClient* new_fstack_dpdk_socket_client() {
return new FstackDpdkSocketClient(AF_INET, true);
return new FstackDpdkSocketClient(true);
}
extern "C" ISocketServer* new_fstack_dpdk_socket_server() {
return NewObj<FstackDpdkSocketServer>(AF_INET, false, true)->init();
return NewObj<FstackDpdkSocketServer>(false, true)->init();
}
#endif // ENABLE_FSTACK_DPDK
#endif // __linux__
Expand Down
4 changes: 2 additions & 2 deletions net/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -714,9 +714,9 @@ TEST(utils, resolver) {
DEFER(delete resolver);
net::IPAddr localhost("127.0.0.1");
net::IPAddr addr = resolver->resolve("localhost");
EXPECT_EQ(localhost.to_nl(), addr.to_nl());
if (addr.is_ipv4()) EXPECT_EQ(localhost.to_nl(), addr.to_nl());
auto func = [&](net::IPAddr addr_){
EXPECT_EQ(localhost.to_nl(), addr_.to_nl());
if (addr_.is_ipv4()) EXPECT_EQ(localhost.to_nl(), addr_.to_nl());
};
resolver->resolve("localhost", func);
resolver->discard_cache("non-exist-host.com");
Expand Down
71 changes: 46 additions & 25 deletions net/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ limitations under the License.
#include <sys/socket.h>
#include <unistd.h>

#include <chrono>
#include <list>
#include <thread>
#include <string>

Expand Down Expand Up @@ -249,49 +251,68 @@ bool Base64Decode(std::string_view in, std::string &out) {
}

class DefaultResolver : public Resolver {
protected:
struct IPAddrNode : public intrusive_list_node<IPAddrNode> {
IPAddr addr;
IPAddrNode(IPAddr addr) : addr(addr) {}
};
using IPAddrList = intrusive_list<IPAddrNode>;
public:
DefaultResolver(uint64_t cache_ttl, uint64_t resolve_timeout)
: dnscache_(cache_ttl), resolve_timeout_(resolve_timeout) {}
~DefaultResolver() { dnscache_.clear(); }
~DefaultResolver() {
for (auto it : dnscache_) {
((IPAddrList*)it->_obj)->delete_all();
}
dnscache_.clear();
}

IPAddr resolve(const char *host) override {
auto ctr = [&]() -> IPAddr * {
std::vector<IPAddr> addrs;
auto ctr = [&]() -> IPAddrList* {
auto addrs = new IPAddrList();
photon::semaphore sem;
std::thread([&]() {
int ret = gethostbyname(host, addrs);
if (ret < 0) {
addrs.clear();
auto now = std::chrono::steady_clock::now();
IPAddrList ret;
auto cb = [&](IPAddr addr) {
ret.push_back(new IPAddrNode(addr));
return 0;
};
_gethostbyname(host, cb);
auto time_elapsed = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - now).count();
if ((uint64_t)time_elapsed <= resolve_timeout_) {
addrs->push_back(std::move(ret));
sem.signal(1);
}
sem.signal(1);
}).detach();
auto ret = sem.wait(1, resolve_timeout_);
if (ret < 0 && errno == ETIMEDOUT) {
LOG_WARN("Domain resolution for ` timeout!", host);
return new IPAddr; // undefined addr
} else if (addrs.empty()) {
LOG_WARN("Domain resolution for ` failed", host);
return new IPAddr; // undefined addr
}
// TODO: support ipv6
for (auto& ip : addrs) {
if (ip.is_ipv4())
return new IPAddr(ip);
}
return new IPAddr; // undefined addr
sem.wait(1, resolve_timeout_);
return addrs;
};
return *(dnscache_.borrow(host, ctr));
auto ips = dnscache_.borrow(host, ctr);
if (ips->empty()) LOG_ERRNO_RETURN(0, IPAddr(), "Domain resolution for ` failed", host);
auto ret = ips->front();
ips->node = ret->next(); // access in round robin order
return ret->addr;
}

void resolve(const char *host, Delegate<void, IPAddr> func) override { func(resolve(host)); }

void discard_cache(const char *host) override {
void discard_cache(const char *host, IPAddr ip) override {
auto ipaddr = dnscache_.borrow(host);
ipaddr.recycle(true);
if (ip.undefined() || ipaddr->empty()) ipaddr.recycle(true);
else {
for (auto itr = ipaddr->rbegin(); itr != ipaddr->rend(); itr++) {
if ((*itr)->addr == ip) {
ipaddr->erase(*itr);
break;
}
}
}
}

private:
ObjectCache<std::string, IPAddr *> dnscache_;
ObjectCache<std::string, IPAddrList*> dnscache_;
uint64_t resolve_timeout_;
};

Expand Down
3 changes: 2 additions & 1 deletion net/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,12 @@ class Resolver : public Object {
// Normally dns servers return multiple ips in random order, choosing the first one should suffice.
virtual IPAddr resolve(const char* host) = 0;
virtual void resolve(const char* host, Delegate<void, IPAddr> func) = 0;
virtual void discard_cache(const char* host) = 0; // discard current cache of host:ip
virtual void discard_cache(const char* host, IPAddr ip = IPAddr()) = 0; // discard current cache of ip
};

/**
* @brief A non-blocking Resolver based on gethostbyname.
* Currently, it's not thread safe.
*
* @param cache_ttl cache's lifetime in microseconds.
* @param resolve_timeout timeout in microseconds for domain resolution.
Expand Down
Loading

0 comments on commit 7cab63e

Please sign in to comment.