Skip to content

Commit

Permalink
refactor: Move timer to Supervisor
Browse files Browse the repository at this point in the history
Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>
  • Loading branch information
L-Xiafeng committed Dec 23, 2024
1 parent a8394a7 commit acf50b1
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 150 deletions.
55 changes: 2 additions & 53 deletions src/Craned/Craned/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -933,59 +933,7 @@ void JobManager::EvTaskTimerCb_(task_id_t task_id) {
void JobManager::EvCleanTerminateTaskQueueCb_() {
TaskTerminateQueueElem elem;
while (m_task_terminate_queue_.try_dequeue(elem)) {
CRANE_TRACE(
"Receive TerminateRunningTask Request from internal queue. "
"Task id: {}",
elem.task_id);

auto iter = m_task_map_.find(elem.task_id);
if (iter == m_task_map_.end()) {
CRANE_DEBUG("Terminating a non-existent task #{}.", elem.task_id);

// Note if Ctld wants to terminate some tasks that are not running,
// it might indicate other nodes allocated to the task might have
// crashed. We should mark the task as kind of not runnable by removing
// its cgroup.
//
// Considering such a situation:
// In Task Scheduler of Ctld,
// the task index from node id to task id have just been added and
// Ctld are sending CreateCgroupForTasks.
// Right at the moment, one Craned allocated to this task and
// designated as the executing node crashes,
// but it has been sent a CreateCgroupForTasks and replied.
// Then the CranedKeeper search the task index and
// send TerminateTasksOnCraned to all Craned allocated to this task
// including this node.
// In order to give Ctld kind of feedback without adding complicated
// synchronizing mechanism in ScheduleThread_(),
// we just remove the cgroup for such task, Ctld will fail in the
// following ExecuteTasks and the task will go to the right place as
// well as the completed queue.
g_cg_mgr->ReleaseCgroupByTaskIdOnly(elem.task_id);
continue;
}

TaskInstance* task_instance = iter->second.get();

if (elem.terminated_by_user) task_instance->cancelled_by_user = true;
if (elem.mark_as_orphaned) task_instance->orphaned = true;
if (elem.terminated_by_timeout) task_instance->terminated_by_timeout = true;

int sig = SIGTERM; // For BatchTask
if (task_instance->IsCrun()) sig = SIGHUP;

if (!task_instance->processes.empty()) {
// For an Interactive task with a process running or a Batch task, we
// just send a kill signal here.
for (auto&& [pid, pr_instance] : task_instance->processes)
KillProcessInstance_(pr_instance.get(), sig);
} else if (task_instance->task.type() == crane::grpc::Interactive) {
// For an Interactive task with no process running, it ends immediately.
ActivateTaskStatusChangeAsync_(elem.task_id, crane::grpc::Completed,
ExitCode::kExitCodeTerminated,
std::nullopt);
}
// todo:Just forward to Supervisor
}
}

Expand Down Expand Up @@ -1060,6 +1008,7 @@ void JobManager::EvCleanChangeTaskTimeLimitQueueCb_() {

ChangeTaskTimeLimitQueueElem elem;
while (m_task_time_limit_change_queue_.try_dequeue(elem)) {
// todo: Forward Rpc to Supervisor
auto iter = m_task_map_.find(elem.task_id);
if (iter != m_task_map_.end()) {
TaskInstance* task_instance = iter->second.get();
Expand Down
2 changes: 0 additions & 2 deletions src/Craned/Craned/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,6 @@ class JobManager {
uint32_t task_id{0};
bool terminated_by_user{false}; // If the task is canceled by user,
// task->status=Cancelled
bool terminated_by_timeout{false}; // If the task is canceled by user,
// task->status=Timeout
bool mark_as_orphaned{false};
};

Expand Down
229 changes: 155 additions & 74 deletions src/Craned/Supervisor/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ TaskManager::TaskManager() : m_supervisor_exit_(false) {
CRANE_ERROR("Failed to start the SIGCHLD handle");
}

m_terminate_task_async_handle_ = m_uvw_loop_->resource<uvw::async_handle>();
m_terminate_task_async_handle_->on<uvw::async_event>(
[this](const uvw::async_event&, uvw::async_handle&) {
EvCleanTerminateTaskQueueCb_();
});

m_uvw_thread_ = std::thread([this]() {
util::SetCurrentThreadName("TaskMgrLoopThr");
auto idle_handle = m_uvw_loop_->resource<uvw::idle_handle>();
Expand Down Expand Up @@ -80,9 +86,9 @@ void TaskManager::Wait() {

void TaskManager::TaskStopAndDoStatusChange() {
CRANE_INFO("Task #{} stopped and is doing TaskStatusChange...",
m_process_->task.task_id());
m_task_->task.task_id());

switch (m_process_->err_before_exec) {
switch (m_task_->err_before_exec) {
case CraneErr::kProtobufError:
ActivateTaskStatusChange_(crane::grpc::TaskStatus::Failed,
ExitCode::kExitCodeSpawnProcessFail,
Expand All @@ -98,16 +104,16 @@ void TaskManager::TaskStopAndDoStatusChange() {
break;
}

ProcSigchldInfo& sigchld_info = m_process_->sigchld_info;
if (m_process_->task.type() == crane::grpc::Batch || m_process_->IsCrun()) {
ProcSigchldInfo& sigchld_info = m_task_->sigchld_info;
if (m_task_->task.type() == crane::grpc::Batch || m_task_->IsCrun()) {
// For a Batch task, the end of the process means it is done.
if (sigchld_info.is_terminated_by_signal) {
if (m_process_->cancelled_by_user)
if (m_task_->cancelled_by_user)
ActivateTaskStatusChange_(
crane::grpc::TaskStatus::Cancelled,
sigchld_info.value + ExitCode::kTerminationSignalBase,
std::nullopt);
else if (m_process_->terminated_by_timeout)
else if (m_task_->terminated_by_timeout)
ActivateTaskStatusChange_(
crane::grpc::TaskStatus::ExceedTimeLimit,
sigchld_info.value + ExitCode::kTerminationSignalBase,
Expand Down Expand Up @@ -136,7 +142,7 @@ void TaskManager::TaskStopAndDoStatusChange() {
void TaskManager::ActivateTaskStatusChange_(crane::grpc::TaskStatus new_status,
uint32_t exit_code,
std::optional<std::string> reason) {
TaskInstance* instance = m_process_.get();
TaskInstance* instance = m_task_.get();
if (instance->task.type() == crane::grpc::Batch || instance->IsCrun()) {
const std::string& path = instance->meta->parsed_sh_script_path;
if (!path.empty())
Expand All @@ -145,70 +151,11 @@ void TaskManager::ActivateTaskStatusChange_(crane::grpc::TaskStatus new_status,

bool orphaned = instance->orphaned;
// Free the TaskInstance structure
m_process_.release();
m_task_.release();
if (!orphaned)
g_craned_client->TaskStatusChange(new_status, exit_code, reason);
}

void TaskManager::EvSigchldCb_() {
int status;
pid_t pid;
while (true) {
pid = waitpid(-1, &status, WNOHANG
/* TODO(More status tracing): | WUNTRACED | WCONTINUED */);

if (pid > 0) {
auto sigchld_info = std::make_unique<ProcSigchldInfo>();

if (WIFEXITED(status)) {
// Exited with status WEXITSTATUS(status)
sigchld_info->pid = pid;
sigchld_info->is_terminated_by_signal = false;
sigchld_info->value = WEXITSTATUS(status);

CRANE_TRACE("Receiving SIGCHLD for pid {}. Signaled: false, Status: {}",
pid, WEXITSTATUS(status));
} else if (WIFSIGNALED(status)) {
// Killed by signal WTERMSIG(status)
sigchld_info->pid = pid;
sigchld_info->is_terminated_by_signal = true;
sigchld_info->value = WTERMSIG(status);

CRANE_TRACE("Receiving SIGCHLD for pid {}. Signaled: true, Signal: {}",
pid, WTERMSIG(status));
}
/* Todo(More status tracing):
else if (WIFSTOPPED(status)) {
printf("stopped by signal %d\n", WSTOPSIG(status));
} else if (WIFCONTINUED(status)) {
printf("continued\n");
} */

m_mtx_.Lock();
ABSL_ASSERT(m_process_);
m_process_->sigchld_info = *sigchld_info;

if (m_process_->IsCrun())
// TaskStatusChange of a crun task is triggered in
// CforedManager.
g_cfored_manager->TaskProcStopped();
else /* Batch / Calloc */ {
// If the TaskInstance has no process left,
// send TaskStatusChange for this task.
// See the comment of EvActivateTaskStatusChange_.
TaskStopAndDoStatusChange();
}
m_mtx_.Unlock();
} else if (pid == 0) {
break;
} else if (pid < 0) {
if (errno != ECHILD)
CRANE_DEBUG("waitpid() error: {}, {}", errno, strerror(errno));
break;
}
}
}

CraneErr TaskManager::SpawnTaskInstance_() {
using google::protobuf::io::FileInputStream;
using google::protobuf::io::FileOutputStream;
Expand All @@ -218,7 +165,7 @@ CraneErr TaskManager::SpawnTaskInstance_() {
using crane::grpc::CanStartMessage;
using crane::grpc::ChildProcessReady;

auto* instance = m_process_.get();
auto* instance = m_task_.get();

int ctrl_sock_pair[2]; // Socket pair for passing control messages.

Expand Down Expand Up @@ -286,7 +233,8 @@ CraneErr TaskManager::SpawnTaskInstance_() {
}

if (child_pid > 0) { // Parent proc
instance->SetPid(child_pid);
auto process = instance->process;
process->SetPid(child_pid);
CRANE_DEBUG("Subprocess was created for task #{} pid: {}",
instance->task.task_id(), child_pid);

Expand Down Expand Up @@ -473,8 +421,8 @@ CraneErr TaskManager::SpawnTaskInstance_() {
argv.emplace_back("--login");
}

argv.emplace_back(instance->GetExecPath().c_str());
for (auto&& arg : instance->GetArgList()) {
argv.emplace_back(instance->process->GetExecPath().c_str());
for (auto&& arg : instance->process->GetArgList()) {
argv.push_back(arg.c_str());
}
argv.push_back(nullptr);
Expand All @@ -492,11 +440,12 @@ CraneErr TaskManager::SpawnTaskInstance_() {
}

CraneErr TaskManager::KillTaskInstance_(int signum) {
if (m_process_) {
CRANE_TRACE("Killing pid {} with signal {}", m_process_->GetPid(), signum);
if (m_task_ && m_task_->process) {
CRANE_TRACE("Killing pid {} with signal {}", m_task_->process->GetPid(),
signum);

// Send the signal to the whole process group.
int err = kill(-m_process_->GetPid(), signum);
int err = kill(m_task_->process->GetPid(), signum);

if (err == 0)
return CraneErr::kOk;
Expand All @@ -509,4 +458,136 @@ CraneErr TaskManager::KillTaskInstance_(int signum) {
return CraneErr::kNonExistent;
}

void TaskManager::EvSigchldCb_() {
int status;
pid_t pid;
while (true) {
pid = waitpid(-1, &status, WNOHANG
/* TODO(More status tracing): | WUNTRACED | WCONTINUED */);

if (pid > 0) {
auto sigchld_info = std::make_unique<ProcSigchldInfo>();

if (WIFEXITED(status)) {
// Exited with status WEXITSTATUS(status)
sigchld_info->pid = pid;
sigchld_info->is_terminated_by_signal = false;
sigchld_info->value = WEXITSTATUS(status);

CRANE_TRACE("Receiving SIGCHLD for pid {}. Signaled: false, Status: {}",
pid, WEXITSTATUS(status));
} else if (WIFSIGNALED(status)) {
// Killed by signal WTERMSIG(status)
sigchld_info->pid = pid;
sigchld_info->is_terminated_by_signal = true;
sigchld_info->value = WTERMSIG(status);

CRANE_TRACE("Receiving SIGCHLD for pid {}. Signaled: true, Signal: {}",
pid, WTERMSIG(status));
}
/* Todo(More status tracing):
else if (WIFSTOPPED(status)) {
printf("stopped by signal %d\n", WSTOPSIG(status));
} else if (WIFCONTINUED(status)) {
printf("continued\n");
} */

m_task_->sigchld_info = *sigchld_info;

if (m_task_->IsCrun())
// TaskStatusChange of a crun task is triggered in
// CforedManager.
g_cfored_manager->TaskProcStopped();
else /* Batch / Calloc */ {
// If the TaskInstance has no process left,
// send TaskStatusChange for this task.
// See the comment of EvActivateTaskStatusChange_.
TaskStopAndDoStatusChange();
}
} else if (pid == 0) {
break;
} else if (pid < 0) {
if (errno != ECHILD)
CRANE_DEBUG("waitpid() error: {}, {}", errno, strerror(errno));
break;
}
}
}
void TaskManager::EvTaskTimerCb_() {
CRANE_TRACE("Task #{} exceeded its time limit. Terminating it...",
g_config.TaskId);

// Sometimes, task finishes just before time limit.
// After the execution of SIGCHLD callback where the task has been erased,
// the timer is triggered immediately.
// That's why we need to check the existence of the task again in timer
// callback, otherwise a segmentation fault will occur.
if (!m_task_) {
CRANE_TRACE("Task #{} has already been removed.", g_config.TaskId);
return;
}

TaskInstance* task_instance = m_task_.get();
DelTerminationTimer_(task_instance);

if (task_instance->task.type() == crane::grpc::Batch) {
m_task_terminate_elem.store(
TaskTerminateElem{.terminated_by_timeout = true},
std::memory_order::release);
m_terminate_task_async_handle_->send();
} else {
ActivateTaskStatusChange_(crane::grpc::TaskStatus::ExceedTimeLimit,
ExitCode::kExitCodeExceedTimeLimit, std::nullopt);
}
}
void TaskManager::EvCleanTerminateTaskQueueCb_() {
if (!m_task_) {
CRANE_DEBUG("Terminating a non-existent task #{}.", g_config.TaskId);

// Note if Ctld wants to terminate some tasks that are not running,
// it might indicate other nodes allocated to the task might have
// crashed. We should mark the task as kind of not runnable by removing
// its cgroup.
//
// Considering such a situation:
// In Task Scheduler of Ctld,
// the task index from node id to task id have just been added and
// Ctld are sending CreateCgroupForTasks.
// Right at the moment, one Craned allocated to this task and
// designated as the executing node crashes,
// but it has been sent a CreateCgroupForTasks and replied.
// Then the CranedKeeper search the task index and
// send TerminateTasksOnCraned to all Craned allocated to this task
// including this node.
// In order to give Ctld kind of feedback without adding complicated
// synchronizing mechanism in ScheduleThread_(),
// we just remove the cgroup for such task, Ctld will fail in the
// following ExecuteTasks and the task will go to the right place as
// well as the completed queue.
}
CRANE_TRACE(
"Receive TerminateRunningTask Request from internal queue. "
"Task id: {}",
g_config.TaskId);
TaskInstance* task_instance = m_task_.get();
auto elem = m_task_terminate_elem.load(std::memory_order_acquire);

if (elem.terminated_by_user) task_instance->cancelled_by_user = true;
if (elem.mark_as_orphaned) task_instance->orphaned = true;
if (elem.terminated_by_timeout) task_instance->terminated_by_timeout = true;

int sig = SIGTERM; // For BatchTask
if (task_instance->IsCrun()) sig = SIGHUP;

if (task_instance->process) {
// For an Interactive task with a process running or a Batch task, we
// just send a kill signal here.
KillTaskInstance_(sig);
} else if (task_instance->task.type() == crane::grpc::Interactive) {
// For an Interactive task with no process running, it ends immediately.
ActivateTaskStatusChange_(crane::grpc::Completed,
ExitCode::kExitCodeTerminated, std::nullopt);
}
}

} // namespace Supervisor
Loading

0 comments on commit acf50b1

Please sign in to comment.