diff --git a/src/Craned/Craned/Craned.cpp b/src/Craned/Craned/Craned.cpp index 31f3bd1f6..eea828f61 100644 --- a/src/Craned/Craned/Craned.cpp +++ b/src/Craned/Craned/Craned.cpp @@ -29,7 +29,7 @@ #include "CranedServer.h" #include "CtldClient.h" #include "DeviceManager.h" -#include "TaskManager.h" +#include "SupervisorKeeper.h" #include "crane/PluginClient.h" #include "crane/String.h" @@ -617,8 +617,7 @@ void GlobalVariableInit() { g_thread_pool = std::make_unique(std::thread::hardware_concurrency()); - g_job_mgr = std::make_unique(); - + g_task_mgr = std::make_unique(); g_ctld_client = std::make_unique(); g_ctld_client->SetCranedId(g_config.CranedIdOfThisNode); @@ -629,6 +628,8 @@ void GlobalVariableInit() { g_plugin_client = std::make_unique(); g_plugin_client->InitChannelAndStub(g_config.Plugin.PlugindSockPath); } + // SupervisorKeeper will recover above global var + g_supervisor_keeper = std::make_unique(); } void StartServer() { @@ -650,12 +651,11 @@ void StartServer() { g_server->Wait(); // Free global variables - g_job_mgr->Wait(); - g_job_mgr.reset(); - // CforedManager MUST be destructed after JobManager. - g_cfored_manager.reset(); + g_task_mgr->Wait(); + g_task_mgr.reset(); g_server.reset(); g_ctld_client.reset(); + g_supervisor_keeper.reset(); g_plugin_client.reset(); g_thread_pool->wait(); diff --git a/src/Craned/Craned/SupervisorKeeper.cpp b/src/Craned/Craned/SupervisorKeeper.cpp index 4dea89f52..3c613bba3 100644 --- a/src/Craned/Craned/SupervisorKeeper.cpp +++ b/src/Craned/Craned/SupervisorKeeper.cpp @@ -1,5 +1,5 @@ /** -* Copyright (c) 2024 Peking University and Peking University + * Copyright (c) 2024 Peking University and Peking University * Changsha Institute for Computing and Digital Economy * * This program is free software: you can redistribute it and/or modify @@ -17,3 +17,69 @@ */ #include "SupervisorKeeper.h" + +#include +#include + +namespace Craned { +void SupervisorClient::InitChannelAndStub(const std::string& endpoint) { + m_channel_ = CreateUnixInsecureChannel(endpoint); + // std::unique_ptr will automatically release the dangling stub. + m_stub_ = crane::grpc::Supervisor::NewStub(m_channel_); +} + +SupervisorKeeper::SupervisorKeeper() { + try { + std::filesystem::path path = kDefaultSupervisorUnixSockDir; + if (std::filesystem::exists(path) && std::filesystem::is_directory(path)) { + // 遍历目录中的所有条目 + for (const auto& it : std::filesystem::directory_iterator(path)) { + if (std::filesystem::is_socket(it.path())) { + g_thread_pool->detach_task( + [this, it]() { this->RecoverSupervisorMt_(it.path()); }); + } + } + } else { + CRANE_WARN("Supervisor socket dir dose not exit, skip recovery."); + } + } catch (const std::exception& e) { + CRANE_ERROR("Error: {}, when recover supervisor", e.what()); + } +} + +void SupervisorKeeper::AddSupervisor(task_id_t task_id) { + auto sock_path = fmt::format("unix://{}/task_{}.sock", + kDefaultSupervisorUnixSockDir, task_id); + std::shared_ptr stub = std::make_shared(); + stub->InitChannelAndStub(sock_path); + absl::WriterMutexLock lk(&m_mutex); + if (auto it = m_supervisor_map.find(task_id); it != m_supervisor_map.end()) { + CRANE_ERROR("Duplicate supervisor for task #{}", task_id); + return; + } + m_supervisor_map.emplace(task_id, stub); +} + +std::shared_ptr SupervisorKeeper::GetStub(task_id_t task_id) { + absl::ReaderMutexLock lk(&m_mutex); + if (auto it = m_supervisor_map.find(task_id); it != m_supervisor_map.end()) { + return it->second; + } else { + return nullptr; + } +} +void SupervisorKeeper::RecoverSupervisorMt_(const std::filesystem::path& path) { + std::shared_ptr stub = std::make_shared(); + stub->InitChannelAndStub(path); + crane::grpc::TaskToD task; + auto err = stub->CheckTaskStatus(&task); + if (err != CraneErr::kOk) { + CRANE_ERROR("CheckTaskStatus for {} failed: {}", path, err); + return; + } + absl::WriterMutexLock lk(&m_mutex); + m_supervisor_map.emplace(task.task_id(), stub); + g_task_mgr->AddRecoveredTask_(task); +} + +} // namespace Craned \ No newline at end of file diff --git a/src/Craned/Craned/SupervisorKeeper.h b/src/Craned/Craned/SupervisorKeeper.h index 4726a9601..980f52a98 100644 --- a/src/Craned/Craned/SupervisorKeeper.h +++ b/src/Craned/Craned/SupervisorKeeper.h @@ -21,19 +21,41 @@ #include "CranedPublicDefs.h" // Precompiled header comes first. +#include + +#include "TaskManager.h" #include "crane/AtomicHashMap.h" namespace Craned { class SupervisorClient { + public: + CraneExpected ExecuteTask(const ProcessInstance* process); + CraneExpected QueryTaskEnv(); + CraneErr CheckTaskStatus(crane::grpc::TaskToD* task); + + CraneErr TerminateTask(bool mark_as_orphaned, bool terminated_by_user); + CraneErr ChangeTaskTimeLimit(absl::Duration time_limit); + + void InitChannelAndStub(const std::string& endpoint); + + private: + std::shared_ptr m_channel_; - //todo: Rpc wrap func + std::unique_ptr m_stub_; + // todo: Rpc wrap func }; class SupervisorKeeper { + public: + SupervisorKeeper(); + void AddSupervisor(task_id_t task_id); + std::shared_ptr GetStub(task_id_t task_id); + private: - util::AtomicHashMap> + void RecoverSupervisorMt_(const std::filesystem::path& path); + absl::flat_hash_map> m_supervisor_map; - - + absl::Mutex m_mutex; }; -} // namespace Craned \ No newline at end of file +} // namespace Craned + +inline std::unique_ptr g_supervisor_keeper; \ No newline at end of file diff --git a/src/Craned/Craned/TaskManager.cpp b/src/Craned/Craned/TaskManager.cpp index fea830477..1b1f99109 100644 --- a/src/Craned/Craned/TaskManager.cpp +++ b/src/Craned/Craned/TaskManager.cpp @@ -25,6 +25,7 @@ #include #include "CtldClient.h" +#include "SupervisorKeeper.h" #include "crane/String.h" #include "protos/PublicDefs.pb.h" #include "protos/Supervisor.pb.h" @@ -181,6 +182,7 @@ CraneErr TaskManager::KillPid_(pid_t pid, int signum) { void TaskManager::SetSigintCallback(std::function cb) { m_sigint_cb_ = std::move(cb); } +void TaskManager::AddRecoveredTask_(crane::grpc::TaskToD task) {} CraneErr TaskManager::SpawnSupervisor_(TaskInstance* instance, ProcessInstance* process) { @@ -221,9 +223,6 @@ CraneErr TaskManager::SpawnSupervisor_(TaskInstance* instance, return CraneErr::kSystemErr; } - // save the current uid/gid - SavedPrivilege saved_priv{getuid(), getgid()}; - pid_t child_pid = fork(); if (child_pid == -1) { @@ -274,14 +273,9 @@ CraneErr TaskManager::SpawnSupervisor_(TaskInstance* instance, strerror(ostream.GetErrno())); close(ctrl_fd); - // Communication failure caused by process crash or grpc error. - // Since now the parent cannot ask the child - // process to commit suicide, kill child process here and just return. - // The child process will be reaped in SIGCHLD handler and - // thus only ONE TaskStatusChange will be triggered! instance->err_before_exec = CraneErr::kProtobufError; KillPid_(child_pid, SIGKILL); - return CraneErr::kOk; + return CraneErr::kProtobufError; } ok = ParseDelimitedFromZeroCopyStream(&child_process_ready, &istream, @@ -295,10 +289,9 @@ CraneErr TaskManager::SpawnSupervisor_(TaskInstance* instance, instance->task.task_id()); close(ctrl_fd); - // See comments above. instance->err_before_exec = CraneErr::kProtobufError; KillPid_(child_pid, SIGKILL); - return CraneErr::kOk; + return CraneErr::kProtobufError; } // Do Supervisor Init @@ -320,14 +313,9 @@ CraneErr TaskManager::SpawnSupervisor_(TaskInstance* instance, strerror(ostream.GetErrno())); close(ctrl_fd); - // Communication failure caused by process crash or grpc error. - // Since now the parent cannot ask the child - // process to commit suicide, kill child process here and just return. - // The child process will be reaped in SIGCHLD handler and - // thus only ONE TaskStatusChange will be triggered! instance->err_before_exec = CraneErr::kProtobufError; KillPid_(child_pid, SIGKILL); - return CraneErr::kOk; + return CraneErr::kProtobufError; } crane::grpc::SupervisorReady supervisor_ready; @@ -341,7 +329,6 @@ CraneErr TaskManager::SpawnSupervisor_(TaskInstance* instance, instance->task.task_id()); close(ctrl_fd); - // See comments above. instance->err_before_exec = CraneErr::kProtobufError; KillPid_(child_pid, SIGKILL); return CraneErr::kOk; @@ -350,6 +337,19 @@ CraneErr TaskManager::SpawnSupervisor_(TaskInstance* instance, close(ctrl_fd); // todo: Create Supervisor stub,request task execution + g_supervisor_keeper->AddSupervisor(instance->task.task_id()); + + auto stub = g_supervisor_keeper->GetStub(instance->task.task_id()); + auto task_id = stub->ExecuteTask(process); + if (!task_id) { + CRANE_ERROR("Supervisor failed to execute task #{}", + instance->task.task_id()); + instance->err_before_exec = CraneErr::kSupervisorError; + KillPid_(child_pid, SIGKILL); + return CraneErr::kSupervisorError; + } + process->pid = task_id.value(); + return CraneErr::kOk; AskChildToSuicide: @@ -596,12 +596,21 @@ void TaskManager::EvCleanGrpcQueryTaskEnvQueueCb_() { elem.env_prom.set_value(std::unexpected(CraneErr::kSystemErr)); else { auto& instance = task_iter->second; - // todo: Forward this to supervisor - // std::unordered_map env_map; - // for (const auto& [name, value] : instance->GetTaskEnvMap()) { - // env_map.emplace(name, value); - // } - // elem.env_prom.set_value(env_map); + auto stub = g_supervisor_keeper->GetStub(instance->task.task_id()); + if (!stub) { + CRANE_ERROR("Supervisor for task #{} not found", + instance->task.task_id()); + elem.env_prom.set_value(std::unexpected(CraneErr::kSupervisorError)); + continue; + } + auto env_map = stub->QueryTaskEnv(); + if (!env_map) { + CRANE_ERROR("Failed to query env map for task #{}", + instance->task.task_id()); + elem.env_prom.set_value(std::unexpected(CraneErr::kSupervisorError)); + continue; + } + elem.env_prom.set_value(env_map); } } } @@ -636,7 +645,51 @@ void TaskManager::EvCleanGrpcQueryTaskIdFromPidQueueCb_() { void TaskManager::EvCleanTerminateTaskQueueCb_() { TaskTerminateQueueElem elem; while (m_task_terminate_queue_.try_dequeue(elem)) { - // todo:Just forward to Supervisor + CRANE_TRACE( + "Receive TerminateRunningTask Request from internal queue. " + "Task id: {}", + elem.task_id); + + auto iter = m_task_map_.find(elem.task_id); + if (iter == m_task_map_.end()) { + CRANE_DEBUG("Terminating a non-existent task #{}.", elem.task_id); + + // Note if Ctld wants to terminate some tasks that are not running, + // it might indicate other nodes allocated to the task might have + // crashed. We should mark the task as kind of not runnable by removing + // its cgroup. + // + // Considering such a situation: + // In Task Scheduler of Ctld, + // the task index from node id to task id have just been added and + // Ctld are sending CreateCgroupForTasks. + // Right at the moment, one Craned allocated to this task and + // designated as the executing node crashes, + // but it has been sent a CreateCgroupForTasks and replied. + // Then the CranedKeeper search the task index and + // send TerminateTasksOnCraned to all Craned allocated to this task + // including this node. + // In order to give Ctld kind of feedback without adding complicated + // synchronizing mechanism in ScheduleThread_(), + // we just remove the cgroup for such task, Ctld will fail in the + // following ExecuteTasks and the task will go to the right place as + // well as the completed queue. + g_cg_mgr->ReleaseCgroupByTaskIdOnly(elem.task_id); + continue; + } + auto* instance = iter->second.get(); + instance->orphaned = elem.mark_as_orphaned; + + auto stub = g_supervisor_keeper->GetStub(elem.task_id); + if (!stub) { + CRANE_ERROR("Supervisor for task #{} not found", elem.task_id); + continue; + } + auto err = + stub->TerminateTask(elem.mark_as_orphaned, elem.terminated_by_user); + if (err != CraneErr::kOk) { + CRANE_ERROR("Failed to terminate task #{}", elem.task_id); + } } } @@ -713,7 +766,17 @@ void TaskManager::EvCleanChangeTaskTimeLimitQueueCb_() { while (m_task_time_limit_change_queue_.try_dequeue(elem)) { auto iter = m_task_map_.find(elem.task_id); if (iter != m_task_map_.end()) { - // todo: Forward Rpc to Supervisor + auto stub = g_supervisor_keeper->GetStub(elem.task_id); + if (!stub) { + CRANE_ERROR("Supervisor for task #{} not found", elem.task_id); + continue; + } + auto err = stub->ChangeTaskTimeLimit(elem.time_limit); + if (err != CraneErr::kOk) { + CRANE_ERROR("Failed to change task time limit for task #{}", + elem.task_id); + continue; + } elem.ok_prom.set_value(true); } else { CRANE_ERROR("Try to update the time limit of a non-existent task #{}.", diff --git a/src/Craned/Craned/TaskManager.h b/src/Craned/Craned/TaskManager.h index dfdebe64f..c26449e22 100644 --- a/src/Craned/Craned/TaskManager.h +++ b/src/Craned/Craned/TaskManager.h @@ -48,8 +48,6 @@ struct TaskInstance { // Task execution results bool orphaned{false}; CraneErr err_before_exec{CraneErr::kOk}; - bool cancelled_by_user{false}; - bool terminated_by_timeout{false}; absl::flat_hash_map> processes; }; @@ -72,16 +70,12 @@ class TaskManager { CraneExpected QueryTaskEnvMapAsync(task_id_t task_id); - // todo: Send Rpc to Supervisor void TerminateTaskAsync(uint32_t task_id); - // todo: Send Rpc to Supervisor void MarkTaskAsOrphanedAndTerminateAsync(task_id_t task_id); - // todo: Send Rpc to Supervisor bool CheckTaskStatusAsync(task_id_t task_id, crane::grpc::TaskStatus* status); - // todo: Send Rpc to Supervisor bool ChangeTaskTimeLimitAsync(task_id_t task_id, absl::Duration time_limit); // Wait internal libevent base loop to exit... @@ -94,15 +88,13 @@ class TaskManager { */ void SetSigintCallback(std::function cb); + // Called from SupervisorKeeper which guarantee no data race. + void AddRecoveredTask_(crane::grpc::TaskToD task); + private: template using ConcurrentQueue = moodycamel::ConcurrentQueue; - struct SavedPrivilege { - uid_t uid; - gid_t gid; - }; - struct EvQueueQueryTaskIdFromPid { std::promise> task_id_prom; pid_t pid; @@ -267,4 +259,4 @@ class TaskManager { }; } // namespace Craned -inline std::unique_ptr g_job_mgr; \ No newline at end of file +inline std::unique_ptr g_task_mgr; \ No newline at end of file diff --git a/src/Craned/Supervisor/SupervisorPublicDefs.h b/src/Craned/Supervisor/SupervisorPublicDefs.h index 80b2b5953..b8c346d9c 100644 --- a/src/Craned/Supervisor/SupervisorPublicDefs.h +++ b/src/Craned/Supervisor/SupervisorPublicDefs.h @@ -25,8 +25,6 @@ namespace Supervisor { -inline constexpr std::string kSupervisorPidFileDir = "/run/crane"; - using EnvMap = std::unordered_map; struct TaskStatusChangeQueueElem { diff --git a/src/Craned/Supervisor/SupervisorServer.cpp b/src/Craned/Supervisor/SupervisorServer.cpp index 6e7129a27..49b756266 100644 --- a/src/Craned/Supervisor/SupervisorServer.cpp +++ b/src/Craned/Supervisor/SupervisorServer.cpp @@ -29,8 +29,8 @@ grpc::Status Supervisor::SupervisorServiceImpl::StartTask( Supervisor::SupervisorServer::SupervisorServer() { m_service_impl_ = std::make_unique(); - auto unix_socket_path = - fmt::format("unix:/tmp/crane/task_{}.sock", g_config.TaskId); + auto unix_socket_path = fmt::format( + "unix://{}/task_{}.sock", kDefaultSupervisorUnixSockDir, g_config.TaskId); grpc::ServerBuilder builder; ServerBuilderAddUnixInsecureListeningPort(&builder, unix_socket_path); builder.RegisterService(m_service_impl_.get()); diff --git a/src/Craned/Supervisor/TaskManager.cpp b/src/Craned/Supervisor/TaskManager.cpp index 97823192a..575e2a979 100644 --- a/src/Craned/Supervisor/TaskManager.cpp +++ b/src/Craned/Supervisor/TaskManager.cpp @@ -224,7 +224,7 @@ void TaskManager::ActivateTaskStatusChange_(crane::grpc::TaskStatus new_status, } std::string TaskManager::ParseFilePathPattern_(const std::string& path_pattern, - const std::string& cwd) const { + const std::string& cwd) { std::string resolved_path_pattern; if (path_pattern.empty()) { diff --git a/src/Craned/Supervisor/TaskManager.h b/src/Craned/Supervisor/TaskManager.h index 43dbeabea..b66ef4921 100644 --- a/src/Craned/Supervisor/TaskManager.h +++ b/src/Craned/Supervisor/TaskManager.h @@ -147,8 +147,8 @@ class TaskManager { uint32_t exit_code, std::optional reason); - std::string ParseFilePathPattern_(const std::string& path_pattern, - const std::string& cwd) const; + static std::string ParseFilePathPattern_(const std::string& path_pattern, + const std::string& cwd); void LaunchTaskInstance_(); CraneErr SpawnTaskInstance_(); CraneErr KillTaskInstance_(int signum); diff --git a/src/Utilities/PublicHeader/include/crane/PublicHeader.h b/src/Utilities/PublicHeader/include/crane/PublicHeader.h index 4cf91633e..0871d2953 100644 --- a/src/Utilities/PublicHeader/include/crane/PublicHeader.h +++ b/src/Utilities/PublicHeader/include/crane/PublicHeader.h @@ -57,6 +57,7 @@ enum class CraneErr : uint16_t { kProtobufError, kLibEventError, kNoAvailNode, + kSupervisorError, __ERR_SIZE // NOLINT(bugprone-reserved-identifier) }; @@ -88,6 +89,8 @@ inline const char* kDefaultCranedUnixSockPath = "craned/craned.sock"; inline const char* kDefaultCranedMutexFile = "craned/craned.lock"; inline const char* kDefaultCranedLogPath = "craned/craned.log"; +inline const char* kDefaultSupervisorUnixSockDir = "/tmp/crane"; + inline const char* kDefaultPlugindUnixSockPath = "cplugind/cplugind.sock"; constexpr uint64_t kTaskMinTimeLimitSec = 11;