Skip to content

Commit

Permalink
feat: add log-net-activities configuration (#2964)
Browse files Browse the repository at this point in the history
* add log-level to config if pika should log the activities of Net connctions
---------

Co-authored-by: buzhimingyonghu <42060366+buzhimingyonghu@users.noreply.github.com>
  • Loading branch information
cheniujh and buzhimingyonghu authored Dec 16, 2024
1 parent 0906644 commit 8ce0cb3
Show file tree
Hide file tree
Showing 11 changed files with 71 additions and 23 deletions.
10 changes: 8 additions & 2 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
###########################

# Pika port, the default value is 9221.
# [NOTICE] Port Magic offsets of port+1000 / port+2000 are used by Pika at present.
# Port 10221 is used for Rsync, and port 11221 is used for Replication, while the listening port is 9221.
# [NOTICE] Port Magic offsets of port+1000 / port+10000 are used by Pika at present.
# Port 9221+10000 is used for Rsync, and port 9221+1000 is used for incr Replication, while the listening port is 9221.
port : 9221

db-instance-num : 3
Expand Down Expand Up @@ -74,6 +74,12 @@ log-path : ./log/
# The unit of serverlogs is in [days] and the default value is 7(days).
log-retention-time : 7

# log-net-activities can be config as yes or no, if an invalid value is given, normal will be auto set to no.
# when log-net-activities is yes, connection activities will be logged.
# Default log-net-activities value is no.
# [NOTICE] you can use config set command to change log-net-activities dynamically.
log-net-activities : no

# Directory to store the data of Pika.
db-path : ./db/

Expand Down
19 changes: 11 additions & 8 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,8 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return log_retention_time_;
}
std::string log_level() {
std::shared_lock l(rwlock_);
return log_level_;
bool log_net_activities() {
return log_net_activities_.load(std::memory_order::memory_order_relaxed);
}
std::string db_path() {
std::shared_lock l(rwlock_);
Expand Down Expand Up @@ -831,10 +830,13 @@ class PikaConf : public pstd::BaseConf {
max_compaction_bytes_ = value;
}

void SetLogLevel(const std::string& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("loglevel", value);
log_level_ = value;
void SetLogNetActivities(std::string& value) {
TryPushDiffCommands("log-net-activities", value);
if (value == "yes") {
log_net_activities_.store(true);
} else {
log_net_activities_.store(false);
}
}

// Rsync Rate limiting configuration
Expand Down Expand Up @@ -958,7 +960,6 @@ class PikaConf : public pstd::BaseConf {
std::string slaveof_;
std::string log_path_;
int log_retention_time_;
std::string log_level_;
std::string db_path_;
int db_instance_num_ = 0;
std::string db_sync_path_;
Expand Down Expand Up @@ -1099,6 +1100,8 @@ class PikaConf : public pstd::BaseConf {
std::atomic_int cache_maxmemory_policy_ = 1;
std::atomic_int cache_maxmemory_samples_ = 5;
std::atomic_int cache_lfu_decay_time_ = 1;
std::atomic<bool> log_net_activities_ = false;


// rocksdb blob
bool enable_blob_files_ = false;
Expand Down
2 changes: 1 addition & 1 deletion include/pika_dispatch_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class PikaDispatchThread {

bool ClientKill(const std::string& ip_port);
void ClientKillAll();

void SetLogNetActivities(bool value);
void SetQueueLimit(int queue_limit) { thread_rep_->SetQueueLimit(queue_limit); }

void UnAuthUserAndKillClient(const std::set<std::string> &users, const std::shared_ptr<User>& defaultUser);
Expand Down
2 changes: 1 addition & 1 deletion include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ class PikaServer : public pstd::noncopyable {
void CacheConfigInit(cache::CacheConfig &cache_cfg);
void ProcessCronTask();
double HitRatio();

void SetLogNetActivities(bool value);
/*
* disable compact
*/
Expand Down
4 changes: 4 additions & 0 deletions src/net/include/server_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ class ServerThread : public Thread {

int SetTcpNoDelay(int connfd);

void SetLogNetActivities(bool value);

/*
* StartThread will return the error code as pthread_create
* Return 0 if success
Expand Down Expand Up @@ -167,6 +169,8 @@ class ServerThread : public Thread {
*/
std::unique_ptr<NetMultiplexer> net_multiplexer_;

std::atomic<bool> log_net_activities_{false};

private:
friend class HolyThread;
friend class DispatchThread;
Expand Down
10 changes: 7 additions & 3 deletions src/net/src/dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,19 @@ void DispatchThread::HandleNewConn(const int connfd, const std::string& ip_port)
// Slow workers may consume many fds.
// We simply loop to find next legal worker.
NetItem ti(connfd, ip_port);
LOG(INFO) << "accept new conn " << ti.String();
if (log_net_activities_.load(std::memory_order::memory_order_relaxed)) {
LOG(INFO) << "accept new conn " << ti.String();
}
int next_thread = last_thread_;
bool find = false;
for (int cnt = 0; cnt < work_num_; cnt++) {
std::unique_ptr<WorkerThread>& worker_thread = worker_thread_[next_thread];
find = worker_thread->MoveConnIn(ti, false);
if (find) {
last_thread_ = (next_thread + 1) % work_num_;
LOG(INFO) << "find worker(" << next_thread << "), refresh the last_thread_ to " << last_thread_;
if (log_net_activities_.load(std::memory_order::memory_order_relaxed)) {
LOG(INFO) << "find worker(" << next_thread << "), refresh the last_thread_ to " << last_thread_;
}
break;
}
next_thread = (next_thread + 1) % work_num_;
Expand Down Expand Up @@ -189,7 +193,7 @@ void DispatchThread::CleanWaitNodeOfUnBlockedBlrConn(std::shared_ptr<net::RedisC
// removed all the waiting info of this conn/ doing cleaning work
auto pair = blocked_conn_to_keys_.find(conn_unblocked->fd());
if (pair == blocked_conn_to_keys_.end()) {
LOG(WARNING) << "blocking info of blpop/brpop went wrong, blpop/brpop can't working correctly";
LOG(ERROR) << "blocking info of blpop/brpop went wrong, blpop/brpop can't working correctly";
return;
}
auto& blpop_keys_list = pair->second;
Expand Down
4 changes: 4 additions & 0 deletions src/net/src/server_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ void* ServerThread::ThreadMain() {
return nullptr;
}

void ServerThread::SetLogNetActivities(bool value) {
log_net_activities_.store(value, std::memory_order::memory_order_relaxed);
}

#ifdef __ENABLE_SSL
static std::vector<std::unique_ptr<pstd::Mutex>> ssl_mutex_;

Expand Down
26 changes: 19 additions & 7 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1597,6 +1597,13 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeNumber(&config_body, g_pika_conf->log_retention_time());
}

if (pstd::stringmatch(pattern.data(), "log-net-activities", 1) != 0) {
elements += 2;
EncodeString(&config_body, "log-net-activities");
auto output_str = g_pika_conf->log_net_activities() ? "yes" : "no";
EncodeString(&config_body, output_str);
}

if (pstd::stringmatch(pattern.data(), "thread-num", 1) != 0) {
elements += 2;
EncodeString(&config_body, "thread-num");
Expand Down Expand Up @@ -2167,12 +2174,6 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeString(&config_body, g_pika_conf->enable_blob_garbage_collection() ? "yes" : "no");
}

if (pstd::stringmatch(pattern.data(), "loglevel", 1) != 0) {
elements += 2;
EncodeString(&config_body, "loglevel");
EncodeString(&config_body, g_pika_conf->log_level());
}

if (pstd::stringmatch(pattern.data(), "min-blob-size", 1) != 0) {
elements += 2;
EncodeString(&config_body, "min-blob-size");
Expand Down Expand Up @@ -2492,6 +2493,15 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
g_pika_conf->SetSlowlogMaxLen(static_cast<int>(ival));
g_pika_server->SlowlogTrim();
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "log-net-activities") {
if (value != "yes" && value != "no") {
res_.AppendStringRaw("-ERR Invalid argument \'" + value +
"\' for CONFIG SET 'log-net-activities', only yes or no is valid\r\n");
return;
}
g_pika_conf->SetLogNetActivities(value);
g_pika_server->SetLogNetActivities(value == "yes");
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "max-cache-statistic-keys") {
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-cache-statistic-keys'\r\n");
Expand Down Expand Up @@ -3345,7 +3355,9 @@ void QuitCmd::DoInitial() {

void QuitCmd::Do() {
res_.SetRes(CmdRes::kOk);
LOG(INFO) << "QutCmd will close connection " << GetConn()->String();
if (g_pika_conf->log_net_activities()) {
LOG(INFO) << "QuitCmd will close connection " << GetConn()->String();
}
GetConn()->SetClose(true);
}

Expand Down
14 changes: 13 additions & 1 deletion src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,15 @@ int PikaConf::Load() {
if(log_retention_time_ < 0){
LOG(FATAL) << "log-retention-time invalid";
}
GetConfStr("loglevel", &log_level_);

std::string log_net_activities;
GetConfStr("log-net-activities", &log_net_activities);
if (log_net_activities == "yes") {
log_net_activities_.store(true);
} else {
log_net_activities_.store(false);
};

GetConfStr("db-path", &db_path_);
GetConfInt("db-instance-num", &db_instance_num_);
if (db_instance_num_ <= 0) {
Expand Down Expand Up @@ -830,6 +838,9 @@ int PikaConf::ConfigRewrite() {
SetConfStr("slowlog-write-errorlog", slowlog_write_errorlog_.load() ? "yes" : "no");
SetConfInt("slowlog-log-slower-than", slowlog_log_slower_than_.load());
SetConfInt("slowlog-max-len", slowlog_max_len_);
SetConfInt("log-retention-time", log_retention_time_);
SetConfInt("slave-priority", slave_priority_);
SetConfStr("log-net-activities", log_net_activities_ ? "yes" : "no");
SetConfStr("write-binlog", write_binlog_ ? "yes" : "no");
SetConfStr("run-id", run_id_);
SetConfStr("replication-id", replication_id_);
Expand Down Expand Up @@ -889,6 +900,7 @@ int PikaConf::ConfigRewrite() {
SetConfInt("max-cache-files", max_cache_files_);
SetConfInt("max-background-compactions", max_background_compactions_);
SetConfInt("max-background-jobs", max_background_jobs_);
SetConfInt("max-subcompactions", max_subcompactions_);
SetConfInt64("rate-limiter-bandwidth", rate_limiter_bandwidth_);
SetConfInt64("delayed-write-rate", delayed_write_rate_);
SetConfInt64("max-compaction-bytes", max_compaction_bytes_);
Expand Down
1 change: 1 addition & 0 deletions src/pika_dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ void PikaDispatchThread::UnAuthUserAndKillClient(const std::set<std::string>& us
void PikaDispatchThread::StopThread() {
thread_rep_->StopThread();
}
void PikaDispatchThread::SetLogNetActivities(bool value) { thread_rep_->SetLogNetActivities(value); }

bool PikaDispatchThread::Handles::AccessHandle(std::string& ip) const {
if (ip == "127.0.0.1") {
Expand Down
2 changes: 2 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ void PikaServer::Start() {
<< (ret == net::kBindError ? ": bind port " + std::to_string(port_) + " conflict" : ": other error")
<< ", Listen on this port to handle the connected redis client";
}
pika_dispatch_thread_->SetLogNetActivities(g_pika_conf->log_net_activities());
ret = pika_pubsub_thread_->StartThread();
if (ret != net::kSuccess) {
dbs_.clear();
Expand Down Expand Up @@ -1919,3 +1920,4 @@ void PikaServer::CacheConfigInit(cache::CacheConfig& cache_cfg) {
cache_cfg.maxmemory_samples = g_pika_conf->cache_maxmemory_samples();
cache_cfg.lfu_decay_time = g_pika_conf->cache_lfu_decay_time();
}
void PikaServer::SetLogNetActivities(bool value) { pika_dispatch_thread_->SetLogNetActivities(value); }

0 comments on commit 8ce0cb3

Please sign in to comment.