From 2012ff4d8e91a1daff623c3316f026c9d7333334 Mon Sep 17 00:00:00 2001 From: Coldwings Date: Tue, 3 Sep 2024 16:30:12 +0800 Subject: [PATCH] fix RPC stop by timeout during calling --- rpc/out-of-order-execution.cpp | 24 +++++-- rpc/out-of-order-execution.h | 16 +++-- rpc/rpc.cpp | 35 +++++----- rpc/rpc.h | 29 ++++++-- rpc/test/test.cpp | 117 +++++++++++++++++++++++++++++++++ 5 files changed, 186 insertions(+), 35 deletions(-) diff --git a/rpc/out-of-order-execution.cpp b/rpc/out-of-order-execution.cpp index 04d653aa..7fdc30e7 100644 --- a/rpc/out-of-order-execution.cpp +++ b/rpc/out-of-order-execution.cpp @@ -34,6 +34,13 @@ namespace rpc { uint64_t m_tag = 0; bool m_running = true; + // rlock used as both reader lock and wait notifier. + // add yield in lock will break the assuption that threads + // not holding lock should kept in sleep. + // so do not yield, just put into sleep when needed + // make sure it able to wake by interrupts + OooEngine(): m_mutex_r(0) {} + ~OooEngine() { shutdown(); } @@ -55,7 +62,7 @@ namespace rpc { { m_issuing ++; DEFER(m_issuing --); - scoped_lock lock(m_mutex_w); + SCOPED_LOCK(m_mutex_w); if (!m_running) LOG_ERROR_RETURN(ESHUTDOWN, -1, "engine is been shuting down"); if (!args.flag_tag_valid) @@ -89,13 +96,21 @@ namespace rpc { { // lock with param 1 means allow entry without lock // when interuptted - scoped_lock lock(m_mutex_r, 1); + int lockret = m_mutex_r.lock(args.timeout); + ERRNO err; + DEFER(if (lockret == 0) m_mutex_r.unlock()); // when wait_completion returned, // always have tag removed from the map // notify the waiting function (like shutdown()) DEFER(m_cond_collected.notify_one()); + if (lockret < 0 && err.no == ETIMEDOUT) { + // Timed out so return as failure + m_map.erase(args.tag); + LOG_ERROR_RETURN(ETIMEDOUT, -1, "timeout wait for completion"); + } + auto o_tag = args.tag; { auto o_it = m_map.find(o_tag); @@ -106,13 +121,12 @@ namespace rpc { { LOG_ERROR_RETURN(EINVAL, -1, "args tag ` not belong to current thread `", VALUE(args.tag), VALUE(CURRENT)); } - if (o_it->second->collected) { + if (args.collected) { // my completion has been done // just collect it, clear the trace, // then return result - auto ret = o_it->second->ret; m_map.erase(o_it); - return ret; + return args.ret; } } //Hold the lock, but not get the result. diff --git a/rpc/out-of-order-execution.h b/rpc/out-of-order-execution.h index 038541a4..52bdafb8 100644 --- a/rpc/out-of-order-execution.h +++ b/rpc/out-of-order-execution.h @@ -25,8 +25,9 @@ collection of result. The first 2 parts are realized via callbacks. */ #pragma once -#include #include +#include +#include namespace photon{ @@ -75,17 +76,20 @@ namespace rpc { // It's guaranteed not to be called concurrently. CallbackType do_collect; - // whether or not the `tag` field is valid - bool flag_tag_valid = false; - // thread that binding with this argument thread * th; - // whether the context result is collected - bool collected; + // Timeout for wait + Timeout timeout; // return value of collection int ret; + + // whether or not the `tag` field is valid + bool flag_tag_valid = false; + + // whether the context result is collected + volatile bool collected = false; }; diff --git a/rpc/rpc.cpp b/rpc/rpc.cpp index 01c6b88d..570f1d90 100644 --- a/rpc/rpc.cpp +++ b/rpc/rpc.cpp @@ -137,9 +137,7 @@ namespace rpc { FunctionID function; }; iovector *request, *response; - Timeout timeout; - OooArgs(StubImpl* stub, FunctionID function, iovector* req, iovector* resp, uint64_t timeout_): - timeout(timeout_) + OooArgs(StubImpl* stub, FunctionID function, iovector* req, iovector* resp, Timeout timeout_) { request = req; response = resp; @@ -148,14 +146,14 @@ namespace rpc { do_issue.bind(stub, &StubImpl::do_send); do_completion.bind(stub, &StubImpl::do_recv_header); do_collect.bind(stub, &StubImpl::do_recv_body); + timeout = timeout_; } }; - int do_call(FunctionID function, iovector* request, iovector* response, uint64_t timeout) override { + int do_call(FunctionID function, iovector* request, iovector* response, Timeout tmo) override { scoped_rwlock rl(m_rwlock, photon::RLOCK); - Timeout tmo(timeout); if (tmo.expiration() < photon::now) { - LOG_ERROR_RETURN(ETIMEDOUT, -1, "Timed out before rpc start", VALUE(timeout), VALUE(tmo.timeout())); + LOG_ERROR_RETURN(ETIMEDOUT, -1, "Timed out before rpc start", VALUE(tmo.timeout())); } int ret = 0; OooArgs args(this, function, request, response, tmo.timeout()); @@ -421,12 +419,12 @@ namespace rpc { class StubPoolImpl : public StubPool { public: - explicit StubPoolImpl(uint64_t expiration, uint64_t connect_timeout, uint64_t rpc_timeout) { + explicit StubPoolImpl(uint64_t expiration, int64_t connect_timeout, uint64_t transfer_timeout) { tls_ctx = net::new_tls_context(nullptr, nullptr, nullptr); tcpclient = net::new_tcp_socket_client(); tcpclient->timeout(connect_timeout); m_pool = new ObjectCache(expiration); - m_rpc_timeout = rpc_timeout; + m_transfer_timeout = transfer_timeout; } ~StubPoolImpl() { @@ -456,7 +454,7 @@ namespace rpc { } uint64_t get_timeout() const override { - return m_rpc_timeout; + return m_transfer_timeout; } protected: @@ -465,7 +463,7 @@ namespace rpc { if (!sock) LOG_ERRNO_RETURN(0, nullptr, "failed to connect to ", ep); LOG_DEBUG("connected to ", ep); - sock->timeout(m_rpc_timeout); + sock->timeout(m_transfer_timeout); if (tls) { sock = net::new_tls_stream(tls_ctx, sock, net::SecurityRole::Client, true); } @@ -475,7 +473,7 @@ namespace rpc { ObjectCache* m_pool; net::ISocketClient *tcpclient; net::TLSContext* tls_ctx = nullptr; - uint64_t m_rpc_timeout; + uint64_t m_transfer_timeout; }; // dummy pool, for unix domain socket connection to only one point only @@ -483,8 +481,8 @@ namespace rpc { class UDSStubPoolImpl : public StubPoolImpl { public: explicit UDSStubPoolImpl(const char* path, uint64_t expiration, - uint64_t connect_timeout, uint64_t rpc_timeout) - : StubPoolImpl(expiration, connect_timeout, rpc_timeout), + uint64_t connect_timeout, uint64_t transfer_timeout) + : StubPoolImpl(expiration, connect_timeout, transfer_timeout), m_path(path), m_client(net::new_uds_client()) { m_client->timeout(connect_timeout); } @@ -500,7 +498,7 @@ namespace rpc { LOG_ERRNO_RETURN(0, nullptr, "Connect to unix domain socket failed"); } - sock->timeout(m_rpc_timeout); + sock->timeout(m_transfer_timeout); return new_rpc_stub(sock, true); }); } @@ -510,15 +508,16 @@ namespace rpc { net::ISocketClient * m_client; }; - StubPool* new_stub_pool(uint64_t expiration, uint64_t connect_timeout, uint64_t rpc_timeout) { - return new StubPoolImpl(expiration, connect_timeout, rpc_timeout); + StubPool* new_stub_pool(uint64_t expiration, uint64_t connect_timeout, + uint64_t transfer_timeout) { + return new StubPoolImpl(expiration, connect_timeout, transfer_timeout); } StubPool* new_uds_stub_pool(const char* path, uint64_t expiration, uint64_t connect_timeout, - uint64_t rpc_timeout) { + uint64_t transfer_timeout) { return new UDSStubPoolImpl(path, expiration, connect_timeout, - rpc_timeout); + transfer_timeout); } } // namespace rpc } diff --git a/rpc/rpc.h b/rpc/rpc.h index eb195342..791c57b2 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -59,10 +59,13 @@ namespace rpc /** * @param req Request of Message * @param resp Response of Message + * @param timeout timeout in milliseconds, -1UL for no timeout * @return The number of bytes received, -1 for failure * @note Request and Response should assign to external memory buffers if they have variable-length fields. * Via this, we can achieve zero-copy send and receive. * For Response, there could be only 1 buffer at most. For Request, there is no limit. + * Attention: RPC stub do not support multi vCPU, when multiple vCPUs are used, the RPC stub should be + * vCPU local object. */ template int call(typename Operation::Request& req, @@ -107,6 +110,7 @@ namespace rpc /** * @param req Request of Message * @param resp_iov iovector for the Response + * @param timeout timeout in milliseconds, -1UL for no timeout. * @return Pointer of the Response. nullptr for failure. No need to delete. * @note For this call, we don't need to assign buffers for the Response any more. * `resp_iov` will use its internal allocator to fulfill the memory requirement. @@ -114,7 +118,7 @@ namespace rpc */ template typename Operation::Response* call(typename Operation::Request& req, iovector& resp_iov, - uint64_t timeout = -1UL) { + Timeout timeout = {}) { assert(resp_iov.iovcnt() == 0); SerializerIOV reqmsg; reqmsg.serialize(req); @@ -136,7 +140,7 @@ namespace rpc protected: // This call can be invoked concurrently, and may return out-of-order. // Return the number of bytes received. - virtual int do_call(FunctionID function, iovector* request, iovector* response, uint64_t timeout) = 0; + virtual int do_call(FunctionID function, iovector* request, iovector* response, Timeout timeout) = 0; }; class Skeleton : public Object @@ -243,10 +247,23 @@ namespace rpc }; extern "C" Stub* new_rpc_stub(IStream* stream, bool ownership = false); - extern "C" StubPool* new_stub_pool(uint64_t expiration, uint64_t connect_timeout, uint64_t rpc_timeout); - extern "C" StubPool* new_uds_stub_pool(const char* path, uint64_t expiration, - uint64_t connect_timeout, - uint64_t rpc_timeout); + /** + About timeout: + 1. When a socket/stub not used by any caller for `expiration` microsecs, it will be dropped. + 2. When socket connecting, it will fail by be timed out after `connect_timeout` microsecs. + 3. When socket needs to read/write (like sending request and waiting for any response), but got + nothing in `transfer_timeout` microsecs, it will be timed out. It's just a timeout for a single + read/write operation, not a RPC timeout, which is controlled by the caller when invoking `Stub::call`. + 4. `Stub::call` measures the time from invoking `call` before sending request to received + response head. Receiving response body is not considered. + **/ + extern "C" StubPool* new_stub_pool(uint64_t expiration, + uint64_t connect_timeout, + uint64_t transfer_timeout); + extern "C" StubPool* new_uds_stub_pool(const char* path, + uint64_t expiration, + uint64_t connect_timeout, + uint64_t transfer_timeout); extern "C" Skeleton* new_skeleton(uint32_t pool_size = 128); __attribute__((deprecated)) diff --git a/rpc/test/test.cpp b/rpc/test/test.cpp index 86bf890d..dfa877ab 100644 --- a/rpc/test/test.cpp +++ b/rpc/test/test.cpp @@ -485,6 +485,123 @@ TEST_F(RpcTest, passive_shutdown) { GTEST_ASSERT_LT(duration, 3500); } +class RpcServerTimeout { +public: + RpcServerTimeout(Skeleton* skeleton, net::ISocketServer* socket) : m_socket(socket), m_skeleton(skeleton) { + m_skeleton->register_service(this); + m_socket->set_handler({this, &RpcServerTimeout::serve}); + } + struct Operation { + const static uint32_t IID = 0x1; + const static uint32_t FID = 0x2; + struct Request : public photon::rpc::Message { + int code = 0; + PROCESS_FIELDS(code); + }; + struct Response : public photon::rpc::Message { + int code = 0; + PROCESS_FIELDS(code); + }; + }; + + struct OperationT { + const static uint32_t IID = 0x1; + const static uint32_t FID = 0x3; + struct Request : public photon::rpc::Message { + int code = 0; + PROCESS_FIELDS(code); + }; + struct Response : public photon::rpc::Message { + int code = 0; + PROCESS_FIELDS(code); + }; + }; + int do_rpc_service(Operation::Request* req, Operation::Response* resp, IOVector* iov, IStream* stream) { + resp->code = req->code; + return 0; + } + int do_rpc_service(OperationT::Request* req, OperationT::Response* resp, IOVector* iov, IStream* stream) { + resp->code = req->code; + photon::thread_usleep(5UL*1000*1000); + return 0; + } + + int serve(photon::net::ISocketStream* stream) { + return m_skeleton->serve(stream); + } + int run() { + if (m_socket->bind_v4localhost() != 0) + // if (m_socket->bind(9527, net::IPAddr::V6Any()) != 0) + LOG_ERRNO_RETURN(0, -1, "bind failed"); + if (m_socket->listen() != 0) + LOG_ERRNO_RETURN(0, -1, "listen failed"); + m_endpoint = m_socket->getsockname(); + LOG_DEBUG("bound to ", m_endpoint); + return m_socket->start_loop(false); + } + net::ISocketServer* m_socket; + Skeleton* m_skeleton; + photon::net::EndPoint m_endpoint; +}; + +static uint64_t do_call_hb(Stub* stub) { + RpcServerTimeout::Operation::Request req; + RpcServerTimeout::Operation::Response resp; + stub->call(req, resp); + return 0; +} + +TEST_F(RpcTest, timeout_with_hb) { + auto socket_server = photon::net::new_tcp_socket_server(); + GTEST_ASSERT_NE(nullptr, socket_server); + DEFER(delete socket_server); + auto sk = photon::rpc::new_skeleton(); + GTEST_ASSERT_NE(nullptr, sk); + DEFER(delete sk); + + RpcServerTimeout rpc_server(sk, socket_server); + GTEST_ASSERT_EQ(0, rpc_server.run()); + + // photon::net::EndPoint ep(net::IPAddr::V4Loopback(), 9527); + auto& ep = rpc_server.m_endpoint; + auto pool = photon::rpc::new_stub_pool(-1, -1, 1'000'000); + DEFER(delete pool); + auto th1 = photon::thread_enable_join(photon::thread_create11([&]{ + // Should always succeed in 3 seconds + auto stub = pool->get_stub(ep, false); + if (!stub) abort(); + DEFER(pool->put_stub(ep, false)); + Timeout timeout(7'000'000); + while(timeout.expired() > photon::now) { + int ret = do_call_hb(stub); + if (ret < 0) { + LOG_ERROR(VALUE(ret)); + abort(); + } + photon::thread_yield(); + } + })); + photon::thread_yield_to((photon::thread*)th1); + + auto th2 = photon::thread_enable_join(photon::thread_create11([&]{ + // Should get connection refused after 2 seconds. Because socket closed listen fd at 1 second. + auto stub = pool->get_stub(ep, false); + if (!stub) { + abort(); + } + DEFER(pool->put_stub(ep, false)); + RpcServerTimeout::OperationT::Request req; + RpcServerTimeout::OperationT::Response resp; + auto before = photon::now; + auto ret = stub->call(req, resp, 1'000'000); + ERRNO err; + EXPECT_EQ(ret, -1); + EXPECT_LE(photon::now - before, 2'000'000); + })); + + photon::thread_join(th2); + photon::thread_join(th1); +} int main(int argc, char** arg) { ::testing::InitGoogleTest(&argc, arg);