Skip to content

Commit

Permalink
implement vcpu local and vcpu local for http client
Browse files Browse the repository at this point in the history
Signed-off-by: liulanzheng <lanzheng.liulz@alibaba-inc.com>
  • Loading branch information
liulanzheng committed May 20, 2024
1 parent 492049b commit 9a2e291
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 11 deletions.
1 change: 1 addition & 0 deletions include/photon/thread/vcpu_local.h
36 changes: 25 additions & 11 deletions net/http/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ limitations under the License.
#include <photon/net/socket.h>
#include <photon/net/security-context/tls-stream.h>
#include <photon/net/utils.h>
#include <photon/thread/vcpu_local.h>

namespace photon {
namespace net {
Expand All @@ -32,7 +33,7 @@ static const uint64_t kDNSCacheLife = 3600UL * 1000 * 1000;
static constexpr char USERAGENT[] = "PhotonLibOS_HTTP";


class PooledDialer {
class PooledDialer: public VCPULocal {
public:
net::TLSContext* tls_ctx = nullptr;
bool tls_ctx_ownership;
Expand All @@ -54,8 +55,16 @@ class PooledDialer {
}

~PooledDialer() {
}

int vcpu_exit() override {
resolver.reset();
udssock.reset();
tlssock.reset();
tcpsock.reset();
if (tls_ctx_ownership)
delete tls_ctx;
return 0;
}

ISocketStream* dial(std::string_view host, uint16_t port, bool secure,
Expand Down Expand Up @@ -125,16 +134,25 @@ enum RoundtripStatus {
ROUNDTRIP_FAST_RETRY,
};

thread_local PooledDialer *dialer = nullptr;

class ClientImpl : public Client {
public:
PooledDialer m_dialer;
CommonHeaders<> m_common_headers;
TLSContext *m_tls_ctx;
ICookieJar *m_cookie_jar;
ClientImpl(ICookieJar *cookie_jar, TLSContext *tls_ctx) :
m_dialer(tls_ctx),
m_tls_ctx(tls_ctx),
m_cookie_jar(cookie_jar) {
}

PooledDialer* get_dialer() {
if (dialer == nullptr) {
dialer = new PooledDialer(m_tls_ctx);
}
return dialer;
}

using SocketStream_ptr = std::unique_ptr<ISocketStream>;
int redirect(Operation* op) {
if (op->resp.body_size() > 0) {
Expand Down Expand Up @@ -165,22 +183,18 @@ class ClientImpl : public Client {
return ROUNDTRIP_REDIRECT;
}

int concurreny = 0;
int do_roundtrip(Operation* op, Timeout tmo) {
concurreny++;
LOG_DEBUG(VALUE(concurreny));
DEFER(concurreny--);
op->status_code = -1;
if (tmo.timeout() == 0)
LOG_ERROR_RETURN(ETIMEDOUT, ROUNDTRIP_FAILED, "connection timedout");
auto &req = op->req;
ISocketStream* s;
if (m_proxy && !m_proxy_url.empty())
s = m_dialer.dial(m_proxy_url, tmo.timeout());
s = get_dialer()->dial(m_proxy_url, tmo.timeout());
else if (!op->uds_path.empty())
s = m_dialer.dial(op->uds_path, tmo.timeout());
s = get_dialer()->dial(op->uds_path, tmo.timeout());
else
s = m_dialer.dial(req, tmo.timeout());
s = get_dialer()->dial(req, tmo.timeout());
if (!s) {
if (errno == ECONNREFUSED || errno == ENOENT) {
LOG_ERROR_RETURN(0, ROUNDTRIP_FAST_RETRY, "connection refused")
Expand Down Expand Up @@ -288,7 +302,7 @@ class ClientImpl : public Client {
}

ISocketStream* native_connect(std::string_view host, uint16_t port, bool secure, uint64_t timeout) override {
return m_dialer.dial(host, port, secure, timeout);
return get_dialer()->dial(host, port, secure, timeout);
}

CommonHeaders<>* common_headers() override {
Expand Down
56 changes: 56 additions & 0 deletions net/http/test/client_function_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,62 @@ TEST(http_client, partial_body) {
EXPECT_EQ(true, buf == "http_clien");
}


TEST(http_client, vcpu) {
system("mkdir -p /tmp/ease_ut/http_test/");
system("echo \"this is a http_client request body text for socket stream\" > /tmp/ease_ut/http_test/ease-httpclient-gettestfile");
auto tcpserver = new_tcp_socket_server();
tcpserver->setsockopt<int>(IPPROTO_TCP, TCP_NODELAY, 1);
tcpserver->bind_v4localhost();
tcpserver->listen();
DEFER(delete tcpserver);
auto server = new_http_server();
DEFER(delete server);
auto fs = photon::fs::new_localfs_adaptor("/tmp/ease_ut/http_test/");
DEFER(delete fs);
auto fs_handler = new_fs_handler(fs);
DEFER(delete fs_handler);
server->add_handler(fs_handler);
tcpserver->set_handler(server->get_connection_handler());
tcpserver->start_loop();
auto target = to_url(tcpserver, "/ease-httpclient-gettestfile");
auto client = new_http_client();
DEFER(delete client);

int vcpu_num = 16;
photon::semaphore sem(0);
std::thread th[vcpu_num];
for (int i = 0; i < vcpu_num; i++) {
th[i] = std::thread([&] {
photon::init(photon::INIT_EVENT_DEFAULT, photon::INIT_IO_NONE);
DEFER({
photon::fini();
sem.signal(1);
});

for (int round = 0; round < 10; round++) {
auto op = client->new_operation(Verb::GET, target);
DEFER(client->destroy_operation(op));
op->req.headers.content_length(0);
int ret = client->call(op);
GTEST_ASSERT_EQ(0, ret);

char resp_body_buf[1024];
EXPECT_EQ(sizeof(socket_buf), op->resp.resource_size());
ret = op->resp.read(resp_body_buf, sizeof(socket_buf));
EXPECT_EQ(sizeof(socket_buf), ret);
resp_body_buf[sizeof(socket_buf) - 1] = '\0';
LOG_DEBUG(resp_body_buf);
EXPECT_EQ(0, strcmp(resp_body_buf, socket_buf));
}
});
}

sem.wait(vcpu_num);
for (int i = 0; i < vcpu_num; i++)
th[i].join();
}

TEST(DISABLED_http_client, ipv6) { // make sure runing in a ipv6-ready environment
auto client = new_http_client();
DEFER(delete client);
Expand Down
2 changes: 2 additions & 0 deletions photon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ limitations under the License.
#include "thread/thread.h"
#include "thread/thread-pool.h"
#include "thread/stack-allocator.h"
#include "thread/vcpu_local.h"
#ifdef ENABLE_FSTACK_DPDK
#include "io/fstack-dpdk.h"
#endif
Expand Down Expand Up @@ -95,6 +96,7 @@ int init(uint64_t event_engine, uint64_t io_engine, const PhotonOptions& options
}

int fini() {
at_vcpu_exit();
#ifdef __linux__
FINI_IO(LIBAIO, libaio_wrapper)
FINI_IO(SOCKET_EDGE_TRIGGER, et_poller)
Expand Down
51 changes: 51 additions & 0 deletions thread/vcpu_local.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
Copyright 2022 The Photon Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#include "vcpu_local.h"
#include "../common/alog.h"
#include "std-compat.h"
#include <pthread.h>

namespace photon {

static thread_local VCPULocal *list = nullptr;

VCPULocal::VCPULocal() {
LOG_DEBUG("push ", VALUE(this));
if (list == nullptr) {
list = this;
} else {
list->insert_tail(this);
}
}

VCPULocal::~VCPULocal() {
LOG_DEBUG("erase ", VALUE(this));
auto nx = this->remove_from_list();
if (this == list)
list = nx;
}

void at_vcpu_exit() {
if (list == nullptr)
return;
while (list) {
list->vcpu_exit();
delete list;
}
}

} // namespace photon
32 changes: 32 additions & 0 deletions thread/vcpu_local.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
Copyright 2022 The Photon Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#pragma once
#include <photon/thread/list.h>

namespace photon {

class VCPULocal : public intrusive_list_node<VCPULocal> {
public:
VCPULocal();
virtual ~VCPULocal();

virtual int vcpu_exit() = 0;
};

void at_vcpu_exit();

} // namespace photon

0 comments on commit 9a2e291

Please sign in to comment.