Skip to content

Commit

Permalink
RPC support graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
beef9999 committed Dec 11, 2023
1 parent 3ae9e95 commit e1eaa1a
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 80 deletions.
2 changes: 1 addition & 1 deletion examples/rpc/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ struct ExampleServer {

// Serve provides handler for socket server
int serve(photon::net::ISocketStream* stream) {
return skeleton->serve(stream, false);
return skeleton->serve(stream);
}

void term() {
Expand Down
2 changes: 1 addition & 1 deletion net/basic_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ int connect(int fd, const struct sockaddr *addr, socklen_t addrlen,
if (ret < 0) return -1;
socklen_t n = sizeof(err);
ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &n);
if (ret < 0) return -1;
if (unlikely(ret < 0)) return -1;
if (err) {
errno = err;
return -1;
Expand Down
1 change: 1 addition & 0 deletions net/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ namespace net {
using Handler = Callback<ISocketStream*>;
virtual ISocketServer* set_handler(Handler handler) = 0;
virtual int start_loop(bool block = false) = 0;
// Close the listening fd. It's the user's responsibility to close the active connections.
virtual void terminate() = 0;
};

Expand Down
96 changes: 39 additions & 57 deletions rpc/rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ namespace rpc {
bool got_it;
int* stream_serv_count;
photon::condition_variable *stream_cv;
std::shared_ptr<photon::mutex> w_lock;
photon::mutex* w_lock;

Context(SkeletonImpl* sk, IStream* s) :
request(sk->m_allocator), stream(s), sk(sk) { }
Expand Down Expand Up @@ -289,6 +289,7 @@ namespace rpc {
}
int response_sender(iovector* resp)
{
assert(w_lock);
Header h;
h.size = (uint32_t)resp->sum();
h.function = header.function;
Expand All @@ -297,13 +298,11 @@ namespace rpc {
resp->push_front(&h, sizeof(h));
if (stream == nullptr)
LOG_ERRNO_RETURN(0, -1, "socket closed ");
if (w_lock) {
w_lock->lock();
}

w_lock->lock();
ssize_t ret = stream->writev(resp->iovec(), resp->iovcnt());
if (w_lock) {
w_lock->unlock();
}
w_lock->unlock();

if (ret < (ssize_t)(sizeof(h) + h.size)) {
stream->shutdown(ShutdownHow::ReadWrite);
LOG_ERRNO_RETURN(0, -1, "failed to send rpc response to stream ", stream);
Expand All @@ -314,43 +313,38 @@ namespace rpc {
condition_variable m_cond_served;
struct ThreadLink : public intrusive_list_node<ThreadLink>
{
photon::thread* thread = nullptr;
photon::thread* thread = photon::CURRENT;
};
intrusive_list<ThreadLink> m_list;
intrusive_list<ThreadLink> m_list; // Stores the thread ID of every stream
uint64_t m_serving_count = 0;
bool m_concurrent;
bool m_running = true;
photon::ThreadPoolBase *m_thread_pool;
virtual int serve(IStream* stream, bool ownership) override
virtual int serve(IStream* stream) override
{
if (!m_running)
LOG_ERROR_RETURN(ENOTSUP, -1, "the skeleton has closed");
if (unlikely(!m_running))
return -1;

ThreadLink node;
m_list.push_back(&node);
DEFER(m_list.erase(&node));
DEFER(if (ownership) delete stream;);
// stream serve refcount
int stream_serv_count = 0;
photon::mutex w_lock;
photon::condition_variable stream_cv;
// once serve goint to exit, stream may destruct
// once serve exit, stream will destruct
// make sure all requests relies on this stream are finished
DEFER({
while (stream_serv_count > 0) stream_cv.wait_no_lock();
});
if (stream_accept_notify) stream_accept_notify(stream);
DEFER(if (stream_close_notify) stream_close_notify(stream));
auto w_lock = m_concurrent ? std::make_shared<photon::mutex>() : nullptr;
while(m_running)
{

while(likely(m_running)) {
Context context(this, stream);
context.stream_serv_count = &stream_serv_count;
context.stream_cv = &stream_cv;
context.w_lock = w_lock;
node.thread = CURRENT;
context.w_lock = &w_lock;
int ret = context.read_request();
ERRNO err;
node.thread = nullptr;
if (ret < 0) {
// should only shutdown read, for other threads
// might still writing
Expand All @@ -363,16 +357,11 @@ namespace rpc {
}
}

if (!m_concurrent) {
context.serve_request();
} else {
context.got_it = false;
m_thread_pool->thread_create(&async_serve, &context);
// async_serve will be start, add refcount here
stream_serv_count ++;
while(!context.got_it)
thread_yield_to(nullptr);
}
context.got_it = false;
m_thread_pool->thread_create(&async_serve, &context);
stream_serv_count ++;
while(!context.got_it)
thread_yield();
}
return 0;
}
Expand All @@ -381,48 +370,41 @@ namespace rpc {
auto ctx = (Context*)args_;
Context context(std::move(*ctx));
ctx->got_it = true;
thread_yield_to(nullptr);
thread_yield();
context.serve_request();
// serve done, here reduce refcount
(*ctx->stream_serv_count) --;
ctx->stream_cv->notify_all();
return nullptr;
}
virtual int shutdown_no_wait() override {
photon::thread_create11(&SkeletonImpl::shutdown, this);
return 0;
}
virtual int shutdown() override
{
m_running = false;
for (const auto& x: m_list)
if (x->thread)
thread_interrupt(x->thread);
// it should confirm that all threads are finished
// or m_list may not destruct correctly
while (m_serving_count > 0) {
// means shutdown called by rpc serve, should return to give chance to shutdown
if ((m_serving_count == 1) && (m_list.front()->thread == nullptr))
return 0;
m_cond_served.wait_no_lock();
virtual int shutdown(bool no_more_requests) override {
if (no_more_requests)
m_running = false;
while (m_list) {
thread_enable_join(m_list.front()->thread);
if (no_more_requests) {
thread_interrupt(m_list.front()->thread);
}
// Wait all streams destructed. Their attached RPC requests are finished as well.
thread_join((join_handle*) m_list.front()->thread);
}
while (!m_list.empty())
thread_usleep(1000);
return 0;
}
void shutdown_no_wait(bool no_more_requests) override {
thread_create11(&SkeletonImpl::shutdown, this, no_more_requests);
}
virtual ~SkeletonImpl() {
shutdown();
shutdown(true);
photon::delete_thread_pool(m_thread_pool);
}
explicit SkeletonImpl(bool concurrent = true, uint32_t pool_size = 128)
: m_concurrent(concurrent),
explicit SkeletonImpl(uint32_t pool_size = 128) :
m_thread_pool(photon::new_thread_pool(pool_size)) {
m_thread_pool->enable_autoscale();
}
};
Skeleton* new_skeleton(bool concurrent, uint32_t pool_size)
Skeleton* new_skeleton(uint32_t pool_size)
{
return new SkeletonImpl(concurrent, pool_size);
return new SkeletonImpl(pool_size);
}

class StubPoolImpl : public StubPool {
Expand Down
22 changes: 17 additions & 5 deletions rpc/rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,25 @@ namespace rpc
virtual int set_close_notify(Notifier notifier) = 0;

// can be invoked concurrently by multiple threads
// if have the ownership of stream, `serve` will delete it before exit
virtual int serve(IStream* stream, bool ownership_stream = false) = 0;
virtual int serve(IStream* stream) = 0;

[[deprecated]]
int serve(IStream* stream, bool /*ownership_stream*/) {
return serve(stream);
}

// set the allocator to allocate memory for recving responses
// the default allocator is defined in iovector.h/cpp
virtual void set_allocator(IOAlloc allocation) = 0;

virtual int shutdown_no_wait() = 0;
/**
* @brief Shutdown the rpc server from outside.
* @warning DO NOT invoke this function within the RPC request.
* You should create a thread to invoke it, or just use shutdown_no_wait.
*/
virtual int shutdown(bool no_more_requests = true) = 0;

virtual int shutdown() = 0;
virtual void shutdown_no_wait(bool no_more_requests = true) = 0;

template <class ServerClass>
int register_service(ServerClass* obj)
Expand Down Expand Up @@ -238,7 +247,10 @@ namespace rpc
extern "C" StubPool* new_uds_stub_pool(const char* path, uint64_t expiration,
uint64_t connect_timeout,
uint64_t rpc_timeout);
extern "C" Skeleton* new_skeleton(bool concurrent = true, uint32_t pool_size = 128);
extern "C" Skeleton* new_skeleton(uint32_t pool_size = 128);
[[deprecated]] inline Skeleton* new_skeleton(bool /*concurrent*/, uint32_t pool_size = 128) {
return new_skeleton(pool_size);
}

struct __example__operation1__ // defination of operator
{
Expand Down
2 changes: 1 addition & 1 deletion rpc/test/test-rpc-message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class TestRPCServer {
}

int serve(photon::net::ISocketStream* stream) {
return skeleton->serve(stream, false);
return skeleton->serve(stream);
}

int run() {
Expand Down
Loading

0 comments on commit e1eaa1a

Please sign in to comment.