Skip to content

Commit

Permalink
fix: make pika compactible with redis-sentinel (#2854)
Browse files Browse the repository at this point in the history
* 1. make pika support redis sentinel
2. support client kill type pubsub/normal
3. ensure fd is removed in epoll if server wanna close fd

* fix exit process:
1. ensure NetWork Thread(Dispacher) can be stopped in time
2. ensure all queued Async WriteDB task can be done before exit

---------

Co-authored-by: chejinge <945997690@qq.com>
  • Loading branch information
2 people authored and brother-jin committed Aug 13, 2024
1 parent 88f8d70 commit 6d0a681
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 22 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/pika.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:

- name: Unit Test
working-directory: ${{ github.workspace }}
run: ./pikatests.sh all
run: ./pikatests.sh all clean

# master on port 9221, slave on port 9231, all with 2 db
- name: Start codis, pika master and pika slave
Expand Down Expand Up @@ -203,7 +203,7 @@ jobs:

- name: Unit Test
working-directory: ${{ github.workspace }}
run: ./pikatests.sh all
run: ./pikatests.sh all clean

- name: Start codis, pika master and pika slave
working-directory: ${{ github.workspace }}/build
Expand Down Expand Up @@ -265,7 +265,7 @@ jobs:
- name: Unit Test
working-directory: ${{ github.workspace }}
run: |
./pikatests.sh all
./pikatests.sh all clean
- name: Start codis, pika master and pika slave
working-directory: ${{ github.workspace }}/build
Expand Down
6 changes: 4 additions & 2 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,10 @@ class ClientCmd : public Cmd {
Cmd* Clone() override { return new ClientCmd(*this); }

private:
std::string operation_, info_;
const static std::string KILLTYPE_NORMAL;
const static std::string KILLTYPE_PUBSUB;

std::string operation_, info_, kill_type_;
void DoInitial() override;
};

Expand All @@ -266,7 +269,6 @@ class InfoCmd : public Cmd {
kInfoCommandStats,
kInfoCache
};

InfoCmd(const std::string& name, int arity, uint32_t flag) : Cmd(name, arity, flag) {}
void Do() override;
void Split(const HintKeys& hint_keys) override {};
Expand Down
2 changes: 1 addition & 1 deletion include/pika_dispatch_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class PikaDispatchThread {
int max_conn_rbuf_size);
~PikaDispatchThread();
int StartThread();

void StopThread();
uint64_t ThreadClientList(std::vector<ClientInfo>* clients);

bool ClientKill(const std::string& ip_port);
Expand Down
6 changes: 6 additions & 0 deletions include/pika_repl_bgworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ class PikaReplBgWorker {
explicit PikaReplBgWorker(int queue_size);
int StartThread();
int StopThread();
int TaskQueueSize() {
int pri_size = 0;
int qu_size = 0;
bg_thread_.QueueSize(&pri_size, &qu_size);
return pri_size + qu_size;
}
void Schedule(net::TaskFunc func, void* arg);
void Schedule(net::TaskFunc func, void* arg, std::function<void()>& call_back);
static void HandleBGWorkerWriteBinlog(void* arg);
Expand Down
2 changes: 2 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ class PikaServer : public pstd::noncopyable {
void ClientKillAll();
int ClientKill(const std::string& ip_port);
int64_t ClientList(std::vector<ClientInfo>* clients = nullptr);
void ClientKillPubSub();
void ClientKillAllNormal();

/*
* Monitor used
Expand Down
4 changes: 3 additions & 1 deletion src/net/include/net_pubsub.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,20 @@ class PubSubThread : public Thread {
bool IsReady(int fd);
int ClientPubSubChannelSize(const std::shared_ptr<NetConn>& conn);
int ClientPubSubChannelPatternSize(const std::shared_ptr<NetConn>& conn);
void NotifyCloseAllConns();

private:
void RemoveConn(const std::shared_ptr<NetConn>& conn);
void CloseConn(const std::shared_ptr<NetConn>& conn);

void CloseAllConns();
int ClientChannelSize(const std::shared_ptr<NetConn>& conn);

int msg_pfd_[2];
bool should_exit_;

mutable pstd::RWMutex rwlock_; /* For external statistics */
std::map<int, std::shared_ptr<ConnHandle>> conns_;
std::atomic<bool> close_all_conn_sig_{false};

pstd::Mutex pub_mutex_;
pstd::CondVar receiver_rsignal_;
Expand Down
31 changes: 30 additions & 1 deletion src/net/src/net_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,34 @@ void PubSubThread::RemoveConn(const std::shared_ptr<NetConn>& conn) {
}

void PubSubThread::CloseConn(const std::shared_ptr<NetConn>& conn) {
CloseFd(conn);
net_multiplexer_->NetDelEvent(conn->fd(), 0);
CloseFd(conn);
{
std::lock_guard l(rwlock_);
conns_.erase(conn->fd());
}
}

void PubSubThread::CloseAllConns() {
{
std::lock_guard l(channel_mutex_);
pubsub_channel_.clear();
}
{
std::lock_guard l(pattern_mutex_);
pubsub_pattern_.clear();
}
{
std::lock_guard l(rwlock_);
for (auto& pair : conns_) {
net_multiplexer_->NetDelEvent(pair.second->conn->fd(), 0);
CloseFd(pair.second->conn);
}
std::map<int, std::shared_ptr<ConnHandle>> tmp;
conns_.swap(tmp);
}
}

int PubSubThread::Publish(const std::string& channel, const std::string& msg) {
// TODO(LIBA-S): change the Publish Mode to Asynchronous
std::lock_guard lk(pub_mutex_);
Expand Down Expand Up @@ -414,6 +434,12 @@ void* PubSubThread::ThreadMain() {
char triger[1];

while (!should_stop()) {

if (close_all_conn_sig_.load()) {
close_all_conn_sig_.store(false);
CloseAllConns();
}

nfds = net_multiplexer_->NetPoll(NET_CRON_INTERVAL);
for (int i = 0; i < nfds; i++) {
pfe = (net_multiplexer_->FiredEvents()) + i;
Expand Down Expand Up @@ -585,4 +611,7 @@ void PubSubThread::Cleanup() {
}
conns_.clear();
}
void PubSubThread::NotifyCloseAllConns() {
close_all_conn_sig_.store(true);
}
}; // namespace net
28 changes: 26 additions & 2 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -762,8 +762,15 @@ void ClientCmd::DoInitial() {
res_.SetRes(CmdRes::kErrOther, "Syntax error, try CLIENT (LIST [order by [addr|idle])");
return;
}
} else if ((strcasecmp(argv_[1].data(), "kill") == 0) && argv_.size() == 3) {
} else if (argv_.size() == 3 && (strcasecmp(argv_[1].data(), "kill") == 0)) {
info_ = argv_[2];
} else if (argv_.size() == 4 &&
(strcasecmp(argv_[1].data(), "kill") == 0) &&
(strcasecmp(argv_[2].data(), "type") == 0) &&
((strcasecmp(argv_[3].data(), KILLTYPE_NORMAL.data()) == 0) || (strcasecmp(argv_[3].data(), KILLTYPE_PUBSUB.data()) == 0))) {
//kill all if user wanna kill a type
info_ = "type";
kill_type_ = argv_[3];
} else {
res_.SetRes(CmdRes::kErrOther, "Syntax error, try CLIENT (LIST [order by [addr|idle]| KILL ip:port)");
return;
Expand Down Expand Up @@ -813,6 +820,16 @@ void ClientCmd::Do() {
} else if ((strcasecmp(operation_.data(), "kill") == 0) && (strcasecmp(info_.data(), "all") == 0)) {
g_pika_server->ClientKillAll();
res_.SetRes(CmdRes::kOk);
} else if ((strcasecmp(operation_.data(), "kill") == 0) && (strcasecmp(info_.data(), "type") == 0)) {
if (kill_type_ == KILLTYPE_NORMAL) {
g_pika_server->ClientKillAllNormal();
res_.SetRes(CmdRes::kOk);
} else if (kill_type_ == KILLTYPE_PUBSUB) {
g_pika_server->ClientKillPubSub();
res_.SetRes(CmdRes::kOk);
} else {
res_.SetRes(CmdRes::kErrOther, "kill type is unknown");
}
} else if (g_pika_server->ClientKill(info_) == 1) {
res_.SetRes(CmdRes::kOk);
} else {
Expand Down Expand Up @@ -867,6 +884,10 @@ const std::string InfoCmd::kDebugSection = "debug";
const std::string InfoCmd::kCommandStatsSection = "commandstats";
const std::string InfoCmd::kCacheSection = "cache";


const std::string ClientCmd::KILLTYPE_NORMAL = "normal";
const std::string ClientCmd::KILLTYPE_PUBSUB = "pubsub";

void InfoCmd::Execute() {
std::shared_ptr<DB> db = g_pika_server->GetDB(db_name_);
Do();
Expand Down Expand Up @@ -1278,6 +1299,7 @@ void InfoCmd::InfoReplication(std::string& info) {
Status s;
uint32_t filenum = 0;
uint64_t offset = 0;
uint64_t slave_repl_offset = 0;
std::string safety_purge;
std::shared_ptr<SyncMasterDB> master_db = nullptr;
for (const auto& t_item : g_pika_server->dbs_) {
Expand All @@ -1289,11 +1311,13 @@ void InfoCmd::InfoReplication(std::string& info) {
continue;
}
master_db->Logger()->GetProducerStatus(&filenum, &offset);
slave_repl_offset += static_cast<uint64_t>(filenum) * static_cast<uint64_t>(g_pika_conf->binlog_file_size());
slave_repl_offset += offset;
tmp_stream << db_name << ":binlog_offset=" << filenum << " " << offset;
s = master_db->GetSafetyPurgeBinlog(&safety_purge);
tmp_stream << ",safety_purge=" << (s.ok() ? safety_purge : "error") << "\r\n";
}

tmp_stream << "slave_repl_offset:" << slave_repl_offset << "\r\n";
info.append(tmp_stream.str());
}

Expand Down
4 changes: 4 additions & 0 deletions src/pika_dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ void PikaDispatchThread::UnAuthUserAndKillClient(const std::set<std::string>& us
}
}

void PikaDispatchThread::StopThread() {
thread_rep_->StopThread();
}

bool PikaDispatchThread::Handles::AccessHandle(std::string& ip) const {
if (ip == "127.0.0.1") {
ip = g_pika_server->host();
Expand Down
17 changes: 17 additions & 0 deletions src/pika_repl_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,23 @@ int PikaReplClient::Stop() {
for (auto & binlog_worker : write_binlog_workers_) {
binlog_worker->StopThread();
}

// write DB task is async task, we must wait all writeDB task done and then to exit
// or some data will be loss
bool all_write_db_task_done = true;
do {
for (auto &db_worker: write_db_workers_) {
if (db_worker->TaskQueueSize() != 0) {
all_write_db_task_done = false;
std::this_thread::sleep_for(std::chrono::microseconds(300));
break;
} else {
all_write_db_task_done = true;
}
}
//if there are unfinished async write db task, just continue to wait
} while (!all_write_db_task_done);

for (auto &db_worker: write_db_workers_) {
db_worker->StopThread();
}
Expand Down
31 changes: 19 additions & 12 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,18 @@ PikaServer::PikaServer()

acl_ = std::make_unique<::Acl>();
SetSlowCmdThreadPoolFlag(g_pika_conf->slow_cmd_pool());
bgsave_thread_.set_thread_name("PikaServer::bgsave_thread_");
purge_thread_.set_thread_name("PikaServer::purge_thread_");
bgslots_cleanup_thread_.set_thread_name("PikaServer::bgslots_cleanup_thread_");
common_bg_thread_.set_thread_name("PikaServer::common_bg_thread_");
key_scan_thread_.set_thread_name("PikaServer::key_scan_thread_");
}

PikaServer::~PikaServer() {
rsync_server_->Stop();
// DispatchThread will use queue of worker thread,
// so we need to delete dispatch before worker.
// DispatchThread will use queue of worker thread
// so we need to Stop dispatch before worker.
pika_dispatch_thread_->StopThread();
pika_client_processor_->Stop();
pika_slow_cmd_thread_pool_->stop_thread_pool();
pika_admin_cmd_thread_pool_->stop_thread_pool();
Expand Down Expand Up @@ -805,13 +811,11 @@ size_t PikaServer::SlowCmdThreadPoolMaxQueueSize() {
}

void PikaServer::BGSaveTaskSchedule(net::TaskFunc func, void* arg) {
bgsave_thread_.set_thread_name("BGSaveTask");
bgsave_thread_.StartThread();
bgsave_thread_.Schedule(func, arg);
}

void PikaServer::PurgelogsTaskSchedule(net::TaskFunc func, void* arg) {
purge_thread_.set_thread_name("PurgelogsTask");
purge_thread_.StartThread();
purge_thread_.Schedule(func, arg);
}
Expand All @@ -823,7 +827,6 @@ void PikaServer::PurgeDir(const std::string& path) {


void PikaServer::PurgeDirTaskSchedule(void (*function)(void*), void* arg) {
purge_thread_.set_thread_name("PurgeDirTask");
purge_thread_.StartThread();
purge_thread_.Schedule(function, arg);
}
Expand Down Expand Up @@ -875,12 +878,21 @@ void PikaServer::TryDBSync(const std::string& ip, int port, const std::string& d
}

void PikaServer::KeyScanTaskSchedule(net::TaskFunc func, void* arg) {
key_scan_thread_.set_thread_name("KeyScanTask");
key_scan_thread_.StartThread();
key_scan_thread_.Schedule(func, arg);
}

void PikaServer::ClientKillAll() { pika_dispatch_thread_->ClientKillAll(); }
void PikaServer::ClientKillAll() {
pika_dispatch_thread_->ClientKillAll();
pika_pubsub_thread_->NotifyCloseAllConns();
}

void PikaServer::ClientKillPubSub() { pika_pubsub_thread_->NotifyCloseAllConns();
}

void PikaServer::ClientKillAllNormal() {
pika_dispatch_thread_->ClientKillAll();
}

int PikaServer::ClientKill(const std::string& ip_port) {
if (pika_dispatch_thread_->ClientKill(ip_port)) {
Expand Down Expand Up @@ -1576,7 +1588,6 @@ void PikaServer::Bgslotsreload(const std::shared_ptr<DB>& db) {
LOG(INFO) << "Start slot reloading";

// Start new thread if needed
bgsave_thread_.set_thread_name("SlotsReload");
bgsave_thread_.StartThread();
bgsave_thread_.Schedule(&DoBgslotsreload, static_cast<void*>(this));
}
Expand Down Expand Up @@ -1644,7 +1655,6 @@ void PikaServer::Bgslotscleanup(std::vector<int> cleanupSlots, const std::shared
LOG(INFO) << "Start slot cleanup, slots: " << slotsStr << std::endl;

// Start new thread if needed
bgslots_cleanup_thread_.set_thread_name("SlotsCleanup");
bgslots_cleanup_thread_.StartThread();
bgslots_cleanup_thread_.Schedule(&DoBgslotscleanup, static_cast<void*>(this));
}
Expand Down Expand Up @@ -1749,7 +1759,6 @@ void DoBgslotscleanup(void* arg) {
void PikaServer::ResetCacheAsync(uint32_t cache_num, std::shared_ptr<DB> db, cache::CacheConfig *cache_cfg) {
if (PIKA_CACHE_STATUS_OK == db->cache()->CacheStatus()
|| PIKA_CACHE_STATUS_NONE == db->cache()->CacheStatus()) {
common_bg_thread_.set_thread_name("ResetCacheTask");
common_bg_thread_.StartThread();
BGCacheTaskArg *arg = new BGCacheTaskArg();
arg->db = db;
Expand All @@ -1773,7 +1782,6 @@ void PikaServer::ClearCacheDbAsync(std::shared_ptr<DB> db) {
LOG(WARNING) << "can not clear cache in status: " << db->cache()->CacheStatus();
return;
}
common_bg_thread_.set_thread_name("CacheClearThread");
common_bg_thread_.StartThread();
BGCacheTaskArg *arg = new BGCacheTaskArg();
arg->db = db;
Expand Down Expand Up @@ -1841,7 +1849,6 @@ void PikaServer::ClearCacheDbAsyncV2(std::shared_ptr<DB> db) {
LOG(WARNING) << "can not clear cache in status: " << db->cache()->CacheStatus();
return;
}
common_bg_thread_.set_thread_name("V2CacheClearThread");
common_bg_thread_.StartThread();
BGCacheTaskArg *arg = new BGCacheTaskArg();
arg->db = db;
Expand Down

0 comments on commit 6d0a681

Please sign in to comment.