Skip to content

Commit

Permalink
feat: Add logger for Supervisor debugging
Browse files Browse the repository at this point in the history
Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>
  • Loading branch information
L-Xiafeng committed Dec 22, 2024
1 parent 511b42d commit 9ae93dd
Show file tree
Hide file tree
Showing 15 changed files with 165 additions and 122 deletions.
6 changes: 4 additions & 2 deletions protos/Supervisor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ message CanStartMessage {
bool ok = 1;
}

message CranedReady {
message InitSupervisorRequest {
uint32 task_id = 1;
bool ok = 2;
string debug_level = 2;
string craned_unix_socket_path = 3;
bool ok = 4;
}

message SupervisorReady {
Expand Down
16 changes: 3 additions & 13 deletions src/CraneCtld/CraneCtld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,24 +100,14 @@ void ParseConfig(int argc, char** argv) {
g_config.CraneCtldDebugLevel = "info";

// spdlog should be initialized as soon as possible
spdlog::level::level_enum log_level;
if (g_config.CraneCtldDebugLevel == "trace") {
log_level = spdlog::level::trace;
} else if (g_config.CraneCtldDebugLevel == "debug") {
log_level = spdlog::level::debug;
} else if (g_config.CraneCtldDebugLevel == "info") {
log_level = spdlog::level::info;
} else if (g_config.CraneCtldDebugLevel == "warn") {
log_level = spdlog::level::warn;
} else if (g_config.CraneCtldDebugLevel == "error") {
log_level = spdlog::level::err;
std::optional log_level=StrToLogLevel(g_config.CraneCtldDebugLevel);
if (log_level.has_value()) {
InitLogger(log_level.value(), g_config.CraneCtldLogFile, true);
} else {
fmt::print(stderr, "Illegal debug-level format.");
std::exit(1);
}

InitLogger(log_level, g_config.CraneCtldLogFile);

// External configuration file path
if (!parsed_args.count("db-config") && config["DbConfigPath"]) {
db_config_path = config["DbConfigPath"].as<std::string>();
Expand Down
29 changes: 9 additions & 20 deletions src/Craned/Craned/Craned.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
#include <ctime>
#include <cxxopts.hpp>

#include "../Supervisor/CforedClient.h"
#include "CranedServer.h"
#include "CtldClient.h"
#include "DeviceManager.h"
#include "JobManager.h"
#include "crane/PluginClient.h"
#include "crane/String.h"

Expand Down Expand Up @@ -105,25 +105,17 @@ void ParseConfig(int argc, char** argv) {
g_config.CranedDebugLevel = "info";

// spdlog should be initialized as soon as possible
spdlog::level::level_enum log_level;
if (g_config.CranedDebugLevel == "trace") {
log_level = spdlog::level::trace;
} else if (g_config.CranedDebugLevel == "debug") {
log_level = spdlog::level::debug;
} else if (g_config.CranedDebugLevel == "info") {
log_level = spdlog::level::info;
} else if (g_config.CranedDebugLevel == "warn") {
log_level = spdlog::level::warn;
} else if (g_config.CranedDebugLevel == "error") {
log_level = spdlog::level::err;
std::optional log_level = StrToLogLevel(g_config.CranedDebugLevel);
if (log_level.has_value()) {
InitLogger(log_level.value(), g_config.CranedLogFile, true);
} else {
fmt::print(stderr, "Illegal debug-level format.");
std::exit(1);
}

InitLogger(log_level, g_config.CranedLogFile);
#ifdef CRANE_ENABLE_BPF
Craned::CgroupV2::SetBpfDebugLogLevel(static_cast<uint32_t>(log_level));
Craned::CgroupV2::SetBpfDebugLogLevel(
static_cast<uint32_t>(log_level.value()));
#endif
if (config["CranedUnixSockPath"])
g_config.CranedUnixSockPath =
Expand Down Expand Up @@ -625,7 +617,7 @@ void GlobalVariableInit() {
g_thread_pool =
std::make_unique<BS::thread_pool>(std::thread::hardware_concurrency());

g_task_mgr = std::make_unique<Craned::JobManager>();
g_job_mgr = std::make_unique<Craned::JobManager>();

g_ctld_client = std::make_unique<Craned::CtldClient>();
g_ctld_client->SetCranedId(g_config.CranedIdOfThisNode);
Expand All @@ -637,9 +629,6 @@ void GlobalVariableInit() {
g_plugin_client = std::make_unique<plugin::PluginClient>();
g_plugin_client->InitChannelAndStub(g_config.Plugin.PlugindSockPath);
}

g_cfored_manager = std::make_unique<Craned::CforedManager>();
g_cfored_manager->Init();
}

void StartServer() {
Expand All @@ -661,8 +650,8 @@ void StartServer() {
g_server->Wait();

// Free global variables
g_task_mgr->Wait();
g_task_mgr.reset();
g_job_mgr->Wait();
g_job_mgr.reset();
// CforedManager MUST be destructed after JobManager.
g_cfored_manager.reset();
g_server.reset();
Expand Down
16 changes: 8 additions & 8 deletions src/Craned/Craned/CranedServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ grpc::Status CranedServiceImpl::ExecuteTask(

CraneErr err;
for (auto const &task_to_d : request->tasks()) {
err = g_task_mgr->ExecuteTaskAsync(task_to_d);
err = g_job_mgr->ExecuteTaskAsync(task_to_d);
if (err != CraneErr::kOk)
response->add_failed_task_id_list(task_to_d.task_id());
}
Expand All @@ -49,7 +49,7 @@ grpc::Status CranedServiceImpl::TerminateTasks(
absl::StrJoin(request->task_id_list(), ","));

for (task_id_t id : request->task_id_list())
g_task_mgr->TerminateTaskAsync(id);
g_job_mgr->TerminateTaskAsync(id);
response->set_ok(true);

return Status::OK;
Expand All @@ -59,7 +59,7 @@ grpc::Status CranedServiceImpl::TerminateOrphanedTask(
grpc::ServerContext *context,
const crane::grpc::TerminateOrphanedTaskRequest *request,
crane::grpc::TerminateOrphanedTaskReply *response) {
g_task_mgr->MarkTaskAsOrphanedAndTerminateAsync(request->task_id());
g_job_mgr->MarkTaskAsOrphanedAndTerminateAsync(request->task_id());
response->set_ok(true);

return Status::OK;
Expand Down Expand Up @@ -135,7 +135,7 @@ grpc::Status CranedServiceImpl::QueryTaskIdFromPort(

// 3. pid2jobid
do {
auto task_id_expt = g_task_mgr->QueryTaskIdFromPidAsync(pid_i);
auto task_id_expt = g_job_mgr->QueryTaskIdFromPidAsync(pid_i);
if (task_id_expt.has_value()) {
CRANE_TRACE("Task id for pid {} is #{}", pid_i, task_id_expt.value());
response->set_ok(true);
Expand Down Expand Up @@ -387,7 +387,7 @@ Status CranedServiceImpl::QueryTaskEnvVariables(
grpc::ServerContext *context,
const ::crane::grpc::QueryTaskEnvVariablesRequest *request,
crane::grpc::QueryTaskEnvVariablesReply *response) {
auto task_env_map = g_task_mgr->QueryTaskEnvMapAsync(request->task_id());
auto task_env_map = g_job_mgr->QueryTaskEnvMapAsync(request->task_id());
if (task_env_map.has_value()) {
for (const auto &[name, value] : task_env_map.value())
response->mutable_env_map()->emplace(name, value);
Expand Down Expand Up @@ -473,7 +473,7 @@ grpc::Status CranedServiceImpl::CheckTaskStatus(
crane::grpc::CheckTaskStatusReply *response) {
crane::grpc::TaskStatus status{};

bool exist = g_task_mgr->CheckTaskStatusAsync(request->task_id(), &status);
bool exist = g_job_mgr->CheckTaskStatusAsync(request->task_id(), &status);
response->set_ok(exist);
response->set_status(status);

Expand All @@ -484,7 +484,7 @@ grpc::Status CranedServiceImpl::ChangeTaskTimeLimit(
grpc::ServerContext *context,
const crane::grpc::ChangeTaskTimeLimitRequest *request,
crane::grpc::ChangeTaskTimeLimitReply *response) {
bool ok = g_task_mgr->ChangeTaskTimeLimitAsync(
bool ok = g_job_mgr->ChangeTaskTimeLimitAsync(
request->task_id(), absl::Seconds(request->time_limit_seconds()));
response->set_ok(ok);

Expand Down Expand Up @@ -545,7 +545,7 @@ CranedServer::CranedServer(const Config::CranedListenConf &listen_conf) {
listen_conf.UnixSocketListenAddr, craned_listen_addr,
listen_conf.CranedListenPort);

g_task_mgr->SetSigintCallback([p_server = m_server_.get()] {
g_job_mgr->SetSigintCallback([p_server = m_server_.get()] {
p_server->Shutdown();
CRANE_INFO("Grpc Server Shutdown() was called.");
});
Expand Down
1 change: 0 additions & 1 deletion src/Craned/Craned/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <pty.h>
#include <sys/wait.h>

#include "CforedClient.h"
#include "CtldClient.h"
#include "crane/String.h"
#include "protos/PublicDefs.pb.h"
Expand Down
2 changes: 1 addition & 1 deletion src/Craned/Craned/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -382,4 +382,4 @@ class JobManager {
};
} // namespace Craned

inline std::unique_ptr<Craned::JobManager> g_task_mgr;
inline std::unique_ptr<Craned::JobManager> g_job_mgr;
24 changes: 12 additions & 12 deletions src/Craned/Supervisor/CforedClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ bool CforedClient::TaskProcessStop() {
};

void CforedClient::TaskOutPutForward(const std::string& msg) {
CRANE_TRACE("Receive TaskOutputForward for task #{}: {}", g_task_mgr->task_id,
CRANE_TRACE("Receive TaskOutputForward for task #{}: {}", g_config.TaskId,
msg);
m_output_queue_.enqueue(msg);
}
Expand Down Expand Up @@ -370,12 +370,12 @@ void CforedManager::RegisterCb_() {
});

CRANE_TRACE("Registering fd {} for outputs of task #{}", elem.fd,
g_task_mgr->task_id);
g_config.TaskId);
auto poll_handle = m_loop_->resource<uvw::poll_handle>(elem.fd);
poll_handle->on<uvw::poll_event>(
[this, elem = std::move(elem)](const uvw::poll_event&,
uvw::poll_handle& h) {
CRANE_TRACE("Detect task #{} output.", g_task_mgr->task_id);
CRANE_TRACE("Detect task #{} output.", g_config.TaskId);

constexpr int MAX_BUF_SIZE = 4096;
char buf[MAX_BUF_SIZE];
Expand All @@ -390,14 +390,14 @@ void CforedManager::RegisterCb_() {
// For pty,do nothing, process exit on return -1 and error set to
// EIO
CRANE_TRACE("Read EOF from pty task #{} on cfored {}",
g_task_mgr->task_id, elem.cfored);
g_config.TaskId, elem.cfored);
}
}

if (ret == -1) {
if (!elem.pty) {
CRANE_ERROR("Error when reading task #{} output, error {}",
g_task_mgr->task_id, std::strerror(errno));
g_config.TaskId, std::strerror(errno));
return;
}

Expand All @@ -411,28 +411,28 @@ void CforedManager::RegisterCb_() {
return;
} else {
CRANE_ERROR("Error when reading task #{} output, error {}",
g_task_mgr->task_id, std::strerror(errno));
g_config.TaskId, std::strerror(errno));
return;
}
}

if (read_finished) {
CRANE_TRACE("Task #{} to cfored {} finished its output.",
g_task_mgr->task_id, elem.cfored);
g_config.TaskId, elem.cfored);
h.close();
close(elem.fd);

bool ok_to_free = m_cfored_client_->TaskOutputFinish();
if (ok_to_free) {
CRANE_TRACE("It's ok to unregister task #{} on {}",
g_task_mgr->task_id, elem.cfored);
g_config.TaskId, elem.cfored);
UnregisterIOForward_();
}
return;
}

std::string output(buf, ret);
CRANE_TRACE("Fwd to task #{}: {}", g_task_mgr->task_id, output);
CRANE_TRACE("Fwd to task #{}: {}", g_config.TaskId, output);
m_cfored_client_->TaskOutPutForward(output);
});
int ret = poll_handle->start(uvw::poll_handle::poll_event_flags::READABLE);
Expand All @@ -447,10 +447,10 @@ void CforedManager::TaskProcStopped() { m_task_stop_handle_->send(); }

void CforedManager::TaskStopCb_() {
CRANE_TRACE("Task #{} to cfored {} just stopped its process.",
g_task_mgr->task_id, m_cfored_client_->CforedName());
g_config.TaskId, m_cfored_client_->CforedName());
bool ok_to_free = m_cfored_client_->TaskProcessStop();
if (ok_to_free) {
CRANE_TRACE("It's ok to unregister task #{} on {}", g_task_mgr->task_id,
CRANE_TRACE("It's ok to unregister task #{} on {}", g_config.TaskId,
m_cfored_client_->CforedName());
UnregisterIOForward_();
}
Expand All @@ -460,7 +460,7 @@ void CforedManager::UnregisterIOForward_() { m_unregister_handle_->send(); }

void CforedManager::UnregisterCb_() {
m_cfored_client_.reset();
g_task_mgr->TaskStopAndDoStatusChange();
g_job_mgr->TaskStopAndDoStatusChange();
}

} // namespace Supervisor
5 changes: 3 additions & 2 deletions src/Craned/Supervisor/CranedClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@ class CranedClient {
std::shared_ptr<crane::grpc::Craned::Stub> m_stub_;
};

inline std::unique_ptr<CranedClient> g_craned_client;
} // namespace Supervisor
} // namespace Supervisor

inline std::unique_ptr<Supervisor::CranedClient> g_craned_client;
Loading

0 comments on commit 9ae93dd

Please sign in to comment.