Skip to content

Commit

Permalink
Refactor: Replace libevent with libuvw in Craned (#359)
Browse files Browse the repository at this point in the history
* Refactor: remove libevent in craned

Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>

* Refactor: remove libevent in craned

Signed-off-by: xiafeng <xiafeng.li@foxmail.com>

* Refactor: remove libevent in craned

Signed-off-by: xiafeng <xiafeng.li@foxmail.com>

* Refactor: remove libevent ]

Signed-off-by: xiafeng <xiafeng.li@foxmail.com>

* Refactor: rename call backs

Signed-off-by: xiafeng <xiafeng.li@foxmail.com>

* Refactor: remove 'this->',rename call backs

Signed-off-by: xiafeng <xiafeng.li@foxmail.com>

* Bugfix: fix asan reporting use-after-poison

Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>

* Bugfix: fix asan/tsan reported error

Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>

* Bugfix: remove log in child process in case of deadlock in logger after fork.

Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>

* Refactor: Solve comment,use const&.

Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>

* Refactor: Rename private variables

Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>

* chore: Remove redundant include

Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>

* refactor<Craned>: Remove no used code.

Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>

---------

Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>
Signed-off-by: xiafeng <xiafeng.li@foxmail.com>
  • Loading branch information
L-Xiafeng authored Nov 28, 2024
1 parent 58a0115 commit 5a42881
Show file tree
Hide file tree
Showing 23 changed files with 383 additions and 608 deletions.
2 changes: 1 addition & 1 deletion dependencies/cmake/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ add_subdirectory(yaml-cpp)
add_subdirectory(fmt)
add_subdirectory(googletest)
add_subdirectory(spdlog)
add_subdirectory(LibEvent)
#add_subdirectory(LibEvent)
add_subdirectory(cxxopts)
add_subdirectory(grpc)
add_subdirectory(libcgroup)
Expand Down
4 changes: 0 additions & 4 deletions src/CraneCtld/AccountManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@

#include "AccountManager.h"

#include <string>

#include "CtldPublicDefs.h"
#include "crane/PasswordEntry.h"
#include "protos/PublicDefs.pb.h"
#include "range/v3/algorithm/contains.hpp"

