Skip to content

Commit

Permalink
use codeErr type
Browse files Browse the repository at this point in the history
  • Loading branch information
1daidai1 committed Dec 11, 2024
1 parent 5dabede commit bf52d55
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 80 deletions.
4 changes: 2 additions & 2 deletions protos/Crane.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ message SubmitBatchTaskReply {
bool ok = 1;
oneof payload{
uint32 task_id = 2;
string reason = 3;
ErrCode reason = 3;
}
}

Expand All @@ -72,7 +72,7 @@ message SubmitBatchTasksRequest {

message SubmitBatchTasksReply {
repeated uint32 task_id_list = 1;
repeated string reason_list = 2;
repeated ErrCode reason_list = 2;
}

message ExecuteTasksRequest {
Expand Down
123 changes: 67 additions & 56 deletions protos/PublicDefs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -319,65 +319,76 @@ message TrimmedPartitionInfo {
enum ErrCode {
SUCCESS = 0; // Success

ERR_INVALID_UID = 10001;
ERR_INVALID_OP_USER = 10002;
ERR_INVALID_USER = 10003;
ERR_PERMISSION_USER = 10004;
ERR_USER_DUPLICATE_ACCOUNT = 10005;
ERR_USER_ALLOWED_ACCOUNT = 10006;
ERR_INVALID_ADMIN_LEVEL = 10007;
ERR_USER_ACCOUNT_MISMATCH = 10008;
ERR_NO_ACCOUNT_SPECIFIED = 10009;

ERR_INVALID_ACCOUNT = 10010;
ERR_DUPLICATE_ACCOUNT = 10011;
ERR_INVALID_PARENTACCOUNT = 10012;
ERR_DELETE_ACCOUNT = 10013;

ERR_INVALID_PARTITION = 10014;
ERR_ALLOWED_PARTITION = 10015;
ERR_DUPLICATE_PARTITION = 10016;
ERR_PARENT_ALLOWED_PARTITION = 10017;
ERR_USER_EMPTY_PARTITION = 10018;
ERR_CHILD_HAS_PARTITION = 10019;

ERR_INVALID_QOS = 10020;
ERR_DB_DUPLICATE_QOS = 10021;
ERR_DELETE_QOS = 10022;
ERR_CONVERT_TO_INTERGER = 10023;
ERR_TIME_LIMIT = 10024;
ERR_ALLOWED_QOS = 10025;
ERR_DUPLICATE_QOS = 10026;
ERR_PARENT_ALLOWED_QOS = 10027;
ERR_SET_ALLOWED_QOS = 10028;
ERR_ALLOWED_DEFAULT_QOS = 10029;
ERR_DUPLICATE_DEFAULT_QOS = 10030;
ERR_CHILD_HAS_DEFAULT_QOS = 10031;
ERR_SET_ACCOUNT_QOS = 10032;
ERR_SET_DEFAULT_QOS = 10033;
ERR_IS_DEFAULT_QOS = 10034;

ERR_UPDATE_DATABASE = 10035;

ERR_GENERIC_FAILURE = 10100;
ERR_INVALID_UID = 10001; // Invalid UID passed
ERR_INVALID_OP_USER = 10002; // Invalid operation user
ERR_INVALID_USER = 10003; // Invalid user
ERR_PERMISSION_USER = 10004; // User permissions too low, no permission to operate
ERR_BLOCKED_USER = 10005;
ERR_USER_DUPLICATE_ACCOUNT= 10006; // User duplicate account insertion
ERR_USER_ALLOWED_ACCOUNT = 10007; // User does not have permission for the account
ERR_INVALID_ADMIN_LEVEL = 10008; // Invalid permission level
ERR_USER_ACCOUNT_MISMATCH = 10009; // User does not belong to the account
ERR_NO_ACCOUNT_SPECIFIED = 10010;

ERR_INVALID_ACCOUNT = 10011; // Invalid account
ERR_DUPLICATE_ACCOUNT = 10012; // Duplicate account insertion
ERR_INVALID_PARENTACCOUNT = 10013; // Invalid parent account
ERR_DELETE_ACCOUNT = 10014; // Account has child nodes
ERR_BLOCKED_ACCOUNT = 10015;

ERR_INVALID_PARTITION = 10016; // Invalid partition, partition does not exist
ERR_ALLOWED_PARTITION = 10017; // Account/user does not include this partition
ERR_DUPLICATE_PARTITION = 10018; // Account/user duplicate insertion
ERR_PARENT_ALLOWED_PARTITION = 10019; // Parent account does not include this partition
ERR_USER_EMPTY_PARTITION = 10020; // Cannot add QoS when user has no partition
ERR_CHILD_HAS_PARTITION = 10021; // Partition '{}' is used by some descendant node of the account '{}'. Ignoring this constraint with forced operation.
ERR_HAS_NO_QOS_IN_PARTITION = 10022;
ERR_HAS_ALLOWED_QOS_IN_PARTITION = 10023;

ERR_INVALID_QOS = 10024; // Invalid QoS, QoS does not exist
ERR_DB_DUPLICATE_QOS = 10025; // Duplicate QoS insertion in the database.
ERR_DELETE_QOS = 10026; // QoS reference count is not zero.
ERR_CONVERT_TO_INTERGER = 10027; // String to integer conversion failed
ERR_TIME_LIMIT = 10028; // Invalid time value
ERR_ALLOWED_QOS = 10029; // Account/user does not include this QoS.
ERR_DUPLICATE_QOS = 10030; // Account/user duplicate insertion.
ERR_PARENT_ALLOWED_QOS = 10031; // Parent account does not include this QoS.
ERR_SET_ALLOWED_QOS = 10032; // QoS '{}' is the default QoS of partition '{}', but not found in the new QoS list.
ERR_ALLOWED_DEFAULT_QOS = 10033; // Default QoS is not in the allowed QoS list
ERR_DUPLICATE_DEFAULT_QOS = 10034; // Duplicate default QoS setting
ERR_CHILD_HAS_DEFAULT_QOS = 10035; // Someone is using QoS '{}' as default QoS. Ignoring this constraint with forced deletion, the deleted default QoS is randomly replaced with one of the remaining items in the QoS list.
ERR_SET_ACCOUNT_QOS = 10036; // QoS '{}' is used by some descendant node or itself of the account '{}'. Ignoring this constraint with forced operation.
ERR_SET_DEFAULT_QOS = 10037; // Qos '{}' not in allowed qos list or is already the default qos
ERR_IS_DEFAULT_QOS = 10038;

ERR_UPDATE_DATABASE = 10039; // Database update failed

ERR_GENERIC_FAILURE = 10100;
ERR_NO_RESOURCE = 10101;
ERR_NON_EXISTENT = 10102;
ERR_INVALID_NODE_NUM = 10103;
ERR_SYSTEM_ERR = 10104;
ERR_EXISTING_TASK = 10105;
ERR_INVALID_PARAM = 10106;
ERR_STOP = 10107;
ERR_PERMISSION_DENIED = 10108;
ERR_CONNECTION_TIMEOUT = 10109;
ERR_CONNECTION_ABORTED = 10110;
ERR_RPC_FAILURE = 10111;
ERR_TOKEN_REQUEST_FAILURE = 10112;
ERR_STREAM_BROKEN = 10113;
ERR_INVALID_STUB = 10114;
ERR_CGROUP = 10115;
ERR_PROTOBUF = 10116;
ERR_LIB_EVENT = 10117;
ERR_NO_AVAIL_NODE = 10118;
ERR_INVAILD_NODE_LIST = 10104;
ERR_INVAILD_EX_NODE_LIST = 10105;
ERR_TIME_TIMIT_BEYOND = 10106;
ERR_CPUS_PER_TASK_BEYOND = 10107;
ERR_NO_ENOUGH_NODE = 10108;

ERR_SYSTEM_ERR = 10109;
ERR_EXISTING_TASK = 10110;
ERR_BEYOND_TASK_ID = 10111;
ERR_INVALID_PARAM = 10112;
ERR_STOP = 10113;
ERR_PERMISSION_DENIED = 10114;
ERR_CONNECTION_TIMEOUT = 10115;
ERR_CONNECTION_ABORTED = 10116;
ERR_RPC_FAILURE = 10117;
ERR_TOKEN_REQUEST_FAILURE = 10118;
ERR_STREAM_BROKEN = 10119;
ERR_INVALID_STUB = 10120;
ERR_CGROUP = 10121;
ERR_PROTOBUF = 10122;
ERR_LIB_EVENT = 10123;
ERR_NO_AVAIL_NODE = 10124;
}

enum EntityType {
Expand Down
16 changes: 16 additions & 0 deletions src/CraneCtld/AccountManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,11 @@ bool AccountManager::CheckUserPermissionToPartition(
return false;
}

<<<<<<< HEAD
std::expected<void, std::string> AccountManager::CheckIfUserOfAccountIsEnabled(
=======
CraneErrCodeExpected<void> AccountManager::CheckIfUserOfAccountIsEnabled(
>>>>>>> 2d88fb3 (use codeErr type)
const std::string& user, const std::string& account) {
util::read_lock_guard user_guard(m_rw_user_mutex_);
util::read_lock_guard account_guard(m_rw_account_mutex_);
Expand All @@ -907,15 +911,23 @@ std::expected<void, std::string> AccountManager::CheckIfUserOfAccountIsEnabled(
do {
const Account* account_ptr = GetExistedAccountInfoNoLock_(account_name);
if (account_ptr->blocked) {
<<<<<<< HEAD
return std::unexpected(
fmt::format("Ancestor account '{}' is blocked", account_ptr->name));
=======
return std::unexpected(crane::grpc::ErrCode::ERR_BLOCKED_ACCOUNT);
>>>>>>> 2d88fb3 (use codeErr type)
}
account_name = account_ptr->parent_account;
} while (!account_name.empty());

const User* user_ptr = GetExistedUserInfoNoLock_(user);
if (user_ptr->account_to_attrs_map.at(account).blocked) {
<<<<<<< HEAD
return std::unexpected(fmt::format("User '{}' is blocked", user_ptr->name));
=======
return std::unexpected(crane::grpc::ErrCode::ERR_BLOCKED_USER);
>>>>>>> 2d88fb3 (use codeErr type)
}
return {};
}
Expand Down Expand Up @@ -989,7 +1001,11 @@ AccountManager::CraneExpected<void> AccountManager::CheckAndApplyQosLimitOnTask(
}

return {};
<<<<<<< HEAD
}}
=======
}
>>>>>>> 2d88fb3 (use codeErr type)

std::expected<void, std::string> AccountManager::CheckUidIsAdmin(
uint32_t uid) {
Expand Down
4 changes: 4 additions & 0 deletions src/CraneCtld/AccountManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ class AccountManager {
const std::string& account,
const std::string& partition);

<<<<<<< HEAD
std::expected<void, std::string> CheckIfUserOfAccountIsEnabled(
=======
CraneErrCodeExpected<void> CheckIfUserOfAccountIsEnabled(
>>>>>>> 2d88fb3 (use codeErr type)
const std::string& user, const std::string& account);

CraneExpected<void> CheckAndApplyQosLimitOnTask(const std::string& user,
Expand Down
17 changes: 14 additions & 3 deletions src/CraneCtld/CtldGrpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ grpc::Status CraneCtldServiceImpl::SubmitBatchTask(
response->set_task_id(id);
} else {
response->set_ok(false);
response->set_reason(
"System error occurred or "
"the number of pending tasks exceeded maximum value.");
response->set_reason(crane::grpc::ErrCode::ERR_BEYOND_TASK_ID);
}
} else {
response->set_ok(false);
Expand All @@ -57,7 +55,11 @@ grpc::Status CraneCtldServiceImpl::SubmitBatchTasks(
grpc::ServerContext *context,
const crane::grpc::SubmitBatchTasksRequest *request,
crane::grpc::SubmitBatchTasksReply *response) {
<<<<<<< HEAD
std::vector<std::expected<std::future<task_id_t>, std::string>> results;
=======
std::vector<CraneErrCodeExpected<std::future<task_id_t>>> results;
>>>>>>> 2d88fb3 (use codeErr type)

uint32_t task_count = request->count();
const auto &task_to_ctld = request->task();
Expand Down Expand Up @@ -815,7 +817,11 @@ grpc::Status CraneCtldServiceImpl::CforedStream(
result = std::expected<task_id_t, std::string>{
submit_result.value().get()};
} else {
<<<<<<< HEAD
result = std::unexpected(submit_result.error());
=======
result = result::fail(CraneErrCodeStr(submit_result.error()));
>>>>>>> 2d88fb3 (use codeErr type)
}
ok = stream_writer->WriteTaskIdReply(payload.pid(), result);

Expand Down Expand Up @@ -984,6 +990,11 @@ CtldServer::SubmitTaskToScheduler(std::unique_ptr<TaskInCtld> task) {
return {std::move(future)};
}

<<<<<<< HEAD
return std::unexpected(result.error());}
=======
return std::unexpected(result.error());
}
>>>>>>> 2d88fb3 (use codeErr type)

} // namespace Ctld
4 changes: 4 additions & 0 deletions src/CraneCtld/CtldGrpcServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,11 @@ class CtldServer {

inline void Wait() { m_server_->Wait(); }

<<<<<<< HEAD
std::expected<std::future<task_id_t>, std::string> SubmitTaskToScheduler(
=======
CraneErrCodeExpected <std::future<task_id_t>> SubmitTaskToScheduler(
>>>>>>> 2d88fb3 (use codeErr type)
std::unique_ptr<TaskInCtld> task);

private:
Expand Down
35 changes: 18 additions & 17 deletions src/CraneCtld/TaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ bool TaskScheduler::Init() {
CRANE_TRACE("Restore task #{} from embedded running queue.",
task->TaskId());

err = AcquireTaskAttributes(task.get());
if (err != CraneErr::kOk || task->type == crane::grpc::Interactive) {
auto result = AcquireTaskAttributes(task.get());
if (!result || task->type == crane::grpc::Interactive) {
task->SetStatus(crane::grpc::Failed);
ok = g_embedded_db_client->UpdateRuntimeAttrOfTask(0, task_db_id,
task->RuntimeAttr());
Expand All @@ -92,13 +92,13 @@ bool TaskScheduler::Init() {
"mark the task as FAILED.",
task_id);
}
if (err != CraneErr::kOk)
if (!result)
CRANE_INFO(
"Failed to acquire task attributes for restored running task "
"#{}. "
"Error Code: {}. "
"Mark it as FAILED and move it to the ended queue.",
task_id, CraneErrStr(err));
task_id, CraneErrCodeStr(result.error()));
else {
CRANE_INFO("Mark running interactive task {} as FAILED.", task_id);
for (const CranedId& craned_id : task->CranedIds()) {
Expand Down Expand Up @@ -318,13 +318,13 @@ bool TaskScheduler::Init() {
}

if (!mark_task_as_failed &&
AcquireTaskAttributes(task.get()) != CraneErr::kOk) {
!(AcquireTaskAttributes(task.get()))) {
CRANE_ERROR("AcquireTaskAttributes failed for task #{}", task_id);
mark_task_as_failed = true;
}

if (!mark_task_as_failed &&
CheckTaskValidity(task.get()) != CraneErr::kOk) {
!(CheckTaskValidity(task.get()))) {
CRANE_ERROR("CheckTaskValidity failed for task #{}", task_id);
mark_task_as_failed = true;
}
Expand Down Expand Up @@ -2718,9 +2718,10 @@ void TaskScheduler::PersistAndTransferTasksToMongodb_(
}
}

CraneErr TaskScheduler::AcquireTaskAttributes(TaskInCtld* task) {
CraneErrCodeExpected<void> TaskScheduler::AcquireTaskAttributes(TaskInCtld* task) {
auto part_it = g_config.Partitions.find(task->partition_id);
if (part_it == g_config.Partitions.end()) return CraneErr::kNonExistent;
if (part_it == g_config.Partitions.end())
return std::unexpected(crane::grpc::ErrCode::ERR_INVALID_PARTITION);

task->partition_priority = part_it->second.priority;

Expand Down Expand Up @@ -2753,25 +2754,25 @@ CraneErr TaskScheduler::AcquireTaskAttributes(TaskInCtld* task) {
if (!task->TaskToCtld().nodelist().empty() && task->included_nodes.empty()) {
std::list<std::string> nodes;
bool ok = util::ParseHostList(task->TaskToCtld().nodelist(), &nodes);
if (!ok) return CraneErr::kInvaildNodeList;
if (!ok) return std::unexpected(crane::grpc::ErrCode::ERR_INVAILD_NODE_LIST);

for (auto&& node : nodes) task->included_nodes.emplace(std::move(node));
}

if (!task->TaskToCtld().excludes().empty() && task->excluded_nodes.empty()) {
std::list<std::string> nodes;
bool ok = util::ParseHostList(task->TaskToCtld().excludes(), &nodes);
if (!ok) return CraneErr::kInvalidExNodeList;
if (!ok) return std::unexpected(crane::grpc::ErrCode::ERR_INVAILD_EX_NODE_LIST);

for (auto&& node : nodes) task->excluded_nodes.emplace(std::move(node));
}

return CraneErr::kOk;
return {};
}

CraneErr TaskScheduler::CheckTaskValidity(TaskInCtld* task) {
CraneErrCodeExpected<void> TaskScheduler::CheckTaskValidity(TaskInCtld* task) {
if (!CheckIfTimeLimitIsValid(task->time_limit))
return CraneErr::kInvalidTimeLimit;
return std::unexpected(crane::grpc::ErrCode::ERR_TIME_TIMIT_BEYOND) ;

// Check whether the selected partition is able to run this task.
std::unordered_set<std::string> avail_nodes;
Expand Down Expand Up @@ -2801,15 +2802,15 @@ CraneErr TaskScheduler::CheckTaskValidity(TaskInCtld* task) {
.memory_sw_bytes),
util::ReadableTypedDeviceMap(
metas_ptr->partition_global_meta.res_total.GetDeviceMap()));
return CraneErr::kNoResource;
return std::unexpected(crane::grpc::ErrCode::ERR_NO_RESOURCE) ;
}

if (task->node_num > metas_ptr->craned_ids.size()) {
CRANE_TRACE(
"Nodes not enough for task #{}. "
"Partition total Nodes: {}",
task->TaskId(), metas_ptr->craned_ids.size());
return CraneErr::kInvalidNodeNum;
return std::unexpected(crane::grpc::ErrCode::ERR_INVALID_NODE_NUM);
}

auto craned_meta_map = g_meta_container->GetCranedMetaMapConstPtr();
Expand All @@ -2831,10 +2832,10 @@ CraneErr TaskScheduler::CheckTaskValidity(TaskInCtld* task) {
"Resource not enough. Task #{} needs {} nodes, while only {} "
"nodes satisfy its requirement.",
task->TaskId(), task->node_num, avail_nodes.size());
return CraneErr::kNoAvailNode;
return std::unexpected(crane::grpc::ErrCode::ERR_NO_ENOUGH_NODE);
}

return CraneErr::kOk;
return {};
}

void TaskScheduler::TerminateTasksOnCraned(const CranedId& craned_id,
Expand Down
4 changes: 2 additions & 2 deletions src/CraneCtld/TaskScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,9 @@ class TaskScheduler {
return TerminateRunningTaskNoLock_(iter->second.get());
}

static CraneErr AcquireTaskAttributes(TaskInCtld* task);
static CraneErrCodeExpected<void> AcquireTaskAttributes(TaskInCtld* task);

static CraneErr CheckTaskValidity(TaskInCtld* task);
static CraneErrCodeExpected<void> CheckTaskValidity(TaskInCtld* task);

private:
template <class... Ts>
Expand Down

0 comments on commit bf52d55

Please sign in to comment.