Skip to content

Commit

Permalink
add fini_hook for photon and for http client (#484)
Browse files Browse the repository at this point in the history
Signed-off-by: liulanzheng <lanzheng.liulz@alibaba-inc.com>
  • Loading branch information
liulanzheng authored May 21, 2024
1 parent 492049b commit 24e35e7
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 13 deletions.
3 changes: 1 addition & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,8 @@ if (PHOTON_BUILD_TESTING)

# add_compile_options(-Wno-error)

include_directories(include ${GFLAGS_INCLUDE_DIRS} ${GOOGLETEST_INCLUDE_DIRS})
add_library(ci-tools STATIC test/ci-tools.cpp)

include_directories(photon_static ${GFLAGS_INCLUDE_DIRS} ${GOOGLETEST_INCLUDE_DIRS})
link_libraries(${GFLAGS_LIBRARIES} ${GOOGLETEST_LIBRARIES} ci-tools)

add_subdirectory(examples)
Expand Down
30 changes: 20 additions & 10 deletions net/http/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ limitations under the License.
#include <photon/net/socket.h>
#include <photon/net/security-context/tls-stream.h>
#include <photon/net/utils.h>
#include <photon/photon.h>

namespace photon {
namespace net {
Expand Down Expand Up @@ -51,9 +52,17 @@ class PooledDialer {
tcpsock.reset(new_tcp_socket_pool(tcp_cli, -1, true));
tlssock.reset(new_tcp_socket_pool(tls_cli, -1, true));
udssock.reset(new_uds_client());
photon::fini_hook({this, &PooledDialer::at_photon_fini});
}

~PooledDialer() {
}

void at_photon_fini() {
resolver.reset();
udssock.reset();
tlssock.reset();
tcpsock.reset();
if (tls_ctx_ownership)
delete tls_ctx;
}
Expand Down Expand Up @@ -127,14 +136,19 @@ enum RoundtripStatus {

class ClientImpl : public Client {
public:
PooledDialer m_dialer;
CommonHeaders<> m_common_headers;
TLSContext *m_tls_ctx;
ICookieJar *m_cookie_jar;
ClientImpl(ICookieJar *cookie_jar, TLSContext *tls_ctx) :
m_dialer(tls_ctx),
m_tls_ctx(tls_ctx),
m_cookie_jar(cookie_jar) {
}

PooledDialer& get_dialer() {
thread_local PooledDialer dialer(m_tls_ctx);
return dialer;
}

using SocketStream_ptr = std::unique_ptr<ISocketStream>;
int redirect(Operation* op) {
if (op->resp.body_size() > 0) {
Expand Down Expand Up @@ -165,22 +179,18 @@ class ClientImpl : public Client {
return ROUNDTRIP_REDIRECT;
}

int concurreny = 0;
int do_roundtrip(Operation* op, Timeout tmo) {
concurreny++;
LOG_DEBUG(VALUE(concurreny));
DEFER(concurreny--);
op->status_code = -1;
if (tmo.timeout() == 0)
LOG_ERROR_RETURN(ETIMEDOUT, ROUNDTRIP_FAILED, "connection timedout");
auto &req = op->req;
ISocketStream* s;
if (m_proxy && !m_proxy_url.empty())
s = m_dialer.dial(m_proxy_url, tmo.timeout());
s = get_dialer().dial(m_proxy_url, tmo.timeout());
else if (!op->uds_path.empty())
s = m_dialer.dial(op->uds_path, tmo.timeout());
s = get_dialer().dial(op->uds_path, tmo.timeout());
else
s = m_dialer.dial(req, tmo.timeout());
s = get_dialer().dial(req, tmo.timeout());
if (!s) {
if (errno == ECONNREFUSED || errno == ENOENT) {
LOG_ERROR_RETURN(0, ROUNDTRIP_FAST_RETRY, "connection refused")
Expand Down Expand Up @@ -288,7 +298,7 @@ class ClientImpl : public Client {
}

ISocketStream* native_connect(std::string_view host, uint16_t port, bool secure, uint64_t timeout) override {
return m_dialer.dial(host, port, secure, timeout);
return get_dialer().dial(host, port, secure, timeout);
}

CommonHeaders<>* common_headers() override {
Expand Down
56 changes: 56 additions & 0 deletions net/http/test/client_function_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,62 @@ TEST(http_client, partial_body) {
EXPECT_EQ(true, buf == "http_clien");
}


TEST(http_client, vcpu) {
system("mkdir -p /tmp/ease_ut/http_test/");
system("echo \"this is a http_client request body text for socket stream\" > /tmp/ease_ut/http_test/ease-httpclient-gettestfile");
auto tcpserver = new_tcp_socket_server();
tcpserver->setsockopt<int>(IPPROTO_TCP, TCP_NODELAY, 1);
tcpserver->bind_v4localhost();
tcpserver->listen();
DEFER(delete tcpserver);
auto server = new_http_server();
DEFER(delete server);
auto fs = photon::fs::new_localfs_adaptor("/tmp/ease_ut/http_test/");
DEFER(delete fs);
auto fs_handler = new_fs_handler(fs);
DEFER(delete fs_handler);
server->add_handler(fs_handler);
tcpserver->set_handler(server->get_connection_handler());
tcpserver->start_loop();
auto target = to_url(tcpserver, "/ease-httpclient-gettestfile");
auto client = new_http_client();
DEFER(delete client);

int vcpu_num = 16;
photon::semaphore sem(0);
std::thread th[vcpu_num];
for (int i = 0; i < vcpu_num; i++) {
th[i] = std::thread([&] {
photon::init(photon::INIT_EVENT_DEFAULT, photon::INIT_IO_NONE);
DEFER({
photon::fini();
sem.signal(1);
});

for (int round = 0; round < 10; round++) {
auto op = client->new_operation(Verb::GET, target);
DEFER(client->destroy_operation(op));
op->req.headers.content_length(0);
int ret = client->call(op);
GTEST_ASSERT_EQ(0, ret);

char resp_body_buf[1024];
EXPECT_EQ(sizeof(socket_buf), op->resp.resource_size());
ret = op->resp.read(resp_body_buf, sizeof(socket_buf));
EXPECT_EQ(sizeof(socket_buf), ret);
resp_body_buf[sizeof(socket_buf) - 1] = '\0';
LOG_DEBUG(resp_body_buf);
EXPECT_EQ(0, strcmp(resp_body_buf, socket_buf));
}
});
}

sem.wait(vcpu_num);
for (int i = 0; i < vcpu_num; i++)
th[i].join();
}

TEST(DISABLED_http_client, ipv6) { // make sure runing in a ipv6-ready environment
auto client = new_http_client();
DEFER(delete client);
Expand Down
14 changes: 14 additions & 0 deletions photon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ limitations under the License.
#include "net/curl.h"
#include "net/socket.h"
#include "fs/exportfs.h"
#include "common/callback.h"
#include <vector>

namespace photon {

Expand Down Expand Up @@ -94,7 +96,19 @@ int init(uint64_t event_engine, uint64_t io_engine, const PhotonOptions& options
return 0;
}

static std::vector<Delegate<void>>& get_hook_vector() {
thread_local std::vector<Delegate<void>> hooks;
return hooks;
}

void fini_hook(Delegate<void> handler) {
get_hook_vector().emplace_back(handler);
}

int fini() {
for (auto h : get_hook_vector()) {
h.fire();
}
#ifdef __linux__
FINI_IO(LIBAIO, libaio_wrapper)
FINI_IO(SOCKET_EDGE_TRIGGER, et_poller)
Expand Down
7 changes: 6 additions & 1 deletion photon.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
#pragma once

#include <inttypes.h>
#include <photon/common/callback.h>

namespace photon {

Expand Down Expand Up @@ -68,5 +69,9 @@ int init(uint64_t event_engine = INIT_EVENT_DEFAULT,
*/
int fini();

/**
* @brief add callbacks on fini()
*/
void fini_hook(Delegate<void> handler);

}
}

0 comments on commit 24e35e7

Please sign in to comment.