Expand Down
2 changes: 0 additions & 2 deletions src/CraneCtld/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ target_link_libraries(cranectld PRIVATE
Utility_PublicHeader
Utility_PluginClient

dev_event_core
dev_event_pthreads
uvw

cxxopts
Expand Down
6 changes: 0 additions & 6 deletions src/CraneCtld/CraneCtld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "CtldPreCompiledHeader.h"
// Precompiled header comes first!

#include <event2/thread.h>
#include <sys/file.h>
#include <sys/stat.h>
#include <yaml-cpp/yaml.h>
Expand All @@ -35,9 +34,7 @@
#include "DbClient.h"
#include "EmbeddedDbClient.h"
#include "TaskScheduler.h"
#include "crane/Logger.h"
#include "crane/Network.h"
#include "crane/OS.h"
#include "crane/PluginClient.h"

void ParseConfig(int argc, char** argv) {
Expand Down Expand Up @@ -641,9 +638,6 @@ void InitializeCtldGlobalVariables() {

crane::InitializeNetworkFunctions();

// Enable inter-thread custom event notification.
evthread_use_pthreads();

char hostname[HOST_NAME_MAX + 1];
int err = gethostname(hostname, HOST_NAME_MAX + 1);
if (err != 0) {
Expand Down
2 changes: 0 additions & 2 deletions src/CraneCtld/CranedKeeper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

#include "CranedKeeper.h"

#include <google/protobuf/util/time_util.h>

namespace Ctld {

using grpc::ClientContext;
Expand Down
1 change: 0 additions & 1 deletion src/CraneCtld/CranedMetaContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "CranedMetaContainer.h"

#include "CranedKeeper.h"
#include "crane/String.h"
#include "protos/PublicDefs.pb.h"

namespace Ctld {
Expand Down
3 changes: 0 additions & 3 deletions src/CraneCtld/CtldGrpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@

#include "CtldGrpcServer.h"

#include <google/protobuf/util/time_util.h>

#include "AccountManager.h"
#include "CranedKeeper.h"
#include "CranedMetaContainer.h"
#include "EmbeddedDbClient.h"
#include "TaskScheduler.h"
#include "crane/String.h"

namespace Ctld {

Expand Down
2 changes: 0 additions & 2 deletions src/CraneCtld/DbClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

#include "DbClient.h"

#include <google/protobuf/util/time_util.h>

#include <bsoncxx/exception/exception.hpp>
#include <mongocxx/exception/exception.hpp>

Expand Down
1 change: 0 additions & 1 deletion src/CraneCtld/DbClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

#include <bsoncxx/builder/stream/document.hpp>
#include <bsoncxx/json.hpp>
#include <concepts>
#include <mongocxx/client.hpp>
#include <mongocxx/cursor.hpp>
#include <mongocxx/instance.hpp>
Expand Down
6 changes: 0 additions & 6 deletions src/CraneCtld/TaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,12 @@

#include "TaskScheduler.h"

#include <absl/time/time.h>
#include <google/protobuf/util/time_util.h>

#include <future>

#include "AccountManager.h"
#include "CranedKeeper.h"
#include "CranedMetaContainer.h"
#include "CtldPublicDefs.h"
#include "EmbeddedDbClient.h"
#include "crane/PluginClient.h"
#include "crane/PublicHeader.h"
#include "protos/PublicDefs.pb.h"

namespace Ctld {
Expand Down
3 changes: 0 additions & 3 deletions src/Craned/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ target_link_libraries(craned

crane_proto_lib

dev_event_core
dev_event_pthreads

bs_thread_pool

cxxopts
Expand Down
2 changes: 0 additions & 2 deletions src/Craned/CforedClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

#include "CforedClient.h"

#include <utility>

#include "crane/String.h"
namespace Craned {

Expand Down
69 changes: 39 additions & 30 deletions src/Craned/CgroupManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -573,40 +573,27 @@ bool CgroupManager::ReleaseCgroup(uint32_t task_id, uid_t uid) {
this->m_task_id_to_cg_spec_map_.Erase(task_id);

{
auto uid_task_id_map = this->m_uid_to_task_ids_map_.GetMapExclusivePtr();
if (!uid_task_id_map->contains(uid)) {
CRANE_DEBUG(
"Trying to release a non-existent cgroup for uid #{}. Ignoring it...",
uid);
return false;
}

auto task_id_set_ptr = uid_task_id_map->at(uid).RawPtr();

task_id_set_ptr->erase(task_id);
if (task_id_set_ptr->empty()) {
uid_task_id_map->erase(uid);
}
// Do not access task_id_set_ptr after erasing form map
}

if (!this->m_task_id_to_cg_map_.Contains(task_id)) {
CRANE_DEBUG(
"Trying to release a non-existent cgroup for task #{}. Ignoring "
"it...",
task_id);

return false;
} else {
// The termination of all processes in a cgroup is a time-consuming work.
// Therefore, once we are sure that the cgroup for this task exists, we
// let gRPC call return and put the termination work into the thread pool
// to avoid blocking the event loop of TaskManager.
// Kind of async behavior.

// avoid deadlock by Erase at next line
CgroupInterface *cgroup = this->m_task_id_to_cg_map_[task_id]->release();
this->m_task_id_to_cg_map_.Erase(task_id);
auto task_id_to_cg_map_ptr =
this->m_task_id_to_cg_map_.GetMapExclusivePtr();
auto it = task_id_to_cg_map_ptr->find(task_id);
if (it == task_id_to_cg_map_ptr->end()) {
CRANE_DEBUG(
"Trying to release a non-existent cgroup for task #{}. Ignoring "
"it...",
task_id);

return false;
}
CgroupInterface *cgroup = it->second.GetExclusivePtr()->release();

task_id_to_cg_map_ptr->erase(task_id);

if (cgroup != nullptr) {
g_thread_pool->detach_task([cgroup]() {
Expand All @@ -632,8 +619,27 @@ bool CgroupManager::ReleaseCgroup(uint32_t task_id, uid_t uid) {
delete cgroup;
});
}
return true;
}

{
auto uid_task_ids_map_ptr = this->m_uid_to_task_ids_map_.GetMapExclusivePtr();
auto it = uid_task_ids_map_ptr->find(uid);
if (it == uid_task_ids_map_ptr->end()) {
CRANE_DEBUG(
"Trying to release a non-existent cgroup for uid #{}. Ignoring it...",
uid);
return false;
}

auto task_id_set_ptr = uid_task_ids_map_ptr->at(uid).RawPtr();

task_id_set_ptr->erase(task_id);
if (task_id_set_ptr->empty()) {
uid_task_ids_map_ptr->erase(uid);
}
// Do not access task_id_set_ptr after erasing form map
}
return true;
}

void CgroupManager::RmAllTaskCgroupsUnderController_(
Expand Down Expand Up @@ -725,8 +731,11 @@ bool CgroupManager::QueryTaskInfoOfUidAsync(uid_t uid, TaskInfoOfUid *info) {
info->job_cnt = 0;
info->cgroup_exists = false;

if (this->m_uid_to_task_ids_map_.Contains(uid)) {
auto task_ids = this->m_uid_to_task_ids_map_[uid];
if (auto task_ids = this->m_uid_to_task_ids_map_[uid]) {
if (!task_ids) {
CRANE_WARN("Uid {} not found in uid_to_task_ids_map", uid);
return false;
}
info->job_cnt = task_ids->size();
info->first_task_id = *task_ids->begin();
}
Expand Down
4 changes: 2 additions & 2 deletions src/Craned/CgroupManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
*
*/
#pragma once
#include "CranedPublicDefs.h"
// Precompiled header comes first.

#include <libcgroup.h>

#include "CranedPublicDefs.h"
#include "crane/AtomicHashMap.h"
#include "crane/OS.h"

#ifdef CRANE_ENABLE_BPF
# include <bpf/libbpf.h>
Expand Down
10 changes: 1 addition & 9 deletions src/Craned/Craned.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@
#include "CranedPublicDefs.h"
// Precompiled header comes first.

#include <event2/thread.h>
#include <sys/file.h>
#include <sys/stat.h>
#include <sys/sysinfo.h>
#include <sys/utsname.h>
#include <yaml-cpp/yaml.h>

#include <ctime>
Expand All @@ -32,10 +29,8 @@
#include "CforedClient.h"
#include "CranedServer.h"
#include "CtldClient.h"
#include "crane/Network.h"
#include "crane/OS.h"
#include "DeviceManager.h"
#include "crane/PluginClient.h"
#include "crane/PublicHeader.h"
#include "crane/String.h"

using Craned::g_config;
Expand Down Expand Up @@ -602,9 +597,6 @@ void GlobalVariableInit() {
// SIGPIPE while communicating with spawned task processes.
signal(SIGPIPE, SIG_IGN);

// Enable inter-thread custom event notification.
evthread_use_pthreads();

PasswordEntry::InitializeEntrySize();

using Craned::CgroupManager;
Expand Down
1 change: 1 addition & 0 deletions src/Craned/CranedPreCompiledHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,4 @@
// Then include the other Crane headers
#include "crane/GrpcHelper.h"
#include "crane/Network.h"
#include "crane/PublicHeader.h"
7 changes: 3 additions & 4 deletions src/Craned/CranedPublicDefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,15 @@
// Precompiled header comes first

#include "crane/OS.h"
#include "crane/PublicHeader.h"
#include "protos/Crane.pb.h"


namespace Craned {

inline const uint64_t kEvSigChldResendMs = 500'000;
inline constexpr uint64_t kEvSigChldResendMs = 500;

using EnvMap = std::unordered_map<std::string, std::string>;

struct TaskStatusChange {
struct TaskStatusChangeQueueElem {
task_id_t task_id{};
crane::grpc::TaskStatus new_status{};
uint32_t exit_code{};
Expand Down
15 changes: 6 additions & 9 deletions src/Craned/CranedServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@

#include "CranedServer.h"

#include <arpa/inet.h>
#include <sys/stat.h>
#include <yaml-cpp/yaml.h>

#include "CtldClient.h"
#include "TaskManager.h"

namespace Craned {

Expand Down Expand Up @@ -173,11 +171,11 @@ grpc::Status CranedServiceImpl::CreateCgroupForTasks(
for (int i = 0; i < request->task_id_list_size(); i++) {
task_id_t task_id = request->task_id_list(i);
uid_t uid = request->uid_list(i);
crane::grpc::ResourceInNode const &res = request->res_list(i);
const crane::grpc::ResourceInNode &res = request->res_list(i);

CgroupSpec spec{.uid = uid,
.task_id = task_id,
.res_in_node = std::move(res),
.res_in_node = res,
.execution_node = request->execution_node(i)};
CRANE_TRACE("Receive CreateCgroup for task #{}, uid {}", task_id, uid);
cg_specs.emplace_back(std::move(spec));
Expand Down Expand Up @@ -309,7 +307,7 @@ grpc::Status CranedServiceImpl::QueryTaskIdFromPortForward(

crane::grpc::QueryTaskIdFromPortRequest request_to_remote_service;
crane::grpc::QueryTaskIdFromPortReply reply_from_remote_service;
ClientContext context_of_remote_service;
grpc::ClientContext context_of_remote_service;
Status status_remote_service;

request_to_remote_service.set_port(request->ssh_remote_port());
Expand Down Expand Up @@ -389,8 +387,7 @@ Status CranedServiceImpl::QueryTaskEnvVariables(
grpc::ServerContext *context,
const ::crane::grpc::QueryTaskEnvVariablesRequest *request,
crane::grpc::QueryTaskEnvVariablesReply *response) {
auto task_env_map =
g_task_mgr->QueryTaskEnvMapAsync(request->task_id());
auto task_env_map = g_task_mgr->QueryTaskEnvMapAsync(request->task_id());
if (task_env_map.has_value()) {
for (const auto &[name, value] : task_env_map.value())
response->mutable_env_map()->emplace(name, value);
Expand Down Expand Up @@ -445,7 +442,7 @@ grpc::Status CranedServiceImpl::QueryTaskEnvVariablesForward(

crane::grpc::QueryTaskEnvVariablesRequest request_to_remote_service;
crane::grpc::QueryTaskEnvVariablesReply reply_from_remote_service;
ClientContext context_of_remote_service;
grpc::ClientContext context_of_remote_service;
Status status_remote_service;

request_to_remote_service.set_task_id(request->task_id());
Expand Down
5 changes: 0 additions & 5 deletions src/Craned/CranedServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@
#include "CranedPublicDefs.h"
// Precompiled header comes first.

#include <google/protobuf/util/time_util.h>

#include "TaskManager.h"
#include "crane/Lock.h"
#include "crane/PublicHeader.h"
#include "protos/Crane.grpc.pb.h"
#include "protos/Crane.pb.h"

Expand Down
Loading

0 comments on commit 5a42881

Please sign in to comment.