diff --git a/protos/Supervisor.proto b/protos/Supervisor.proto index 48888a04b..6a330a1b7 100644 --- a/protos/Supervisor.proto +++ b/protos/Supervisor.proto @@ -44,6 +44,7 @@ message TaskExecutionRequest{ message TaskExecutionReply{ bool ok = 1; + int32 pid = 2; } message TerminateRequest{ diff --git a/src/Craned/Craned/CMakeLists.txt b/src/Craned/Craned/CMakeLists.txt index f85ac8dea..b7a1290f8 100644 --- a/src/Craned/Craned/CMakeLists.txt +++ b/src/Craned/Craned/CMakeLists.txt @@ -12,7 +12,9 @@ add_executable(craned CranedPreCompiledHeader.h DeviceManager.cpp - DeviceManager.h) + DeviceManager.h + SupervisorKeeper.cpp + SupervisorKeeper.h) target_precompile_headers(craned PRIVATE CranedPreCompiledHeader.h) add_dependencies(craned libcgroup) diff --git a/src/Craned/Craned/JobManager.cpp b/src/Craned/Craned/JobManager.cpp index 44f951d2f..1a60e9600 100644 --- a/src/Craned/Craned/JobManager.cpp +++ b/src/Craned/Craned/JobManager.cpp @@ -141,12 +141,6 @@ JobManager::JobManager() { EvCleanGrpcExecuteTaskQueueCb_(); }); - m_process_sigchld_async_handle_ = m_uvw_loop_->resource(); - m_process_sigchld_async_handle_->on( - [this](const uvw::async_event&, uvw::async_handle&) { - EvCleanSigchldQueueCb_(); - }); - // Task Status Change Event m_task_status_change_async_handle_ = m_uvw_loop_->resource(); @@ -180,7 +174,7 @@ JobManager::JobManager() { auto idle_handle = m_uvw_loop_->resource(); idle_handle->on( [this](const uvw::idle_event&, uvw::idle_handle& h) { - if (m_task_cleared_) { + if (m_is_ending_now_) { h.parent().walk([](auto&& h) { h.close(); }); h.parent().stop(); } @@ -203,71 +197,6 @@ const TaskInstance* JobManager::FindInstanceByTaskId_(uint32_t task_id) { return iter->second.get(); } -void JobManager::TaskStopAndDoStatusChangeAsync(uint32_t task_id) { - auto it = m_task_map_.find(task_id); - if (it == m_task_map_.end()) { - CRANE_ERROR("Task #{} not found in TaskStopAndDoStatusChangeAsync.", - task_id); - return; - } - TaskInstance* instance = it->second.get(); - - CRANE_INFO("Task #{} stopped and is doing TaskStatusChange...", task_id); - - switch (instance->err_before_exec) { - case CraneErr::kProtobufError: - ActivateTaskStatusChangeAsync_(task_id, crane::grpc::TaskStatus::Failed, - ExitCode::kExitCodeSpawnProcessFail, - std::nullopt); - break; - - case CraneErr::kCgroupError: - ActivateTaskStatusChangeAsync_(task_id, crane::grpc::TaskStatus::Failed, - ExitCode::kExitCodeCgroupError, - std::nullopt); - break; - - default: - break; - } - - ProcSigchldInfo& sigchld_info = instance->sigchld_info; - if (instance->task.type() == crane::grpc::Batch || instance->IsCrun()) { - // For a Batch task, the end of the process means it is done. - if (sigchld_info.is_terminated_by_signal) { - if (instance->cancelled_by_user) - ActivateTaskStatusChangeAsync_( - task_id, crane::grpc::TaskStatus::Cancelled, - sigchld_info.value + ExitCode::kTerminationSignalBase, - std::nullopt); - else if (instance->terminated_by_timeout) - ActivateTaskStatusChangeAsync_( - task_id, crane::grpc::TaskStatus::ExceedTimeLimit, - sigchld_info.value + ExitCode::kTerminationSignalBase, - std::nullopt); - else - ActivateTaskStatusChangeAsync_( - task_id, crane::grpc::TaskStatus::Failed, - sigchld_info.value + ExitCode::kTerminationSignalBase, - std::nullopt); - } else - ActivateTaskStatusChangeAsync_(task_id, - crane::grpc::TaskStatus::Completed, - sigchld_info.value, std::nullopt); - } else /* Calloc */ { - // For a COMPLETING Calloc task with a process running, - // the end of this process means that this task is done. - if (sigchld_info.is_terminated_by_signal) - ActivateTaskStatusChangeAsync_( - task_id, crane::grpc::TaskStatus::Completed, - sigchld_info.value + ExitCode::kTerminationSignalBase, std::nullopt); - else - ActivateTaskStatusChangeAsync_(task_id, - crane::grpc::TaskStatus::Completed, - sigchld_info.value, std::nullopt); - } -} - void JobManager::EvSigchldCb_() { assert(m_instance_ptr_->m_instance_ptr_ != nullptr); @@ -278,42 +207,9 @@ void JobManager::EvSigchldCb_() { /* TODO(More status tracing): | WUNTRACED | WCONTINUED */); if (pid > 0) { - auto sigchld_info = std::make_unique(); - - if (WIFEXITED(status)) { - // Exited with status WEXITSTATUS(status) - sigchld_info->pid = pid; - sigchld_info->is_terminated_by_signal = false; - sigchld_info->value = WEXITSTATUS(status); - - CRANE_TRACE("Receiving SIGCHLD for pid {}. Signaled: false, Status: {}", - pid, WEXITSTATUS(status)); - } else if (WIFSIGNALED(status)) { - // Killed by signal WTERMSIG(status) - sigchld_info->pid = pid; - sigchld_info->is_terminated_by_signal = true; - sigchld_info->value = WTERMSIG(status); - - CRANE_TRACE("Receiving SIGCHLD for pid {}. Signaled: true, Signal: {}", - pid, WTERMSIG(status)); - } - /* Todo(More status tracing): - else if (WIFSTOPPED(status)) { - printf("stopped by signal %d\n", WSTOPSIG(status)); - } else if (WIFCONTINUED(status)) { - printf("continued\n"); - } */ - m_sigchld_queue_.enqueue(std::move(sigchld_info)); - m_process_sigchld_async_handle_->send(); + // We do nothing now } else if (pid == 0) { // There's no child that needs reaping. - // If Craned is exiting, check if there's any task remaining. - // If there's no task running, just stop the loop of JobManager. - if (m_is_ending_now_) { - if (m_task_map_.empty()) { - ActivateShutdownAsync_(); - } - } break; } else if (pid < 0) { if (errno != ECHILD) @@ -323,160 +219,7 @@ void JobManager::EvSigchldCb_() { } } -void JobManager::EvCleanSigchldQueueCb_() { - std::unique_ptr sigchld_info; - while (m_sigchld_queue_.try_dequeue(sigchld_info)) { - auto pid = sigchld_info->pid; - - if (sigchld_info->resend_timer != nullptr) { - sigchld_info->resend_timer->close(); - sigchld_info->resend_timer.reset(); - } - - m_mtx_.Lock(); - auto task_iter = m_pid_task_map_.find(pid); - auto proc_iter = m_pid_proc_map_.find(pid); - - if (task_iter == m_pid_task_map_.end() || - proc_iter == m_pid_proc_map_.end()) { - m_mtx_.Unlock(); - - auto* sigchld_info_raw_ptr = sigchld_info.release(); - sigchld_info_raw_ptr->resend_timer = - m_uvw_loop_->resource(); - sigchld_info_raw_ptr->resend_timer->on( - [this, sigchld_info_raw_ptr](const uvw::timer_event&, - uvw::timer_handle&) { - EvSigchldTimerCb_(sigchld_info_raw_ptr); - }); - sigchld_info_raw_ptr->resend_timer->start( - std::chrono::milliseconds(kEvSigChldResendMs), - std::chrono::milliseconds(0)); - CRANE_TRACE("Child Process {} exit too early, will do SigchldCb later", - pid); - continue; - } - - TaskInstance* instance = task_iter->second; - ProcessInstance* proc = proc_iter->second; - uint32_t task_id = instance->task.task_id(); - - // Remove indexes from pid to ProcessInstance* - m_pid_proc_map_.erase(proc_iter); - m_pid_task_map_.erase(task_iter); - - m_mtx_.Unlock(); - - instance->sigchld_info = *sigchld_info; - - // Free the ProcessInstance. ITask struct is not freed here because - // the ITask for an Interactive task can have no ProcessInstance. - auto pr_it = instance->processes.find(pid); - if (pr_it == instance->processes.end()) { - CRANE_ERROR("Failed to find pid {} in task #{}'s ProcessInstances", pid, - task_id); - } else { - instance->processes.erase(pr_it); - - if (!instance->processes.empty()) { - if (sigchld_info->is_terminated_by_signal) { - // If a task is terminated by a signal and there are other - // running processes belonging to this task, kill them. - TerminateTaskAsync(task_id); - } - } else { - if (instance->IsCrun()) - // TaskStatusChange of a crun task is triggered in - // CforedManager. - g_cfored_manager->TaskProcOnCforedStopped( - instance->task.interactive_meta().cfored_name(), - instance->task.task_id()); - else /* Batch / Calloc */ { - // If the ProcessInstance has no process left, - // send TaskStatusChange for this task. - // See the comment of EvActivateTaskStatusChange_. - TaskStopAndDoStatusChangeAsync(task_id); - } - } - } - } -} - -void JobManager::EvSigchldTimerCb_(ProcSigchldInfo* sigchld_info) { - m_sigchld_queue_.enqueue(std::unique_ptr(sigchld_info)); - m_process_sigchld_async_handle_->send(); -} - -void JobManager::EvSigintCb_() { - if (!m_is_ending_now_) { - // SIGINT has been sent once. If SIGINT are captured twice, it indicates - // the signal sender can't wait to stop Craned and Craned just send SIGTERM - // to all tasks to kill them immediately. - - CRANE_INFO("Caught SIGINT. Send SIGTERM to all running tasks..."); - - m_is_ending_now_ = true; - - if (m_sigint_cb_) m_sigint_cb_(); - - for (auto task_it = m_task_map_.begin(); task_it != m_task_map_.end();) { - task_id_t task_id = task_it->first; - TaskInstance* task_instance = task_it->second.get(); - - if (task_instance->task.type() == crane::grpc::Batch || - task_instance->IsCrun()) { - for (auto&& [pid, pr_instance] : task_instance->processes) { - CRANE_INFO( - "Sending SIGINT to the process group of task #{} with root " - "process pid {}", - task_id, pr_instance->GetPid()); - KillProcessInstance_(pr_instance.get(), SIGKILL); - } - task_it++; - } else { - // Kill all process of a calloc task and just remove it from the - // task map. - CRANE_DEBUG("Cleaning Calloc task #{}...", - task_instance->task.task_id()); - - // Todo: Performance issue! - task_instance->cgroup->KillAllProcesses(); - - auto to_remove_it = task_it++; - m_task_map_.erase(to_remove_it); - } - } - - if (m_task_map_.empty()) { - // If there is not any batch task to wait for, stop the loop directly. - ActivateShutdownAsync_(); - } - } else { - CRANE_INFO( - "SIGINT has been triggered already. Sending SIGKILL to all process " - "groups instead."); - if (m_task_map_.empty()) { - // If there is no task to kill, stop the loop directly. - ActivateShutdownAsync_(); - } else { - for (auto&& [task_id, task_instance] : m_task_map_) { - for (auto&& [pid, pr_instance] : task_instance->processes) { - CRANE_INFO( - "Sending SIGKILL to the process group of task #{} with root " - "process pid {}", - task_id, pr_instance->GetPid()); - KillProcessInstance_(pr_instance.get(), SIGKILL); - } - } - } - } -} - -void JobManager::ActivateShutdownAsync_() { - CRANE_TRACE("Triggering exit event..."); - CRANE_ASSERT(m_is_ending_now_ == true); - m_task_cleared_ = true; -} +void JobManager::EvSigintCb_() { m_is_ending_now_ = true; } void JobManager::Wait() { if (m_uvw_thread_.joinable()) m_uvw_thread_.join(); @@ -1091,15 +834,6 @@ void JobManager::EvCleanTaskStatusChangeQueueCb_() { if (!orphaned) g_ctld_client->TaskStatusChangeAsync(std::move(status_change)); } - - // Todo: Add additional timer to check periodically whether all children - // have exited. - if (m_is_ending_now_ && m_task_map_.empty()) { - CRANE_TRACE( - "Craned is ending and all tasks have been reaped. " - "Stop event loop."); - ActivateShutdownAsync_(); - } } void JobManager::ActivateTaskStatusChangeAsync_( diff --git a/src/Craned/Craned/JobManager.h b/src/Craned/Craned/JobManager.h index a438db8c0..686210dde 100644 --- a/src/Craned/Craned/JobManager.h +++ b/src/Craned/Craned/JobManager.h @@ -78,15 +78,6 @@ struct CrunMetaInTaskInstance : MetaInTaskInstance { ~CrunMetaInTaskInstance() override = default; }; -// also arg for EvSigchldTimerCb_ -struct ProcSigchldInfo { - pid_t pid; - bool is_terminated_by_signal; - int value; - - std::shared_ptr resend_timer{nullptr}; -}; - // Todo: Task may consists of multiple subtasks struct TaskInstance { ~TaskInstance() { @@ -117,7 +108,6 @@ struct TaskInstance { CraneErr err_before_exec{CraneErr::kOk}; bool cancelled_by_user{false}; bool terminated_by_timeout{false}; - ProcSigchldInfo sigchld_info{}; absl::flat_hash_map> processes; }; @@ -140,16 +130,18 @@ class JobManager { CraneExpected QueryTaskEnvMapAsync(task_id_t task_id); + // todo: Send Rpc to Supervisor void TerminateTaskAsync(uint32_t task_id); + // todo: Send Rpc to Supervisor void MarkTaskAsOrphanedAndTerminateAsync(task_id_t task_id); + // todo: Send Rpc to Supervisor bool CheckTaskStatusAsync(task_id_t task_id, crane::grpc::TaskStatus* status); + // todo: Send Rpc to Supervisor bool ChangeTaskTimeLimitAsync(task_id_t task_id, absl::Duration time_limit); - void TaskStopAndDoStatusChangeAsync(uint32_t task_id); - // Wait internal libevent base loop to exit... void Wait(); @@ -199,20 +191,20 @@ class JobManager { std::promise> status_prom; }; + // todo: Move to Supervisor static std::string ParseFilePathPattern_(const std::string& path_pattern, const std::string& cwd, task_id_t task_id); void LaunchTaskInstanceMt_(TaskInstance* instance); + // todo: Move to Supervisor CraneErr SpawnProcessInInstance_(TaskInstance* instance, ProcessInstance* process); const TaskInstance* FindInstanceByTaskId_(uint32_t task_id); - // Ask JobManager to stop its event loop. - void ActivateShutdownAsync_(); - + // todo: use grpc struct for params /** * Inform CraneCtld of the status change of a task. * This method is called when the status of a task is changed: @@ -230,6 +222,7 @@ class JobManager { uint32_t exit_code, std::optional reason); + // todo: Move timer to Supervisor template void AddTerminationTimer_(TaskInstance* instance, Duration duration) { auto termination_handel = m_uvw_loop_->resource(); @@ -262,6 +255,7 @@ class JobManager { instance->termination_timer.reset(); } + // todo: Refactor this, send rpc to supervisor /** * Send a signal to the process group to which the processes in * ProcessInstance belongs. @@ -279,8 +273,7 @@ class JobManager { // They should be modified in libev callbacks to avoid races. // Contains all the task that is running on this Craned node. - absl::flat_hash_map> - m_task_map_; + absl::flat_hash_map> m_task_map_; // ================================================================== // Critical data region starts @@ -295,9 +288,9 @@ class JobManager { // The two following maps are used as indexes // and doesn't have the ownership of underlying objects. // A TaskInstance may contain more than one ProcessInstance. - absl::flat_hash_map m_pid_task_map_ + absl::flat_hash_map m_pid_task_map_ ABSL_GUARDED_BY(m_mtx_); - absl::flat_hash_map m_pid_proc_map_ + absl::flat_hash_map m_pid_proc_map_ ABSL_GUARDED_BY(m_mtx_); absl::Mutex m_mtx_; @@ -307,8 +300,6 @@ class JobManager { void EvSigchldCb_(); - void EvCleanSigchldQueueCb_(); - // Callback function to handle SIGINT sent by Ctrl+C void EvSigintCb_(); @@ -328,8 +319,6 @@ class JobManager { void EvTaskTimerCb_(task_id_t task_id); - void EvSigchldTimerCb_(ProcSigchldInfo* sigchld_info); - std::shared_ptr m_uvw_loop_; std::shared_ptr m_sigchld_handle_; @@ -350,9 +339,6 @@ class JobManager { // A custom event that handles the ExecuteTask RPC. ConcurrentQueue> m_grpc_execute_task_queue_; - std::shared_ptr m_process_sigchld_async_handle_; - ConcurrentQueue> m_sigchld_queue_; - std::shared_ptr m_task_status_change_async_handle_; ConcurrentQueue m_task_status_change_queue_; diff --git a/src/Craned/Craned/SupervisorKeeper.cpp b/src/Craned/Craned/SupervisorKeeper.cpp new file mode 100644 index 000000000..4dea89f52 --- /dev/null +++ b/src/Craned/Craned/SupervisorKeeper.cpp @@ -0,0 +1,19 @@ +/** +* Copyright (c) 2024 Peking University and Peking University + * Changsha Institute for Computing and Digital Economy + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "SupervisorKeeper.h" diff --git a/src/Craned/Craned/SupervisorKeeper.h b/src/Craned/Craned/SupervisorKeeper.h new file mode 100644 index 000000000..4726a9601 --- /dev/null +++ b/src/Craned/Craned/SupervisorKeeper.h @@ -0,0 +1,39 @@ +/** + * Copyright (c) 2024 Peking University and Peking University + * Changsha Institute for Computing and Digital Economy + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#pragma once + +#include "CranedPublicDefs.h" +// Precompiled header comes first. + +#include "crane/AtomicHashMap.h" +namespace Craned { +class SupervisorClient { + + //todo: Rpc wrap func +}; + +class SupervisorKeeper { + private: + util::AtomicHashMap> + m_supervisor_map; + + +}; +} // namespace Craned \ No newline at end of file diff --git a/src/Craned/Supervisor/CranedClient.cpp b/src/Craned/Supervisor/CranedClient.cpp index 08daa258d..1d85e0d7e 100644 --- a/src/Craned/Supervisor/CranedClient.cpp +++ b/src/Craned/Supervisor/CranedClient.cpp @@ -31,8 +31,7 @@ void CranedClient::InitChannelAndStub(const std::string& endpoint) { m_stub_ = crane::grpc::Craned::NewStub(m_channel_); } -void CranedClient::TaskStatusChange(uint32_t task_id, - crane::grpc::TaskStatus new_status, +void CranedClient::TaskStatusChange(crane::grpc::TaskStatus new_status, uint32_t exit_code, std::optional reason) { ClientContext context; @@ -43,7 +42,7 @@ void CranedClient::TaskStatusChange(uint32_t task_id, crane::grpc::TaskStatusChangeReply reply; request.set_reason(reason.value_or("")); request.set_exit_code(exit_code); - request.set_task_id(task_id); + request.set_task_id(g_config.TaskId); request.set_new_status(new_status); m_stub_->TaskStatusChange(&context, request, &reply); diff --git a/src/Craned/Supervisor/CranedClient.h b/src/Craned/Supervisor/CranedClient.h index e6bd7e6f3..39a0f3d55 100644 --- a/src/Craned/Supervisor/CranedClient.h +++ b/src/Craned/Supervisor/CranedClient.h @@ -27,8 +27,8 @@ namespace Supervisor { class CranedClient { public: void InitChannelAndStub(const std::string& endpoint); - void TaskStatusChange(uint32_t task_id, crane::grpc::TaskStatus new_status, - uint32_t exit_code, std::optional reason); + void TaskStatusChange(crane::grpc::TaskStatus new_status, uint32_t exit_code, + std::optional reason); private: std::shared_ptr m_channel_; diff --git a/src/Craned/Supervisor/Supervisor.cpp b/src/Craned/Supervisor/Supervisor.cpp index cc13fdfed..39943c227 100644 --- a/src/Craned/Supervisor/Supervisor.cpp +++ b/src/Craned/Supervisor/Supervisor.cpp @@ -95,7 +95,7 @@ void InitFromStdin(int argc, char** argv) { } } -bool CreatePidFile() { +void CreatePidFile() { pid_t pid = getpid(); auto pid_file_path = Supervisor::kSupervisorPidFileDir / @@ -140,9 +140,18 @@ void CreateRequiredDirectories() { void GlobalVariableInit() { CreateRequiredDirectories(); + // Ignore following sig + signal(SIGINT, SIG_IGN); + signal(SIGTERM, SIG_IGN); + signal(SIGTSTP, SIG_IGN); + signal(SIGQUIT, SIG_IGN); // Mask SIGPIPE to prevent Supervisor from crushing due to // SIGPIPE while communicating with spawned task processes. signal(SIGPIPE, SIG_IGN); + signal(SIGUSR1, SIG_IGN); + signal(SIGUSR2, SIG_IGN); + signal(SIGALRM, SIG_IGN); + signal(SIGHUP, SIG_IGN); CreatePidFile(); @@ -154,6 +163,7 @@ void GlobalVariableInit() { g_craned_client = std::make_unique(); g_craned_client->InitChannelAndStub(g_config.CranedUnixSocketPath); + if (g_config.Plugin.Enabled) { CRANE_INFO("[Plugin] Plugin module is enabled."); g_plugin_client = std::make_unique(); @@ -188,7 +198,6 @@ void StartServer() { // Set FD_CLOEXEC on stdin, stdout, stderr util::os::SetCloseOnExecOnFdRange(STDIN_FILENO, STDERR_FILENO + 1); - util::os::CheckProxyEnvironmentVariable(); g_server->Wait(); g_task_mgr->Wait(); diff --git a/src/Craned/Supervisor/TaskManager.cpp b/src/Craned/Supervisor/TaskManager.cpp index 2f4b74a3e..8b14e0073 100644 --- a/src/Craned/Supervisor/TaskManager.cpp +++ b/src/Craned/Supervisor/TaskManager.cpp @@ -18,11 +18,16 @@ #include "TaskManager.h" +#include +#include +#include #include +#include "../Craned/JobManager.h" #include "CforedClient.h" #include "CranedClient.h" #include "crane/String.h" +#include "protos/Supervisor.grpc.pb.h" namespace Supervisor { @@ -142,8 +147,7 @@ void TaskManager::ActivateTaskStatusChange_(crane::grpc::TaskStatus new_status, // Free the TaskInstance structure m_process_.release(); if (!orphaned) - g_craned_client->TaskStatusChange(m_process_->task.task_id(), new_status, - exit_code, reason); + g_craned_client->TaskStatusChange(new_status, exit_code, reason); } void TaskManager::EvSigchldCb_() { @@ -205,4 +209,304 @@ void TaskManager::EvSigchldCb_() { } } +CraneErr TaskManager::SpawnTaskInstance_() { + using google::protobuf::io::FileInputStream; + using google::protobuf::io::FileOutputStream; + using google::protobuf::util::ParseDelimitedFromZeroCopyStream; + using google::protobuf::util::SerializeDelimitedToZeroCopyStream; + + using crane::grpc::CanStartMessage; + using crane::grpc::ChildProcessReady; + + auto* instance = m_process_.get(); + + int ctrl_sock_pair[2]; // Socket pair for passing control messages. + + // Socket pair for forwarding IO of crun tasks. Craned read from index 0. + int crun_io_sock_pair[2]; + + // The ResourceInNode structure should be copied here for being accessed in + // the child process. + // Note that CgroupManager acquires a lock for this. + // If the lock is held in the parent process during fork, the forked thread in + // the child proc will block forever. + // That's why we should copy it here and the child proc should not hold any + // lock. + + if (socketpair(AF_UNIX, SOCK_STREAM, 0, ctrl_sock_pair) != 0) { + CRANE_ERROR("Failed to create socket pair: {}", strerror(errno)); + return CraneErr::kSystemErr; + } + + // save the current uid/gid + SavedPrivilege saved_priv{getuid(), getgid()}; + + int rc = setegid(instance->pwd_entry.Gid()); + if (rc == -1) { + CRANE_ERROR("error: setegid. {}", strerror(errno)); + return CraneErr::kSystemErr; + } + __gid_t gid_a[1] = {instance->pwd_entry.Gid()}; + setgroups(1, gid_a); + rc = seteuid(instance->pwd_entry.Uid()); + if (rc == -1) { + CRANE_ERROR("error: seteuid. {}", strerror(errno)); + return CraneErr::kSystemErr; + } + + pid_t child_pid; + bool launch_pty{false}; + + if (instance->IsCrun()) { + auto* crun_meta = + dynamic_cast(instance->meta.get()); + launch_pty = instance->task.interactive_meta().pty(); + CRANE_DEBUG("Launch crun task #{} pty:{}", instance->task.task_id(), + launch_pty); + + if (launch_pty) { + child_pid = forkpty(&crun_meta->msg_fd, nullptr, nullptr, nullptr); + } else { + if (socketpair(AF_UNIX, SOCK_STREAM, 0, crun_io_sock_pair) != 0) { + CRANE_ERROR("Failed to create socket pair for task io forward: {}", + strerror(errno)); + return CraneErr::kSystemErr; + } + crun_meta->msg_fd = crun_io_sock_pair[0]; + child_pid = fork(); + } + } else { + child_pid = fork(); + } + + if (child_pid == -1) { + CRANE_ERROR("fork() failed for task #{}: {}", instance->task.task_id(), + strerror(errno)); + return CraneErr::kSystemErr; + } + + if (child_pid > 0) { // Parent proc + instance->SetPid(child_pid); + CRANE_DEBUG("Subprocess was created for task #{} pid: {}", + instance->task.task_id(), child_pid); + + if (instance->IsCrun()) { + auto* meta = dynamic_cast(instance->meta.get()); + g_cfored_manager->RegisterIOForward( + instance->task.interactive_meta().cfored_name(), meta->msg_fd, + launch_pty); + } + + int ctrl_fd = ctrl_sock_pair[0]; + close(ctrl_sock_pair[1]); + if (instance->IsCrun() && !launch_pty) { + close(crun_io_sock_pair[1]); + } + + setegid(saved_priv.gid); + seteuid(saved_priv.uid); + setgroups(0, nullptr); + + bool ok; + FileInputStream istream(ctrl_fd); + FileOutputStream ostream(ctrl_fd); + CanStartMessage msg; + ChildProcessReady child_process_ready; + + // Migrate the new subprocess to newly created cgroup + + CRANE_TRACE("New task #{} is ready. Asking subprocess to execv...", + instance->task.task_id()); + + // Tell subprocess that the parent process is ready. Then the + // subprocess should continue to exec(). + msg.set_ok(true); + ok = SerializeDelimitedToZeroCopyStream(msg, &ostream); + if (!ok) { + CRANE_ERROR("Failed to serialize msg to ostream: {}", + strerror(ostream.GetErrno())); + } + + if (ok) ok &= ostream.Flush(); + if (!ok) { + CRANE_ERROR("Failed to send ok=true to subprocess {} for task #{}: {}", + child_pid, instance->task.task_id(), + strerror(ostream.GetErrno())); + close(ctrl_fd); + + // Communication failure caused by process crash or grpc error. + // Since now the parent cannot ask the child + // process to commit suicide, kill child process here and just return. + // The child process will be reaped in SIGCHLD handler and + // thus only ONE TaskStatusChange will be triggered! + instance->err_before_exec = CraneErr::kProtobufError; + KillTaskInstance_(SIGKILL); + return CraneErr::kOk; + } + + ok = ParseDelimitedFromZeroCopyStream(&child_process_ready, &istream, + nullptr); + if (!ok || !msg.ok()) { + if (!ok) + CRANE_ERROR("Socket child endpoint failed: {}", + strerror(istream.GetErrno())); + if (!msg.ok()) + CRANE_ERROR("False from subprocess {} of task #{}", child_pid, + instance->task.task_id()); + close(ctrl_fd); + + // See comments above. + instance->err_before_exec = CraneErr::kProtobufError; + KillTaskInstance_(SIGKILL); + return CraneErr::kOk; + } + + close(ctrl_fd); + return CraneErr::kOk; + + } else { // Child proc + // Disable SIGABRT backtrace from child processes. + signal(SIGABRT, SIG_DFL); + + const std::string& cwd = instance->task.cwd(); + rc = chdir(cwd.c_str()); + if (rc == -1) { + // CRANE_ERROR("[Child Process] Error: chdir to {}. {}", cwd.c_str(), + // strerror(errno)); + std::abort(); + } + + setreuid(instance->pwd_entry.Uid(), instance->pwd_entry.Uid()); + setregid(instance->pwd_entry.Gid(), instance->pwd_entry.Gid()); + + // Set pgid to the pid of task root process. + setpgid(0, 0); + + close(ctrl_sock_pair[0]); + int ctrl_fd = ctrl_sock_pair[1]; + + FileInputStream istream(ctrl_fd); + FileOutputStream ostream(ctrl_fd); + CanStartMessage msg; + ChildProcessReady child_process_ready; + bool ok; + + ok = ParseDelimitedFromZeroCopyStream(&msg, &istream, nullptr); + if (!ok || !msg.ok()) { + // if (!ok) { + // int err = istream.GetErrno(); + // CRANE_ERROR("Failed to read socket from parent: {}", strerror(err)); + // } + + // if (!msg.ok()) + // CRANE_ERROR("Parent process ask not to start the subprocess."); + + std::abort(); + } + + if (instance->task.type() == crane::grpc::Batch) { + int stdout_fd, stderr_fd; + + auto* meta = dynamic_cast(instance->meta.get()); + const std::string& stdout_file_path = meta->parsed_output_file_pattern; + const std::string& stderr_file_path = meta->parsed_error_file_pattern; + + stdout_fd = + open(stdout_file_path.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0644); + if (stdout_fd == -1) { + // CRANE_ERROR("[Child Process] Error: open {}. {}", stdout_file_path, + // strerror(errno)); + std::abort(); + } + dup2(stdout_fd, 1); + + if (stderr_file_path.empty()) { + dup2(stdout_fd, 2); + } else { + stderr_fd = + open(stderr_file_path.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0644); + if (stderr_fd == -1) { + // CRANE_ERROR("[Child Process] Error: open {}. {}", stderr_file_path, + // strerror(errno)); + std::abort(); + } + dup2(stderr_fd, 2); // stderr -> error file + close(stderr_fd); + } + close(stdout_fd); + + } else if (instance->IsCrun() && !launch_pty) { + close(crun_io_sock_pair[0]); + + dup2(crun_io_sock_pair[1], 0); + dup2(crun_io_sock_pair[1], 1); + dup2(crun_io_sock_pair[1], 2); + close(crun_io_sock_pair[1]); + } + + child_process_ready.set_ok(true); + ok = SerializeDelimitedToZeroCopyStream(child_process_ready, &ostream); + ok &= ostream.Flush(); + if (!ok) { + // CRANE_ERROR("[Child Process] Error: Failed to flush."); + std::abort(); + } + + close(ctrl_fd); + + // Close stdin for batch tasks. + // If these file descriptors are not closed, a program like mpirun may + // keep waiting for the input from stdin or other fds and will never end. + if (instance->task.type() == crane::grpc::Batch) close(0); + util::os::CloseFdFrom(3); + + // Prepare the command line arguments. + std::vector argv; + + // Argv[0] is the program name which can be anything. + argv.emplace_back("CraneScript"); + + if (instance->task.get_user_env()) { + // If --get-user-env is specified, + // we need to use --login option of bash to load settings from the user's + // settings. + argv.emplace_back("--login"); + } + + argv.emplace_back(instance->GetExecPath().c_str()); + for (auto&& arg : instance->GetArgList()) { + argv.push_back(arg.c_str()); + } + argv.push_back(nullptr); + + execv("/bin/bash", const_cast(argv.data())); + + // Error occurred since execv returned. At this point, errno is set. + // Ctld use SIGABRT to inform the client of this failure. + fmt::print(stderr, "[Craned Subprocess Error] Failed to execv. Error: {}\n", + strerror(errno)); + // Todo: See https://tldp.org/LDP/abs/html/exitcodes.html, return standard + // exit codes + abort(); + } +} + +CraneErr TaskManager::KillTaskInstance_(int signum) { + if (m_process_) { + CRANE_TRACE("Killing pid {} with signal {}", m_process_->GetPid(), signum); + + // Send the signal to the whole process group. + int err = kill(-m_process_->GetPid(), signum); + + if (err == 0) + return CraneErr::kOk; + else { + CRANE_TRACE("kill failed. error: {}", strerror(errno)); + return CraneErr::kGenericFailure; + } + } + + return CraneErr::kNonExistent; +} + } // namespace Supervisor \ No newline at end of file diff --git a/src/Craned/Supervisor/TaskManager.h b/src/Craned/Supervisor/TaskManager.h index 8b93a98ed..2f9c325c6 100644 --- a/src/Craned/Supervisor/TaskManager.h +++ b/src/Craned/Supervisor/TaskManager.h @@ -18,10 +18,16 @@ #pragma once #include "SupervisorPublicDefs.h" +#include "crane/PasswordEntry.h" // Precompiled header comes first. namespace Supervisor { +struct SavedPrivilege { + uid_t uid; + gid_t gid; +}; + struct MetaInTaskInstance { std::string parsed_sh_script_path; virtual ~MetaInTaskInstance() = default; @@ -69,6 +75,7 @@ struct TaskInstance { crane::grpc::ProcToD task; + PasswordEntry pwd_entry; std::unique_ptr meta; bool orphaned{false}; @@ -94,6 +101,9 @@ class TaskManager { void TaskStopAndDoStatusChange(); + CraneErr SpawnTaskInstance_(); + CraneErr KillTaskInstance_(int signum); + void ActivateTaskStatusChange_(crane::grpc::TaskStatus new_status, uint32_t exit_code, std::optional reason);