Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add fini_hook for photon and for http client #484

Merged
merged 1 commit into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,8 @@ if (PHOTON_BUILD_TESTING)

# add_compile_options(-Wno-error)

include_directories(include ${GFLAGS_INCLUDE_DIRS} ${GOOGLETEST_INCLUDE_DIRS})
add_library(ci-tools STATIC test/ci-tools.cpp)

include_directories(photon_static ${GFLAGS_INCLUDE_DIRS} ${GOOGLETEST_INCLUDE_DIRS})
link_libraries(${GFLAGS_LIBRARIES} ${GOOGLETEST_LIBRARIES} ci-tools)

add_subdirectory(examples)
Expand Down
30 changes: 20 additions & 10 deletions net/http/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ limitations under the License.
#include <photon/net/socket.h>
#include <photon/net/security-context/tls-stream.h>
#include <photon/net/utils.h>
#include <photon/photon.h>

namespace photon {
namespace net {
Expand Down Expand Up @@ -51,9 +52,17 @@ class PooledDialer {
tcpsock.reset(new_tcp_socket_pool(tcp_cli, -1, true));
tlssock.reset(new_tcp_socket_pool(tls_cli, -1, true));
udssock.reset(new_uds_client());
photon::fini_hook({this, &PooledDialer::at_photon_fini});
}

~PooledDialer() {
}

void at_photon_fini() {
resolver.reset();
udssock.reset();
tlssock.reset();
tcpsock.reset();
if (tls_ctx_ownership)
delete tls_ctx;
}
Expand Down Expand Up @@ -127,14 +136,19 @@ enum RoundtripStatus {

class ClientImpl : public Client {
public:
PooledDialer m_dialer;
CommonHeaders<> m_common_headers;
TLSContext *m_tls_ctx;
ICookieJar *m_cookie_jar;
ClientImpl(ICookieJar *cookie_jar, TLSContext *tls_ctx) :
m_dialer(tls_ctx),
m_tls_ctx(tls_ctx),
m_cookie_jar(cookie_jar) {
}

PooledDialer& get_dialer() {
thread_local PooledDialer dialer(m_tls_ctx);
return dialer;
}

using SocketStream_ptr = std::unique_ptr<ISocketStream>;
int redirect(Operation* op) {
if (op->resp.body_size() > 0) {
Expand Down Expand Up @@ -165,22 +179,18 @@ class ClientImpl : public Client {
return ROUNDTRIP_REDIRECT;
}

int concurreny = 0;
int do_roundtrip(Operation* op, Timeout tmo) {
concurreny++;
LOG_DEBUG(VALUE(concurreny));
DEFER(concurreny--);
op->status_code = -1;
if (tmo.timeout() == 0)
LOG_ERROR_RETURN(ETIMEDOUT, ROUNDTRIP_FAILED, "connection timedout");
auto &req = op->req;
ISocketStream* s;
if (m_proxy && !m_proxy_url.empty())
s = m_dialer.dial(m_proxy_url, tmo.timeout());
s = get_dialer().dial(m_proxy_url, tmo.timeout());
else if (!op->uds_path.empty())
s = m_dialer.dial(op->uds_path, tmo.timeout());
s = get_dialer().dial(op->uds_path, tmo.timeout());
else
s = m_dialer.dial(req, tmo.timeout());
s = get_dialer().dial(req, tmo.timeout());
if (!s) {
if (errno == ECONNREFUSED || errno == ENOENT) {
LOG_ERROR_RETURN(0, ROUNDTRIP_FAST_RETRY, "connection refused")
Expand Down Expand Up @@ -288,7 +298,7 @@ class ClientImpl : public Client {
}

ISocketStream* native_connect(std::string_view host, uint16_t port, bool secure, uint64_t timeout) override {
return m_dialer.dial(host, port, secure, timeout);
return get_dialer().dial(host, port, secure, timeout);
}

CommonHeaders<>* common_headers() override {
Expand Down
56 changes: 56 additions & 0 deletions net/http/test/client_function_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,62 @@ TEST(http_client, partial_body) {
EXPECT_EQ(true, buf == "http_clien");
}


TEST(http_client, vcpu) {
system("mkdir -p /tmp/ease_ut/http_test/");
system("echo \"this is a http_client request body text for socket stream\" > /tmp/ease_ut/http_test/ease-httpclient-gettestfile");
auto tcpserver = new_tcp_socket_server();
tcpserver->setsockopt<int>(IPPROTO_TCP, TCP_NODELAY, 1);
tcpserver->bind_v4localhost();
tcpserver->listen();
DEFER(delete tcpserver);
auto server = new_http_server();
DEFER(delete server);
auto fs = photon::fs::new_localfs_adaptor("/tmp/ease_ut/http_test/");
DEFER(delete fs);
auto fs_handler = new_fs_handler(fs);
DEFER(delete fs_handler);
server->add_handler(fs_handler);
tcpserver->set_handler(server->get_connection_handler());
tcpserver->start_loop();
auto target = to_url(tcpserver, "/ease-httpclient-gettestfile");
auto client = new_http_client();
DEFER(delete client);

int vcpu_num = 16;
photon::semaphore sem(0);
std::thread th[vcpu_num];
for (int i = 0; i < vcpu_num; i++) {
th[i] = std::thread([&] {
photon::init(photon::INIT_EVENT_DEFAULT, photon::INIT_IO_NONE);
DEFER({
photon::fini();
sem.signal(1);
});

for (int round = 0; round < 10; round++) {
auto op = client->new_operation(Verb::GET, target);
DEFER(client->destroy_operation(op));
op->req.headers.content_length(0);
int ret = client->call(op);
GTEST_ASSERT_EQ(0, ret);

char resp_body_buf[1024];
EXPECT_EQ(sizeof(socket_buf), op->resp.resource_size());
ret = op->resp.read(resp_body_buf, sizeof(socket_buf));
EXPECT_EQ(sizeof(socket_buf), ret);
resp_body_buf[sizeof(socket_buf) - 1] = '\0';
LOG_DEBUG(resp_body_buf);
EXPECT_EQ(0, strcmp(resp_body_buf, socket_buf));
}
});
}

sem.wait(vcpu_num);
for (int i = 0; i < vcpu_num; i++)
th[i].join();
}

TEST(DISABLED_http_client, ipv6) { // make sure runing in a ipv6-ready environment
auto client = new_http_client();
DEFER(delete client);
Expand Down
14 changes: 14 additions & 0 deletions photon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ limitations under the License.
#include "net/curl.h"
#include "net/socket.h"
#include "fs/exportfs.h"
#include "common/callback.h"
#include <vector>

namespace photon {

Expand Down Expand Up @@ -94,7 +96,19 @@ int init(uint64_t event_engine, uint64_t io_engine, const PhotonOptions& options
return 0;
}

static std::vector<Delegate<void>>& get_hook_vector() {
thread_local std::vector<Delegate<void>> hooks;
return hooks;
}

void fini_hook(Delegate<void> handler) {
get_hook_vector().emplace_back(handler);
}

int fini() {
for (auto h : get_hook_vector()) {
h.fire();
}
#ifdef __linux__
FINI_IO(LIBAIO, libaio_wrapper)
FINI_IO(SOCKET_EDGE_TRIGGER, et_poller)
Expand Down
7 changes: 6 additions & 1 deletion photon.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
#pragma once

#include <inttypes.h>
#include <photon/common/callback.h>

namespace photon {

Expand Down Expand Up @@ -68,5 +69,9 @@ int init(uint64_t event_engine = INIT_EVENT_DEFAULT,
*/
int fini();

/**
* @brief add callbacks on fini()
*/
void fini_hook(Delegate<void> handler);
liulanzheng marked this conversation as resolved.
Show resolved Hide resolved

}
}
Loading