Skip to content

Commit

Permalink
refactor: Move code from Craned to Supervisor
Browse files Browse the repository at this point in the history
Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>
  • Loading branch information
L-Xiafeng committed Dec 22, 2024
1 parent 9ae93dd commit a8394a7
Show file tree
Hide file tree
Showing 11 changed files with 408 additions and 305 deletions.
1 change: 1 addition & 0 deletions protos/Supervisor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ message TaskExecutionRequest{

message TaskExecutionReply{
bool ok = 1;
int32 pid = 2;
}

message TerminateRequest{
Expand Down
4 changes: 3 additions & 1 deletion src/Craned/Craned/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ add_executable(craned

CranedPreCompiledHeader.h
DeviceManager.cpp
DeviceManager.h)
DeviceManager.h
SupervisorKeeper.cpp
SupervisorKeeper.h)

target_precompile_headers(craned PRIVATE CranedPreCompiledHeader.h)
add_dependencies(craned libcgroup)
Expand Down
272 changes: 3 additions & 269 deletions src/Craned/Craned/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,6 @@ JobManager::JobManager() {
EvCleanGrpcExecuteTaskQueueCb_();
});

m_process_sigchld_async_handle_ = m_uvw_loop_->resource<uvw::async_handle>();
m_process_sigchld_async_handle_->on<uvw::async_event>(
[this](const uvw::async_event&, uvw::async_handle&) {
EvCleanSigchldQueueCb_();
});

