diff --git a/include/photon/thread/vcpu_local.h b/include/photon/thread/vcpu_local.h new file mode 120000 index 00000000..31f451b7 --- /dev/null +++ b/include/photon/thread/vcpu_local.h @@ -0,0 +1 @@ +../../../thread/vcpu_local.h \ No newline at end of file diff --git a/net/http/client.cpp b/net/http/client.cpp index eadb6c27..922ede92 100644 --- a/net/http/client.cpp +++ b/net/http/client.cpp @@ -24,6 +24,7 @@ limitations under the License. #include #include #include +#include namespace photon { namespace net { @@ -32,7 +33,7 @@ static const uint64_t kDNSCacheLife = 3600UL * 1000 * 1000; static constexpr char USERAGENT[] = "PhotonLibOS_HTTP"; -class PooledDialer { +class PooledDialer: public VCPULocal { public: net::TLSContext* tls_ctx = nullptr; bool tls_ctx_ownership; @@ -54,8 +55,16 @@ class PooledDialer { } ~PooledDialer() { + } + + int vcpu_exit() override { + resolver.reset(); + udssock.reset(); + tlssock.reset(); + tcpsock.reset(); if (tls_ctx_ownership) delete tls_ctx; + return 0; } ISocketStream* dial(std::string_view host, uint16_t port, bool secure, @@ -125,16 +134,25 @@ enum RoundtripStatus { ROUNDTRIP_FAST_RETRY, }; +thread_local PooledDialer *dialer = nullptr; + class ClientImpl : public Client { public: - PooledDialer m_dialer; CommonHeaders<> m_common_headers; + TLSContext *m_tls_ctx; ICookieJar *m_cookie_jar; ClientImpl(ICookieJar *cookie_jar, TLSContext *tls_ctx) : - m_dialer(tls_ctx), + m_tls_ctx(tls_ctx), m_cookie_jar(cookie_jar) { } + PooledDialer* get_dialer() { + if (dialer == nullptr) { + dialer = new PooledDialer(m_tls_ctx); + } + return dialer; + } + using SocketStream_ptr = std::unique_ptr; int redirect(Operation* op) { if (op->resp.body_size() > 0) { @@ -165,22 +183,18 @@ class ClientImpl : public Client { return ROUNDTRIP_REDIRECT; } - int concurreny = 0; int do_roundtrip(Operation* op, Timeout tmo) { - concurreny++; - LOG_DEBUG(VALUE(concurreny)); - DEFER(concurreny--); op->status_code = -1; if (tmo.timeout() == 0) LOG_ERROR_RETURN(ETIMEDOUT, ROUNDTRIP_FAILED, "connection timedout"); auto &req = op->req; ISocketStream* s; if (m_proxy && !m_proxy_url.empty()) - s = m_dialer.dial(m_proxy_url, tmo.timeout()); + s = get_dialer()->dial(m_proxy_url, tmo.timeout()); else if (!op->uds_path.empty()) - s = m_dialer.dial(op->uds_path, tmo.timeout()); + s = get_dialer()->dial(op->uds_path, tmo.timeout()); else - s = m_dialer.dial(req, tmo.timeout()); + s = get_dialer()->dial(req, tmo.timeout()); if (!s) { if (errno == ECONNREFUSED || errno == ENOENT) { LOG_ERROR_RETURN(0, ROUNDTRIP_FAST_RETRY, "connection refused") @@ -288,7 +302,7 @@ class ClientImpl : public Client { } ISocketStream* native_connect(std::string_view host, uint16_t port, bool secure, uint64_t timeout) override { - return m_dialer.dial(host, port, secure, timeout); + return get_dialer()->dial(host, port, secure, timeout); } CommonHeaders<>* common_headers() override { diff --git a/net/http/test/client_function_test.cpp b/net/http/test/client_function_test.cpp index 851e079b..d5b029fd 100644 --- a/net/http/test/client_function_test.cpp +++ b/net/http/test/client_function_test.cpp @@ -530,6 +530,62 @@ TEST(http_client, partial_body) { EXPECT_EQ(true, buf == "http_clien"); } + +TEST(http_client, vcpu) { +system("mkdir -p /tmp/ease_ut/http_test/"); + system("echo \"this is a http_client request body text for socket stream\" > /tmp/ease_ut/http_test/ease-httpclient-gettestfile"); + auto tcpserver = new_tcp_socket_server(); + tcpserver->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1); + tcpserver->bind_v4localhost(); + tcpserver->listen(); + DEFER(delete tcpserver); + auto server = new_http_server(); + DEFER(delete server); + auto fs = photon::fs::new_localfs_adaptor("/tmp/ease_ut/http_test/"); + DEFER(delete fs); + auto fs_handler = new_fs_handler(fs); + DEFER(delete fs_handler); + server->add_handler(fs_handler); + tcpserver->set_handler(server->get_connection_handler()); + tcpserver->start_loop(); + auto target = to_url(tcpserver, "/ease-httpclient-gettestfile"); + auto client = new_http_client(); + DEFER(delete client); + + int vcpu_num = 16; + photon::semaphore sem(0); + std::thread th[vcpu_num]; + for (int i = 0; i < vcpu_num; i++) { + th[i] = std::thread([&] { + photon::init(photon::INIT_EVENT_DEFAULT, photon::INIT_IO_NONE); + DEFER({ + photon::fini(); + sem.signal(1); + }); + + for (int round = 0; round < 10; round++) { + auto op = client->new_operation(Verb::GET, target); + DEFER(client->destroy_operation(op)); + op->req.headers.content_length(0); + int ret = client->call(op); + GTEST_ASSERT_EQ(0, ret); + + char resp_body_buf[1024]; + EXPECT_EQ(sizeof(socket_buf), op->resp.resource_size()); + ret = op->resp.read(resp_body_buf, sizeof(socket_buf)); + EXPECT_EQ(sizeof(socket_buf), ret); + resp_body_buf[sizeof(socket_buf) - 1] = '\0'; + LOG_DEBUG(resp_body_buf); + EXPECT_EQ(0, strcmp(resp_body_buf, socket_buf)); + } + }); + } + + sem.wait(vcpu_num); + for (int i = 0; i < vcpu_num; i++) + th[i].join(); +} + TEST(DISABLED_http_client, ipv6) { // make sure runing in a ipv6-ready environment auto client = new_http_client(); DEFER(delete client); diff --git a/photon.cpp b/photon.cpp index b20379c8..de36f9c0 100644 --- a/photon.cpp +++ b/photon.cpp @@ -22,6 +22,7 @@ limitations under the License. #include "thread/thread.h" #include "thread/thread-pool.h" #include "thread/stack-allocator.h" +#include "thread/vcpu_local.h" #ifdef ENABLE_FSTACK_DPDK #include "io/fstack-dpdk.h" #endif @@ -95,6 +96,7 @@ int init(uint64_t event_engine, uint64_t io_engine, const PhotonOptions& options } int fini() { + at_vcpu_exit(); #ifdef __linux__ FINI_IO(LIBAIO, libaio_wrapper) FINI_IO(SOCKET_EDGE_TRIGGER, et_poller) diff --git a/thread/vcpu_local.cpp b/thread/vcpu_local.cpp new file mode 100644 index 00000000..23f4aed4 --- /dev/null +++ b/thread/vcpu_local.cpp @@ -0,0 +1,51 @@ +/* +Copyright 2022 The Photon Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include "vcpu_local.h" +#include "../common/alog.h" +#include "std-compat.h" +#include + +namespace photon { + +static thread_local VCPULocal *list = nullptr; + +VCPULocal::VCPULocal() { + LOG_DEBUG("push ", VALUE(this)); + if (list == nullptr) { + list = this; + } else { + list->insert_tail(this); + } +} + +VCPULocal::~VCPULocal() { + LOG_DEBUG("erase ", VALUE(this)); + auto nx = this->remove_from_list(); + if (this == list) + list = nx; +} + +void at_vcpu_exit() { + if (list == nullptr) + return; + while (list) { + list->vcpu_exit(); + delete list; + } +} + +} // namespace photon diff --git a/thread/vcpu_local.h b/thread/vcpu_local.h new file mode 100644 index 00000000..78b77445 --- /dev/null +++ b/thread/vcpu_local.h @@ -0,0 +1,32 @@ +/* +Copyright 2022 The Photon Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#pragma once +#include + +namespace photon { + +class VCPULocal : public intrusive_list_node { +public: + VCPULocal(); + virtual ~VCPULocal(); + + virtual int vcpu_exit() = 0; +}; + +void at_vcpu_exit(); + +} // namespace photon