From 5a42881c2eafe7a97e00681c1ea00d928986fd2e Mon Sep 17 00:00:00 2001 From: Junlin Li <70465472+L-Xiafeng@users.noreply.github.com> Date: Fri, 29 Nov 2024 00:14:00 +0800 Subject: [PATCH] Refactor: Replace libevent with libuvw in Craned (#359) * Refactor: remove libevent in craned Signed-off-by: Li Junlin * Refactor: remove libevent in craned Signed-off-by: xiafeng * Refactor: remove libevent in craned Signed-off-by: xiafeng * Refactor: remove libevent ] Signed-off-by: xiafeng * Refactor: rename call backs Signed-off-by: xiafeng * Refactor: remove 'this->',rename call backs Signed-off-by: xiafeng * Bugfix: fix asan reporting use-after-poison Signed-off-by: Li Junlin * Bugfix: fix asan/tsan reported error Signed-off-by: Li Junlin * Bugfix: remove log in child process in case of deadlock in logger after fork. Signed-off-by: Li Junlin * Refactor: Solve comment,use const&. Signed-off-by: Li Junlin * Refactor: Rename private variables Signed-off-by: Li Junlin * chore: Remove redundant include Signed-off-by: Li Junlin * refactor: Remove no used code. Signed-off-by: Li Junlin --------- Signed-off-by: Li Junlin Signed-off-by: xiafeng --- dependencies/cmake/CMakeLists.txt | 2 +- src/CraneCtld/AccountManager.cpp | 4 - src/CraneCtld/CMakeLists.txt | 2 - src/CraneCtld/CraneCtld.cpp | 6 - src/CraneCtld/CranedKeeper.cpp | 2 - src/CraneCtld/CranedMetaContainer.cpp | 1 - src/CraneCtld/CtldGrpcServer.cpp | 3 - src/CraneCtld/DbClient.cpp | 2 - src/CraneCtld/DbClient.h | 1 - src/CraneCtld/TaskScheduler.cpp | 6 - src/Craned/CMakeLists.txt | 3 - src/Craned/CforedClient.cpp | 2 - src/Craned/CgroupManager.cpp | 69 +-- src/Craned/CgroupManager.h | 4 +- src/Craned/Craned.cpp | 10 +- src/Craned/CranedPreCompiledHeader.h | 1 + src/Craned/CranedPublicDefs.h | 7 +- src/Craned/CranedServer.cpp | 15 +- src/Craned/CranedServer.h | 5 - src/Craned/CtldClient.cpp | 8 +- src/Craned/CtldClient.h | 6 +- src/Craned/TaskManager.cpp | 612 ++++++++++---------------- src/Craned/TaskManager.h | 220 ++++----- 23 files changed, 383 insertions(+), 608 deletions(-) diff --git a/dependencies/cmake/CMakeLists.txt b/dependencies/cmake/CMakeLists.txt index f68a9aa25..c1ccd32fe 100644 --- a/dependencies/cmake/CMakeLists.txt +++ b/dependencies/cmake/CMakeLists.txt @@ -18,7 +18,7 @@ add_subdirectory(yaml-cpp) add_subdirectory(fmt) add_subdirectory(googletest) add_subdirectory(spdlog) -add_subdirectory(LibEvent) +#add_subdirectory(LibEvent) add_subdirectory(cxxopts) add_subdirectory(grpc) add_subdirectory(libcgroup) diff --git a/src/CraneCtld/AccountManager.cpp b/src/CraneCtld/AccountManager.cpp index d2e466ce6..30e1564dd 100644 --- a/src/CraneCtld/AccountManager.cpp +++ b/src/CraneCtld/AccountManager.cpp @@ -18,10 +18,6 @@ #include "AccountManager.h" -#include - -#include "CtldPublicDefs.h" -#include "crane/PasswordEntry.h" #include "protos/PublicDefs.pb.h" #include "range/v3/algorithm/contains.hpp" diff --git a/src/CraneCtld/CMakeLists.txt b/src/CraneCtld/CMakeLists.txt index ff2cbe3ab..f030eb8e2 100644 --- a/src/CraneCtld/CMakeLists.txt +++ b/src/CraneCtld/CMakeLists.txt @@ -29,8 +29,6 @@ target_link_libraries(cranectld PRIVATE Utility_PublicHeader Utility_PluginClient - dev_event_core - dev_event_pthreads uvw cxxopts diff --git a/src/CraneCtld/CraneCtld.cpp b/src/CraneCtld/CraneCtld.cpp index fc66208e8..959746ae1 100644 --- a/src/CraneCtld/CraneCtld.cpp +++ b/src/CraneCtld/CraneCtld.cpp @@ -19,7 +19,6 @@ #include "CtldPreCompiledHeader.h" // Precompiled header comes first! -#include #include #include #include @@ -35,9 +34,7 @@ #include "DbClient.h" #include "EmbeddedDbClient.h" #include "TaskScheduler.h" -#include "crane/Logger.h" #include "crane/Network.h" -#include "crane/OS.h" #include "crane/PluginClient.h" void ParseConfig(int argc, char** argv) { @@ -641,9 +638,6 @@ void InitializeCtldGlobalVariables() { crane::InitializeNetworkFunctions(); - // Enable inter-thread custom event notification. - evthread_use_pthreads(); - char hostname[HOST_NAME_MAX + 1]; int err = gethostname(hostname, HOST_NAME_MAX + 1); if (err != 0) { diff --git a/src/CraneCtld/CranedKeeper.cpp b/src/CraneCtld/CranedKeeper.cpp index 73ad643bc..863bfb5dd 100644 --- a/src/CraneCtld/CranedKeeper.cpp +++ b/src/CraneCtld/CranedKeeper.cpp @@ -18,8 +18,6 @@ #include "CranedKeeper.h" -#include - namespace Ctld { using grpc::ClientContext; diff --git a/src/CraneCtld/CranedMetaContainer.cpp b/src/CraneCtld/CranedMetaContainer.cpp index 3023221ca..f2e8a953c 100644 --- a/src/CraneCtld/CranedMetaContainer.cpp +++ b/src/CraneCtld/CranedMetaContainer.cpp @@ -19,7 +19,6 @@ #include "CranedMetaContainer.h" #include "CranedKeeper.h" -#include "crane/String.h" #include "protos/PublicDefs.pb.h" namespace Ctld { diff --git a/src/CraneCtld/CtldGrpcServer.cpp b/src/CraneCtld/CtldGrpcServer.cpp index 724fcf2b3..2449741e3 100644 --- a/src/CraneCtld/CtldGrpcServer.cpp +++ b/src/CraneCtld/CtldGrpcServer.cpp @@ -18,14 +18,11 @@ #include "CtldGrpcServer.h" -#include - #include "AccountManager.h" #include "CranedKeeper.h" #include "CranedMetaContainer.h" #include "EmbeddedDbClient.h" #include "TaskScheduler.h" -#include "crane/String.h" namespace Ctld { diff --git a/src/CraneCtld/DbClient.cpp b/src/CraneCtld/DbClient.cpp index 81a4cbae2..1bef5af95 100644 --- a/src/CraneCtld/DbClient.cpp +++ b/src/CraneCtld/DbClient.cpp @@ -18,8 +18,6 @@ #include "DbClient.h" -#include - #include #include diff --git a/src/CraneCtld/DbClient.h b/src/CraneCtld/DbClient.h index af8d82e79..5b35788de 100644 --- a/src/CraneCtld/DbClient.h +++ b/src/CraneCtld/DbClient.h @@ -25,7 +25,6 @@ #include #include -#include #include #include #include diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index 58efa3541..e713fd7dc 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -18,18 +18,12 @@ #include "TaskScheduler.h" -#include -#include - -#include - #include "AccountManager.h" #include "CranedKeeper.h" #include "CranedMetaContainer.h" #include "CtldPublicDefs.h" #include "EmbeddedDbClient.h" #include "crane/PluginClient.h" -#include "crane/PublicHeader.h" #include "protos/PublicDefs.pb.h" namespace Ctld { diff --git a/src/Craned/CMakeLists.txt b/src/Craned/CMakeLists.txt index 8fc7ad074..f8eb17c11 100644 --- a/src/Craned/CMakeLists.txt +++ b/src/Craned/CMakeLists.txt @@ -40,9 +40,6 @@ target_link_libraries(craned crane_proto_lib - dev_event_core - dev_event_pthreads - bs_thread_pool cxxopts diff --git a/src/Craned/CforedClient.cpp b/src/Craned/CforedClient.cpp index 21377149a..274aa630b 100644 --- a/src/Craned/CforedClient.cpp +++ b/src/Craned/CforedClient.cpp @@ -18,8 +18,6 @@ #include "CforedClient.h" -#include - #include "crane/String.h" namespace Craned { diff --git a/src/Craned/CgroupManager.cpp b/src/Craned/CgroupManager.cpp index abb3f934a..75a3ddca7 100644 --- a/src/Craned/CgroupManager.cpp +++ b/src/Craned/CgroupManager.cpp @@ -573,31 +573,6 @@ bool CgroupManager::ReleaseCgroup(uint32_t task_id, uid_t uid) { this->m_task_id_to_cg_spec_map_.Erase(task_id); { - auto uid_task_id_map = this->m_uid_to_task_ids_map_.GetMapExclusivePtr(); - if (!uid_task_id_map->contains(uid)) { - CRANE_DEBUG( - "Trying to release a non-existent cgroup for uid #{}. Ignoring it...", - uid); - return false; - } - - auto task_id_set_ptr = uid_task_id_map->at(uid).RawPtr(); - - task_id_set_ptr->erase(task_id); - if (task_id_set_ptr->empty()) { - uid_task_id_map->erase(uid); - } - // Do not access task_id_set_ptr after erasing form map - } - - if (!this->m_task_id_to_cg_map_.Contains(task_id)) { - CRANE_DEBUG( - "Trying to release a non-existent cgroup for task #{}. Ignoring " - "it...", - task_id); - - return false; - } else { // The termination of all processes in a cgroup is a time-consuming work. // Therefore, once we are sure that the cgroup for this task exists, we // let gRPC call return and put the termination work into the thread pool @@ -605,8 +580,20 @@ bool CgroupManager::ReleaseCgroup(uint32_t task_id, uid_t uid) { // Kind of async behavior. // avoid deadlock by Erase at next line - CgroupInterface *cgroup = this->m_task_id_to_cg_map_[task_id]->release(); - this->m_task_id_to_cg_map_.Erase(task_id); + auto task_id_to_cg_map_ptr = + this->m_task_id_to_cg_map_.GetMapExclusivePtr(); + auto it = task_id_to_cg_map_ptr->find(task_id); + if (it == task_id_to_cg_map_ptr->end()) { + CRANE_DEBUG( + "Trying to release a non-existent cgroup for task #{}. Ignoring " + "it...", + task_id); + + return false; + } + CgroupInterface *cgroup = it->second.GetExclusivePtr()->release(); + + task_id_to_cg_map_ptr->erase(task_id); if (cgroup != nullptr) { g_thread_pool->detach_task([cgroup]() { @@ -632,8 +619,27 @@ bool CgroupManager::ReleaseCgroup(uint32_t task_id, uid_t uid) { delete cgroup; }); } - return true; } + + { + auto uid_task_ids_map_ptr = this->m_uid_to_task_ids_map_.GetMapExclusivePtr(); + auto it = uid_task_ids_map_ptr->find(uid); + if (it == uid_task_ids_map_ptr->end()) { + CRANE_DEBUG( + "Trying to release a non-existent cgroup for uid #{}. Ignoring it...", + uid); + return false; + } + + auto task_id_set_ptr = uid_task_ids_map_ptr->at(uid).RawPtr(); + + task_id_set_ptr->erase(task_id); + if (task_id_set_ptr->empty()) { + uid_task_ids_map_ptr->erase(uid); + } + // Do not access task_id_set_ptr after erasing form map + } + return true; } void CgroupManager::RmAllTaskCgroupsUnderController_( @@ -725,8 +731,11 @@ bool CgroupManager::QueryTaskInfoOfUidAsync(uid_t uid, TaskInfoOfUid *info) { info->job_cnt = 0; info->cgroup_exists = false; - if (this->m_uid_to_task_ids_map_.Contains(uid)) { - auto task_ids = this->m_uid_to_task_ids_map_[uid]; + if (auto task_ids = this->m_uid_to_task_ids_map_[uid]) { + if (!task_ids) { + CRANE_WARN("Uid {} not found in uid_to_task_ids_map", uid); + return false; + } info->job_cnt = task_ids->size(); info->first_task_id = *task_ids->begin(); } diff --git a/src/Craned/CgroupManager.h b/src/Craned/CgroupManager.h index 8df2c05ac..f88d338e4 100644 --- a/src/Craned/CgroupManager.h +++ b/src/Craned/CgroupManager.h @@ -24,12 +24,12 @@ * */ #pragma once +#include "CranedPublicDefs.h" +// Precompiled header comes first. #include -#include "CranedPublicDefs.h" #include "crane/AtomicHashMap.h" -#include "crane/OS.h" #ifdef CRANE_ENABLE_BPF # include diff --git a/src/Craned/Craned.cpp b/src/Craned/Craned.cpp index 956a2cabb..1a27eafad 100644 --- a/src/Craned/Craned.cpp +++ b/src/Craned/Craned.cpp @@ -19,11 +19,8 @@ #include "CranedPublicDefs.h" // Precompiled header comes first. -#include #include #include -#include -#include #include #include @@ -32,10 +29,8 @@ #include "CforedClient.h" #include "CranedServer.h" #include "CtldClient.h" -#include "crane/Network.h" -#include "crane/OS.h" +#include "DeviceManager.h" #include "crane/PluginClient.h" -#include "crane/PublicHeader.h" #include "crane/String.h" using Craned::g_config; @@ -602,9 +597,6 @@ void GlobalVariableInit() { // SIGPIPE while communicating with spawned task processes. signal(SIGPIPE, SIG_IGN); - // Enable inter-thread custom event notification. - evthread_use_pthreads(); - PasswordEntry::InitializeEntrySize(); using Craned::CgroupManager; diff --git a/src/Craned/CranedPreCompiledHeader.h b/src/Craned/CranedPreCompiledHeader.h index 4b835e154..2d318701b 100644 --- a/src/Craned/CranedPreCompiledHeader.h +++ b/src/Craned/CranedPreCompiledHeader.h @@ -84,3 +84,4 @@ // Then include the other Crane headers #include "crane/GrpcHelper.h" #include "crane/Network.h" +#include "crane/PublicHeader.h" diff --git a/src/Craned/CranedPublicDefs.h b/src/Craned/CranedPublicDefs.h index 20017cfb6..a2563cd1d 100644 --- a/src/Craned/CranedPublicDefs.h +++ b/src/Craned/CranedPublicDefs.h @@ -22,16 +22,15 @@ // Precompiled header comes first #include "crane/OS.h" -#include "crane/PublicHeader.h" -#include "protos/Crane.pb.h" + namespace Craned { -inline const uint64_t kEvSigChldResendMs = 500'000; +inline constexpr uint64_t kEvSigChldResendMs = 500; using EnvMap = std::unordered_map; -struct TaskStatusChange { +struct TaskStatusChangeQueueElem { task_id_t task_id{}; crane::grpc::TaskStatus new_status{}; uint32_t exit_code{}; diff --git a/src/Craned/CranedServer.cpp b/src/Craned/CranedServer.cpp index 0c28183c4..2b3406c6d 100644 --- a/src/Craned/CranedServer.cpp +++ b/src/Craned/CranedServer.cpp @@ -18,11 +18,9 @@ #include "CranedServer.h" -#include -#include #include -#include "CtldClient.h" +#include "TaskManager.h" namespace Craned { @@ -173,11 +171,11 @@ grpc::Status CranedServiceImpl::CreateCgroupForTasks( for (int i = 0; i < request->task_id_list_size(); i++) { task_id_t task_id = request->task_id_list(i); uid_t uid = request->uid_list(i); - crane::grpc::ResourceInNode const &res = request->res_list(i); + const crane::grpc::ResourceInNode &res = request->res_list(i); CgroupSpec spec{.uid = uid, .task_id = task_id, - .res_in_node = std::move(res), + .res_in_node = res, .execution_node = request->execution_node(i)}; CRANE_TRACE("Receive CreateCgroup for task #{}, uid {}", task_id, uid); cg_specs.emplace_back(std::move(spec)); @@ -309,7 +307,7 @@ grpc::Status CranedServiceImpl::QueryTaskIdFromPortForward( crane::grpc::QueryTaskIdFromPortRequest request_to_remote_service; crane::grpc::QueryTaskIdFromPortReply reply_from_remote_service; - ClientContext context_of_remote_service; + grpc::ClientContext context_of_remote_service; Status status_remote_service; request_to_remote_service.set_port(request->ssh_remote_port()); @@ -389,8 +387,7 @@ Status CranedServiceImpl::QueryTaskEnvVariables( grpc::ServerContext *context, const ::crane::grpc::QueryTaskEnvVariablesRequest *request, crane::grpc::QueryTaskEnvVariablesReply *response) { - auto task_env_map = - g_task_mgr->QueryTaskEnvMapAsync(request->task_id()); + auto task_env_map = g_task_mgr->QueryTaskEnvMapAsync(request->task_id()); if (task_env_map.has_value()) { for (const auto &[name, value] : task_env_map.value()) response->mutable_env_map()->emplace(name, value); @@ -445,7 +442,7 @@ grpc::Status CranedServiceImpl::QueryTaskEnvVariablesForward( crane::grpc::QueryTaskEnvVariablesRequest request_to_remote_service; crane::grpc::QueryTaskEnvVariablesReply reply_from_remote_service; - ClientContext context_of_remote_service; + grpc::ClientContext context_of_remote_service; Status status_remote_service; request_to_remote_service.set_task_id(request->task_id()); diff --git a/src/Craned/CranedServer.h b/src/Craned/CranedServer.h index d1a22384a..0e89c60e3 100644 --- a/src/Craned/CranedServer.h +++ b/src/Craned/CranedServer.h @@ -21,11 +21,6 @@ #include "CranedPublicDefs.h" // Precompiled header comes first. -#include - -#include "TaskManager.h" -#include "crane/Lock.h" -#include "crane/PublicHeader.h" #include "protos/Crane.grpc.pb.h" #include "protos/Crane.pb.h" diff --git a/src/Craned/CtldClient.cpp b/src/Craned/CtldClient.cpp index 7af81ab03..fbdd1892e 100644 --- a/src/Craned/CtldClient.cpp +++ b/src/Craned/CtldClient.cpp @@ -24,7 +24,7 @@ CtldClient::~CtldClient() { m_thread_stop_ = true; CRANE_TRACE("CtldClient is ending. Waiting for the thread to finish."); - m_async_send_thread_.join(); + if (m_async_send_thread_.joinable()) m_async_send_thread_.join(); } void CtldClient::InitChannelAndStub(const std::string& server_address) { @@ -83,12 +83,12 @@ void CtldClient::OnCraneCtldConnected() { return; } } - } while (retry_time--); + } while (!m_thread_stop_ && retry_time--); CRANE_ERROR("Failed to register actively."); } -void CtldClient::TaskStatusChangeAsync(TaskStatusChange&& task_status_change) { +void CtldClient::TaskStatusChangeAsync(TaskStatusChangeQueueElem&& task_status_change) { absl::MutexLock lock(&m_task_status_change_mtx_); m_task_status_change_list_.emplace_back(std::move(task_status_change)); } @@ -151,7 +151,7 @@ void CtldClient::AsyncSendThread_() { continue; } - std::list changes; + std::list changes; changes.splice(changes.begin(), std::move(m_task_status_change_list_)); m_task_status_change_mtx_.Unlock(); diff --git a/src/Craned/CtldClient.h b/src/Craned/CtldClient.h index 4e9334ff4..338850477 100644 --- a/src/Craned/CtldClient.h +++ b/src/Craned/CtldClient.h @@ -21,9 +21,7 @@ #include "CranedPublicDefs.h" // Precompiled header comes first. -#include "crane/PublicHeader.h" #include "protos/Crane.grpc.pb.h" -#include "protos/Crane.pb.h" namespace Craned { @@ -54,7 +52,7 @@ class CtldClient { void OnCraneCtldConnected(); - void TaskStatusChangeAsync(TaskStatusChange&& task_status_change); + void TaskStatusChangeAsync(TaskStatusChangeQueueElem&& task_status_change); bool CancelTaskStatusChangeByTaskId(task_id_t task_id, crane::grpc::TaskStatus* new_status); @@ -70,7 +68,7 @@ class CtldClient { absl::Mutex m_task_status_change_mtx_; - std::list m_task_status_change_list_ + std::list m_task_status_change_list_ ABSL_GUARDED_BY(m_task_status_change_mtx_); std::thread m_async_send_thread_; diff --git a/src/Craned/TaskManager.cpp b/src/Craned/TaskManager.cpp index 05fd8b556..949e41213 100644 --- a/src/Craned/TaskManager.cpp +++ b/src/Craned/TaskManager.cpp @@ -21,10 +21,11 @@ #include #include #include +#include #include "CforedClient.h" -#include "CgroupManager.h" -#include "crane/OS.h" +#include "CtldClient.h" +#include "crane/String.h" #include "protos/CraneSubprocess.pb.h" #include "protos/PublicDefs.pb.h" @@ -97,171 +98,101 @@ TaskManager::TaskManager() { // Only called once. Guaranteed by singleton pattern. m_instance_ptr_ = this; - m_ev_base_ = event_base_new(); - if (m_ev_base_ == nullptr) { - CRANE_ERROR("Could not initialize libevent!"); - std::terminate(); - } - { // SIGCHLD - m_ev_sigchld_ = evsignal_new(m_ev_base_, SIGCHLD, EvSigchldCb_, this); - if (!m_ev_sigchld_) { - CRANE_ERROR("Failed to create the SIGCHLD event!"); - std::terminate(); - } + m_uvw_loop_ = uvw::loop::create(); - if (event_add(m_ev_sigchld_, nullptr) < 0) { - CRANE_ERROR("Could not add the SIGCHLD event to base!"); - std::terminate(); - } - } - { - m_ev_process_sigchld_ = event_new(m_ev_base_, -1, EV_PERSIST | EV_READ, - EvProcessSigchldCb_, this); - if (!m_ev_process_sigchld_) { - CRANE_ERROR("Failed to create the Do SIGCHLD event!"); - std::terminate(); - } + m_sigchld_handle_ = m_uvw_loop_->resource(); + m_sigchld_handle_->on( + [this](const uvw::signal_event&, uvw::signal_handle&) { EvSigchldCb_(); }); - if (event_add(m_ev_process_sigchld_, nullptr) < 0) { - CRANE_ERROR("Could not add the Do SIGCHLD event to base!"); - std::terminate(); - } + if (m_sigchld_handle_->start(SIGCLD) != 0) { + CRANE_ERROR("Failed to start the SIGCLD handle"); } - { // SIGINT - m_ev_sigint_ = evsignal_new(m_ev_base_, SIGINT, EvSigintCb_, this); - if (!m_ev_sigint_) { - CRANE_ERROR("Failed to create the SIGCHLD event!"); - std::terminate(); - } - if (event_add(m_ev_sigint_, nullptr) < 0) { - CRANE_ERROR("Could not add the SIGINT event to base!"); - std::terminate(); - } + m_sigint_handle_ = m_uvw_loop_->resource(); + m_sigint_handle_->on( + [this](const uvw::signal_event&, uvw::signal_handle&) { EvSigintCb_(); }); + if (m_sigint_handle_->start(SIGINT) != 0) { + CRANE_ERROR("Failed to start the SIGINT handle"); } - { // gRPC: QueryTaskIdFromPid - m_ev_query_task_id_from_pid_ = - event_new(m_ev_base_, -1, EV_PERSIST | EV_READ, - EvGrpcQueryTaskIdFromPidCb_, this); - if (!m_ev_query_task_id_from_pid_) { - CRANE_ERROR("Failed to create the query task id event!"); - std::terminate(); - } - - if (event_add(m_ev_query_task_id_from_pid_, nullptr) < 0) { - CRANE_ERROR("Could not add the query task id event to base!"); - std::terminate(); - } - } - { // gRPC: QueryTaskEnvironmentVariable - m_ev_query_task_environment_variables_ = - event_new(m_ev_base_, -1, EV_PERSIST | EV_READ, - EvGrpcQueryTaskEnvironmentVariableCb_, this); - if (!m_ev_query_task_environment_variables_) { - CRANE_ERROR("Failed to create the query task env event!"); - std::terminate(); - } - - if (event_add(m_ev_query_task_environment_variables_, nullptr) < 0) { - CRANE_ERROR("Could not add the query task env to base!"); - std::terminate(); - } - } - { // Exit Event - m_ev_exit_event_ = - event_new(m_ev_base_, -1, EV_PERSIST | EV_READ, EvExitEventCb_, this); - if (!m_ev_exit_event_) { - CRANE_ERROR("Failed to create the exit event!"); - std::terminate(); - } - if (event_add(m_ev_exit_event_, nullptr) < 0) { - CRANE_ERROR("Could not add the exit event to base!"); - std::terminate(); + // gRPC: QueryTaskIdFromPid + m_query_task_id_from_pid_async_handle_ = + m_uvw_loop_->resource(); + m_query_task_id_from_pid_async_handle_->on( + [this](const uvw::async_event&, uvw::async_handle&) { + EvCleanGrpcQueryTaskIdFromPidQueueCb_(); + }); + + // gRPC: QueryTaskEnvironmentVariable + m_query_task_environment_variables_async_handle_ = + m_uvw_loop_->resource(); + m_query_task_environment_variables_async_handle_->on( + [this](const uvw::async_event&, uvw::async_handle&) { + EvCleanGrpcQueryTaskEnvQueueCb_(); + }); + + // gRPC Execute Task Event + m_grpc_execute_task_async_handle_ = + m_uvw_loop_->resource(); + m_grpc_execute_task_async_handle_->on( + [this](const uvw::async_event&, uvw::async_handle&) { + EvCleanGrpcExecuteTaskQueueCb_(); + }); + + m_process_sigchld_async_handle_ = m_uvw_loop_->resource(); + m_process_sigchld_async_handle_->on( + [this](const uvw::async_event&, uvw::async_handle&) { + EvCleanSigchldQueueCb_(); + }); + + // Task Status Change Event + m_task_status_change_async_handle_ = + m_uvw_loop_->resource(); + m_task_status_change_async_handle_->on( + [this](const uvw::async_event&, uvw::async_handle&) { + EvCleanTaskStatusChangeQueueCb_(); + }); + + m_change_task_time_limit_async_handle_ = + m_uvw_loop_->resource(); + m_change_task_time_limit_async_handle_->on( + [this](const uvw::async_event&, uvw::async_handle&) { + EvCleanChangeTaskTimeLimitQueueCb_(); + }); + + m_terminate_task_async_handle_ = m_uvw_loop_->resource(); + m_terminate_task_async_handle_->on( + [this](const uvw::async_event&, uvw::async_handle&) { + EvCleanTerminateTaskQueueCb_(); + }); + + m_check_task_status_async_handle_ = + m_uvw_loop_->resource(); + m_check_task_status_async_handle_->on( + [this](const uvw::async_event&, uvw::async_handle&) { + EvCleanCheckTaskStatusQueueCb_(); + }); + + m_uvw_thread_ = std::thread([this]() { + util::SetCurrentThreadName("TaskMgrLoopThr"); + auto idle_handle = m_uvw_loop_->resource(); + idle_handle->on( + [this](const uvw::idle_event&, uvw::idle_handle& h) { + if (m_task_cleared_) { + h.parent().walk([](auto&& h) { h.close(); }); + h.parent().stop(); + } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + }); + if (idle_handle->start() != 0) { + CRANE_ERROR("Failed to start the idle event in TaskManager loop."); } - } - { // Grpc Execute Task Event - m_ev_grpc_execute_task_ = event_new(m_ev_base_, -1, EV_READ | EV_PERSIST, - EvGrpcExecuteTaskCb_, this); - if (!m_ev_grpc_execute_task_) { - CRANE_ERROR("Failed to create the grpc_execute_task event!"); - std::terminate(); - } - if (event_add(m_ev_grpc_execute_task_, nullptr) < 0) { - CRANE_ERROR("Could not add the m_ev_grpc_execute_task_ to base!"); - std::terminate(); - } - } - { // Task Status Change Event - m_ev_task_status_change_ = event_new(m_ev_base_, -1, EV_READ | EV_PERSIST, - EvTaskStatusChangeCb_, this); - if (!m_ev_task_status_change_) { - CRANE_ERROR("Failed to create the task_status_change event!"); - std::terminate(); - } - if (event_add(m_ev_task_status_change_, nullptr) < 0) { - CRANE_ERROR("Could not add the m_ev_task_status_change_event_ to base!"); - std::terminate(); - } - } - { - m_ev_task_time_limit_change_ = event_new( - m_ev_base_, -1, EV_READ | EV_PERSIST, EvChangeTaskTimeLimitCb_, this); - if (!m_ev_task_time_limit_change_) { - CRANE_ERROR("Failed to create the task_time_limit_change event!"); - std::terminate(); - } - if (event_add(m_ev_task_time_limit_change_, nullptr) < 0) { - CRANE_ERROR("Could not add the m_ev_task_time_limit_change_ to base!"); - std::terminate(); - } - } - { - m_ev_task_terminate_ = event_new(m_ev_base_, -1, EV_READ | EV_PERSIST, - EvTerminateTaskCb_, this); - if (!m_ev_task_terminate_) { - CRANE_ERROR("Failed to create the task_terminate event!"); - std::terminate(); - } - if (event_add(m_ev_task_terminate_, nullptr) < 0) { - CRANE_ERROR("Could not add the m_ev_task_terminate_ to base!"); - std::terminate(); - } - } - { - m_ev_check_task_status_ = event_new(m_ev_base_, -1, EV_READ | EV_PERSIST, - EvCheckTaskStatusCb_, this); - if (!m_ev_check_task_status_) { - CRANE_ERROR("Failed to create the check_task_status event!"); - std::terminate(); - } - if (event_add(m_ev_check_task_status_, nullptr) < 0) { - CRANE_ERROR("Could not add the m_ev_check_task_status_ to base!"); - std::terminate(); - } - } - - m_ev_loop_thread_ = - std::thread([this]() { event_base_dispatch(m_ev_base_); }); + m_uvw_loop_->run(); + }); } TaskManager::~TaskManager() { - if (m_ev_loop_thread_.joinable()) m_ev_loop_thread_.join(); - - if (m_ev_sigchld_) event_free(m_ev_sigchld_); - if (m_ev_sigint_) event_free(m_ev_sigint_); - - if (m_ev_query_task_id_from_pid_) event_free(m_ev_query_task_id_from_pid_); - if (m_ev_query_task_environment_variables_) - event_free(m_ev_query_task_environment_variables_); - if (m_ev_grpc_execute_task_) event_free(m_ev_grpc_execute_task_); - if (m_ev_exit_event_) event_free(m_ev_exit_event_); - if (m_ev_task_status_change_) event_free(m_ev_task_status_change_); - if (m_ev_task_time_limit_change_) event_free(m_ev_task_time_limit_change_); - if (m_ev_task_terminate_) event_free(m_ev_task_terminate_); - if (m_ev_check_task_status_) event_free(m_ev_check_task_status_); - - if (m_ev_base_) event_base_free(m_ev_base_); + if (m_uvw_thread_.joinable()) m_uvw_thread_.join(); } const TaskInstance* TaskManager::FindInstanceByTaskId_(uint32_t task_id) { @@ -283,14 +214,15 @@ void TaskManager::TaskStopAndDoStatusChangeAsync(uint32_t task_id) { switch (instance->err_before_exec) { case CraneErr::kProtobufError: - EvActivateTaskStatusChange_(task_id, crane::grpc::TaskStatus::Failed, - ExitCode::kExitCodeSpawnProcessFail, - std::nullopt); + ActivateTaskStatusChangeAsync_(task_id, crane::grpc::TaskStatus::Failed, + ExitCode::kExitCodeSpawnProcessFail, + std::nullopt); break; case CraneErr::kCgroupError: - EvActivateTaskStatusChange_(task_id, crane::grpc::TaskStatus::Failed, - ExitCode::kExitCodeCgroupError, std::nullopt); + ActivateTaskStatusChangeAsync_(task_id, crane::grpc::TaskStatus::Failed, + ExitCode::kExitCodeCgroupError, + std::nullopt); break; default: @@ -302,40 +234,40 @@ void TaskManager::TaskStopAndDoStatusChangeAsync(uint32_t task_id) { // For a Batch task, the end of the process means it is done. if (sigchld_info.is_terminated_by_signal) { if (instance->cancelled_by_user) - EvActivateTaskStatusChange_( + ActivateTaskStatusChangeAsync_( task_id, crane::grpc::TaskStatus::Cancelled, sigchld_info.value + ExitCode::kTerminationSignalBase, std::nullopt); else if (instance->terminated_by_timeout) - EvActivateTaskStatusChange_( + ActivateTaskStatusChangeAsync_( task_id, crane::grpc::TaskStatus::ExceedTimeLimit, sigchld_info.value + ExitCode::kTerminationSignalBase, std::nullopt); else - EvActivateTaskStatusChange_( + ActivateTaskStatusChangeAsync_( task_id, crane::grpc::TaskStatus::Failed, sigchld_info.value + ExitCode::kTerminationSignalBase, std::nullopt); } else - EvActivateTaskStatusChange_(task_id, crane::grpc::TaskStatus::Completed, - sigchld_info.value, std::nullopt); + ActivateTaskStatusChangeAsync_(task_id, + crane::grpc::TaskStatus::Completed, + sigchld_info.value, std::nullopt); } else /* Calloc */ { // For a COMPLETING Calloc task with a process running, // the end of this process means that this task is done. if (sigchld_info.is_terminated_by_signal) - EvActivateTaskStatusChange_( + ActivateTaskStatusChangeAsync_( task_id, crane::grpc::TaskStatus::Completed, sigchld_info.value + ExitCode::kTerminationSignalBase, std::nullopt); else - EvActivateTaskStatusChange_(task_id, crane::grpc::TaskStatus::Completed, - sigchld_info.value, std::nullopt); + ActivateTaskStatusChangeAsync_(task_id, + crane::grpc::TaskStatus::Completed, + sigchld_info.value, std::nullopt); } } -void TaskManager::EvSigchldCb_(evutil_socket_t sig, short events, - void* user_data) { +void TaskManager::EvSigchldCb_() { assert(m_instance_ptr_->m_instance_ptr_ != nullptr); - auto* this_ = reinterpret_cast(user_data); int status; pid_t pid; @@ -369,15 +301,15 @@ void TaskManager::EvSigchldCb_(evutil_socket_t sig, short events, } else if (WIFCONTINUED(status)) { printf("continued\n"); } */ - this_->m_sigchld_queue_.enqueue(std::move(sigchld_info)); - event_active(this_->m_ev_process_sigchld_, 0, 0); + m_sigchld_queue_.enqueue(std::move(sigchld_info)); + m_process_sigchld_async_handle_->send(); } else if (pid == 0) { // There's no child that needs reaping. // If Craned is exiting, check if there's any task remaining. // If there's no task running, just stop the loop of TaskManager. - if (this_->m_is_ending_now_) { - if (this_->m_task_map_.empty()) { - this_->EvActivateShutdown_(); + if (m_is_ending_now_) { + if (m_task_map_.empty()) { + ActivateShutdownAsync_(); } } break; @@ -389,41 +321,37 @@ void TaskManager::EvSigchldCb_(evutil_socket_t sig, short events, } } -void TaskManager::EvProcessSigchldCb_(int sig, short events, void* user_data) { - auto* this_ = reinterpret_cast(user_data); - +void TaskManager::EvCleanSigchldQueueCb_() { std::unique_ptr sigchld_info; - while (this_->m_sigchld_queue_.try_dequeue(sigchld_info)) { + while (m_sigchld_queue_.try_dequeue(sigchld_info)) { auto pid = sigchld_info->pid; if (sigchld_info->resend_timer != nullptr) { - evtimer_del(sigchld_info->resend_timer); - event_free(sigchld_info->resend_timer); - sigchld_info->resend_timer = nullptr; + sigchld_info->resend_timer->close(); + sigchld_info->resend_timer.reset(); } - this_->m_mtx_.Lock(); - auto task_iter = this_->m_pid_task_map_.find(pid); - auto proc_iter = this_->m_pid_proc_map_.find(pid); - - if (task_iter == this_->m_pid_task_map_.end() || - proc_iter == this_->m_pid_proc_map_.end()) { - this_->m_mtx_.Unlock(); - - EvQueueSigchldArg* arg = new EvQueueSigchldArg; - - timeval tv{kEvSigChldResendMs / 1000'000, kEvSigChldResendMs % 1000'000}; - sigchld_info->resend_timer = - event_new(this_->m_ev_base_, -1, 0, EvOnSigchldTimerCb_, arg); - evtimer_add(sigchld_info->resend_timer, &tv); - - CRANE_ASSERT_MSG(sigchld_info->resend_timer != nullptr, - "Failed to create new timer."); + m_mtx_.Lock(); + auto task_iter = m_pid_task_map_.find(pid); + auto proc_iter = m_pid_proc_map_.find(pid); + + if (task_iter == m_pid_task_map_.end() || + proc_iter == m_pid_proc_map_.end()) { + m_mtx_.Unlock(); + + auto* sigchld_info_raw_ptr = sigchld_info.release(); + sigchld_info_raw_ptr->resend_timer = + m_uvw_loop_->resource(); + sigchld_info_raw_ptr->resend_timer->on( + [this, sigchld_info_raw_ptr](const uvw::timer_event&, + uvw::timer_handle&) { + EvSigchldTimerCb_(sigchld_info_raw_ptr); + }); + sigchld_info_raw_ptr->resend_timer->start( + std::chrono::milliseconds(kEvSigChldResendMs), + std::chrono::milliseconds(0)); CRANE_TRACE("Child Process {} exit too early, will do SigchldCb later", - sigchld_info->pid); - arg->task_manager = this_; - arg->sigchld_info = std::move(sigchld_info); - + pid); continue; } @@ -432,10 +360,10 @@ void TaskManager::EvProcessSigchldCb_(int sig, short events, void* user_data) { uint32_t task_id = instance->task.task_id(); // Remove indexes from pid to ProcessInstance* - this_->m_pid_proc_map_.erase(proc_iter); - this_->m_pid_task_map_.erase(task_iter); + m_pid_proc_map_.erase(proc_iter); + m_pid_task_map_.erase(task_iter); - this_->m_mtx_.Unlock(); + m_mtx_.Unlock(); instance->sigchld_info = *sigchld_info; proc->Finish(sigchld_info->is_terminated_by_signal, sigchld_info->value); @@ -453,7 +381,7 @@ void TaskManager::EvProcessSigchldCb_(int sig, short events, void* user_data) { if (sigchld_info->is_terminated_by_signal) { // If a task is terminated by a signal and there are other // running processes belonging to this task, kill them. - this_->TerminateTaskAsync(task_id); + TerminateTaskAsync(task_id); } } else { if (instance->IsCrun()) @@ -466,54 +394,31 @@ void TaskManager::EvProcessSigchldCb_(int sig, short events, void* user_data) { // If the ProcessInstance has no process left, // send TaskStatusChange for this task. // See the comment of EvActivateTaskStatusChange_. - this_->TaskStopAndDoStatusChangeAsync(task_id); + TaskStopAndDoStatusChangeAsync(task_id); } } } } } -void TaskManager::EvOnSigchldTimerCb_(int, short, void* arg_) { - auto* arg = reinterpret_cast(arg_); - auto* this_ = arg->task_manager; - - this_->m_sigchld_queue_.enqueue(std::move(arg->sigchld_info)); - event_active(this_->m_ev_process_sigchld_, 0, 0); - - delete arg; -} - -void TaskManager::EvSubprocessReadCb_(struct bufferevent* bev, void* process) { - auto* proc = reinterpret_cast(process); - - size_t buf_len = evbuffer_get_length(bev->input); - - std::string str; - str.resize(buf_len); - int n_copy = evbuffer_remove(bev->input, str.data(), buf_len); - - CRANE_TRACE("Read {:>4} bytes from subprocess (pid: {}): {}", n_copy, - proc->GetPid(), str); - - proc->Output(std::move(str)); +void TaskManager::EvSigchldTimerCb_(ProcSigchldInfo* sigchld_info) { + m_sigchld_queue_.enqueue(std::unique_ptr(sigchld_info)); + m_process_sigchld_async_handle_->send(); } -void TaskManager::EvSigintCb_(int sig, short events, void* user_data) { - auto* this_ = reinterpret_cast(user_data); - - if (!this_->m_is_ending_now_) { +void TaskManager::EvSigintCb_() { + if (!m_is_ending_now_) { // SIGINT has been sent once. If SIGINT are captured twice, it indicates // the signal sender can't wait to stop Craned and Craned just send SIGTERM // to all tasks to kill them immediately. CRANE_INFO("Caught SIGINT. Send SIGTERM to all running tasks..."); - this_->m_is_ending_now_ = true; + m_is_ending_now_ = true; - if (this_->m_sigint_cb_) this_->m_sigint_cb_(); + if (m_sigint_cb_) m_sigint_cb_(); - for (auto task_it = this_->m_task_map_.begin(); - task_it != this_->m_task_map_.end();) { + for (auto task_it = m_task_map_.begin(); task_it != m_task_map_.end();) { task_id_t task_id = task_it->first; TaskInstance* task_instance = task_it->second.get(); @@ -537,23 +442,23 @@ void TaskManager::EvSigintCb_(int sig, short events, void* user_data) { task_instance->cgroup->KillAllProcesses(); auto to_remove_it = task_it++; - this_->m_task_map_.erase(to_remove_it); + m_task_map_.erase(to_remove_it); } } - if (this_->m_task_map_.empty()) { + if (m_task_map_.empty()) { // If there is not any batch task to wait for, stop the loop directly. - this_->EvActivateShutdown_(); + ActivateShutdownAsync_(); } } else { CRANE_INFO( "SIGINT has been triggered already. Sending SIGKILL to all process " "groups instead."); - if (this_->m_task_map_.empty()) { + if (m_task_map_.empty()) { // If there is no task to kill, stop the loop directly. - this_->EvActivateShutdown_(); + ActivateShutdownAsync_(); } else { - for (auto&& [task_id, task_instance] : this_->m_task_map_) { + for (auto&& [task_id, task_instance] : m_task_map_) { for (auto&& [pid, pr_instance] : task_instance->processes) { CRANE_INFO( "Sending SIGKILL to the process group of task #{} with root " @@ -566,23 +471,14 @@ void TaskManager::EvSigintCb_(int sig, short events, void* user_data) { } } -void TaskManager::EvExitEventCb_(int efd, short events, void* user_data) { - auto* this_ = reinterpret_cast(user_data); - - CRANE_TRACE("Exit event triggered. Stop event loop."); - - struct timeval delay = {0, 0}; - event_base_loopexit(this_->m_ev_base_, &delay); -} - -void TaskManager::EvActivateShutdown_() { +void TaskManager::ActivateShutdownAsync_() { CRANE_TRACE("Triggering exit event..."); - m_is_ending_now_ = true; - event_active(m_ev_exit_event_, 0, 0); + CRANE_ASSERT(m_is_ending_now_ == true); + m_task_cleared_ = true; } void TaskManager::Wait() { - if (m_ev_loop_thread_.joinable()) m_ev_loop_thread_.join(); + if (m_uvw_thread_.joinable()) m_uvw_thread_.join(); } CraneErr TaskManager::KillProcessInstance_(const ProcessInstance* proc, @@ -818,8 +714,8 @@ CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance, const std::string& cwd = instance->task.cwd(); rc = chdir(cwd.c_str()); if (rc == -1) { - CRANE_ERROR("[Child Process] Error: chdir to {}. {}", cwd.c_str(), - strerror(errno)); + // CRANE_ERROR("[Child Process] Error: chdir to {}. {}", cwd.c_str(), + // strerror(errno)); std::abort(); } @@ -840,13 +736,13 @@ CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance, ok = ParseDelimitedFromZeroCopyStream(&msg, &istream, nullptr); if (!ok || !msg.ok()) { - if (!ok) { - int err = istream.GetErrno(); - CRANE_ERROR("Failed to read socket from parent: {}", strerror(err)); - } + // if (!ok) { + // int err = istream.GetErrno(); + // CRANE_ERROR("Failed to read socket from parent: {}", strerror(err)); + // } - if (!msg.ok()) - CRANE_ERROR("Parent process ask not to start the subprocess."); + // if (!msg.ok()) + // CRANE_ERROR("Parent process ask not to start the subprocess."); std::abort(); } @@ -862,8 +758,8 @@ CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance, stdout_fd = open(stdout_file_path.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0644); if (stdout_fd == -1) { - CRANE_ERROR("[Child Process] Error: open {}. {}", stdout_file_path, - strerror(errno)); + // CRANE_ERROR("[Child Process] Error: open {}. {}", stdout_file_path, + // strerror(errno)); std::abort(); } dup2(stdout_fd, 1); @@ -874,8 +770,8 @@ CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance, stderr_fd = open(stderr_file_path.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0644); if (stderr_fd == -1) { - CRANE_ERROR("[Child Process] Error: open {}. {}", stderr_file_path, - strerror(errno)); + // CRANE_ERROR("[Child Process] Error: open {}. {}", stderr_file_path, + // strerror(errno)); std::abort(); } dup2(stderr_fd, 2); // stderr -> error file @@ -899,7 +795,7 @@ CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance, ok = SerializeDelimitedToZeroCopyStream(child_process_ready, &ostream); ok &= ostream.Flush(); if (!ok) { - CRANE_ERROR("[Child Process] Error: Failed to flush."); + // CRANE_ERROR("[Child Process] Error: Failed to flush."); std::abort(); } @@ -982,23 +878,21 @@ CraneErr TaskManager::ExecuteTaskAsync(crane::grpc::TaskToD const& task) { instance->meta = std::make_unique(); m_grpc_execute_task_queue_.enqueue(std::move(instance)); - event_active(m_ev_grpc_execute_task_, 0, 0); + m_grpc_execute_task_async_handle_->send(); return CraneErr::kOk; } -void TaskManager::EvGrpcExecuteTaskCb_(int, short events, void* user_data) { - auto* this_ = reinterpret_cast(user_data); +void TaskManager::EvCleanGrpcExecuteTaskQueueCb_() { std::unique_ptr popped_instance; - while (this_->m_grpc_execute_task_queue_.try_dequeue(popped_instance)) { + while (m_grpc_execute_task_queue_.try_dequeue(popped_instance)) { // Once ExecuteTask RPC is processed, the TaskInstance goes into // m_task_map_. TaskInstance* instance = popped_instance.get(); task_id_t task_id = instance->task.task_id(); - auto [iter, ok] = - this_->m_task_map_.emplace(task_id, std::move(popped_instance)); + auto [iter, ok] = m_task_map_.emplace(task_id, std::move(popped_instance)); if (!ok) { CRANE_ERROR("Duplicated ExecuteTask request for task #{}. Ignore it.", task_id); @@ -1009,11 +903,11 @@ void TaskManager::EvGrpcExecuteTaskCb_(int, short events, void* user_data) { // Note: event_new and event_add in this function is not thread safe, // so we move it outside the multithreading part. int64_t sec = instance->task.time_limit().seconds(); - this_->EvAddTerminationTimer_(instance, sec); + AddTerminationTimer_(instance, sec); CRANE_TRACE("Add a timer of {} seconds for task #{}", sec, task_id); g_thread_pool->detach_task( - [this_, instance]() { this_->LaunchTaskInstanceMt_(instance); }); + [this, instance]() { LaunchTaskInstanceMt_(instance); }); } } @@ -1023,7 +917,7 @@ void TaskManager::LaunchTaskInstanceMt_(TaskInstance* instance) { if (!g_cg_mgr->CheckIfCgroupForTasksExists(task_id)) { CRANE_ERROR("Failed to find created cgroup for task #{}", task_id); - EvActivateTaskStatusChange_( + ActivateTaskStatusChangeAsync_( task_id, crane::grpc::TaskStatus::Failed, ExitCode::kExitCodeCgroupError, fmt::format("Failed to find created cgroup for task #{}", task_id)); @@ -1034,7 +928,7 @@ void TaskManager::LaunchTaskInstanceMt_(TaskInstance* instance) { if (!instance->pwd_entry.Valid()) { CRANE_DEBUG("Failed to look up password entry for uid {} of task #{}", instance->task.uid(), task_id); - EvActivateTaskStatusChange_( + ActivateTaskStatusChangeAsync_( task_id, crane::grpc::TaskStatus::Failed, ExitCode::kExitCodePermissionDenied, fmt::format("Failed to look up password entry for uid {} of task #{}", @@ -1046,7 +940,7 @@ void TaskManager::LaunchTaskInstanceMt_(TaskInstance* instance) { bool ok = g_cg_mgr->AllocateAndGetCgroup(task_id, &cg); if (!ok) { CRANE_ERROR("Failed to allocate cgroup for task #{}", task_id); - EvActivateTaskStatusChange_( + ActivateTaskStatusChangeAsync_( task_id, crane::grpc::TaskStatus::Failed, ExitCode::kExitCodeCgroupError, fmt::format("Failed to allocate cgroup for task #{}", task_id)); @@ -1065,7 +959,7 @@ void TaskManager::LaunchTaskInstanceMt_(TaskInstance* instance) { FILE* fptr = fopen(sh_path.c_str(), "w"); if (fptr == nullptr) { CRANE_ERROR("Failed write the script for task #{}", task_id); - EvActivateTaskStatusChange_( + ActivateTaskStatusChangeAsync_( task_id, crane::grpc::TaskStatus::Failed, ExitCode::kExitCodeFileNotFound, fmt::format("Cannot write shell script for batch task #{}", task_id)); @@ -1118,7 +1012,7 @@ void TaskManager::LaunchTaskInstanceMt_(TaskInstance* instance) { // we should send TaskStatusChange manually. CraneErr err = SpawnProcessInInstance_(instance, process.get()); if (err != CraneErr::kOk) { - EvActivateTaskStatusChange_( + ActivateTaskStatusChangeAsync_( task_id, crane::grpc::TaskStatus::Failed, ExitCode::kExitCodeSpawnProcessFail, fmt::format( @@ -1168,14 +1062,11 @@ std::string TaskManager::ParseFilePathPattern_(const std::string& path_pattern, return resolved_path_pattern; } -void TaskManager::EvTaskStatusChangeCb_(int efd, short events, - void* user_data) { - auto* this_ = reinterpret_cast(user_data); - - TaskStatusChange status_change; - while (this_->m_task_status_change_queue_.try_dequeue(status_change)) { - auto iter = this_->m_task_map_.find(status_change.task_id); - if (iter == this_->m_task_map_.end()) { +void TaskManager::EvCleanTaskStatusChangeQueueCb_() { + TaskStatusChangeQueueElem status_change; + while (m_task_status_change_queue_.try_dequeue(status_change)) { + auto iter = m_task_map_.find(status_change.task_id); + if (iter == m_task_map_.end()) { // When Ctrl+C is pressed for Craned, all tasks including just forked // tasks will be terminated. // In some error cases, a double TaskStatusChange might be triggered. @@ -1193,7 +1084,7 @@ void TaskManager::EvTaskStatusChangeCb_(int efd, short events, bool orphaned = instance->orphaned; // Free the TaskInstance structure - this_->m_task_map_.erase(status_change.task_id); + m_task_map_.erase(status_change.task_id); if (!orphaned) g_ctld_client->TaskStatusChangeAsync(std::move(status_change)); @@ -1201,40 +1092,37 @@ void TaskManager::EvTaskStatusChangeCb_(int efd, short events, // Todo: Add additional timer to check periodically whether all children // have exited. - if (this_->m_is_ending_now_ && this_->m_task_map_.empty()) { + if (m_is_ending_now_ && m_task_map_.empty()) { CRANE_TRACE( "Craned is ending and all tasks have been reaped. " "Stop event loop."); - this_->EvActivateShutdown_(); + ActivateShutdownAsync_(); } } -void TaskManager::EvActivateTaskStatusChange_( +void TaskManager::ActivateTaskStatusChangeAsync_( uint32_t task_id, crane::grpc::TaskStatus new_status, uint32_t exit_code, std::optional reason) { - TaskStatusChange status_change{task_id, new_status, exit_code}; + TaskStatusChangeQueueElem status_change{task_id, new_status, exit_code}; if (reason.has_value()) status_change.reason = std::move(reason); m_task_status_change_queue_.enqueue(std::move(status_change)); - event_active(m_ev_task_status_change_, 0, 0); + m_task_status_change_async_handle_->send(); } CraneExpected TaskManager::QueryTaskEnvMapAsync(task_id_t task_id) { EvQueueQueryTaskEnvMap elem{.task_id = task_id}; std::future> env_future = elem.env_prom.get_future(); m_query_task_environment_variables_queue.enqueue(std::move(elem)); - event_active(m_ev_query_task_environment_variables_, 0, 0); + m_query_task_environment_variables_async_handle_->send(); return env_future.get(); } -void TaskManager::EvGrpcQueryTaskEnvironmentVariableCb_(int efd, short events, - void* user_data) { - auto* this_ = reinterpret_cast(user_data); - +void TaskManager::EvCleanGrpcQueryTaskEnvQueueCb_() { EvQueueQueryTaskEnvMap elem; - while (this_->m_query_task_environment_variables_queue.try_dequeue(elem)) { - auto task_iter = this_->m_task_map_.find(elem.task_id); - if (task_iter == this_->m_task_map_.end()) + while (m_query_task_environment_variables_queue.try_dequeue(elem)) { + auto task_iter = m_task_map_.find(elem.task_id); + if (task_iter == m_task_map_.end()) elem.env_prom.set_value(std::unexpected(CraneErr::kSystemErr)); else { auto& instance = task_iter->second; @@ -1253,21 +1141,17 @@ CraneExpected TaskManager::QueryTaskIdFromPidAsync(pid_t pid) { std::future> task_id_opt_future = elem.task_id_prom.get_future(); m_query_task_id_from_pid_queue_.enqueue(std::move(elem)); - event_active(m_ev_query_task_id_from_pid_, 0, 0); - + m_query_task_environment_variables_async_handle_->send(); return task_id_opt_future.get(); } -void TaskManager::EvGrpcQueryTaskIdFromPidCb_(int efd, short events, - void* user_data) { - auto* this_ = reinterpret_cast(user_data); - +void TaskManager::EvCleanGrpcQueryTaskIdFromPidQueueCb_() { EvQueueQueryTaskIdFromPid elem; - while (this_->m_query_task_id_from_pid_queue_.try_dequeue(elem)) { - this_->m_mtx_.Lock(); + while (m_query_task_id_from_pid_queue_.try_dequeue(elem)) { + m_mtx_.Lock(); - auto task_iter = this_->m_pid_task_map_.find(elem.pid); - if (task_iter == this_->m_pid_task_map_.end()) + auto task_iter = m_pid_task_map_.find(elem.pid); + if (task_iter == m_pid_task_map_.end()) elem.task_id_prom.set_value(std::unexpected(CraneErr::kSystemErr)); else { TaskInstance* instance = task_iter->second; @@ -1275,15 +1159,11 @@ void TaskManager::EvGrpcQueryTaskIdFromPidCb_(int efd, short events, elem.task_id_prom.set_value(task_id); } - this_->m_mtx_.Unlock(); + m_mtx_.Unlock(); } } -void TaskManager::EvOnTaskTimerCb_(int, short, void* arg_) { - auto* arg = reinterpret_cast(arg_); - TaskManager* this_ = arg->task_manager; - task_id_t task_id = arg->task_id; - +void TaskManager::EvTaskTimerCb_(task_id_t task_id) { CRANE_TRACE("Task #{} exceeded its time limit. Terminating it...", task_id); // Sometimes, task finishes just before time limit. @@ -1291,41 +1171,39 @@ void TaskManager::EvOnTaskTimerCb_(int, short, void* arg_) { // the timer is triggered immediately. // That's why we need to check the existence of the task again in timer // callback, otherwise a segmentation fault will occur. - auto task_it = this_->m_task_map_.find(task_id); - if (task_it == this_->m_task_map_.end()) { + auto task_it = m_task_map_.find(task_id); + if (task_it == m_task_map_.end()) { CRANE_TRACE("Task #{} has already been removed."); return; } TaskInstance* task_instance = task_it->second.get(); - this_->EvDelTerminationTimer_(task_instance); + DelTerminationTimer_(task_instance); if (task_instance->task.type() == crane::grpc::Batch) { - EvQueueTaskTerminate ev_task_terminate{ + TaskTerminateQueueElem ev_task_terminate{ .task_id = task_id, .terminated_by_timeout = true, }; - this_->m_task_terminate_queue_.enqueue(ev_task_terminate); - event_active(this_->m_ev_task_terminate_, 0, 0); + m_task_terminate_queue_.enqueue(ev_task_terminate); + m_terminate_task_async_handle_->send(); } else { - this_->EvActivateTaskStatusChange_( + ActivateTaskStatusChangeAsync_( task_id, crane::grpc::TaskStatus::ExceedTimeLimit, ExitCode::kExitCodeExceedTimeLimit, std::nullopt); } } -void TaskManager::EvTerminateTaskCb_(int efd, short events, void* user_data) { - auto* this_ = reinterpret_cast(user_data); - - EvQueueTaskTerminate elem; - while (this_->m_task_terminate_queue_.try_dequeue(elem)) { +void TaskManager::EvCleanTerminateTaskQueueCb_() { + TaskTerminateQueueElem elem; + while (m_task_terminate_queue_.try_dequeue(elem)) { CRANE_TRACE( "Receive TerminateRunningTask Request from internal queue. " "Task id: {}", elem.task_id); - auto iter = this_->m_task_map_.find(elem.task_id); - if (iter == this_->m_task_map_.end()) { + auto iter = m_task_map_.find(elem.task_id); + if (iter == m_task_map_.end()) { CRANE_DEBUG("Terminating a non-existent task #{}.", elem.task_id); // Note if Ctld wants to terminate some tasks that are not running, @@ -1368,34 +1246,34 @@ void TaskManager::EvTerminateTaskCb_(int efd, short events, void* user_data) { KillProcessInstance_(pr_instance.get(), sig); } else if (task_instance->task.type() == crane::grpc::Interactive) { // For an Interactive task with no process running, it ends immediately. - this_->EvActivateTaskStatusChange_(elem.task_id, crane::grpc::Completed, - ExitCode::kExitCodeTerminated, - std::nullopt); + ActivateTaskStatusChangeAsync_(elem.task_id, crane::grpc::Completed, + ExitCode::kExitCodeTerminated, + std::nullopt); } } } void TaskManager::TerminateTaskAsync(uint32_t task_id) { - EvQueueTaskTerminate elem{.task_id = task_id, .terminated_by_user = true}; + TaskTerminateQueueElem elem{.task_id = task_id, .terminated_by_user = true}; m_task_terminate_queue_.enqueue(elem); - event_active(m_ev_task_terminate_, 0, 0); + m_terminate_task_async_handle_->send(); } void TaskManager::MarkTaskAsOrphanedAndTerminateAsync(task_id_t task_id) { - EvQueueTaskTerminate elem{.task_id = task_id, .mark_as_orphaned = true}; + TaskTerminateQueueElem elem{.task_id = task_id, .mark_as_orphaned = true}; m_task_terminate_queue_.enqueue(elem); - event_active(m_ev_task_terminate_, 0, 0); + m_terminate_task_async_handle_->send(); } bool TaskManager::CheckTaskStatusAsync(task_id_t task_id, crane::grpc::TaskStatus* status) { - EvQueueCheckTaskStatus elem{.task_id = task_id}; + CheckTaskStatusQueueElem elem{.task_id = task_id}; std::future> res{ elem.status_prom.get_future()}; m_check_task_status_queue_.enqueue(std::move(elem)); - event_active(m_ev_check_task_status_, 0, 0); + m_check_task_status_async_handle_->send(); auto [ok, task_status] = res.get(); if (!ok) return false; @@ -1404,13 +1282,11 @@ bool TaskManager::CheckTaskStatusAsync(task_id_t task_id, return true; } -void TaskManager::EvCheckTaskStatusCb_(int, short events, void* user_data) { - auto* this_ = reinterpret_cast(user_data); - - EvQueueCheckTaskStatus elem; - while (this_->m_check_task_status_queue_.try_dequeue(elem)) { +void TaskManager::EvCleanCheckTaskStatusQueueCb_() { + CheckTaskStatusQueueElem elem; + while (m_check_task_status_queue_.try_dequeue(elem)) { task_id_t task_id = elem.task_id; - if (this_->m_task_map_.contains(task_id)) { + if (m_task_map_.contains(task_id)) { // Found in task map. The task must be running. elem.status_prom.set_value({true, crane::grpc::TaskStatus::Running}); continue; @@ -1434,24 +1310,24 @@ void TaskManager::EvCheckTaskStatusCb_(int, short events, void* user_data) { bool TaskManager::ChangeTaskTimeLimitAsync(task_id_t task_id, absl::Duration time_limit) { - EvQueueChangeTaskTimeLimit elem{.task_id = task_id, .time_limit = time_limit}; + ChangeTaskTimeLimitQueueElem elem{.task_id = task_id, + .time_limit = time_limit}; std::future ok_fut = elem.ok_prom.get_future(); m_task_time_limit_change_queue_.enqueue(std::move(elem)); - event_active(m_ev_task_time_limit_change_, 0, 0); + m_change_task_time_limit_async_handle_->send(); return ok_fut.get(); } -void TaskManager::EvChangeTaskTimeLimitCb_(int, short events, void* user_data) { - auto* this_ = reinterpret_cast(user_data); +void TaskManager::EvCleanChangeTaskTimeLimitQueueCb_() { absl::Time now = absl::Now(); - EvQueueChangeTaskTimeLimit elem; - while (this_->m_task_time_limit_change_queue_.try_dequeue(elem)) { - auto iter = this_->m_task_map_.find(elem.task_id); - if (iter != this_->m_task_map_.end()) { + ChangeTaskTimeLimitQueueElem elem; + while (m_task_time_limit_change_queue_.try_dequeue(elem)) { + auto iter = m_task_map_.find(elem.task_id); + if (iter != m_task_map_.end()) { TaskInstance* task_instance = iter->second.get(); - this_->EvDelTerminationTimer_(task_instance); + DelTerminationTimer_(task_instance); absl::Time start_time = absl::FromUnixSeconds(task_instance->task.start_time().seconds()); @@ -1459,14 +1335,14 @@ void TaskManager::EvChangeTaskTimeLimitCb_(int, short events, void* user_data) { if (now - start_time >= new_time_limit) { // If the task times out, terminate it. - EvQueueTaskTerminate ev_task_terminate{.task_id = elem.task_id, - .terminated_by_timeout = true}; - this_->m_task_terminate_queue_.enqueue(ev_task_terminate); - event_active(this_->m_ev_task_terminate_, 0, 0); + TaskTerminateQueueElem ev_task_terminate{.task_id = elem.task_id, + .terminated_by_timeout = true}; + m_task_terminate_queue_.enqueue(ev_task_terminate); + m_terminate_task_async_handle_->send(); } else { // If the task haven't timed out, set up a new timer. - this_->EvAddTerminationTimer_( + AddTerminationTimer_( task_instance, ToInt64Seconds((new_time_limit - (absl::Now() - start_time)))); } diff --git a/src/Craned/TaskManager.h b/src/Craned/TaskManager.h index 6b54a1cc6..bee386bc8 100644 --- a/src/Craned/TaskManager.h +++ b/src/Craned/TaskManager.h @@ -21,31 +21,16 @@ #include "CranedPublicDefs.h" // Precompiled header comes first. -#include -#include -#include -#include #include -#include -#include + +#include #include "CgroupManager.h" -#include "CtldClient.h" -#include "DeviceManager.h" #include "crane/PasswordEntry.h" -#include "crane/PublicHeader.h" #include "protos/Crane.grpc.pb.h" -#include "protos/Crane.pb.h" namespace Craned { -class TaskManager; - -struct EvTimerCbArg { - TaskManager* task_manager; - task_id_t task_id; -}; - struct BatchMetaInProcessInstance { std::string parsed_output_file_pattern; std::string parsed_error_file_pattern; @@ -57,7 +42,6 @@ class ProcessInstance { : m_executive_path_(std::move(exec_path)), m_arguments_(std::move(arg_list)), m_pid_(0), - m_ev_buf_event_(nullptr), m_user_data_(nullptr) {} ~ProcessInstance() { @@ -69,8 +53,6 @@ class ProcessInstance { CRANE_ERROR( "user_data in ProcessInstance is set, but clean_cb is not set!"); } - - if (m_ev_buf_event_) bufferevent_free(m_ev_buf_event_); } [[nodiscard]] const std::string& GetExecPath() const { @@ -83,14 +65,6 @@ class ProcessInstance { void SetPid(pid_t pid) { m_pid_ = pid; } [[nodiscard]] pid_t GetPid() const { return m_pid_; } - void SetEvBufEvent(struct bufferevent* ev_buf_event) { - m_ev_buf_event_ = ev_buf_event; - } - - void SetOutputCb(std::function cb) { - m_output_cb_ = std::move(cb); - } - void SetFinishCb(std::function cb) { m_finish_cb_ = std::move(cb); } @@ -114,9 +88,6 @@ class ProcessInstance { /* ------------- Fields set by SpawnProcessInInstance_ ---------------- */ pid_t m_pid_; - // The underlying event that handles the output of the task. - struct bufferevent* m_ev_buf_event_; - /* ------- Fields set by the caller of SpawnProcessInInstance_ -------- */ std::string m_executive_path_; std::list m_arguments_; @@ -155,24 +126,20 @@ struct CrunMetaInTaskInstance : MetaInTaskInstance { ~CrunMetaInTaskInstance() override = default; }; -// also arg for EvDoSigChldCb_ +// also arg for EvSigchldTimerCb_ struct ProcSigchldInfo { pid_t pid; bool is_terminated_by_signal; int value; - event* resend_timer{nullptr}; + std::shared_ptr resend_timer{nullptr}; }; // Todo: Task may consists of multiple subtasks struct TaskInstance { ~TaskInstance() { if (termination_timer) { - delete static_cast( - event_get_callback_arg(termination_timer)); - evtimer_del(termination_timer); - event_free(termination_timer); - termination_timer = nullptr; + termination_timer->close(); } if (this->IsCrun()) { @@ -191,7 +158,7 @@ struct TaskInstance { std::string cgroup_path; CgroupInterface* cgroup; - struct event* termination_timer{nullptr}; + std::shared_ptr termination_timer{nullptr}; // Task execution results bool orphaned{false}; @@ -260,13 +227,13 @@ class TaskManager { task_id_t task_id; }; - struct EvQueueChangeTaskTimeLimit { + struct ChangeTaskTimeLimitQueueElem { uint32_t task_id; absl::Duration time_limit; std::promise ok_prom; }; - struct EvQueueTaskTerminate { + struct TaskTerminateQueueElem { uint32_t task_id{0}; bool terminated_by_user{false}; // If the task is canceled by user, // task->status=Cancelled @@ -275,16 +242,11 @@ class TaskManager { bool mark_as_orphaned{false}; }; - struct EvQueueCheckTaskStatus { + struct CheckTaskStatusQueueElem { task_id_t task_id; std::promise> status_prom; }; - struct EvQueueSigchldArg { - TaskManager* task_manager; - std::unique_ptr sigchld_info; - }; - static std::string ParseFilePathPattern_(const std::string& path_pattern, const std::string& cwd, task_id_t task_id); @@ -297,7 +259,7 @@ class TaskManager { const TaskInstance* FindInstanceByTaskId_(uint32_t task_id); // Ask TaskManager to stop its event loop. - void EvActivateShutdown_(); + void ActivateShutdownAsync_(); /** * Inform CraneCtld of the status change of a task. @@ -311,52 +273,41 @@ class TaskManager { * @param release_resource If set to true, CraneCtld will release the * resource (mark the task status as REQUEUE) and requeue the task. */ - void EvActivateTaskStatusChange_(uint32_t task_id, - crane::grpc::TaskStatus new_status, - uint32_t exit_code, - std::optional reason); + void ActivateTaskStatusChangeAsync_(uint32_t task_id, + crane::grpc::TaskStatus new_status, + uint32_t exit_code, + std::optional reason); template - void EvAddTerminationTimer_(TaskInstance* instance, Duration duration) { - std::chrono::seconds const sec = - std::chrono::duration_cast(duration); - - auto* arg = new EvTimerCbArg; - arg->task_manager = this; - arg->task_id = instance->task.task_id(); - - timeval tv{ - sec.count(), - std::chrono::duration_cast(duration - sec) - .count()}; - - struct event* ev = event_new(m_ev_base_, -1, 0, EvOnTaskTimerCb_, arg); - CRANE_ASSERT_MSG(ev != nullptr, "Failed to create new timer."); - evtimer_add(ev, &tv); - - instance->termination_timer = ev; + void AddTerminationTimer_(TaskInstance* instance, Duration duration) { + auto termination_handel = m_uvw_loop_->resource(); + termination_handel->on( + [this, task_id = instance->task.task_id()](const uvw::timer_event&, + uvw::timer_handle& h) { + EvTaskTimerCb_(task_id); + }); + termination_handel->start( + std::chrono::duration_cast(duration), + std::chrono::seconds(0)); + instance->termination_timer = termination_handel; } - void EvAddTerminationTimer_(TaskInstance* instance, int64_t secs) { - auto* arg = new EvTimerCbArg; - arg->task_manager = this; - arg->task_id = instance->task.task_id(); - - timeval tv{static_cast<__time_t>(secs), 0}; - - struct event* ev = event_new(m_ev_base_, -1, 0, EvOnTaskTimerCb_, arg); - CRANE_ASSERT_MSG(ev != nullptr, "Failed to create new timer."); - evtimer_add(ev, &tv); - - instance->termination_timer = ev; + void AddTerminationTimer_(TaskInstance* instance, int64_t secs) { + auto termination_handel = m_uvw_loop_->resource(); + termination_handel->on( + [this, task_id = instance->task.task_id()](const uvw::timer_event&, + uvw::timer_handle& h) { + EvTaskTimerCb_(task_id); + }); + termination_handel->start(std::chrono::seconds(secs), + std::chrono::seconds(0)); + instance->termination_timer = termination_handel; } - static void EvDelTerminationTimer_(TaskInstance* instance) { - delete static_cast( - event_get_callback_arg(instance->termination_timer)); - evtimer_del(instance->termination_timer); - event_free(instance->termination_timer); - instance->termination_timer = nullptr; + static void DelTerminationTimer_(TaskInstance* instance) { + // Close handle before free + instance->termination_timer->close(); + instance->termination_timer.reset(); } /** @@ -402,89 +353,78 @@ class TaskManager { // Critical data region ends // ======================================================================== - static void EvSigchldCb_(evutil_socket_t sig, short events, void* user_data); + void EvSigchldCb_(); - static void EvProcessSigchldCb_(evutil_socket_t sig, short events, - void* user_data); + void EvCleanSigchldQueueCb_(); // Callback function to handle SIGINT sent by Ctrl+C - static void EvSigintCb_(evutil_socket_t sig, short events, void* user_data); - - static void EvGrpcExecuteTaskCb_(evutil_socket_t efd, short events, - void* user_data); + void EvSigintCb_(); - static void EvGrpcQueryTaskIdFromPidCb_(evutil_socket_t efd, short events, - void* user_data); + void EvCleanGrpcExecuteTaskQueueCb_(); - static void EvGrpcQueryTaskEnvironmentVariableCb_(evutil_socket_t efd, - short events, - void* user_data); + void EvCleanGrpcQueryTaskIdFromPidQueueCb_(); - static void EvSubprocessReadCb_(struct bufferevent* bev, void* process); + void EvCleanGrpcQueryTaskEnvQueueCb_(); - static void EvTaskStatusChangeCb_(evutil_socket_t efd, short events, - void* user_data); + void EvCleanTaskStatusChangeQueueCb_(); - static void EvTerminateTaskCb_(evutil_socket_t efd, short events, - void* user_data); + void EvCleanTerminateTaskQueueCb_(); - static void EvCheckTaskStatusCb_(evutil_socket_t, short events, - void* user_data); + void EvCleanCheckTaskStatusQueueCb_(); - static void EvChangeTaskTimeLimitCb_(evutil_socket_t, short events, - void* user_data); + void EvCleanChangeTaskTimeLimitQueueCb_(); - static void EvExitEventCb_(evutil_socket_t, short events, void* user_data); + void EvTaskTimerCb_(task_id_t task_id); - static void EvOnTaskTimerCb_(evutil_socket_t, short, void* arg_); + void EvSigchldTimerCb_(ProcSigchldInfo* sigchld_info); - static void EvOnSigchldTimerCb_(evutil_socket_t, short, void* arg_); + std::shared_ptr m_uvw_loop_; - struct event_base* m_ev_base_{}; - struct event* m_ev_sigchld_{}; + std::shared_ptr m_sigchld_handle_; // When this event is triggered, the TaskManager will not accept // any more new tasks and quit as soon as all existing task end. - struct event* m_ev_sigint_{}; + std::shared_ptr m_sigint_handle_; - // The function which will be called when SIGINT is triggered. - std::function m_sigint_cb_; - - // When SIGINT is triggered or Shutdown() gets called, this variable is set to - // true. Then, AddTaskAsyncMethod will not accept any more new tasks and - // ev_sigchld_cb_ will stop the event loop when there is no task running. - std::atomic_bool m_is_ending_now_{false}; - - struct event* m_ev_process_sigchld_{}; - ConcurrentQueue> m_sigchld_queue_; - - struct event* m_ev_query_task_id_from_pid_{}; + std::shared_ptr m_query_task_id_from_pid_async_handle_; ConcurrentQueue m_query_task_id_from_pid_queue_; - struct event* m_ev_query_task_environment_variables_{}; + std::shared_ptr + m_query_task_environment_variables_async_handle_; ConcurrentQueue m_query_task_environment_variables_queue; + std::shared_ptr m_grpc_execute_task_async_handle_; // A custom event that handles the ExecuteTask RPC. - struct event* m_ev_grpc_execute_task_{}; ConcurrentQueue> m_grpc_execute_task_queue_; - // When this event is triggered, the event loop will exit. - struct event* m_ev_exit_event_{}; + std::shared_ptr m_process_sigchld_async_handle_; + ConcurrentQueue> m_sigchld_queue_; + + std::shared_ptr m_task_status_change_async_handle_; + ConcurrentQueue m_task_status_change_queue_; + + std::shared_ptr m_change_task_time_limit_async_handle_; + ConcurrentQueue m_task_time_limit_change_queue_; - struct event* m_ev_task_status_change_{}; - ConcurrentQueue m_task_status_change_queue_; + std::shared_ptr m_terminate_task_async_handle_; + ConcurrentQueue m_task_terminate_queue_; - struct event* m_ev_task_time_limit_change_{}; - ConcurrentQueue m_task_time_limit_change_queue_; + std::shared_ptr m_check_task_status_async_handle_; + ConcurrentQueue m_check_task_status_queue_; + + // The function which will be called when SIGINT is triggered. + std::function m_sigint_cb_; - struct event* m_ev_task_terminate_{}; - ConcurrentQueue m_task_terminate_queue_; + // When SIGINT is triggered or Shutdown() gets called, this variable is set to + // true. Then, AddTaskAsyncMethod will not accept any more new tasks and + // ev_sigchld_cb_ will stop the event loop when there is no task running. + std::atomic_bool m_is_ending_now_{false}; - struct event* m_ev_check_task_status_{}; - ConcurrentQueue m_check_task_status_queue_; + // After m_is_ending_now_ set to true, when all task are cleared, we can exit. + std::atomic_bool m_task_cleared_{false}; - std::thread m_ev_loop_thread_; + std::thread m_uvw_thread_; static inline TaskManager* m_instance_ptr_; };