Skip to content

Commit

Permalink
refactor: Finish refactor of Craned TaskManager
Browse files Browse the repository at this point in the history
Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>
  • Loading branch information
L-Xiafeng committed Dec 24, 2024
1 parent ba3cc54 commit cb7bcca
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 59 deletions.
14 changes: 7 additions & 7 deletions src/Craned/Craned/Craned.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include "CranedServer.h"
#include "CtldClient.h"
#include "DeviceManager.h"
#include "TaskManager.h"
#include "SupervisorKeeper.h"
#include "crane/PluginClient.h"
#include "crane/String.h"

Expand Down Expand Up @@ -617,8 +617,7 @@ void GlobalVariableInit() {
g_thread_pool =
std::make_unique<BS::thread_pool>(std::thread::hardware_concurrency());

g_job_mgr = std::make_unique<Craned::TaskManager>();

g_task_mgr = std::make_unique<Craned::TaskManager>();
g_ctld_client = std::make_unique<Craned::CtldClient>();
g_ctld_client->SetCranedId(g_config.CranedIdOfThisNode);

Expand All @@ -629,6 +628,8 @@ void GlobalVariableInit() {
g_plugin_client = std::make_unique<plugin::PluginClient>();
g_plugin_client->InitChannelAndStub(g_config.Plugin.PlugindSockPath);
}
// SupervisorKeeper will recover above global var
g_supervisor_keeper = std::make_unique<Craned::SupervisorKeeper>();
}

