Skip to content

Commit

Permalink
dev/ccancel_optimization (#194)
Browse files Browse the repository at this point in the history
* Bulk and asynchronous multithreading optimization were performed on task cancellation

* Add State Machine

* Fix compilation errors.

Signed-off-by: RileyW <wrllrwwrllrw@gmail.com>

* Refactor.

Signed-off-by: RileyW <wrllrwwrllrw@gmail.com>

* Fix logical errors

* Refactor.

Signed-off-by: RileyW <wrllrwwrllrw@gmail.com>

* Refactor.

Signed-off-by: RileyW <wrllrwwrllrw@gmail.com>

---------

Signed-off-by: RileyW <wrllrwwrllrw@gmail.com>
Co-authored-by: RileyW <wrllrwwrllrw@gmail.com>
  • Loading branch information
MCKZX-llx and RileyWen authored Oct 17, 2023
1 parent 9d2910b commit 232ac38
Show file tree
Hide file tree
Showing 14 changed files with 209 additions and 68 deletions.
8 changes: 4 additions & 4 deletions protos/Crane.proto
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,11 @@ message ReleaseCgroupForTaskRequest{

message ReleaseCgroupForTaskReply{}

message TerminateTaskRequest {
uint32 task_id = 1;
message TerminateTasksRequest {
repeated uint32 task_id_list = 1;
}

message TerminateTaskReply {
message TerminateTasksReply {
bool ok = 1;
string reason = 2;
}
Expand Down Expand Up @@ -636,7 +636,7 @@ service Craned {
If there are processes in this interactive task, kill all the processes and deallocate resources.
If the task is a batch task, just kill it.
*/
rpc TerminateTask(TerminateTaskRequest) returns (TerminateTaskReply);
rpc TerminateTasks(TerminateTasksRequest) returns (TerminateTasksReply);
rpc TerminateOrphanedTask(TerminateOrphanedTaskRequest) returns (TerminateOrphanedTaskReply);
rpc ChangeTaskTimeLimit(ChangeTaskTimeLimitRequest) returns (ChangeTaskTimeLimitReply);

Expand Down
28 changes: 28 additions & 0 deletions src/CraneCtld/AccountManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,34 @@ AccountManager::Result AccountManager::FindUserLevelAccountsOfUid(
return Result{true};
}

result::result<void, std::string> AccountManager::CheckUidIsAdmin(
uint32_t uid) {
PasswordEntry entry(uid);
if (!entry.Valid()) {
return result::failure(fmt::format("Uid {} not existed", uid));
}

util::read_lock_guard user_guard(m_rw_user_mutex_);
const User* ptr = GetExistedUserInfoNoLock_(entry.Username());
if (!ptr) {
return result::failure(fmt::format(
"Parameter error: User '{}' is not a crane user", entry.Username()));
}

if (ptr->admin_level == User::Operator || ptr->admin_level == User::Admin)
return {};

return result::failure(fmt::format(
"Permission error: User '{}' is an ordinary user.", entry.Username()));
}

/**
* @param uid
* @param account(when the parameter account is empty, this function can be used
* as a permission query function)
* @param level_of_uid
* @return
*/
AccountManager::Result AccountManager::HasPermissionToAccount(
uint32_t uid, const std::string& account, User::AdminLevel* level_of_uid) {
PasswordEntry entry(uid);
Expand Down
14 changes: 8 additions & 6 deletions src/CraneCtld/AccountManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,15 @@ class AccountManager {
Result FindUserLevelAccountsOfUid(uint32_t uid, User::AdminLevel* level,
std::list<std::string>* accounts);

AccountManager::Result HasPermissionToAccount(uint32_t uid,
const std::string& account,
User::AdminLevel* level_of_uid);
result::result<void, std::string> CheckUidIsAdmin(uint32_t uid);

AccountManager::Result HasPermissionToUser(uint32_t uid,
const std::string& user,
User::AdminLevel* level_of_uid);
AccountManager::Result HasPermissionToAccount(
uint32_t uid, const std::string& account,
User::AdminLevel* level_of_uid = nullptr);

AccountManager::Result HasPermissionToUser(
uint32_t uid, const std::string& user,
User::AdminLevel* level_of_uid = nullptr);

private:
void InitDataMap_();
Expand Down
1 change: 1 addition & 0 deletions src/CraneCtld/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ target_link_libraries(cranectld PRIVATE

event_core
event_pthreads
uvw::uvw-static

cxxopts
Threads::Threads
Expand Down
19 changes: 8 additions & 11 deletions src/CraneCtld/CranedKeeper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,29 +112,26 @@ CraneErr CranedStub::ExecuteTasks(
return CraneErr::kOk;
}

CraneErr CranedStub::TerminateTask(uint32_t task_id) {
using crane::grpc::TerminateTaskReply;
using crane::grpc::TerminateTaskRequest;
CraneErr CranedStub::TerminateTasks(const std::vector<task_id_t> &task_ids) {
using crane::grpc::TerminateTasksReply;
using crane::grpc::TerminateTasksRequest;

ClientContext context;
Status status;
TerminateTaskRequest request;
TerminateTaskReply reply;
TerminateTasksRequest request;
TerminateTasksReply reply;

request.set_task_id(task_id);
for (const auto &id : task_ids) request.add_task_id_list(id);

status = m_stub_->TerminateTask(&context, request, &reply);
status = m_stub_->TerminateTasks(&context, request, &reply);
if (!status.ok()) {
CRANE_DEBUG(
"TerminateRunningTask RPC for Node {} returned with status not ok: {}",
m_craned_id_, status.error_message());
return CraneErr::kRpcFailure;
}

if (reply.ok())
return CraneErr::kOk;
else
return CraneErr::kGenericFailure;
return CraneErr::kOk;
}

CraneErr CranedStub::TerminateOrphanedTask(task_id_t task_id) {
Expand Down
4 changes: 2 additions & 2 deletions src/CraneCtld/CranedKeeper.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ class CranedStub {

CraneErr ReleaseCgroupForTask(uint32_t task_id, uid_t uid);

CraneErr TerminateTask(uint32_t task_id);
CraneErr TerminateTasks(const std::vector<task_id_t> &task_ids);

CraneErr TerminateOrphanedTask(task_id_t task_id);

CraneErr CheckTaskStatus(task_id_t task_id, crane::grpc::TaskStatus *status);

CraneErr ChangeTaskTimeLimit(uint32_t task_id, uint64_t seconds);

bool Invalid() { return m_invalid_; }
bool Invalid() const { return m_invalid_; }

private:
CranedKeeper *m_craned_keeper_;
Expand Down
45 changes: 22 additions & 23 deletions src/CraneCtld/CtldGrpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ grpc::Status CraneCtldServiceImpl::AddAccount(
grpc::ServerContext *context, const crane::grpc::AddAccountRequest *request,
crane::grpc::AddAccountReply *response) {
AccountManager::Result judge_res = g_account_manager->HasPermissionToAccount(
request->uid(), request->account().parent_account(), nullptr);
request->uid(), request->account().parent_account());

if (!judge_res.ok) {
response->set_ok(false);
Expand Down Expand Up @@ -551,12 +551,11 @@ grpc::Status CraneCtldServiceImpl::AddUser(
grpc::Status CraneCtldServiceImpl::AddQos(
grpc::ServerContext *context, const crane::grpc::AddQosRequest *request,
crane::grpc::AddQosReply *response) {
AccountManager::Result judge_res =
g_account_manager->HasPermissionToAccount(request->uid(), "", nullptr);
auto judge_res = g_account_manager->CheckUidIsAdmin(request->uid());

if (!judge_res.ok) {
if (judge_res.has_error()) {
response->set_ok(false);
response->set_reason(judge_res.reason);
response->set_reason(judge_res.error());
return grpc::Status::OK;
}

Expand Down Expand Up @@ -591,8 +590,8 @@ grpc::Status CraneCtldServiceImpl::ModifyEntity(

switch (request->entity_type()) {
case crane::grpc::Account:
judge_res = g_account_manager->HasPermissionToAccount(
request->uid(), request->name(), nullptr);
judge_res = g_account_manager->HasPermissionToAccount(request->uid(),
request->name());

if (!judge_res.ok) {
response->set_ok(false);
Expand Down Expand Up @@ -652,18 +651,20 @@ grpc::Status CraneCtldServiceImpl::ModifyEntity(
request->account(), request->item(), request->value(),
request->force());
break;
case crane::grpc::Qos:
judge_res = g_account_manager->HasPermissionToAccount(request->uid(), "",
nullptr);

if (!judge_res.ok) {
case crane::grpc::Qos: {
auto res = g_account_manager->CheckUidIsAdmin(request->uid());

if (res.has_error()) {
response->set_ok(false);
response->set_reason(judge_res.reason);
response->set_reason(res.error());
return grpc::Status::OK;
}

modify_res = g_account_manager->ModifyQos(
request->name(), request->item(), request->value());
break;
} break;

default:
break;
}
Expand Down Expand Up @@ -750,7 +751,7 @@ grpc::Status CraneCtldServiceImpl::QueryEntityInfo(

AccountManager::Result judge_res =
g_account_manager->HasPermissionToAccount(request->uid(),
request->name(), nullptr);
request->name());

if (!judge_res.ok) {
response->set_ok(false);
Expand Down Expand Up @@ -841,7 +842,7 @@ grpc::Status CraneCtldServiceImpl::QueryEntityInfo(
if (user_shared_ptr) {
AccountManager::Result judge_res =
g_account_manager->HasPermissionToUser(request->uid(),
request->name(), nullptr);
request->name());

if (!judge_res.ok) {
response->set_ok(false);
Expand Down Expand Up @@ -1003,7 +1004,7 @@ grpc::Status CraneCtldServiceImpl::DeleteEntity(
case crane::grpc::Account: {
AccountManager::Result judge_res =
g_account_manager->HasPermissionToAccount(request->uid(),
request->name(), nullptr);
request->name());

if (!judge_res.ok) {
response->set_ok(false);
Expand All @@ -1014,13 +1015,11 @@ grpc::Status CraneCtldServiceImpl::DeleteEntity(
res = g_account_manager->DeleteAccount(request->name());
break;
case crane::grpc::Qos: {
AccountManager::Result judge_res =
g_account_manager->HasPermissionToAccount(request->uid(), "",
nullptr);
auto judge_res = g_account_manager->CheckUidIsAdmin(request->uid());

if (!judge_res.ok) {
if (judge_res.has_error()) {
response->set_ok(false);
response->set_reason(judge_res.reason);
response->set_reason(judge_res.error());
return grpc::Status::OK;
}
}
Expand Down Expand Up @@ -1048,7 +1047,7 @@ grpc::Status CraneCtldServiceImpl::BlockAccountOrUser(
switch (request->entity_type()) {
case crane::grpc::Account:
res = g_account_manager->HasPermissionToAccount(request->uid(),
request->name(), nullptr);
request->name());

if (!res.ok) {
response->set_ok(false);
Expand All @@ -1061,7 +1060,7 @@ grpc::Status CraneCtldServiceImpl::BlockAccountOrUser(
break;
case crane::grpc::User:
res = g_account_manager->HasPermissionToUser(request->uid(),
request->name(), nullptr);
request->name());

if (!res.ok) {
response->set_ok(false);
Expand Down
3 changes: 3 additions & 0 deletions src/CraneCtld/CtldPreCompiledHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@
// Concurrent queue
#include <concurrentqueue/concurrentqueue.h>

// UVW
#include <uvw.hpp>

// result
#include <result/result.hpp>

Expand Down
2 changes: 2 additions & 0 deletions src/CraneCtld/CtldPublicDefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ inline const char* kDefaultDbPath = "/tmp/cranectld/embedded.db";

constexpr uint64_t kTaskScheduleIntervalMs = 1000;
constexpr uint16_t kCompletionQueueDelaySeconds = 15;
constexpr uint32_t kCancelTaskTimeoutMs = 500;
constexpr uint64_t kCancelTaskBatchNum = 1000;

struct Config {
struct Node {
Expand Down
Loading

0 comments on commit 232ac38

Please sign in to comment.