Skip to content

Commit

Permalink
Feat: Crun support pty (#362)
Browse files Browse the repository at this point in the history
* Refactor: Crun support pty

Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>

* Refactor: Use original grpc message id order

Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>

* Fix: Fix ci compile error

Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>

* feat: Crun support pty for single node

Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>

* feat<crun>: Support crun --pty with multi node

Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>

* refactor<cfored>: Refactor cfored/crun/calloc task completion code.

Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>

* fix<cfored>: Fix crun/calloc fail when cancel pending task by ctrl c

Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>

* refactor

Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>

* chore: Typo

Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>

* Refactor.

Signed-off-by: RileyW <wrllrwwrllrw@gmail.com>

* Fix compilation error.

Signed-off-by: RileyW <wrllrwwrllrw@gmail.com>

---------

Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>
Signed-off-by: RileyW <wrllrwwrllrw@gmail.com>
Co-authored-by: RileyW <wrllrwwrllrw@gmail.com>
  • Loading branch information
L-Xiafeng and RileyWen authored Dec 11, 2024
1 parent acec2d1 commit c4540ed
Show file tree
Hide file tree
Showing 13 changed files with 199 additions and 128 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ set(DEPENDENCIES_PRE_INSTALLED_DIR ${CMAKE_CURRENT_SOURCE_DIR}/dependencies/pre_
add_subdirectory(${DEPENDENCIES_PRE_INSTALLED_DIR})

find_package(Threads REQUIRED)
find_library(LIBUTIL_LIBRARY util)

# New in version cmake3.24:
# Set ZLIB_USE_STATIC_LIBS to ON to look for static libraries. Default is OFF.
Expand Down
1 change: 1 addition & 0 deletions protos/PublicDefs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ message InteractiveTaskAdditionalMeta {
string sh_script = 2;
string term_env = 3;
InteractiveTaskType interactive_type = 4;
bool pty = 5;
}

message TaskInfo {
Expand Down
1 change: 1 addition & 0 deletions src/CraneCtld/CranedKeeper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ crane::grpc::ExecuteTasksRequest CranedStub::NewExecuteTasksRequests(
mutable_meta->set_sh_script(meta_in_ctld.sh_script);
mutable_meta->set_term_env(meta_in_ctld.term_env);
mutable_meta->set_interactive_type(meta_in_ctld.interactive_type);
mutable_meta->set_pty(meta_in_ctld.pty);
}
}

Expand Down
46 changes: 23 additions & 23 deletions src/CraneCtld/CtldGrpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ grpc::Status CraneCtldServiceImpl::SubmitBatchTasks(
results.emplace_back(std::move(result));
}

for (auto& res : results) {
for (auto &res : results) {
if (res.has_value())
response->mutable_task_id_list()->Add(res.value().get());
else
Expand Down Expand Up @@ -740,21 +740,22 @@ grpc::Status CraneCtldServiceImpl::CforedStream(
CRANE_ERROR("Expect type CFORED_REGISTRATION from peer {}.",
context->peer());
return Status::CANCELLED;
} else {
cfored_name = cfored_request.payload_cfored_reg().cfored_name();
CRANE_INFO("Cfored {} registered.", cfored_name);
}

ok = stream_writer->WriteCforedRegistrationAck({});
if (ok) {
state = StreamState::kWaitMsg;
} else {
CRANE_ERROR(
"Failed to send msg to cfored {}. Connection is broken. "
"Exiting...",
cfored_name);
state = StreamState::kCleanData;
}
cfored_name = cfored_request.payload_cfored_reg().cfored_name();
CRANE_INFO("Cfored {} registered.", cfored_name);

ok = stream_writer->WriteCforedRegistrationAck({});
if (ok) {
state = StreamState::kWaitMsg;
} else {
CRANE_ERROR(
"Failed to send msg to cfored {}. Connection is broken. "
"Exiting...",
cfored_name);
state = StreamState::kCleanData;
}

} else {
state = StreamState::kCleanData;
}
Expand Down Expand Up @@ -784,15 +785,16 @@ grpc::Status CraneCtldServiceImpl::CforedStream(
};

meta.cb_task_cancel = [writer_weak_ptr](task_id_t task_id) {
CRANE_TRACE("Sending TaskCancelRequest in task_cancel", task_id);
if (auto writer = writer_weak_ptr.lock(); writer)
writer->WriteTaskCancelRequest(task_id);
};

meta.cb_task_completed = [this, i_type, cfored_name,
writer_weak_ptr](task_id_t task_id) {
CRANE_TRACE("Sending TaskCompletionAckReply in task_completed",
task_id);
if (auto writer = writer_weak_ptr.lock(); writer)
meta.cb_task_completed = [this, i_type, cfored_name, writer_weak_ptr](
task_id_t task_id,
bool send_completion_ack) {
if (auto writer = writer_weak_ptr.lock();
writer && send_completion_ack)
writer->WriteTaskCompletionAckReply(task_id);
m_ctld_server_->m_mtx_.Lock();

Expand Down Expand Up @@ -838,8 +840,7 @@ grpc::Status CraneCtldServiceImpl::CforedStream(
case StreamCforedRequest::TASK_COMPLETION_REQUEST: {
auto const &payload = cfored_request.payload_task_complete_req();
CRANE_TRACE("Recv TaskCompletionReq of Task #{}", payload.task_id());

if (g_task_scheduler->TerminatePendingOrRunningTask(
if (g_task_scheduler->TerminatePendingOrRunningIaTask(
payload.task_id()) != CraneErr::kOk)
stream_writer->WriteTaskCompletionAckReply(payload.task_id());
} break;
Expand Down Expand Up @@ -973,8 +974,7 @@ CtldServer::SubmitTaskToScheduler(std::unique_ptr<TaskInCtld> task) {
task->Username(), task->partition_id, task->account));
}

auto enable_res =
g_account_manager->CheckIfUserOfAccountIsEnabled(
auto enable_res = g_account_manager->CheckIfUserOfAccountIsEnabled(
task->Username(), task->account);
if (!enable_res) {
return std::unexpected(enable_res.error());
Expand Down
6 changes: 2 additions & 4 deletions src/CraneCtld/CtldGrpcServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ class CforedStreamWriter {

bool WriteTaskResAllocReply(
task_id_t task_id,
std::expected<std::pair<std::string, std::list<std::string>>,
std::string>
std::expected<std::pair<std::string, std::list<CranedId>>, std::string>
res) {
LockGuard guard(&m_stream_mtx_);
if (!m_valid_) return false;
Expand Down Expand Up @@ -121,8 +120,7 @@ class CforedStreamWriter {
return m_stream_->Write(reply);
}

bool WriteCforedRegistrationAck(
const std::expected<void, std::string> &res) {
bool WriteCforedRegistrationAck(const std::expected<void, std::string> &res) {
LockGuard guard(&m_stream_mtx_);
if (!m_valid_) return false;

Expand Down
10 changes: 7 additions & 3 deletions src/CraneCtld/CtldPublicDefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,14 @@ struct InteractiveMetaInTask {

std::string sh_script;
std::string term_env;
bool pty;
std::function<void(task_id_t, std::string const&,
std::list<std::string> const&)>
cb_task_res_allocated;
std::function<void(task_id_t)> cb_task_completed;

// only for calloc.
std::function<void(task_id_t, bool)> cb_task_completed;

// This will ask front end like crun/calloc to exit
std::function<void(task_id_t)> cb_task_cancel;

// only for crun.
Expand Down Expand Up @@ -493,8 +495,10 @@ struct TaskInCtld {
InteractiveMeta.interactive_type =
val.interactive_meta().interactive_type();
if (InteractiveMeta.interactive_type ==
crane::grpc::InteractiveTaskType::Crun)
crane::grpc::InteractiveTaskType::Crun) {
InteractiveMeta.term_env = val.interactive_meta().term_env();
InteractiveMeta.pty = val.interactive_meta().pty();
}
}

node_num = val.node_num();
Expand Down
92 changes: 56 additions & 36 deletions src/CraneCtld/TaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -686,22 +686,37 @@ void TaskScheduler::ScheduleThread_() {
task->allocated_craneds_regex =
util::HostNameListToStr(task->CranedIds());

// Task execute on all node, otherwise on the first node
bool launch_on_all_nodes;
if (task->type == crane::grpc::Batch) {
// For cbatch tasks whose --node > 1,
// only execute the command at the first allocated node.
task->executing_craned_ids.emplace_back(task->CranedIds().front());
launch_on_all_nodes = false;
} else {
const auto& meta = std::get<InteractiveMetaInTask>(task->meta);
if (meta.interactive_type == crane::grpc::Calloc)
// For calloc tasks we still need to execute a dummy empty task to
// set up a timer.
task->executing_craned_ids.emplace_back(task->CranedIds().front());
else
// For crun tasks we need to execute tasks on all allocated nodes.
for (auto const& craned_id : task->CranedIds())
task->executing_craned_ids.emplace_back(craned_id);
launch_on_all_nodes = false;
else {
// For crun tasks we need to execute tasks on all allocated
// nodes.

// Crun task with pty only launch on first node
if (task->TaskToCtld().interactive_meta().pty())
launch_on_all_nodes = false;
else
launch_on_all_nodes = true;
}
}

if (launch_on_all_nodes) {
for (auto const& craned_id : task->CranedIds())
task->executing_craned_ids.emplace_back(craned_id);
} else
task->executing_craned_ids.emplace_back(task->CranedIds().front());
}

end = std::chrono::steady_clock::now();
CRANE_TRACE(
"Set task fields costed {} ms",
Expand Down Expand Up @@ -1273,18 +1288,12 @@ crane::grpc::CancelTaskReply TaskScheduler::CancelPendingOrRunningTask(
reply.add_not_cancelled_tasks(task_id);
reply.add_not_cancelled_reasons("Permission Denied.");
} else {
bool is_calloc = false;
if (task->type == crane::grpc::Interactive) {
auto& meta = std::get<InteractiveMetaInTask>(task->meta);
if (meta.interactive_type == crane::grpc::Calloc) is_calloc = true;

if (is_calloc && !meta.has_been_cancelled_on_front_end) {
if (!meta.has_been_cancelled_on_front_end) {
meta.has_been_cancelled_on_front_end = true;
meta.cb_task_cancel(task_id);
}
}

if (is_calloc) {
reply.add_cancelled_tasks(task_id);
} else {
CraneErr err = TerminateRunningTaskNoLock_(task);
Expand Down Expand Up @@ -1457,8 +1466,18 @@ void TaskScheduler::CleanCancelQueueCb_() {

if (task->type == crane::grpc::Interactive) {
auto& meta = std::get<InteractiveMetaInTask>(task->meta);
g_thread_pool->detach_task([cb = meta.cb_task_cancel,
task_id = task->TaskId()] { cb(task_id); });
// Cancel request may not come from crun/calloc, ask them to exit
if (!meta.has_been_cancelled_on_front_end) {
meta.has_been_cancelled_on_front_end = true;
g_thread_pool->detach_task([cb = meta.cb_task_cancel,
task_id = task->TaskId()] { cb(task_id); });
} else {
// Cancel request from crun/calloc, reply CompletionAck
g_thread_pool->detach_task(
[cb = meta.cb_task_completed, task_id = task->TaskId()] {
cb(task_id, true);
});
}
}
}

Expand Down Expand Up @@ -1619,34 +1638,35 @@ void TaskScheduler::CleanTaskStatusChangeQueueCb_() {
task->SetStatus(new_status);
} else {
auto& meta = std::get<InteractiveMetaInTask>(task->meta);
if (meta.interactive_type == crane::grpc::Calloc) {
// TaskStatusChange may indicate the time limit has been reached and
// the task has been terminated. No more TerminateTask RPC should be
// sent to the craned node if any further CancelTask or
// TaskCompletionRequest RPC is received.
meta.has_been_terminated_on_craned = true;

if (new_status == crane::grpc::ExceedTimeLimit ||
exit_code == ExitCode::kExitCodeCranedDown) {
meta.has_been_cancelled_on_front_end = true;
meta.cb_task_cancel(task->TaskId());
task->SetStatus(new_status);
} else {
task->SetStatus(crane::grpc::Completed);
}
meta.cb_task_completed(task->TaskId());
} else { // Crun
if (++meta.status_change_cnt < task->node_num) {
if (meta.interactive_type == crane::grpc::Crun) { // Crun
if (++meta.status_change_cnt < task->executing_craned_ids.size()) {
CRANE_TRACE(
"{}/{} TaskStatusChanges of Crun task #{} were received. "
"Keep waiting...",
meta.status_change_cnt, task->node_num, task->TaskId());
meta.status_change_cnt, task->executing_craned_ids.size(),
task->TaskId());
continue;
}
}

task->SetStatus(new_status);
meta.cb_task_completed(task->TaskId());
// TaskStatusChange may indicate the time limit has been reached and
// the task has been terminated. No more TerminateTask RPC should be
// sent to the craned node if any further CancelTask or
// TaskCompletionRequest RPC is received.

// Task end triggered by craned.
if (!meta.has_been_cancelled_on_front_end) {
meta.has_been_cancelled_on_front_end = true;
meta.cb_task_cancel(task->TaskId());
// Completion ack will send in grpc server triggered by task complete
// req
meta.cb_task_completed(task->TaskId(), false);
} else {
// Send Completion Ack to frontend now.
meta.cb_task_completed(task->TaskId(), true);
}

task->SetStatus(new_status);
}

task->SetExitCode(exit_code);
Expand Down
19 changes: 16 additions & 3 deletions src/CraneCtld/TaskScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,21 +254,34 @@ class TaskScheduler {
crane::grpc::CancelTaskReply CancelPendingOrRunningTask(
const crane::grpc::CancelTaskRequest& request);

CraneErr TerminatePendingOrRunningTask(uint32_t task_id) {
CraneErr TerminatePendingOrRunningIaTask(uint32_t task_id) {
LockGuard pending_guard(&m_pending_task_map_mtx_);
LockGuard running_guard(&m_running_task_map_mtx_);

auto pd_it = m_pending_task_map_.find(task_id);
if (pd_it != m_pending_task_map_.end()) {
auto& task = pd_it->second;
if (task->type == crane::grpc::TaskType::Interactive) {
auto& meta = std::get<InteractiveMetaInTask>(task->meta);
meta.has_been_cancelled_on_front_end = true;
}
m_cancel_task_queue_.enqueue(
CancelPendingTaskQueueElem{.task = std::move(pd_it->second)});
CancelPendingTaskQueueElem{.task = std::move(task)});
m_cancel_task_async_handle_->send();
m_pending_task_map_.erase(pd_it);
return CraneErr::kOk;
}

auto rn_it = m_running_task_map_.find(task_id);
if (rn_it == m_running_task_map_.end()) return CraneErr::kNonExistent;
if (rn_it == m_running_task_map_.end())
return CraneErr::kNonExistent;
else {
auto& task = rn_it->second;
if (task->type == crane::grpc::TaskType::Interactive) {
auto& meta = std::get<InteractiveMetaInTask>(task->meta);
meta.has_been_cancelled_on_front_end = true;
}
}

return TerminateRunningTaskNoLock_(rn_it->second.get());
}
Expand Down
1 change: 1 addition & 0 deletions src/Craned/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ target_link_libraries(craned

cxxopts
Threads::Threads
${LIBUTIL_LIBRARY}
nlohmann_json::nlohmann_json

absl::flat_hash_map
Expand Down
Loading

0 comments on commit c4540ed

Please sign in to comment.