From 96ddb45b29ba21d2079f225d4314ceb26984dc6d Mon Sep 17 00:00:00 2001 From: Li Junlin Date: Mon, 23 Dec 2024 15:16:27 +0800 Subject: [PATCH] feat: Craned fork,exec and init Supervisor. Signed-off-by: Li Junlin --- protos/Supervisor.proto | 1 - src/Craned/Craned/CranedPublicDefs.h | 2 + src/Craned/Craned/JobManager.cpp | 333 ++++++--------------------- src/Craned/Craned/JobManager.h | 39 +--- 4 files changed, 78 insertions(+), 297 deletions(-) diff --git a/protos/Supervisor.proto b/protos/Supervisor.proto index 6a330a1b..bc0b2357 100644 --- a/protos/Supervisor.proto +++ b/protos/Supervisor.proto @@ -31,7 +31,6 @@ message InitSupervisorRequest { uint32 task_id = 1; string debug_level = 2; string craned_unix_socket_path = 3; - bool ok = 4; } message SupervisorReady { diff --git a/src/Craned/Craned/CranedPublicDefs.h b/src/Craned/Craned/CranedPublicDefs.h index 3674aff3..cc5c7ca6 100644 --- a/src/Craned/Craned/CranedPublicDefs.h +++ b/src/Craned/Craned/CranedPublicDefs.h @@ -78,6 +78,8 @@ struct Config { std::string CranedScriptDir; std::string CranedUnixSockPath; + std::string SupervisorPath; + bool CranedForeground{}; std::string Hostname; diff --git a/src/Craned/Craned/JobManager.cpp b/src/Craned/Craned/JobManager.cpp index 84dbd444..7bf638f9 100644 --- a/src/Craned/Craned/JobManager.cpp +++ b/src/Craned/Craned/JobManager.cpp @@ -225,25 +225,20 @@ void JobManager::Wait() { if (m_uvw_thread_.joinable()) m_uvw_thread_.join(); } -CraneErr JobManager::KillProcessInstance_(const ProcessInstance* proc, - int signum) { +CraneErr JobManager::KillPid_(pid_t pid, int signum) { // Todo: Add timer which sends SIGTERM for those tasks who // will not quit when receiving SIGINT. - if (proc) { - CRANE_TRACE("Killing pid {} with signal {}", proc->GetPid(), signum); + CRANE_TRACE("Killing pid {} with signal {}", pid, signum); - // Send the signal to the whole process group. - int err = kill(-proc->GetPid(), signum); + // Send the signal to the whole process group. + int err = kill(-pid, signum); - if (err == 0) - return CraneErr::kOk; - else { - CRANE_TRACE("kill failed. error: {}", strerror(errno)); - return CraneErr::kGenericFailure; - } + if (err == 0) + return CraneErr::kOk; + else { + CRANE_TRACE("kill failed. error: {}", strerror(errno)); + return CraneErr::kGenericFailure; } - - return CraneErr::kNonExistent; } void JobManager::SetSigintCallback(std::function cb) { @@ -262,8 +257,13 @@ CraneErr JobManager::SpawnProcessInInstance_(TaskInstance* instance, int ctrl_sock_pair[2]; // Socket pair for passing control messages. - // Socket pair for forwarding IO of crun tasks. Craned read from index 0. - int crun_io_sock_pair[2]; + // int supervisor_ctrl_pipe[2]; // for init Supervisor + // + // // 创建管道 + // if (pipe(supervisor_ctrl_pipe) == -1) { + // CRANE_ERROR("Pipe creation failed!"); + // return CraneErr::kSystemErr; + // } // The ResourceInNode structure should be copied here for being accessed in // the child process. @@ -287,43 +287,7 @@ CraneErr JobManager::SpawnProcessInInstance_(TaskInstance* instance, // save the current uid/gid SavedPrivilege saved_priv{getuid(), getgid()}; - int rc = setegid(instance->pwd_entry.Gid()); - if (rc == -1) { - CRANE_ERROR("error: setegid. {}", strerror(errno)); - return CraneErr::kSystemErr; - } - __gid_t gid_a[1] = {instance->pwd_entry.Gid()}; - setgroups(1, gid_a); - rc = seteuid(instance->pwd_entry.Uid()); - if (rc == -1) { - CRANE_ERROR("error: seteuid. {}", strerror(errno)); - return CraneErr::kSystemErr; - } - - pid_t child_pid; - bool launch_pty{false}; - - if (instance->IsCrun()) { - auto* crun_meta = - dynamic_cast(instance->meta.get()); - launch_pty = instance->task.interactive_meta().pty(); - CRANE_DEBUG("Launch crun task #{} pty:{}", instance->task.task_id(), - launch_pty); - - if (launch_pty) { - child_pid = forkpty(&crun_meta->msg_fd, nullptr, nullptr, nullptr); - } else { - if (socketpair(AF_UNIX, SOCK_STREAM, 0, crun_io_sock_pair) != 0) { - CRANE_ERROR("Failed to create socket pair for task io forward: {}", - strerror(errno)); - return CraneErr::kSystemErr; - } - crun_meta->msg_fd = crun_io_sock_pair[0]; - child_pid = fork(); - } - } else { - child_pid = fork(); - } + pid_t child_pid = fork(); if (child_pid == -1) { CRANE_ERROR("fork() failed for task #{}: {}", instance->task.task_id(), @@ -332,26 +296,11 @@ CraneErr JobManager::SpawnProcessInInstance_(TaskInstance* instance, } if (child_pid > 0) { // Parent proc - process->SetPid(child_pid); CRANE_DEBUG("Subprocess was created for task #{} pid: {}", instance->task.task_id(), child_pid); - if (instance->IsCrun()) { - auto* meta = dynamic_cast(instance->meta.get()); - g_cfored_manager->RegisterIOForward( - instance->task.interactive_meta().cfored_name(), - instance->task.task_id(), meta->msg_fd, launch_pty); - } - int ctrl_fd = ctrl_sock_pair[0]; close(ctrl_sock_pair[1]); - if (instance->IsCrun() && !launch_pty) { - close(crun_io_sock_pair[1]); - } - - setegid(saved_priv.gid); - seteuid(saved_priv.uid); - setgroups(0, nullptr); bool ok; FileInputStream istream(ctrl_fd); @@ -359,24 +308,6 @@ CraneErr JobManager::SpawnProcessInInstance_(TaskInstance* instance, CanStartMessage msg; ChildProcessReady child_process_ready; - // Add event for stdout/stderr of the new subprocess - // struct bufferevent* ev_buf_event; - // ev_buf_event = - // bufferevent_socket_new(m_ev_base_, fd, BEV_OPT_CLOSE_ON_FREE); - // if (!ev_buf_event) { - // CRANE_ERROR( - // "Error constructing bufferevent for the subprocess of task #!", - // instance->task.task_id()); - // err = CraneErr::kLibEventError; - // goto AskChildToSuicide; - // } - // bufferevent_setcb(ev_buf_event, EvSubprocessReadCb_, nullptr, nullptr, - // (void*)process.get()); - // bufferevent_enable(ev_buf_event, EV_READ); - // bufferevent_disable(ev_buf_event, EV_WRITE); - // process->SetEvBufEvent(ev_buf_event); - - // Migrate the new subprocess to newly created cgroup if (!instance->cgroup->MigrateProcIn(child_pid)) { CRANE_ERROR( "Terminate the subprocess of task #{} due to failure of cgroup " @@ -412,7 +343,7 @@ CraneErr JobManager::SpawnProcessInInstance_(TaskInstance* instance, // The child process will be reaped in SIGCHLD handler and // thus only ONE TaskStatusChange will be triggered! instance->err_before_exec = CraneErr::kProtobufError; - KillProcessInstance_(process, SIGKILL); + KillPid_(child_pid, SIGKILL); return CraneErr::kOk; } @@ -429,7 +360,53 @@ CraneErr JobManager::SpawnProcessInInstance_(TaskInstance* instance, // See comments above. instance->err_before_exec = CraneErr::kProtobufError; - KillProcessInstance_(process, SIGKILL); + KillPid_(child_pid, SIGKILL); + return CraneErr::kOk; + } + + // Do Supervisor Init + crane::grpc::InitSupervisorRequest init_request; + init_request.set_debug_level("off"); + init_request.set_craned_unix_socket_path(g_config.CranedUnixSockPath); + init_request.set_task_id(instance->task.task_id()); + + ok = SerializeDelimitedToZeroCopyStream(init_request, &ostream); + if (!ok) { + CRANE_ERROR("Failed to serialize msg to ostream: {}", + strerror(ostream.GetErrno())); + } + + if (ok) ok &= ostream.Flush(); + if (!ok) { + CRANE_ERROR("Failed to send init msg to supervisor for task #{}: {}", + child_pid, instance->task.task_id(), + strerror(ostream.GetErrno())); + close(ctrl_fd); + + // Communication failure caused by process crash or grpc error. + // Since now the parent cannot ask the child + // process to commit suicide, kill child process here and just return. + // The child process will be reaped in SIGCHLD handler and + // thus only ONE TaskStatusChange will be triggered! + instance->err_before_exec = CraneErr::kProtobufError; + KillPid_(child_pid, SIGKILL); + return CraneErr::kOk; + } + + crane::grpc::SupervisorReady supervisor_ready; + ok = ParseDelimitedFromZeroCopyStream(&supervisor_ready, &istream, nullptr); + if (!ok || !msg.ok()) { + if (!ok) + CRANE_ERROR("Socket child endpoint failed: {}", + strerror(istream.GetErrno())); + if (!msg.ok()) + CRANE_ERROR("False from subprocess {} of task #{}", child_pid, + instance->task.task_id()); + close(ctrl_fd); + + // See comments above. + instance->err_before_exec = CraneErr::kProtobufError; + KillPid_(child_pid, SIGKILL); return CraneErr::kOk; } @@ -447,7 +424,7 @@ CraneErr JobManager::SpawnProcessInInstance_(TaskInstance* instance, // See comments above. instance->err_before_exec = CraneErr::kProtobufError; - KillProcessInstance_(process, SIGKILL); + KillPid_(child_pid, SIGKILL); } // See comments above. @@ -459,20 +436,6 @@ CraneErr JobManager::SpawnProcessInInstance_(TaskInstance* instance, // Disable SIGABRT backtrace from child processes. signal(SIGABRT, SIG_DFL); - const std::string& cwd = instance->task.cwd(); - rc = chdir(cwd.c_str()); - if (rc == -1) { - // CRANE_ERROR("[Child Process] Error: chdir to {}. {}", cwd.c_str(), - // strerror(errno)); - std::abort(); - } - - setreuid(instance->pwd_entry.Uid(), instance->pwd_entry.Uid()); - setregid(instance->pwd_entry.Gid(), instance->pwd_entry.Gid()); - - // Set pgid to the pid of task root process. - setpgid(0, 0); - close(ctrl_sock_pair[0]); int ctrl_fd = ctrl_sock_pair[1]; @@ -495,47 +458,6 @@ CraneErr JobManager::SpawnProcessInInstance_(TaskInstance* instance, std::abort(); } - if (instance->task.type() == crane::grpc::Batch) { - int stdout_fd, stderr_fd; - - const std::string& stdout_file_path = - process->batch_meta.parsed_output_file_pattern; - const std::string& stderr_file_path = - process->batch_meta.parsed_error_file_pattern; - - stdout_fd = - open(stdout_file_path.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0644); - if (stdout_fd == -1) { - // CRANE_ERROR("[Child Process] Error: open {}. {}", stdout_file_path, - // strerror(errno)); - std::abort(); - } - dup2(stdout_fd, 1); - - if (stderr_file_path.empty()) { - dup2(stdout_fd, 2); - } else { - stderr_fd = - open(stderr_file_path.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0644); - if (stderr_fd == -1) { - // CRANE_ERROR("[Child Process] Error: open {}. {}", stderr_file_path, - // strerror(errno)); - std::abort(); - } - dup2(stderr_fd, 2); // stderr -> error file - close(stderr_fd); - } - close(stdout_fd); - - } else if (instance->IsCrun() && !launch_pty) { - close(crun_io_sock_pair[0]); - - dup2(crun_io_sock_pair[1], 0); - dup2(crun_io_sock_pair[1], 1); - dup2(crun_io_sock_pair[1], 2); - close(crun_io_sock_pair[1]); - } - child_process_ready.set_ok(true); ok = SerializeDelimitedToZeroCopyStream(child_process_ready, &ostream); ok &= ostream.Flush(); @@ -544,12 +466,12 @@ CraneErr JobManager::SpawnProcessInInstance_(TaskInstance* instance, std::abort(); } - close(ctrl_fd); + // Message will send to stdin of Supervisor for its init. + dup2(ctrl_fd, STDIN_FILENO); // Close stdin for batch tasks. // If these file descriptors are not closed, a program like mpirun may // keep waiting for the input from stdin or other fds and will never end. - if (instance->task.type() == crane::grpc::Batch) close(0); util::os::CloseFdFrom(3); EnvMap task_env_map = instance->GetTaskEnvMap(); @@ -574,7 +496,7 @@ CraneErr JobManager::SpawnProcessInInstance_(TaskInstance* instance, std::vector argv; // Argv[0] is the program name which can be anything. - argv.emplace_back("CraneScript"); + argv.emplace_back("Supervisor"); if (instance->task.get_user_env()) { // If --get-user-env is specified, @@ -583,10 +505,7 @@ CraneErr JobManager::SpawnProcessInInstance_(TaskInstance* instance, argv.emplace_back("--login"); } - argv.emplace_back(process->GetExecPath().c_str()); - for (auto&& arg : process->GetArgList()) { - argv.push_back(arg.c_str()); - } + argv.emplace_back(g_config.SupervisorPath.c_str()); argv.push_back(nullptr); execv("/bin/bash", const_cast(argv.data())); @@ -644,13 +563,6 @@ void JobManager::EvCleanGrpcExecuteTaskQueueCb_() { continue; } - // Add a timer to limit the execution time of a task. - // Note: event_new and event_add in this function is not thread safe, - // so we move it outside the multithreading part. - int64_t sec = instance->task.time_limit().seconds(); - AddTerminationTimer_(instance, sec); - CRANE_TRACE("Add a timer of {} seconds for task #{}", sec, task_id); - g_thread_pool->detach_task( [this, instance]() { LaunchTaskInstanceMt_(instance); }); } @@ -697,59 +609,8 @@ void JobManager::LaunchTaskInstanceMt_(TaskInstance* instance) { // Calloc tasks have no scripts to run. Just return. if (instance->IsCalloc()) return; - instance->meta->parsed_sh_script_path = - fmt::format("{}/Crane-{}.sh", g_config.CranedScriptDir, task_id); - auto& sh_path = instance->meta->parsed_sh_script_path; - - FILE* fptr = fopen(sh_path.c_str(), "w"); - if (fptr == nullptr) { - CRANE_ERROR("Failed write the script for task #{}", task_id); - ActivateTaskStatusChangeAsync_( - task_id, crane::grpc::TaskStatus::Failed, - ExitCode::kExitCodeFileNotFound, - fmt::format("Cannot write shell script for batch task #{}", task_id)); - return; - } - - if (instance->task.type() == crane::grpc::Batch) - fputs(instance->task.batch_meta().sh_script().c_str(), fptr); - else // Crun - fputs(instance->task.interactive_meta().sh_script().c_str(), fptr); - - fclose(fptr); - - chmod(sh_path.c_str(), strtol("0755", nullptr, 8)); - auto process = - std::make_unique(sh_path, std::list()); - - // Prepare file output name for batch tasks. - if (instance->task.type() == crane::grpc::Batch) { - /* Perform file name substitutions - * %j - Job ID - * %u - Username - * %x - Job name - */ - process->batch_meta.parsed_output_file_pattern = - ParseFilePathPattern_(instance->task.batch_meta().output_file_pattern(), - instance->task.cwd(), task_id); - absl::StrReplaceAll({{"%j", std::to_string(task_id)}, - {"%u", instance->pwd_entry.Username()}, - {"%x", instance->task.name()}}, - &process->batch_meta.parsed_output_file_pattern); - - // If -e / --error is not defined, leave - // batch_meta.parsed_error_file_pattern empty; - if (!instance->task.batch_meta().error_file_pattern().empty()) { - process->batch_meta.parsed_error_file_pattern = ParseFilePathPattern_( - instance->task.batch_meta().error_file_pattern(), - instance->task.cwd(), task_id); - absl::StrReplaceAll({{"%j", std::to_string(task_id)}, - {"%u", instance->pwd_entry.Username()}, - {"%x", instance->task.name()}}, - &process->batch_meta.parsed_error_file_pattern); - } - } + std::make_unique(instance->task); // err will NOT be kOk ONLY if fork() is not called due to some failure // or fork() fails. @@ -899,36 +760,6 @@ void JobManager::EvCleanGrpcQueryTaskIdFromPidQueueCb_() { } } -void JobManager::EvTaskTimerCb_(task_id_t task_id) { - CRANE_TRACE("Task #{} exceeded its time limit. Terminating it...", task_id); - - // Sometimes, task finishes just before time limit. - // After the execution of SIGCHLD callback where the task has been erased, - // the timer is triggered immediately. - // That's why we need to check the existence of the task again in timer - // callback, otherwise a segmentation fault will occur. - auto task_it = m_task_map_.find(task_id); - if (task_it == m_task_map_.end()) { - CRANE_TRACE("Task #{} has already been removed."); - return; - } - - TaskInstance* task_instance = task_it->second.get(); - DelTerminationTimer_(task_instance); - - if (task_instance->task.type() == crane::grpc::Batch) { - TaskTerminateQueueElem ev_task_terminate{ - .task_id = task_id, - .terminated_by_timeout = true, - }; - m_task_terminate_queue_.enqueue(ev_task_terminate); - m_terminate_task_async_handle_->send(); - } else { - ActivateTaskStatusChangeAsync_( - task_id, crane::grpc::TaskStatus::ExceedTimeLimit, - ExitCode::kExitCodeExceedTimeLimit, std::nullopt); - } -} void JobManager::EvCleanTerminateTaskQueueCb_() { TaskTerminateQueueElem elem; @@ -1008,29 +839,9 @@ void JobManager::EvCleanChangeTaskTimeLimitQueueCb_() { ChangeTaskTimeLimitQueueElem elem; while (m_task_time_limit_change_queue_.try_dequeue(elem)) { - // todo: Forward Rpc to Supervisor auto iter = m_task_map_.find(elem.task_id); if (iter != m_task_map_.end()) { - TaskInstance* task_instance = iter->second.get(); - DelTerminationTimer_(task_instance); - - absl::Time start_time = - absl::FromUnixSeconds(task_instance->task.start_time().seconds()); - absl::Duration const& new_time_limit = elem.time_limit; - - if (now - start_time >= new_time_limit) { - // If the task times out, terminate it. - TaskTerminateQueueElem ev_task_terminate{.task_id = elem.task_id, - .terminated_by_timeout = true}; - m_task_terminate_queue_.enqueue(ev_task_terminate); - m_terminate_task_async_handle_->send(); - - } else { - // If the task haven't timed out, set up a new timer. - AddTerminationTimer_( - task_instance, - ToInt64Seconds((new_time_limit - (absl::Now() - start_time)))); - } + // todo: Forward Rpc to Supervisor elem.ok_prom.set_value(true); } else { CRANE_ERROR("Try to update the time limit of a non-existent task #{}.", diff --git a/src/Craned/Craned/JobManager.h b/src/Craned/Craned/JobManager.h index 75994f5a..088ee029 100644 --- a/src/Craned/Craned/JobManager.h +++ b/src/Craned/Craned/JobManager.h @@ -36,32 +36,10 @@ struct BatchMetaInProcessInstance { class ProcessInstance { public: - ProcessInstance(std::string exec_path, std::list arg_list) - : m_executive_path_(std::move(exec_path)), - m_arguments_(std::move(arg_list)), - m_pid_(0) {} - - ~ProcessInstance() = default; - - [[nodiscard]] const std::string& GetExecPath() const { - return m_executive_path_; - } - [[nodiscard]] const std::list& GetArgList() const { - return m_arguments_; - } - - void SetPid(pid_t pid) { m_pid_ = pid; } - [[nodiscard]] pid_t GetPid() const { return m_pid_; } - - BatchMetaInProcessInstance batch_meta; + ProcessInstance(const crane::grpc::TaskToD& task) : m_task_(task) {}; private: - /* ------------- Fields set by SpawnProcessInInstance_ ---------------- */ - pid_t m_pid_; - - /* ------- Fields set by the caller of SpawnProcessInInstance_ -------- */ - std::string m_executive_path_; - std::list m_arguments_; + crane::grpc::TaskToD m_task_; }; struct MetaInTaskInstance { @@ -80,15 +58,7 @@ struct CrunMetaInTaskInstance : MetaInTaskInstance { // Todo: Task may consists of multiple subtasks struct TaskInstance { - ~TaskInstance() { - if (termination_timer) { - termination_timer->close(); - } - - if (this->IsCrun()) { - close(dynamic_cast(meta.get())->msg_fd); - } - } + ~TaskInstance() = default; bool IsCrun() const; bool IsCalloc() const; @@ -101,7 +71,6 @@ struct TaskInstance { std::string cgroup_path; CgroupInterface* cgroup; - std::shared_ptr termination_timer{nullptr}; // Task execution results bool orphaned{false}; @@ -232,7 +201,7 @@ class JobManager { * if the signal is invalid, kInvalidParam is returned. * otherwise, kGenericFailure is returned. */ - static CraneErr KillProcessInstance_(const ProcessInstance* proc, int signum); + static CraneErr KillPid_(pid_t pid, int signum); // Note: the three maps below are NOT protected by any mutex. // They should be modified in libev callbacks to avoid races.