void StartServer() {
Expand All @@ -650,12 +651,11 @@ void StartServer() {
g_server->Wait();

// Free global variables
g_job_mgr->Wait();
g_job_mgr.reset();
// CforedManager MUST be destructed after JobManager.
g_cfored_manager.reset();
g_task_mgr->Wait();
g_task_mgr.reset();
g_server.reset();
g_ctld_client.reset();
g_supervisor_keeper.reset();
g_plugin_client.reset();

g_thread_pool->wait();
Expand Down
68 changes: 67 additions & 1 deletion src/Craned/Craned/SupervisorKeeper.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2024 Peking University and Peking University
* Copyright (c) 2024 Peking University and Peking University
* Changsha Institute for Computing and Digital Economy
*
* This program is free software: you can redistribute it and/or modify
Expand All @@ -17,3 +17,69 @@
*/

#include "SupervisorKeeper.h"

#include <protos/Supervisor.grpc.pb.h>
#include <sys/stat.h>

namespace Craned {
void SupervisorClient::InitChannelAndStub(const std::string& endpoint) {
m_channel_ = CreateUnixInsecureChannel(endpoint);
// std::unique_ptr will automatically release the dangling stub.
m_stub_ = crane::grpc::Supervisor::NewStub(m_channel_);
}

SupervisorKeeper::SupervisorKeeper() {
try {
std::filesystem::path path = kDefaultSupervisorUnixSockDir;
if (std::filesystem::exists(path) && std::filesystem::is_directory(path)) {
// 遍历目录中的所有条目
for (const auto& it : std::filesystem::directory_iterator(path)) {
if (std::filesystem::is_socket(it.path())) {
g_thread_pool->detach_task(
[this, it]() { this->RecoverSupervisorMt_(it.path()); });
}
}
} else {
CRANE_WARN("Supervisor socket dir dose not exit, skip recovery.");
}
} catch (const std::exception& e) {
CRANE_ERROR("Error: {}, when recover supervisor", e.what());
}
}

void SupervisorKeeper::AddSupervisor(task_id_t task_id) {
auto sock_path = fmt::format("unix://{}/task_{}.sock",
kDefaultSupervisorUnixSockDir, task_id);
std::shared_ptr stub = std::make_shared<SupervisorClient>();
stub->InitChannelAndStub(sock_path);
absl::WriterMutexLock lk(&m_mutex);
if (auto it = m_supervisor_map.find(task_id); it != m_supervisor_map.end()) {
CRANE_ERROR("Duplicate supervisor for task #{}", task_id);
return;
}
m_supervisor_map.emplace(task_id, stub);
}

std::shared_ptr<SupervisorClient> SupervisorKeeper::GetStub(task_id_t task_id) {
absl::ReaderMutexLock lk(&m_mutex);
if (auto it = m_supervisor_map.find(task_id); it != m_supervisor_map.end()) {
return it->second;
} else {
return nullptr;
}
}
void SupervisorKeeper::RecoverSupervisorMt_(const std::filesystem::path& path) {
std::shared_ptr stub = std::make_shared<SupervisorClient>();
stub->InitChannelAndStub(path);
crane::grpc::TaskToD task;
auto err = stub->CheckTaskStatus(&task);
if (err != CraneErr::kOk) {
CRANE_ERROR("CheckTaskStatus for {} failed: {}", path, err);
return;
}
absl::WriterMutexLock lk(&m_mutex);
m_supervisor_map.emplace(task.task_id(), stub);
g_task_mgr->AddRecoveredTask_(task);
}

} // namespace Craned
34 changes: 28 additions & 6 deletions src/Craned/Craned/SupervisorKeeper.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,41 @@
#include "CranedPublicDefs.h"
// Precompiled header comes first.

#include <protos/Supervisor.grpc.pb.h>

#include "TaskManager.h"
#include "crane/AtomicHashMap.h"
namespace Craned {
class SupervisorClient {
public:
CraneExpected<pid_t> ExecuteTask(const ProcessInstance* process);
CraneExpected<EnvMap> QueryTaskEnv();
CraneErr CheckTaskStatus(crane::grpc::TaskToD* task);

CraneErr TerminateTask(bool mark_as_orphaned, bool terminated_by_user);
CraneErr ChangeTaskTimeLimit(absl::Duration time_limit);

void InitChannelAndStub(const std::string& endpoint);

private:
std::shared_ptr<grpc::Channel> m_channel_;

//todo: Rpc wrap func
std::unique_ptr<crane::grpc::Supervisor::Stub> m_stub_;
// todo: Rpc wrap func
};

class SupervisorKeeper {
public:
SupervisorKeeper();
void AddSupervisor(task_id_t task_id);
std::shared_ptr<SupervisorClient> GetStub(task_id_t task_id);

private:
util::AtomicHashMap<absl::flat_hash_map, task_id_t,
std::shared_ptr<SupervisorClient>>
void RecoverSupervisorMt_(const std::filesystem::path& path);
absl::flat_hash_map<task_id_t, std::shared_ptr<SupervisorClient>>
m_supervisor_map;


absl::Mutex m_mutex;
};
} // namespace Craned
} // namespace Craned

inline std::unique_ptr<Craned::SupervisorKeeper> g_supervisor_keeper;
115 changes: 89 additions & 26 deletions src/Craned/Craned/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <sys/wait.h>

#include "CtldClient.h"
#include "SupervisorKeeper.h"
#include "crane/String.h"
#include "protos/PublicDefs.pb.h"
#include "protos/Supervisor.pb.h"
Expand Down Expand Up @@ -181,6 +182,7 @@ CraneErr TaskManager::KillPid_(pid_t pid, int signum) {
void TaskManager::SetSigintCallback(std::function<void()> cb) {
m_sigint_cb_ = std::move(cb);
}
void TaskManager::AddRecoveredTask_(crane::grpc::TaskToD task) {}

CraneErr TaskManager::SpawnSupervisor_(TaskInstance* instance,
ProcessInstance* process) {
Expand Down Expand Up @@ -221,9 +223,6 @@ CraneErr TaskManager::SpawnSupervisor_(TaskInstance* instance,
return CraneErr::kSystemErr;
}

// save the current uid/gid
SavedPrivilege saved_priv{getuid(), getgid()};

pid_t child_pid = fork();

if (child_pid == -1) {
Expand Down Expand Up @@ -274,14 +273,9 @@ CraneErr TaskManager::SpawnSupervisor_(TaskInstance* instance,
strerror(ostream.GetErrno()));
close(ctrl_fd);

// Communication failure caused by process crash or grpc error.
// Since now the parent cannot ask the child
// process to commit suicide, kill child process here and just return.
// The child process will be reaped in SIGCHLD handler and
// thus only ONE TaskStatusChange will be triggered!
instance->err_before_exec = CraneErr::kProtobufError;
KillPid_(child_pid, SIGKILL);
return CraneErr::kOk;
return CraneErr::kProtobufError;
}

ok = ParseDelimitedFromZeroCopyStream(&child_process_ready, &istream,
Expand All @@ -295,10 +289,9 @@ CraneErr TaskManager::SpawnSupervisor_(TaskInstance* instance,
instance->task.task_id());
close(ctrl_fd);

// See comments above.
instance->err_before_exec = CraneErr::kProtobufError;
KillPid_(child_pid, SIGKILL);
return CraneErr::kOk;
return CraneErr::kProtobufError;
}

// Do Supervisor Init
Expand All @@ -320,14 +313,9 @@ CraneErr TaskManager::SpawnSupervisor_(TaskInstance* instance,
strerror(ostream.GetErrno()));
close(ctrl_fd);

// Communication failure caused by process crash or grpc error.
// Since now the parent cannot ask the child
// process to commit suicide, kill child process here and just return.
// The child process will be reaped in SIGCHLD handler and
// thus only ONE TaskStatusChange will be triggered!
instance->err_before_exec = CraneErr::kProtobufError;
KillPid_(child_pid, SIGKILL);
return CraneErr::kOk;
return CraneErr::kProtobufError;
}

crane::grpc::SupervisorReady supervisor_ready;
Expand All @@ -341,7 +329,6 @@ CraneErr TaskManager::SpawnSupervisor_(TaskInstance* instance,
instance->task.task_id());
close(ctrl_fd);

// See comments above.
instance->err_before_exec = CraneErr::kProtobufError;
KillPid_(child_pid, SIGKILL);
return CraneErr::kOk;
Expand All @@ -350,6 +337,19 @@ CraneErr TaskManager::SpawnSupervisor_(TaskInstance* instance,
close(ctrl_fd);

// todo: Create Supervisor stub,request task execution
g_supervisor_keeper->AddSupervisor(instance->task.task_id());

auto stub = g_supervisor_keeper->GetStub(instance->task.task_id());
auto task_id = stub->ExecuteTask(process);
if (!task_id) {
CRANE_ERROR("Supervisor failed to execute task #{}",
instance->task.task_id());
instance->err_before_exec = CraneErr::kSupervisorError;
KillPid_(child_pid, SIGKILL);
return CraneErr::kSupervisorError;
}
process->pid = task_id.value();

return CraneErr::kOk;

AskChildToSuicide:
Expand Down Expand Up @@ -596,12 +596,21 @@ void TaskManager::EvCleanGrpcQueryTaskEnvQueueCb_() {
elem.env_prom.set_value(std::unexpected(CraneErr::kSystemErr));
else {
auto& instance = task_iter->second;
// todo: Forward this to supervisor
// std::unordered_map<std::string, std::string> env_map;
// for (const auto& [name, value] : instance->GetTaskEnvMap()) {
// env_map.emplace(name, value);
// }
// elem.env_prom.set_value(env_map);
auto stub = g_supervisor_keeper->GetStub(instance->task.task_id());
if (!stub) {
CRANE_ERROR("Supervisor for task #{} not found",
instance->task.task_id());
elem.env_prom.set_value(std::unexpected(CraneErr::kSupervisorError));
continue;
}
auto env_map = stub->QueryTaskEnv();
if (!env_map) {
CRANE_ERROR("Failed to query env map for task #{}",
instance->task.task_id());
elem.env_prom.set_value(std::unexpected(CraneErr::kSupervisorError));
continue;
}
elem.env_prom.set_value(env_map);
}
}
}
Expand Down Expand Up @@ -636,7 +645,51 @@ void TaskManager::EvCleanGrpcQueryTaskIdFromPidQueueCb_() {
void TaskManager::EvCleanTerminateTaskQueueCb_() {
TaskTerminateQueueElem elem;
while (m_task_terminate_queue_.try_dequeue(elem)) {
// todo:Just forward to Supervisor
CRANE_TRACE(
"Receive TerminateRunningTask Request from internal queue. "
"Task id: {}",
elem.task_id);

auto iter = m_task_map_.find(elem.task_id);
if (iter == m_task_map_.end()) {
CRANE_DEBUG("Terminating a non-existent task #{}.", elem.task_id);

// Note if Ctld wants to terminate some tasks that are not running,
// it might indicate other nodes allocated to the task might have
// crashed. We should mark the task as kind of not runnable by removing
// its cgroup.
//
// Considering such a situation:
// In Task Scheduler of Ctld,
// the task index from node id to task id have just been added and
// Ctld are sending CreateCgroupForTasks.
// Right at the moment, one Craned allocated to this task and
// designated as the executing node crashes,
// but it has been sent a CreateCgroupForTasks and replied.
// Then the CranedKeeper search the task index and
// send TerminateTasksOnCraned to all Craned allocated to this task
// including this node.
// In order to give Ctld kind of feedback without adding complicated
// synchronizing mechanism in ScheduleThread_(),
// we just remove the cgroup for such task, Ctld will fail in the
// following ExecuteTasks and the task will go to the right place as
// well as the completed queue.
g_cg_mgr->ReleaseCgroupByTaskIdOnly(elem.task_id);
continue;
}
auto* instance = iter->second.get();
instance->orphaned = elem.mark_as_orphaned;

auto stub = g_supervisor_keeper->GetStub(elem.task_id);
if (!stub) {
CRANE_ERROR("Supervisor for task #{} not found", elem.task_id);
continue;
}
auto err =
stub->TerminateTask(elem.mark_as_orphaned, elem.terminated_by_user);
if (err != CraneErr::kOk) {
CRANE_ERROR("Failed to terminate task #{}", elem.task_id);
}
}
}

Expand Down Expand Up @@ -713,7 +766,17 @@ void TaskManager::EvCleanChangeTaskTimeLimitQueueCb_() {
while (m_task_time_limit_change_queue_.try_dequeue(elem)) {
auto iter = m_task_map_.find(elem.task_id);
if (iter != m_task_map_.end()) {
// todo: Forward Rpc to Supervisor
auto stub = g_supervisor_keeper->GetStub(elem.task_id);
if (!stub) {
CRANE_ERROR("Supervisor for task #{} not found", elem.task_id);
continue;
}
auto err = stub->ChangeTaskTimeLimit(elem.time_limit);
if (err != CraneErr::kOk) {
CRANE_ERROR("Failed to change task time limit for task #{}",
elem.task_id);
continue;
}
elem.ok_prom.set_value(true);
} else {
CRANE_ERROR("Try to update the time limit of a non-existent task #{}.",
Expand Down
Loading

0 comments on commit cb7bcca

Please sign in to comment.