Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ipv6 in http client #310

Merged
merged 4 commits into from
Dec 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions fs/fuse_adaptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,21 @@ namespace photon {
namespace fs{

static IFileSystem* fs = nullptr;
static uid_t cuid = 0, cgid = 0;

#define CHECK_FS() if (!fs) return -EFAULT;

static void* xmp_init(struct fuse_conn_info *conn)
{
REPORT_PERF(xmp_init, 1)
(void) conn;
cuid = geteuid();
cgid = getegid();
if ((unsigned int)conn->capable & FUSE_CAP_ATOMIC_O_TRUNC) {
conn->want |= FUSE_CAP_ATOMIC_O_TRUNC;
}
if ((unsigned int)conn->capable & FUSE_CAP_BIG_WRITES) {
conn->want |= FUSE_CAP_BIG_WRITES;
}
return NULL;
}

Expand All @@ -102,9 +110,13 @@ static int xmp_getattr(const char *path, struct stat *stbuf)
CHECK_FS();
errno = 0;
int res = fs->lstat(path, stbuf);
if(res) \
LOG_ERROR_RETURN(0, errno ? -errno : res, VALUE(path));
errno = 0;
if(res) return -errno;
stbuf->st_blocks = stbuf->st_size / 512 + 1; // du relies on this
stbuf->st_blksize = 4096;
// use current user/group when backendfs doesn't support uid/gid
if (stbuf->st_uid == 0) stbuf->st_uid = cuid;
if (stbuf->st_gid == 0) stbuf->st_gid = cgid;
if (stbuf->st_nlink == 0) stbuf->st_nlink = 1;
return 0;
}

Expand Down
2 changes: 1 addition & 1 deletion net/http/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ ISocketStream* PooledDialer::dial(std::string_view host, uint16_t port, bool sec
if (ipaddr.undefined()) LOG_DEBUG("No connectable resolve result");
// When failed, remove resolved result from dns cache so that following retries can try
// different ips.
resolver->discard_cache(strhost.c_str());
resolver->discard_cache(strhost.c_str(), ipaddr);
return nullptr;
}

Expand Down
10 changes: 10 additions & 0 deletions net/http/test/client_function_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,16 @@ TEST(http_client, partial_body) {
EXPECT_EQ(true, buf == "http_clien");
}

TEST(DISABLED_http_client, ipv6) { // make sure runing in a ipv6-ready environment
auto client = new_http_client();
DEFER(delete client);
// here is an ipv6-only website
auto op = client->new_operation(Verb::GET, "http://test6.ustc.edu.cn");
DEFER(delete op);
op->call();
EXPECT_EQ(200, op->resp.status_code());
}

TEST(url, url_escape_unescape) {
EXPECT_EQ(
url_escape("?a=x:b&b=cd&c= feg&d=2/1[+]@alibaba.com&e='!bad';"),
Expand Down
51 changes: 24 additions & 27 deletions net/kernel_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,7 @@ class KernelSocketStream : public SocketStreamBase {

class KernelSocketClient : public SocketClientBase {
public:
KernelSocketClient(int socket_family, bool nonblocking) :
m_socket_family(socket_family),
m_nonblocking(nonblocking) {}
KernelSocketClient(bool nonblocking) : m_nonblocking(nonblocking) {}

ISocketStream* connect(const char* path, size_t count) override {
struct sockaddr_un addr_un;
Expand All @@ -211,11 +209,10 @@ class KernelSocketClient : public SocketClientBase {
}

protected:
int m_socket_family;
bool m_nonblocking;

virtual KernelSocketStream* create_stream() {
return new KernelSocketStream(m_socket_family, m_nonblocking);
virtual KernelSocketStream* create_stream(int socket_family) {
return new KernelSocketStream(socket_family, m_nonblocking);
}

virtual int fd_connect(int fd, const sockaddr* remote, socklen_t addrlen) {
Expand All @@ -224,7 +221,7 @@ class KernelSocketClient : public SocketClientBase {

ISocketStream* do_connect(const sockaddr* remote, socklen_t len_remote,
const sockaddr* local = nullptr, socklen_t len_local = 0) {
auto stream = create_stream();
auto stream = create_stream(remote->sa_family);
auto deleter = [&](KernelSocketStream*) {
auto errno_backup = errno;
delete stream;
Expand Down Expand Up @@ -504,8 +501,8 @@ class ZeroCopySocketClient : public KernelSocketClient {
public:
using KernelSocketClient::KernelSocketClient;
protected:
KernelSocketStream* create_stream() override {
return new ZeroCopySocketStream(m_socket_family, m_nonblocking);
KernelSocketStream* create_stream(int socket_family) override {
return new ZeroCopySocketStream(socket_family, m_nonblocking);
}
};

Expand Down Expand Up @@ -544,8 +541,8 @@ class IouringSocketClient : public KernelSocketClient {
using KernelSocketClient::KernelSocketClient;

protected:
KernelSocketStream* create_stream() override {
return new IouringSocketStream(m_socket_family, m_nonblocking);
KernelSocketStream* create_stream(int socket_family) override {
return new IouringSocketStream(socket_family, m_nonblocking);
}

int fd_connect(int fd, const sockaddr* remote, socklen_t addrlen) override {
Expand Down Expand Up @@ -614,8 +611,8 @@ class IouringFixedFileSocketClient : public IouringSocketClient {
protected:
using IouringSocketClient::IouringSocketClient;

KernelSocketStream* create_stream() override {
return new IouringFixedFileSocketStream(m_socket_family, m_nonblocking);
KernelSocketStream* create_stream(int socket_family) override {
return new IouringFixedFileSocketStream(socket_family, m_nonblocking);
}
};

Expand Down Expand Up @@ -683,8 +680,8 @@ class FstackDpdkSocketClient : public KernelSocketClient {
protected:
using KernelSocketClient::KernelSocketClient;

KernelSocketStream* create_stream() override {
return new FstackDpdkSocketStream(m_socket_family, m_nonblocking);
KernelSocketStream* create_stream(int socket_family) override {
return new FstackDpdkSocketStream(socket_family, m_nonblocking);
}

int fd_connect(int fd, const sockaddr* remote, socklen_t addrlen) override {
Expand Down Expand Up @@ -939,8 +936,8 @@ class ETKernelSocketClient : public KernelSocketClient {
using KernelSocketClient::KernelSocketClient;

protected:
KernelSocketStream* create_stream() override {
return new ETKernelSocketStream(m_socket_family, m_nonblocking);
KernelSocketStream* create_stream(int socket_family) override {
return new ETKernelSocketStream(socket_family, m_nonblocking);
}
};

Expand Down Expand Up @@ -974,10 +971,10 @@ class ETKernelSocketServer : public KernelSocketServer, public NotifyContext {
/* ET Socket - End */

extern "C" ISocketClient* new_tcp_socket_client() {
return new KernelSocketClient(AF_INET, true);
return new KernelSocketClient(true);
}
extern "C" ISocketClient* new_tcp_socket_client_ipv6() {
return new KernelSocketClient(AF_INET6, true);
return new KernelSocketClient(true);
}
extern "C" ISocketServer* new_tcp_socket_server() {
return NewObj<KernelSocketServer>(AF_INET, false, true)->init();
Expand All @@ -986,7 +983,7 @@ extern "C" ISocketServer* new_tcp_socket_server_ipv6() {
return NewObj<KernelSocketServer>(AF_INET6, false, true)->init();
}
extern "C" ISocketClient* new_uds_client() {
return new KernelSocketClient(AF_UNIX, true);
return new KernelSocketClient(true);
}
extern "C" ISocketServer* new_uds_server(bool autoremove) {
return NewObj<KernelSocketServer>(AF_UNIX, autoremove, true)->init();
Expand All @@ -996,14 +993,14 @@ extern "C" ISocketServer* new_zerocopy_tcp_server() {
return NewObj<ZeroCopySocketServer>(AF_INET, false, true)->init();
}
extern "C" ISocketClient* new_zerocopy_tcp_client() {
return new ZeroCopySocketClient(AF_INET, true);
return new ZeroCopySocketClient(true);
}
#ifdef PHOTON_URING
extern "C" ISocketClient* new_iouring_tcp_client() {
if (photon::iouring_register_files_enabled())
return new IouringFixedFileSocketClient(AF_INET, false);
return new IouringFixedFileSocketClient(false);
else
return new IouringSocketClient(AF_INET, false);
return new IouringSocketClient(false);
}
extern "C" ISocketServer* new_iouring_tcp_server() {
if (photon::iouring_register_files_enabled())
Expand All @@ -1013,23 +1010,23 @@ extern "C" ISocketServer* new_iouring_tcp_server() {
}
#endif // PHOTON_URING
extern "C" ISocketClient* new_et_tcp_socket_client() {
return new ETKernelSocketClient(AF_INET, true);
return new ETKernelSocketClient(true);
}
extern "C" ISocketServer* new_et_tcp_socket_server() {
return NewObj<ETKernelSocketServer>(AF_INET, false, true)->init();
}
extern "C" ISocketClient* new_smc_socket_client() {
return new KernelSocketClient(AF_SMC, true);
return new KernelSocketClient(true);
}
extern "C" ISocketServer* new_smc_socket_server() {
return NewObj<KernelSocketServer>(AF_SMC, false, true)->init();
}
#ifdef ENABLE_FSTACK_DPDK
extern "C" ISocketClient* new_fstack_dpdk_socket_client() {
return new FstackDpdkSocketClient(AF_INET, true);
return new FstackDpdkSocketClient(true);
}
extern "C" ISocketServer* new_fstack_dpdk_socket_server() {
return NewObj<FstackDpdkSocketServer>(AF_INET, false, true)->init();
return NewObj<FstackDpdkSocketServer>(false, true)->init();
}
#endif // ENABLE_FSTACK_DPDK
#endif // __linux__
Expand Down
4 changes: 2 additions & 2 deletions net/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -714,9 +714,9 @@ TEST(utils, resolver) {
DEFER(delete resolver);
net::IPAddr localhost("127.0.0.1");
net::IPAddr addr = resolver->resolve("localhost");
EXPECT_EQ(localhost.to_nl(), addr.to_nl());
if (addr.is_ipv4()) EXPECT_EQ(localhost.to_nl(), addr.to_nl());
auto func = [&](net::IPAddr addr_){
EXPECT_EQ(localhost.to_nl(), addr_.to_nl());
if (addr_.is_ipv4()) EXPECT_EQ(localhost.to_nl(), addr_.to_nl());
};
resolver->resolve("localhost", func);
resolver->discard_cache("non-exist-host.com");
Expand Down
71 changes: 46 additions & 25 deletions net/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ limitations under the License.
#include <sys/socket.h>
#include <unistd.h>

#include <chrono>
#include <list>
#include <thread>
#include <string>

Expand Down Expand Up @@ -249,49 +251,68 @@ bool Base64Decode(std::string_view in, std::string &out) {
}

class DefaultResolver : public Resolver {
protected:
struct IPAddrNode : public intrusive_list_node<IPAddrNode> {
IPAddr addr;
IPAddrNode(IPAddr addr) : addr(addr) {}
};
using IPAddrList = intrusive_list<IPAddrNode>;
public:
DefaultResolver(uint64_t cache_ttl, uint64_t resolve_timeout)
: dnscache_(cache_ttl), resolve_timeout_(resolve_timeout) {}
~DefaultResolver() { dnscache_.clear(); }
~DefaultResolver() {
for (auto it : dnscache_) {
((IPAddrList*)it->_obj)->delete_all();
}
dnscache_.clear();
}

IPAddr resolve(const char *host) override {
auto ctr = [&]() -> IPAddr * {
std::vector<IPAddr> addrs;
auto ctr = [&]() -> IPAddrList* {
auto addrs = new IPAddrList();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The list doesn't need to be new-ed, as it's only a pointer to the node, initially nullptr. And nodes are directly pushed-back into the list.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ObjectCache need its value be a pointer, otherwise using ObjectCache::Borrow would be a problem.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

photon::semaphore sem;
std::thread([&]() {
int ret = gethostbyname(host, addrs);
if (ret < 0) {
addrs.clear();
auto now = std::chrono::steady_clock::now();
IPAddrList ret;
auto cb = [&](IPAddr addr) {
ret.push_back(new IPAddrNode(addr));
return 0;
};
_gethostbyname(host, cb);
auto time_elapsed = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - now).count();
if ((uint64_t)time_elapsed <= resolve_timeout_) {
addrs->push_back(std::move(ret));
sem.signal(1);
}
sem.signal(1);
}).detach();
auto ret = sem.wait(1, resolve_timeout_);
if (ret < 0 && errno == ETIMEDOUT) {
LOG_WARN("Domain resolution for ` timeout!", host);
return new IPAddr; // undefined addr
} else if (addrs.empty()) {
LOG_WARN("Domain resolution for ` failed", host);
return new IPAddr; // undefined addr
}
// TODO: support ipv6
for (auto& ip : addrs) {
if (ip.is_ipv4())
return new IPAddr(ip);
}
return new IPAddr; // undefined addr
sem.wait(1, resolve_timeout_);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if time out, sem will be destructed upon return, but it may be referenced afterwards in the resolving thread.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addrs may also be operated (erased or deleted) when the resolving thread is still running.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return addrs;
};
return *(dnscache_.borrow(host, ctr));
auto ips = dnscache_.borrow(host, ctr);
if (ips->empty()) LOG_ERRNO_RETURN(0, IPAddr(), "Domain resolution for ` failed", host);
auto ret = ips->front();
ips->node = ret->next(); // access in round robin order
return ret->addr;
}

void resolve(const char *host, Delegate<void, IPAddr> func) override { func(resolve(host)); }

void discard_cache(const char *host) override {
void discard_cache(const char *host, IPAddr ip) override {
auto ipaddr = dnscache_.borrow(host);
ipaddr.recycle(true);
if (ip.undefined() || ipaddr->empty()) ipaddr.recycle(true);
else {
for (auto itr = ipaddr->rbegin(); itr != ipaddr->rend(); itr++) {
if ((*itr)->addr == ip) {
ipaddr->erase(*itr);
break;
}
}
}
}

private:
ObjectCache<std::string, IPAddr *> dnscache_;
ObjectCache<std::string, IPAddrList*> dnscache_;
uint64_t resolve_timeout_;
};

Expand Down
3 changes: 2 additions & 1 deletion net/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,12 @@ class Resolver : public Object {
// Normally dns servers return multiple ips in random order, choosing the first one should suffice.
virtual IPAddr resolve(const char* host) = 0;
virtual void resolve(const char* host, Delegate<void, IPAddr> func) = 0;
virtual void discard_cache(const char* host) = 0; // discard current cache of host:ip
virtual void discard_cache(const char* host, IPAddr ip = IPAddr()) = 0; // discard current cache of ip
};

/**
* @brief A non-blocking Resolver based on gethostbyname.
* Currently, it's not thread safe.
*
* @param cache_ttl cache's lifetime in microseconds.
* @param resolve_timeout timeout in microseconds for domain resolution.
Expand Down
Loading