Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc graceful shutdown #291

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/rpc/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ struct ExampleServer {

// Serve provides handler for socket server
int serve(photon::net::ISocketStream* stream) {
return skeleton->serve(stream, false);
return skeleton->serve(stream);
}

void term() {
Expand Down
2 changes: 1 addition & 1 deletion net/basic_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ int connect(int fd, const struct sockaddr *addr, socklen_t addrlen,
if (ret < 0) return -1;
socklen_t n = sizeof(err);
ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &n);
if (ret < 0) return -1;
if (unlikely(ret < 0)) return -1;
if (err) {
errno = err;
return -1;
Expand Down
1 change: 1 addition & 0 deletions net/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ namespace net {
using Handler = Callback<ISocketStream*>;
virtual ISocketServer* set_handler(Handler handler) = 0;
virtual int start_loop(bool block = false) = 0;
// Close the listening fd. It's the user's responsibility to close the active connections.
virtual void terminate() = 0;
};

Expand Down
94 changes: 40 additions & 54 deletions rpc/rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ namespace rpc {
bool got_it;
int* stream_serv_count;
photon::condition_variable *stream_cv;
std::shared_ptr<photon::mutex> w_lock;
photon::mutex* w_lock;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

现在的代码中已经有cv,能够保证mutex在所有request完成后再析构,所以这里不需要用共享指针了。

Context(SkeletonImpl* sk, IStream* s) :
request(sk->m_allocator), stream(s), sk(sk) { }
Expand Down Expand Up @@ -289,6 +289,7 @@ namespace rpc {
}
int response_sender(iovector* resp)
{
assert(w_lock);
Header h;
h.size = (uint32_t)resp->sum();
h.function = header.function;
Expand All @@ -297,13 +298,11 @@ namespace rpc {
resp->push_front(&h, sizeof(h));
if (stream == nullptr)
LOG_ERRNO_RETURN(0, -1, "socket closed ");
if (w_lock) {
w_lock->lock();
}

w_lock->lock();
ssize_t ret = stream->writev(resp->iovec(), resp->iovcnt());
if (w_lock) {
w_lock->unlock();
}
w_lock->unlock();

if (ret < (ssize_t)(sizeof(h) + h.size)) {
stream->shutdown(ShutdownHow::ReadWrite);
LOG_ERRNO_RETURN(0, -1, "failed to send rpc response to stream ", stream);
Expand All @@ -314,43 +313,38 @@ namespace rpc {
condition_variable m_cond_served;
struct ThreadLink : public intrusive_list_node<ThreadLink>
{
photon::thread* thread = nullptr;
photon::thread* thread = photon::CURRENT;
};
intrusive_list<ThreadLink> m_list;
intrusive_list<ThreadLink> m_list; // Stores the thread ID of every stream
uint64_t m_serving_count = 0;
bool m_concurrent;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

删除同步模式,降低代码复杂度

bool m_running = true;
photon::ThreadPoolBase *m_thread_pool;
virtual int serve(IStream* stream, bool ownership) override
virtual int serve(IStream* stream) override
{
if (!m_running)
LOG_ERROR_RETURN(ENOTSUP, -1, "the skeleton has closed");
if (unlikely(!m_running))
return -1;

ThreadLink node;
m_list.push_back(&node);
DEFER(m_list.erase(&node));
DEFER(if (ownership) delete stream;);
// stream serve refcount
int stream_serv_count = 0;
photon::mutex w_lock;
photon::condition_variable stream_cv;
// once serve goint to exit, stream may destruct
// once serve exit, stream will destruct
// make sure all requests relies on this stream are finished
DEFER({
while (stream_serv_count > 0) stream_cv.wait_no_lock();
});
if (stream_accept_notify) stream_accept_notify(stream);
DEFER(if (stream_close_notify) stream_close_notify(stream));
auto w_lock = m_concurrent ? std::make_shared<photon::mutex>() : nullptr;
while(m_running)
{

while(likely(m_running)) {
Context context(this, stream);
context.stream_serv_count = &stream_serv_count;
context.stream_cv = &stream_cv;
context.w_lock = w_lock;
node.thread = CURRENT;
context.w_lock = &w_lock;
int ret = context.read_request();
ERRNO err;
node.thread = nullptr;
if (ret < 0) {
// should only shutdown read, for other threads
// might still writing
Expand All @@ -363,16 +357,11 @@ namespace rpc {
}
}

if (!m_concurrent) {
context.serve_request();
} else {
context.got_it = false;
m_thread_pool->thread_create(&async_serve, &context);
// async_serve will be start, add refcount here
stream_serv_count ++;
while(!context.got_it)
thread_yield_to(nullptr);
}
context.got_it = false;
m_thread_pool->thread_create(&async_serve, &context);
stream_serv_count ++;
while(!context.got_it)
thread_yield();
}
return 0;
}
Expand All @@ -381,48 +370,45 @@ namespace rpc {
auto ctx = (Context*)args_;
Context context(std::move(*ctx));
ctx->got_it = true;
thread_yield_to(nullptr);
thread_yield();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thread_yield_to 第一步会有一个unlikely的判断,thread_yield 是直接调用

context.serve_request();
// serve done, here reduce refcount
(*ctx->stream_serv_count) --;
ctx->stream_cv->notify_all();
return nullptr;
}
virtual int shutdown_no_wait() override {
photon::thread_create11(&SkeletonImpl::shutdown, this);
virtual int shutdown(bool no_more_requests) override {
m_running = !no_more_requests;
while (m_list) {
auto th = m_list.front()->thread;
thread_enable_join(th);
if (no_more_requests) {
thread_interrupt(th);
}
// Wait all streams destructed. Their attached RPC requests are finished as well.
thread_join((join_handle*) th);
}
return 0;
}
virtual int shutdown() override
{
int shutdown_no_wait() override {
m_running = false;
for (const auto& x: m_list)
if (x->thread)
thread_interrupt(x->thread);
// it should confirm that all threads are finished
// or m_list may not destruct correctly
while (m_serving_count > 0) {
// means shutdown called by rpc serve, should return to give chance to shutdown
if ((m_serving_count == 1) && (m_list.front()->thread == nullptr))
return 0;
m_cond_served.wait_no_lock();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

现有代码已经可以保证在stream析构前等待所有requests结束,因此这段可以删了

for (auto* each: m_list) {
thread_interrupt(each->thread);
}
while (!m_list.empty())
thread_usleep(1000);
return 0;
}
virtual ~SkeletonImpl() {
shutdown();
shutdown(true);
photon::delete_thread_pool(m_thread_pool);
}
explicit SkeletonImpl(bool concurrent = true, uint32_t pool_size = 128)
: m_concurrent(concurrent),
explicit SkeletonImpl(uint32_t pool_size = 128) :
m_thread_pool(photon::new_thread_pool(pool_size)) {
m_thread_pool->enable_autoscale();
}
};
Skeleton* new_skeleton(bool concurrent, uint32_t pool_size)
Skeleton* new_skeleton(uint32_t pool_size)
{
return new SkeletonImpl(concurrent, pool_size);
return new SkeletonImpl(pool_size);
}

class StubPoolImpl : public StubPool {
Expand Down
24 changes: 19 additions & 5 deletions rpc/rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,25 @@ namespace rpc
virtual int set_close_notify(Notifier notifier) = 0;

// can be invoked concurrently by multiple threads
// if have the ownership of stream, `serve` will delete it before exit
virtual int serve(IStream* stream, bool ownership_stream = false) = 0;
virtual int serve(IStream* stream) = 0;

__attribute__((deprecated))
int serve(IStream* stream, bool /*ownership_stream*/) {
return serve(stream);
}

// set the allocator to allocate memory for recving responses
// the default allocator is defined in iovector.h/cpp
virtual void set_allocator(IOAlloc allocation) = 0;

virtual int shutdown_no_wait() = 0;
Copy link
Collaborator

@lihuiba lihuiba Dec 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

int shutdown_no_wait(bool no_more_requests = true) {
    return shutdown(no_more_requests, false);
}

/**
* @brief Shutdown the rpc server from outside.
* @warning DO NOT invoke this function within the RPC request.
* You should create a thread to invoke it, or just use shutdown_no_wait.
*/
virtual int shutdown(bool no_more_requests = true) = 0;

virtual int shutdown() = 0;
virtual int shutdown_no_wait() = 0;

template <class ServerClass>
int register_service(ServerClass* obj)
Expand Down Expand Up @@ -238,7 +247,12 @@ namespace rpc
extern "C" StubPool* new_uds_stub_pool(const char* path, uint64_t expiration,
uint64_t connect_timeout,
uint64_t rpc_timeout);
extern "C" Skeleton* new_skeleton(bool concurrent = true, uint32_t pool_size = 128);
extern "C" Skeleton* new_skeleton(uint32_t pool_size = 128);

__attribute__((deprecated))
inline Skeleton* new_skeleton(bool /*concurrent*/, uint32_t pool_size = 128) {
return new_skeleton(pool_size);
}

struct __example__operation1__ // defination of operator
{
Expand Down
2 changes: 1 addition & 1 deletion rpc/test/test-rpc-message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class TestRPCServer {
}

int serve(photon::net::ISocketStream* stream) {
return skeleton->serve(stream, false);
return skeleton->serve(stream);
}

int run() {
Expand Down
Loading