// Task Status Change Event
m_task_status_change_async_handle_ =
m_uvw_loop_->resource<uvw::async_handle>();
Expand Down Expand Up @@ -180,7 +174,7 @@ JobManager::JobManager() {
auto idle_handle = m_uvw_loop_->resource<uvw::idle_handle>();
idle_handle->on<uvw::idle_event>(
[this](const uvw::idle_event&, uvw::idle_handle& h) {
if (m_task_cleared_) {
if (m_is_ending_now_) {
h.parent().walk([](auto&& h) { h.close(); });
h.parent().stop();
}
Expand All @@ -203,71 +197,6 @@ const TaskInstance* JobManager::FindInstanceByTaskId_(uint32_t task_id) {
return iter->second.get();
}

void JobManager::TaskStopAndDoStatusChangeAsync(uint32_t task_id) {
auto it = m_task_map_.find(task_id);
if (it == m_task_map_.end()) {
CRANE_ERROR("Task #{} not found in TaskStopAndDoStatusChangeAsync.",
task_id);
return;
}
TaskInstance* instance = it->second.get();

CRANE_INFO("Task #{} stopped and is doing TaskStatusChange...", task_id);

switch (instance->err_before_exec) {
case CraneErr::kProtobufError:
ActivateTaskStatusChangeAsync_(task_id, crane::grpc::TaskStatus::Failed,
ExitCode::kExitCodeSpawnProcessFail,
std::nullopt);
break;

case CraneErr::kCgroupError:
ActivateTaskStatusChangeAsync_(task_id, crane::grpc::TaskStatus::Failed,
ExitCode::kExitCodeCgroupError,
std::nullopt);
break;

default:
break;
}

ProcSigchldInfo& sigchld_info = instance->sigchld_info;
if (instance->task.type() == crane::grpc::Batch || instance->IsCrun()) {
// For a Batch task, the end of the process means it is done.
if (sigchld_info.is_terminated_by_signal) {
if (instance->cancelled_by_user)
ActivateTaskStatusChangeAsync_(
task_id, crane::grpc::TaskStatus::Cancelled,
sigchld_info.value + ExitCode::kTerminationSignalBase,
std::nullopt);
else if (instance->terminated_by_timeout)
ActivateTaskStatusChangeAsync_(
task_id, crane::grpc::TaskStatus::ExceedTimeLimit,
sigchld_info.value + ExitCode::kTerminationSignalBase,
std::nullopt);
else
ActivateTaskStatusChangeAsync_(
task_id, crane::grpc::TaskStatus::Failed,
sigchld_info.value + ExitCode::kTerminationSignalBase,
std::nullopt);
} else
ActivateTaskStatusChangeAsync_(task_id,
crane::grpc::TaskStatus::Completed,
sigchld_info.value, std::nullopt);
} else /* Calloc */ {
// For a COMPLETING Calloc task with a process running,
// the end of this process means that this task is done.
if (sigchld_info.is_terminated_by_signal)
ActivateTaskStatusChangeAsync_(
task_id, crane::grpc::TaskStatus::Completed,
sigchld_info.value + ExitCode::kTerminationSignalBase, std::nullopt);
else
ActivateTaskStatusChangeAsync_(task_id,
crane::grpc::TaskStatus::Completed,
sigchld_info.value, std::nullopt);
}
}

void JobManager::EvSigchldCb_() {
assert(m_instance_ptr_->m_instance_ptr_ != nullptr);

Expand All @@ -278,42 +207,9 @@ void JobManager::EvSigchldCb_() {
/* TODO(More status tracing): | WUNTRACED | WCONTINUED */);

if (pid > 0) {
auto sigchld_info = std::make_unique<ProcSigchldInfo>();

if (WIFEXITED(status)) {
// Exited with status WEXITSTATUS(status)
sigchld_info->pid = pid;
sigchld_info->is_terminated_by_signal = false;
sigchld_info->value = WEXITSTATUS(status);

CRANE_TRACE("Receiving SIGCHLD for pid {}. Signaled: false, Status: {}",
pid, WEXITSTATUS(status));
} else if (WIFSIGNALED(status)) {
// Killed by signal WTERMSIG(status)
sigchld_info->pid = pid;
sigchld_info->is_terminated_by_signal = true;
sigchld_info->value = WTERMSIG(status);

CRANE_TRACE("Receiving SIGCHLD for pid {}. Signaled: true, Signal: {}",
pid, WTERMSIG(status));
}
/* Todo(More status tracing):
else if (WIFSTOPPED(status)) {
printf("stopped by signal %d\n", WSTOPSIG(status));
} else if (WIFCONTINUED(status)) {
printf("continued\n");
} */
m_sigchld_queue_.enqueue(std::move(sigchld_info));
m_process_sigchld_async_handle_->send();
// We do nothing now
} else if (pid == 0) {
// There's no child that needs reaping.
// If Craned is exiting, check if there's any task remaining.
// If there's no task running, just stop the loop of JobManager.
if (m_is_ending_now_) {
if (m_task_map_.empty()) {
ActivateShutdownAsync_();
}
}
break;
} else if (pid < 0) {
if (errno != ECHILD)
Expand All @@ -323,160 +219,7 @@ void JobManager::EvSigchldCb_() {
}
}

void JobManager::EvCleanSigchldQueueCb_() {
std::unique_ptr<ProcSigchldInfo> sigchld_info;
while (m_sigchld_queue_.try_dequeue(sigchld_info)) {
auto pid = sigchld_info->pid;

if (sigchld_info->resend_timer != nullptr) {
sigchld_info->resend_timer->close();
sigchld_info->resend_timer.reset();
}

m_mtx_.Lock();
auto task_iter = m_pid_task_map_.find(pid);
auto proc_iter = m_pid_proc_map_.find(pid);

if (task_iter == m_pid_task_map_.end() ||
proc_iter == m_pid_proc_map_.end()) {
m_mtx_.Unlock();

auto* sigchld_info_raw_ptr = sigchld_info.release();
sigchld_info_raw_ptr->resend_timer =
m_uvw_loop_->resource<uvw::timer_handle>();
sigchld_info_raw_ptr->resend_timer->on<uvw::timer_event>(
[this, sigchld_info_raw_ptr](const uvw::timer_event&,
uvw::timer_handle&) {
EvSigchldTimerCb_(sigchld_info_raw_ptr);
});
sigchld_info_raw_ptr->resend_timer->start(
std::chrono::milliseconds(kEvSigChldResendMs),
std::chrono::milliseconds(0));
CRANE_TRACE("Child Process {} exit too early, will do SigchldCb later",
pid);
continue;
}

TaskInstance* instance = task_iter->second;
ProcessInstance* proc = proc_iter->second;
uint32_t task_id = instance->task.task_id();

// Remove indexes from pid to ProcessInstance*
m_pid_proc_map_.erase(proc_iter);
m_pid_task_map_.erase(task_iter);

m_mtx_.Unlock();

instance->sigchld_info = *sigchld_info;

// Free the ProcessInstance. ITask struct is not freed here because
// the ITask for an Interactive task can have no ProcessInstance.
auto pr_it = instance->processes.find(pid);
if (pr_it == instance->processes.end()) {
CRANE_ERROR("Failed to find pid {} in task #{}'s ProcessInstances", pid,
task_id);
} else {
instance->processes.erase(pr_it);

if (!instance->processes.empty()) {
if (sigchld_info->is_terminated_by_signal) {
// If a task is terminated by a signal and there are other
// running processes belonging to this task, kill them.
TerminateTaskAsync(task_id);
}
} else {
if (instance->IsCrun())
// TaskStatusChange of a crun task is triggered in
// CforedManager.
g_cfored_manager->TaskProcOnCforedStopped(
instance->task.interactive_meta().cfored_name(),
instance->task.task_id());
else /* Batch / Calloc */ {
// If the ProcessInstance has no process left,
// send TaskStatusChange for this task.
// See the comment of EvActivateTaskStatusChange_.
TaskStopAndDoStatusChangeAsync(task_id);
}
}
}
}
}

void JobManager::EvSigchldTimerCb_(ProcSigchldInfo* sigchld_info) {
m_sigchld_queue_.enqueue(std::unique_ptr<ProcSigchldInfo>(sigchld_info));
m_process_sigchld_async_handle_->send();
}

void JobManager::EvSigintCb_() {
if (!m_is_ending_now_) {
// SIGINT has been sent once. If SIGINT are captured twice, it indicates
// the signal sender can't wait to stop Craned and Craned just send SIGTERM
// to all tasks to kill them immediately.

CRANE_INFO("Caught SIGINT. Send SIGTERM to all running tasks...");

m_is_ending_now_ = true;

if (m_sigint_cb_) m_sigint_cb_();

for (auto task_it = m_task_map_.begin(); task_it != m_task_map_.end();) {
task_id_t task_id = task_it->first;
TaskInstance* task_instance = task_it->second.get();

if (task_instance->task.type() == crane::grpc::Batch ||
task_instance->IsCrun()) {
for (auto&& [pid, pr_instance] : task_instance->processes) {
CRANE_INFO(
"Sending SIGINT to the process group of task #{} with root "
"process pid {}",
task_id, pr_instance->GetPid());
KillProcessInstance_(pr_instance.get(), SIGKILL);
}
task_it++;
} else {
// Kill all process of a calloc task and just remove it from the
// task map.
CRANE_DEBUG("Cleaning Calloc task #{}...",
task_instance->task.task_id());

// Todo: Performance issue!
task_instance->cgroup->KillAllProcesses();

auto to_remove_it = task_it++;
m_task_map_.erase(to_remove_it);
}
}

if (m_task_map_.empty()) {
// If there is not any batch task to wait for, stop the loop directly.
ActivateShutdownAsync_();
}
} else {
CRANE_INFO(
"SIGINT has been triggered already. Sending SIGKILL to all process "
"groups instead.");
if (m_task_map_.empty()) {
// If there is no task to kill, stop the loop directly.
ActivateShutdownAsync_();
} else {
for (auto&& [task_id, task_instance] : m_task_map_) {
for (auto&& [pid, pr_instance] : task_instance->processes) {
CRANE_INFO(
"Sending SIGKILL to the process group of task #{} with root "
"process pid {}",
task_id, pr_instance->GetPid());
KillProcessInstance_(pr_instance.get(), SIGKILL);
}
}
}
}
}

void JobManager::ActivateShutdownAsync_() {
CRANE_TRACE("Triggering exit event...");
CRANE_ASSERT(m_is_ending_now_ == true);
m_task_cleared_ = true;
}
void JobManager::EvSigintCb_() { m_is_ending_now_ = true; }

void JobManager::Wait() {
if (m_uvw_thread_.joinable()) m_uvw_thread_.join();
Expand Down Expand Up @@ -1091,15 +834,6 @@ void JobManager::EvCleanTaskStatusChangeQueueCb_() {
if (!orphaned)
g_ctld_client->TaskStatusChangeAsync(std::move(status_change));
}

// Todo: Add additional timer to check periodically whether all children
// have exited.
if (m_is_ending_now_ && m_task_map_.empty()) {
CRANE_TRACE(
"Craned is ending and all tasks have been reaped. "
"Stop event loop.");
ActivateShutdownAsync_();
}
}

void JobManager::ActivateTaskStatusChangeAsync_(
Expand Down
Loading

0 comments on commit a8394a7

Please sign in to comment.