diff --git a/examples/rpc/server.h b/examples/rpc/server.h index ff6ce94e..a7348957 100644 --- a/examples/rpc/server.h +++ b/examples/rpc/server.h @@ -59,7 +59,7 @@ struct ExampleServer { // Serve provides handler for socket server int serve(photon::net::ISocketStream* stream) { - return skeleton->serve(stream, false); + return skeleton->serve(stream); } void term() { diff --git a/net/basic_socket.cpp b/net/basic_socket.cpp index bd54f8fd..8446b8dd 100644 --- a/net/basic_socket.cpp +++ b/net/basic_socket.cpp @@ -97,7 +97,7 @@ int connect(int fd, const struct sockaddr *addr, socklen_t addrlen, if (ret < 0) return -1; socklen_t n = sizeof(err); ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &n); - if (ret < 0) return -1; + if (unlikely(ret < 0)) return -1; if (err) { errno = err; return -1; diff --git a/net/socket.h b/net/socket.h index 75592a66..c0914ef8 100644 --- a/net/socket.h +++ b/net/socket.h @@ -255,6 +255,7 @@ namespace net { using Handler = Callback; virtual ISocketServer* set_handler(Handler handler) = 0; virtual int start_loop(bool block = false) = 0; + // Close the listening fd. It's the user's responsibility to close the active connections. virtual void terminate() = 0; }; diff --git a/rpc/rpc.cpp b/rpc/rpc.cpp index 8082674f..d55a778e 100644 --- a/rpc/rpc.cpp +++ b/rpc/rpc.cpp @@ -222,7 +222,7 @@ namespace rpc { bool got_it; int* stream_serv_count; photon::condition_variable *stream_cv; - std::shared_ptr w_lock; + photon::mutex* w_lock; Context(SkeletonImpl* sk, IStream* s) : request(sk->m_allocator), stream(s), sk(sk) { } @@ -289,6 +289,7 @@ namespace rpc { } int response_sender(iovector* resp) { + assert(w_lock); Header h; h.size = (uint32_t)resp->sum(); h.function = header.function; @@ -297,13 +298,11 @@ namespace rpc { resp->push_front(&h, sizeof(h)); if (stream == nullptr) LOG_ERRNO_RETURN(0, -1, "socket closed "); - if (w_lock) { - w_lock->lock(); - } + + w_lock->lock(); ssize_t ret = stream->writev(resp->iovec(), resp->iovcnt()); - if (w_lock) { - w_lock->unlock(); - } + w_lock->unlock(); + if (ret < (ssize_t)(sizeof(h) + h.size)) { stream->shutdown(ShutdownHow::ReadWrite); LOG_ERRNO_RETURN(0, -1, "failed to send rpc response to stream ", stream); @@ -314,43 +313,38 @@ namespace rpc { condition_variable m_cond_served; struct ThreadLink : public intrusive_list_node { - photon::thread* thread = nullptr; + photon::thread* thread = photon::CURRENT; }; - intrusive_list m_list; + intrusive_list m_list; // Stores the thread ID of every stream uint64_t m_serving_count = 0; - bool m_concurrent; bool m_running = true; photon::ThreadPoolBase *m_thread_pool; - virtual int serve(IStream* stream, bool ownership) override + virtual int serve(IStream* stream) override { - if (!m_running) - LOG_ERROR_RETURN(ENOTSUP, -1, "the skeleton has closed"); + if (unlikely(!m_running)) + return -1; ThreadLink node; m_list.push_back(&node); DEFER(m_list.erase(&node)); - DEFER(if (ownership) delete stream;); // stream serve refcount int stream_serv_count = 0; + photon::mutex w_lock; photon::condition_variable stream_cv; - // once serve goint to exit, stream may destruct + // once serve exit, stream will destruct // make sure all requests relies on this stream are finished DEFER({ while (stream_serv_count > 0) stream_cv.wait_no_lock(); }); if (stream_accept_notify) stream_accept_notify(stream); DEFER(if (stream_close_notify) stream_close_notify(stream)); - auto w_lock = m_concurrent ? std::make_shared() : nullptr; - while(m_running) - { + + while(likely(m_running)) { Context context(this, stream); context.stream_serv_count = &stream_serv_count; context.stream_cv = &stream_cv; - context.w_lock = w_lock; - node.thread = CURRENT; + context.w_lock = &w_lock; int ret = context.read_request(); - ERRNO err; - node.thread = nullptr; if (ret < 0) { // should only shutdown read, for other threads // might still writing @@ -363,16 +357,11 @@ namespace rpc { } } - if (!m_concurrent) { - context.serve_request(); - } else { - context.got_it = false; - m_thread_pool->thread_create(&async_serve, &context); - // async_serve will be start, add refcount here - stream_serv_count ++; - while(!context.got_it) - thread_yield_to(nullptr); - } + context.got_it = false; + m_thread_pool->thread_create(&async_serve, &context); + stream_serv_count ++; + while(!context.got_it) + thread_yield(); } return 0; } @@ -381,48 +370,45 @@ namespace rpc { auto ctx = (Context*)args_; Context context(std::move(*ctx)); ctx->got_it = true; - thread_yield_to(nullptr); + thread_yield(); context.serve_request(); // serve done, here reduce refcount (*ctx->stream_serv_count) --; ctx->stream_cv->notify_all(); return nullptr; } - virtual int shutdown_no_wait() override { - photon::thread_create11(&SkeletonImpl::shutdown, this); + virtual int shutdown(bool no_more_requests) override { + m_running = !no_more_requests; + while (m_list) { + auto th = m_list.front()->thread; + thread_enable_join(th); + if (no_more_requests) { + thread_interrupt(th); + } + // Wait all streams destructed. Their attached RPC requests are finished as well. + thread_join((join_handle*) th); + } return 0; } - virtual int shutdown() override - { + int shutdown_no_wait() override { m_running = false; - for (const auto& x: m_list) - if (x->thread) - thread_interrupt(x->thread); - // it should confirm that all threads are finished - // or m_list may not destruct correctly - while (m_serving_count > 0) { - // means shutdown called by rpc serve, should return to give chance to shutdown - if ((m_serving_count == 1) && (m_list.front()->thread == nullptr)) - return 0; - m_cond_served.wait_no_lock(); + for (auto* each: m_list) { + thread_interrupt(each->thread); } - while (!m_list.empty()) - thread_usleep(1000); return 0; } virtual ~SkeletonImpl() { - shutdown(); + shutdown(true); photon::delete_thread_pool(m_thread_pool); } - explicit SkeletonImpl(bool concurrent = true, uint32_t pool_size = 128) - : m_concurrent(concurrent), + explicit SkeletonImpl(uint32_t pool_size = 128) : m_thread_pool(photon::new_thread_pool(pool_size)) { m_thread_pool->enable_autoscale(); } }; - Skeleton* new_skeleton(bool concurrent, uint32_t pool_size) + Skeleton* new_skeleton(uint32_t pool_size) { - return new SkeletonImpl(concurrent, pool_size); + return new SkeletonImpl(pool_size); } class StubPoolImpl : public StubPool { diff --git a/rpc/rpc.h b/rpc/rpc.h index 0b2b19fa..eb195342 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -161,16 +161,25 @@ namespace rpc virtual int set_close_notify(Notifier notifier) = 0; // can be invoked concurrently by multiple threads - // if have the ownership of stream, `serve` will delete it before exit - virtual int serve(IStream* stream, bool ownership_stream = false) = 0; + virtual int serve(IStream* stream) = 0; + + __attribute__((deprecated)) + int serve(IStream* stream, bool /*ownership_stream*/) { + return serve(stream); + } // set the allocator to allocate memory for recving responses // the default allocator is defined in iovector.h/cpp virtual void set_allocator(IOAlloc allocation) = 0; - virtual int shutdown_no_wait() = 0; + /** + * @brief Shutdown the rpc server from outside. + * @warning DO NOT invoke this function within the RPC request. + * You should create a thread to invoke it, or just use shutdown_no_wait. + */ + virtual int shutdown(bool no_more_requests = true) = 0; - virtual int shutdown() = 0; + virtual int shutdown_no_wait() = 0; template int register_service(ServerClass* obj) @@ -238,7 +247,12 @@ namespace rpc extern "C" StubPool* new_uds_stub_pool(const char* path, uint64_t expiration, uint64_t connect_timeout, uint64_t rpc_timeout); - extern "C" Skeleton* new_skeleton(bool concurrent = true, uint32_t pool_size = 128); + extern "C" Skeleton* new_skeleton(uint32_t pool_size = 128); + + __attribute__((deprecated)) + inline Skeleton* new_skeleton(bool /*concurrent*/, uint32_t pool_size = 128) { + return new_skeleton(pool_size); + } struct __example__operation1__ // defination of operator { diff --git a/rpc/test/test-rpc-message.cpp b/rpc/test/test-rpc-message.cpp index 6597e0cd..ef188830 100644 --- a/rpc/test/test-rpc-message.cpp +++ b/rpc/test/test-rpc-message.cpp @@ -156,7 +156,7 @@ class TestRPCServer { } int serve(photon::net::ISocketStream* stream) { - return skeleton->serve(stream, false); + return skeleton->serve(stream); } int run() { diff --git a/rpc/test/test.cpp b/rpc/test/test.cpp index 3b06f18f..29f85bda 100644 --- a/rpc/test/test.cpp +++ b/rpc/test/test.cpp @@ -16,16 +16,30 @@ limitations under the License. #include "../../rpc/rpc.cpp" #include +#include #include #include #include #include #include #include +#include +#include "../../test/ci-tools.h" + using namespace std; using namespace photon; using namespace rpc; +class RpcTest : public testing::Test { +public: + void SetUp() override { + GTEST_ASSERT_EQ(0, photon::init(ci_ev_engine, photon::INIT_IO_NONE)); + } + void TearDown() override { + photon::fini(); + } +}; + std::string S = "1234567890"; struct Args { @@ -38,15 +52,10 @@ struct Args } void verify() { - LOG_DEBUG(VALUE(a)); EXPECT_EQ(a, 123); - LOG_DEBUG(VALUE(b)); EXPECT_EQ(b, 123); - LOG_DEBUG(VALUE(c)); EXPECT_EQ(c, 123); - LOG_DEBUG(VALUE(d)); EXPECT_EQ(d, 123); - LOG_DEBUG(VALUE(s)); EXPECT_EQ(s, S); } uint64_t serialize(iovector& iov) @@ -142,13 +151,13 @@ int server_exit_function(void* instance, iovector* request, rpc::Skeleton::Respo bool skeleton_exited; photon::condition_variable skeleton_exit; -rpc::Skeleton* g_sk; + void* rpc_skeleton(void* args) { - skeleton_exited = false; auto s = (IStream*)args; auto sk = new_skeleton(); - g_sk = sk; + DEFER(delete sk); + sk->add_function(FID, rpc::Skeleton::Function((void*)123, &server_function)); sk->add_function(-1, rpc::Skeleton::Function(sk, &server_exit_function)); sk->serve(s); @@ -171,7 +180,7 @@ void do_call(StubImpl& stub, uint64_t function) EXPECT_EQ(memcmp(STR, resp_iov.iov.back().iov_base, LEN(STR)), 0); } -TEST(rpc, call) +TEST_F(RpcTest, call) { unique_ptr ds( new_duplex_memory_stream(16) ); thread_create(&rpc_skeleton, ds->endpoint_a); @@ -207,13 +216,14 @@ void* do_concurrent_call_shut(void* arg) return nullptr; } -TEST(rpc, concurrent) +TEST_F(RpcTest, concurrent) { // log_output_level = 1; LOG_INFO("Creating 1,000 threads, each doing 1,000 RPC calls"); // ds will be destruct just after function returned // but server will not // therefore, it will cause assert when destruction + skeleton_exited = false; unique_ptr ds( new_duplex_memory_stream(16) ); thread_create(&rpc_skeleton, ds->endpoint_a); @@ -232,7 +242,6 @@ TEST(rpc, concurrent) ds->close(); if (!skeleton_exited) skeleton_exit.wait_no_lock(); - log_output_level = 0; } void do_call_timeout(StubImpl& stub, uint64_t function) @@ -280,10 +289,9 @@ int server_function_timeout(void* instance, iovector* request, rpc::Skeleton::Re void* rpc_skeleton_timeout(void* args) { - skeleton_exited = false; auto s = (IStream*)args; auto sk = new_skeleton(); - g_sk = sk; + DEFER(delete sk); sk->add_function(FID, rpc::Skeleton::Function((void*)123, &server_function_timeout)); sk->add_function(-1, rpc::Skeleton::Function(sk, &server_exit_function)); sk->serve(s); @@ -293,13 +301,14 @@ void* rpc_skeleton_timeout(void* args) return nullptr; } -TEST(rpc, timeout) { +TEST_F(RpcTest, timeout) { LOG_INFO("Creating 1,000 threads, each doing 1,000 RPC calls"); // ds will be destruct just after function returned // but server will not // therefore, it will cause assert when destruction unique_ptr ds( new_duplex_memory_stream(655360) ); + skeleton_exited = false; thread_create(&rpc_skeleton_timeout, ds->endpoint_a); LOG_DEBUG("asdf1"); @@ -320,9 +329,159 @@ TEST(rpc, timeout) { log_output_level = 0; } +class RpcServer { +public: + RpcServer(Skeleton* skeleton, net::ISocketServer* socket) : m_socket(socket), m_skeleton(skeleton) { + m_skeleton->register_service(this); + m_socket->set_handler({this, &RpcServer::serve}); + } + struct Operation { + const static uint32_t IID = 0x1; + const static uint32_t FID = 0x2; + struct Request : public photon::rpc::Message { + int code = 0; + PROCESS_FIELDS(code); + }; + struct Response : public photon::rpc::Message { + int code = 0; + PROCESS_FIELDS(code); + }; + }; + int do_rpc_service(Operation::Request* req, Operation::Response* resp, IOVector* iov, IStream* stream) { + resp->code = req->code; + return 0; + } + int serve(photon::net::ISocketStream* stream) { + return m_skeleton->serve(stream); + } + int run() { + m_socket->setsockopt(SOL_SOCKET, SO_REUSEPORT, 1); + if (m_socket->bind(9527, net::IPAddr::V6Any()) != 0) + LOG_ERRNO_RETURN(0, -1, "bind failed"); + if (m_socket->listen() != 0) + LOG_ERRNO_RETURN(0, -1, "listen failed"); + return m_socket->start_loop(false); + } + net::ISocketServer* m_socket; + Skeleton* m_skeleton; +}; + +static int do_call_2(Stub* stub) { + RpcServer::Operation::Request req; + RpcServer::Operation::Response resp; + return stub->call(req, resp); +} + +TEST_F(RpcTest, shutdown) { + auto socket_server = photon::net::new_tcp_socket_server_ipv6(); + GTEST_ASSERT_NE(nullptr, socket_server); + DEFER(delete socket_server); + auto sk = photon::rpc::new_skeleton(); + GTEST_ASSERT_NE(nullptr, sk); + DEFER(delete sk); + + RpcServer rpc_server(sk, socket_server); + GTEST_ASSERT_EQ(0, rpc_server.run()); + + auto pool = photon::rpc::new_stub_pool(-1, -1, -1); + DEFER(delete pool); + + photon::net::EndPoint ep(net::IPAddr::V4Loopback(), 9527); + auto stub = pool->get_stub(ep, false); + ASSERT_NE(nullptr, stub); + DEFER(pool->put_stub(ep, true)); + + photon::thread_create11([&]{ + photon::thread_sleep(1); + sk->shutdown(); + delete sk; + sk = nullptr; + }); + + auto start = std::chrono::steady_clock::now(); + while (true) { + int ret = do_call_2(stub); + if (ret < 0) { + GTEST_ASSERT_EQ(ECONNRESET, errno); + break; + } + } + auto end = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(end - start).count(); + GTEST_ASSERT_GT(duration, 900); + GTEST_ASSERT_LT(duration, 1100); +} + +TEST_F(RpcTest, passive_shutdown) { + auto socket_server = photon::net::new_tcp_socket_server_ipv6(); + GTEST_ASSERT_NE(nullptr, socket_server); + DEFER(delete socket_server); + auto sk = photon::rpc::new_skeleton(); + GTEST_ASSERT_NE(nullptr, sk); + DEFER(delete sk); + + RpcServer rpc_server(sk, socket_server); + GTEST_ASSERT_EQ(0, rpc_server.run()); + + photon::net::EndPoint ep(net::IPAddr::V4Loopback(), 9527); + + photon::thread_create11([&]{ + // Should always succeed in 3 seconds + auto pool = photon::rpc::new_stub_pool(-1, -1, -1); + DEFER(delete pool); + auto stub = pool->get_stub(ep, false); + if (!stub) abort(); + DEFER(pool->put_stub(ep, true)); + for (int i = 0 ; i < 30; ++i) { + int ret = do_call_2(stub); + if (ret < 0) { + LOG_ERROR(VALUE(ret)); + abort(); + } + photon::thread_usleep(100'000); + } + }); + + photon::thread_create11([&]{ + photon::thread_sleep(2); + // Should get connection refused after 2 seconds. Because socket closed listen fd at 1 second. + auto pool = photon::rpc::new_stub_pool(-1, -1, -1); + DEFER(delete pool); + auto stub = pool->get_stub(ep, false); + if (stub) { + LOG_ERROR("should not get stub"); + abort(); + } + if (errno != ECONNREFUSED) { + LOG_ERROR(ERRNO()); + abort(); + } + }); + + auto start = std::chrono::steady_clock::now(); + + photon::thread_sleep(1); + socket_server->terminate(); + delete socket_server; + socket_server = nullptr; + + LOG_INFO("begin passive shutdown"); + sk->shutdown(false); + LOG_INFO("end passive shutdown"); + delete sk; + sk = nullptr; + + auto end = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(end - start).count(); + + // The passive shutdown took 3 seconds, until client closed the connection + GTEST_ASSERT_GT(duration, 2900); + GTEST_ASSERT_LT(duration, 3200); +} + int main(int argc, char** arg) { - ::photon::vcpu_init(); + ci_parse_env(); ::testing::InitGoogleTest(&argc, arg); return RUN_ALL_TESTS(); }