Skip to content

Commit

Permalink
fix RPC stop by timeout during calling
Browse files Browse the repository at this point in the history
  • Loading branch information
Coldwings committed Sep 3, 2024
1 parent b4c75d7 commit 2012ff4
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 35 deletions.
24 changes: 19 additions & 5 deletions rpc/out-of-order-execution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ namespace rpc {
uint64_t m_tag = 0;
bool m_running = true;

// rlock used as both reader lock and wait notifier.
// add yield in lock will break the assuption that threads
// not holding lock should kept in sleep.
// so do not yield, just put into sleep when needed
// make sure it able to wake by interrupts
OooEngine(): m_mutex_r(0) {}

~OooEngine() {
shutdown();
}
Expand All @@ -55,7 +62,7 @@ namespace rpc {
{
m_issuing ++;
DEFER(m_issuing --);
scoped_lock lock(m_mutex_w);
SCOPED_LOCK(m_mutex_w);
if (!m_running)
LOG_ERROR_RETURN(ESHUTDOWN, -1, "engine is been shuting down");
if (!args.flag_tag_valid)
Expand Down Expand Up @@ -89,13 +96,21 @@ namespace rpc {
{
// lock with param 1 means allow entry without lock
// when interuptted
scoped_lock lock(m_mutex_r, 1);
int lockret = m_mutex_r.lock(args.timeout);
ERRNO err;
DEFER(if (lockret == 0) m_mutex_r.unlock());

// when wait_completion returned,
// always have tag removed from the map
// notify the waiting function (like shutdown())
DEFER(m_cond_collected.notify_one());

if (lockret < 0 && err.no == ETIMEDOUT) {
// Timed out so return as failure
m_map.erase(args.tag);
LOG_ERROR_RETURN(ETIMEDOUT, -1, "timeout wait for completion");
}

auto o_tag = args.tag;
{
auto o_it = m_map.find(o_tag);
Expand All @@ -106,13 +121,12 @@ namespace rpc {
{
LOG_ERROR_RETURN(EINVAL, -1, "args tag ` not belong to current thread `", VALUE(args.tag), VALUE(CURRENT));
}
if (o_it->second->collected) {
if (args.collected) {
// my completion has been done
// just collect it, clear the trace,
// then return result
auto ret = o_it->second->ret;
m_map.erase(o_it);
return ret;
return args.ret;
}
}
//Hold the lock, but not get the result.
Expand Down
16 changes: 10 additions & 6 deletions rpc/out-of-order-execution.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ collection of result. The first 2 parts are realized via callbacks.
*/

#pragma once
#include <cinttypes>
#include <photon/common/callback.h>
#include <photon/common/timeout.h>
#include <atomic>

namespace photon{

Expand Down Expand Up @@ -75,17 +76,20 @@ namespace rpc {
// It's guaranteed not to be called concurrently.
CallbackType do_collect;

// whether or not the `tag` field is valid
bool flag_tag_valid = false;

// thread that binding with this argument
thread * th;

// whether the context result is collected
bool collected;
// Timeout for wait
Timeout timeout;

// return value of collection
int ret;

// whether or not the `tag` field is valid
bool flag_tag_valid = false;

// whether the context result is collected
volatile bool collected = false;
};


Expand Down
35 changes: 17 additions & 18 deletions rpc/rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,7 @@ namespace rpc {
FunctionID function;
};
iovector *request, *response;
Timeout timeout;
OooArgs(StubImpl* stub, FunctionID function, iovector* req, iovector* resp, uint64_t timeout_):
timeout(timeout_)
OooArgs(StubImpl* stub, FunctionID function, iovector* req, iovector* resp, Timeout timeout_)
{
request = req;
response = resp;
Expand All @@ -148,14 +146,14 @@ namespace rpc {
do_issue.bind(stub, &StubImpl::do_send);
do_completion.bind(stub, &StubImpl::do_recv_header);
do_collect.bind(stub, &StubImpl::do_recv_body);
timeout = timeout_;
}
};

int do_call(FunctionID function, iovector* request, iovector* response, uint64_t timeout) override {
int do_call(FunctionID function, iovector* request, iovector* response, Timeout tmo) override {
scoped_rwlock rl(m_rwlock, photon::RLOCK);
Timeout tmo(timeout);
if (tmo.expiration() < photon::now) {
LOG_ERROR_RETURN(ETIMEDOUT, -1, "Timed out before rpc start", VALUE(timeout), VALUE(tmo.timeout()));
LOG_ERROR_RETURN(ETIMEDOUT, -1, "Timed out before rpc start", VALUE(tmo.timeout()));
}
int ret = 0;
OooArgs args(this, function, request, response, tmo.timeout());
Expand Down Expand Up @@ -421,12 +419,12 @@ namespace rpc {

class StubPoolImpl : public StubPool {
public:
explicit StubPoolImpl(uint64_t expiration, uint64_t connect_timeout, uint64_t rpc_timeout) {
explicit StubPoolImpl(uint64_t expiration, int64_t connect_timeout, uint64_t transfer_timeout) {
tls_ctx = net::new_tls_context(nullptr, nullptr, nullptr);
tcpclient = net::new_tcp_socket_client();
tcpclient->timeout(connect_timeout);
m_pool = new ObjectCache<net::EndPoint, rpc::Stub*>(expiration);
m_rpc_timeout = rpc_timeout;
m_transfer_timeout = transfer_timeout;
}

~StubPoolImpl() {
Expand Down Expand Up @@ -456,7 +454,7 @@ namespace rpc {
}

uint64_t get_timeout() const override {
return m_rpc_timeout;
return m_transfer_timeout;
}

protected:
Expand All @@ -465,7 +463,7 @@ namespace rpc {
if (!sock)
LOG_ERRNO_RETURN(0, nullptr, "failed to connect to ", ep);
LOG_DEBUG("connected to ", ep);
sock->timeout(m_rpc_timeout);
sock->timeout(m_transfer_timeout);
if (tls) {
sock = net::new_tls_stream(tls_ctx, sock, net::SecurityRole::Client, true);
}
Expand All @@ -475,16 +473,16 @@ namespace rpc {
ObjectCache<net::EndPoint, rpc::Stub*>* m_pool;
net::ISocketClient *tcpclient;
net::TLSContext* tls_ctx = nullptr;
uint64_t m_rpc_timeout;
uint64_t m_transfer_timeout;
};

// dummy pool, for unix domain socket connection to only one point only
// so no-mather what connection in, gets domain socket
class UDSStubPoolImpl : public StubPoolImpl {
public:
explicit UDSStubPoolImpl(const char* path, uint64_t expiration,
uint64_t connect_timeout, uint64_t rpc_timeout)
: StubPoolImpl(expiration, connect_timeout, rpc_timeout),
uint64_t connect_timeout, uint64_t transfer_timeout)
: StubPoolImpl(expiration, connect_timeout, transfer_timeout),
m_path(path), m_client(net::new_uds_client()) {
m_client->timeout(connect_timeout);
}
Expand All @@ -500,7 +498,7 @@ namespace rpc {
LOG_ERRNO_RETURN(0, nullptr,
"Connect to unix domain socket failed");
}
sock->timeout(m_rpc_timeout);
sock->timeout(m_transfer_timeout);
return new_rpc_stub(sock, true);
});
}
Expand All @@ -510,15 +508,16 @@ namespace rpc {
net::ISocketClient * m_client;
};

StubPool* new_stub_pool(uint64_t expiration, uint64_t connect_timeout, uint64_t rpc_timeout) {
return new StubPoolImpl(expiration, connect_timeout, rpc_timeout);
StubPool* new_stub_pool(uint64_t expiration, uint64_t connect_timeout,
uint64_t transfer_timeout) {
return new StubPoolImpl(expiration, connect_timeout, transfer_timeout);
}

StubPool* new_uds_stub_pool(const char* path, uint64_t expiration,
uint64_t connect_timeout,
uint64_t rpc_timeout) {
uint64_t transfer_timeout) {
return new UDSStubPoolImpl(path, expiration, connect_timeout,
rpc_timeout);
transfer_timeout);
}
} // namespace rpc
}
29 changes: 23 additions & 6 deletions rpc/rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,13 @@ namespace rpc
/**
* @param req Request of Message
* @param resp Response of Message
* @param timeout timeout in milliseconds, -1UL for no timeout
* @return The number of bytes received, -1 for failure
* @note Request and Response should assign to external memory buffers if they have variable-length fields.
* Via this, we can achieve zero-copy send and receive.
* For Response, there could be only 1 buffer at most. For Request, there is no limit.
* Attention: RPC stub do not support multi vCPU, when multiple vCPUs are used, the RPC stub should be
* vCPU local object.
*/
template<typename Operation>
int call(typename Operation::Request& req,
Expand Down Expand Up @@ -107,14 +110,15 @@ namespace rpc
/**
* @param req Request of Message
* @param resp_iov iovector for the Response
* @param timeout timeout in milliseconds, -1UL for no timeout.
* @return Pointer of the Response. nullptr for failure. No need to delete.
* @note For this call, we don't need to assign buffers for the Response any more.
* `resp_iov` will use its internal allocator to fulfill the memory requirement.
* The only difference between these two calls is the allocator's overhead.
*/
template<typename Operation>
typename Operation::Response* call(typename Operation::Request& req, iovector& resp_iov,
uint64_t timeout = -1UL) {
Timeout timeout = {}) {
assert(resp_iov.iovcnt() == 0);
SerializerIOV reqmsg;
reqmsg.serialize(req);
Expand All @@ -136,7 +140,7 @@ namespace rpc
protected:
// This call can be invoked concurrently, and may return out-of-order.
// Return the number of bytes received.
virtual int do_call(FunctionID function, iovector* request, iovector* response, uint64_t timeout) = 0;
virtual int do_call(FunctionID function, iovector* request, iovector* response, Timeout timeout) = 0;
};

class Skeleton : public Object
Expand Down Expand Up @@ -243,10 +247,23 @@ namespace rpc
};

extern "C" Stub* new_rpc_stub(IStream* stream, bool ownership = false);
extern "C" StubPool* new_stub_pool(uint64_t expiration, uint64_t connect_timeout, uint64_t rpc_timeout);
extern "C" StubPool* new_uds_stub_pool(const char* path, uint64_t expiration,
uint64_t connect_timeout,
uint64_t rpc_timeout);
/**
About timeout:
1. When a socket/stub not used by any caller for `expiration` microsecs, it will be dropped.
2. When socket connecting, it will fail by be timed out after `connect_timeout` microsecs.
3. When socket needs to read/write (like sending request and waiting for any response), but got
nothing in `transfer_timeout` microsecs, it will be timed out. It's just a timeout for a single
read/write operation, not a RPC timeout, which is controlled by the caller when invoking `Stub::call`.
4. `Stub::call` measures the time from invoking `call` before sending request to received
response head. Receiving response body is not considered.
**/
extern "C" StubPool* new_stub_pool(uint64_t expiration,
uint64_t connect_timeout,
uint64_t transfer_timeout);
extern "C" StubPool* new_uds_stub_pool(const char* path,
uint64_t expiration,
uint64_t connect_timeout,
uint64_t transfer_timeout);
extern "C" Skeleton* new_skeleton(uint32_t pool_size = 128);

__attribute__((deprecated))
Expand Down
Loading

0 comments on commit 2012ff4

Please sign in to comment.