Skip to content

Commit

Permalink
add submittask fail reason
Browse files Browse the repository at this point in the history
  • Loading branch information
1daidai1 committed Nov 26, 2024
1 parent 58a0115 commit a31b5d6
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 53 deletions.
56 changes: 24 additions & 32 deletions src/CraneCtld/CtldGrpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,7 @@ CtldServer::CtldServer(const Config::CraneCtldListenConf &listen_conf) {

result::result<std::future<task_id_t>, std::string>
CtldServer::SubmitTaskToScheduler(std::unique_ptr<TaskInCtld> task) {
CraneErr err;
Result result;

if (!task->password_entry->Valid()) {
return result::fail(
Expand Down Expand Up @@ -969,46 +969,38 @@ CtldServer::SubmitTaskToScheduler(std::unique_ptr<TaskInCtld> task) {
return result::fail(enable_res.error());
}

err = g_task_scheduler->AcquireTaskAttributes(task.get());
result = g_task_scheduler->AcquireTaskAttributes(task.get());

if (err == CraneErr::kOk)
err = g_task_scheduler->CheckTaskValidity(task.get());
if (result.err == CraneErr::kOk)
result = g_task_scheduler->CheckTaskValidity(task.get());

if (err == CraneErr::kOk) {
if (result.err == CraneErr::kOk) {
task->SetSubmitTime(absl::Now());
std::future<task_id_t> future =
g_task_scheduler->SubmitTaskAsync(std::move(task));
return {std::move(future)};
}

if (err == CraneErr::kNonExistent) {
CRANE_DEBUG("Task submission failed. Reason: Partition doesn't exist!");
return result::fail("Partition doesn't exist!");
} else if (err == CraneErr::kInvalidNodeNum) {
CRANE_DEBUG(
"Task submission failed. Reason: --node is either invalid or greater "
"than the number of nodes in its partition.");
return result::fail(
"--node is either invalid or greater than the number of nodes in its "
"partition.");
} else if (err == CraneErr::kNoResource) {
CRANE_DEBUG(
"Task submission failed. "
"Reason: The resources of the partition are insufficient.");
return result::fail("The resources of the partition are insufficient");
} else if (err == CraneErr::kNoAvailNode) {
if (result.err == CraneErr::kNonExistent) {
CRANE_DEBUG("Task submission failed. Reason: {}", result.reason.c_str());
return result::fail( result.reason);
} else if (result.err == CraneErr::kInvalidNodeNum) {
CRANE_DEBUG("Task submission failed. Reason: {}.", result.reason);
return result::fail( result.reason);
} else if (result.err == CraneErr::kNoResource) {
CRANE_DEBUG(
"Task submission failed. "
"Reason: Nodes satisfying the requirements of task are insufficient");
return result::fail(
"Nodes satisfying the requirements of task are insufficient.");
} else if (err == CraneErr::kInvalidParam) {
CRANE_DEBUG(
"Task submission failed. "
"Reason: The param of task is invalid.");
return result::fail("The param of task is invalid.");
}
return result::fail(CraneErrStr(err));
"Task submission failed. Reason: {}", result.reason.c_str());
return result::fail( result.reason);
} else if (result.err == CraneErr::kNoAvailNode) {
CRANE_DEBUG("Task submission failed. Reason: {}", result.reason);
return result::fail( result.reason);
} else if (result.err == CraneErr::kInvalidParam) {
CRANE_DEBUG("Task submission failed. Reason: {}.", result.reason);
return result::fail( result.reason);
}

std::string errorMessage = std::string(CraneErrStr(result.err)) + ": " + result.reason;
return result::fail(errorMessage.c_str());
}

} // namespace Ctld
77 changes: 58 additions & 19 deletions src/CraneCtld/TaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ bool TaskScheduler::Init() {

bool ok;
CraneErr err;
Result result;

EmbeddedDbClient::DbSnapshot snapshot;
ok = g_embedded_db_client->RetrieveLastSnapshot(&snapshot);
Expand All @@ -87,8 +88,8 @@ bool TaskScheduler::Init() {
CRANE_TRACE("Restore task #{} from embedded running queue.",
task->TaskId());

err = AcquireTaskAttributes(task.get());
if (err != CraneErr::kOk || task->type == crane::grpc::Interactive) {
result = AcquireTaskAttributes(task.get());
if (result.err != CraneErr::kOk || task->type == crane::grpc::Interactive) {
task->SetStatus(crane::grpc::Failed);
ok = g_embedded_db_client->UpdateRuntimeAttrOfTask(0, task_db_id,
task->RuntimeAttr());
Expand All @@ -98,13 +99,13 @@ bool TaskScheduler::Init() {
"mark the task as FAILED.",
task_id);
}
if (err != CraneErr::kOk)
if (result.err != CraneErr::kOk)
CRANE_INFO(
"Failed to acquire task attributes for restored running task "
"#{}. "
"Error Code: {}. "
"Mark it as FAILED and move it to the ended queue.",
task_id, CraneErrStr(err));
task_id, CraneErrStr(result.err));
else {
CRANE_INFO("Mark running interactive task {} as FAILED.", task_id);
for (const CranedId& craned_id : task->CranedIds()) {
Expand Down Expand Up @@ -324,13 +325,13 @@ bool TaskScheduler::Init() {
}

if (!mark_task_as_failed &&
AcquireTaskAttributes(task.get()) != CraneErr::kOk) {
(AcquireTaskAttributes(task.get()).err != CraneErr::kOk)) {
CRANE_ERROR("AcquireTaskAttributes failed for task #{}", task_id);
mark_task_as_failed = true;
}

if (!mark_task_as_failed &&
CheckTaskValidity(task.get()) != CraneErr::kOk) {
(CheckTaskValidity(task.get()).err != CraneErr::kOk)) {
CRANE_ERROR("CheckTaskValidity failed for task #{}", task_id);
mark_task_as_failed = true;
}
Expand Down Expand Up @@ -2724,9 +2725,13 @@ void TaskScheduler::PersistAndTransferTasksToMongodb_(
}
}

CraneErr TaskScheduler::AcquireTaskAttributes(TaskInCtld* task) {
Result TaskScheduler::AcquireTaskAttributes(TaskInCtld* task) {
auto part_it = g_config.Partitions.find(task->partition_id);
if (part_it == g_config.Partitions.end()) return CraneErr::kInvalidParam;
if (part_it == g_config.Partitions.end()) {
return Result{
CraneErr::kInvalidParam,
fmt::format("Partition {} doesn't exist", task->partition_id)};
}

task->partition_priority = part_it->second.priority;

Expand Down Expand Up @@ -2757,31 +2762,38 @@ CraneErr TaskScheduler::AcquireTaskAttributes(TaskInCtld* task) {
if (check_qos_result.has_error()) {
CRANE_ERROR("Failed to call CheckAndApplyQosLimitOnTask: {}",
check_qos_result.error());
return CraneErr::kInvalidParam;
return Result{CraneErr::kInvalidParam, check_qos_result.error()};
}

if (!task->TaskToCtld().nodelist().empty() && task->included_nodes.empty()) {
std::list<std::string> nodes;
bool ok = util::ParseHostList(task->TaskToCtld().nodelist(), &nodes);
if (!ok) return CraneErr::kInvalidParam;
if (!ok)
return Result{CraneErr::kInvalidParam,
fmt::format("nodelist {} error",
task->TaskToCtld().nodelist())};

for (auto&& node : nodes) task->included_nodes.emplace(std::move(node));
}

if (!task->TaskToCtld().excludes().empty() && task->excluded_nodes.empty()) {
std::list<std::string> nodes;
bool ok = util::ParseHostList(task->TaskToCtld().excludes(), &nodes);
if (!ok) return CraneErr::kInvalidParam;
if (!ok)
Result{CraneErr::kInvalidParam,
fmt::format("excludes nodelist {} error",
task->TaskToCtld().excludes())};

for (auto&& node : nodes) task->excluded_nodes.emplace(std::move(node));
}

return CraneErr::kOk;
return Result{CraneErr::kOk, ""};
}

CraneErr TaskScheduler::CheckTaskValidity(TaskInCtld* task) {
Result TaskScheduler::CheckTaskValidity(TaskInCtld* task) {
if (!CheckIfTimeLimitIsValid(task->time_limit))
return CraneErr::kInvalidParam;
return Result{CraneErr::kInvalidParam,
fmt::format("task time_limit is invalid")};

// Check whether the selected partition is able to run this task.
std::unordered_set<std::string> avail_nodes;
Expand All @@ -2792,7 +2804,7 @@ CraneErr TaskScheduler::CheckTaskValidity(TaskInCtld* task) {
// Since we do not access the elements in partition_metas_m

// Check whether the selected partition is able to run this task.
if (!(task->requested_node_res_view * task->node_num <=
if (!(task->requested_node_res_view * task->node_num <=
metas_ptr->partition_global_meta.res_total_inc_dead)) {
CRANE_TRACE(
"Resource not enough for task #{}. "
Expand All @@ -2811,15 +2823,37 @@ CraneErr TaskScheduler::CheckTaskValidity(TaskInCtld* task) {
.memory_sw_bytes),
util::ReadableTypedDeviceMap(
metas_ptr->partition_global_meta.res_total.GetDeviceMap()));
return CraneErr::kNoResource;
return Result{
CraneErr::kNoResource,
fmt::format(
"Resource not enough for task #{}. "
"Partition total: cpu {}, mem: {}, mem+sw: {}, gres: {}",
task->TaskId(),
metas_ptr->partition_global_meta.res_total_inc_dead
.GetAllocatableRes()
.cpu_count,
util::ReadableMemory(
metas_ptr->partition_global_meta.res_total_inc_dead
.GetAllocatableRes()
.memory_bytes),
util::ReadableMemory(
metas_ptr->partition_global_meta.res_total_inc_dead
.GetAllocatableRes()
.memory_sw_bytes),
util::ReadableTypedDeviceMap(
metas_ptr->partition_global_meta.res_total.GetDeviceMap()))};
}

if (task->node_num > metas_ptr->craned_ids.size()) {
CRANE_TRACE(
"Nodes not enough for task #{}. "
"Partition total Nodes: {}",
task->TaskId(), metas_ptr->craned_ids.size());
return CraneErr::kInvalidNodeNum;
return Result{CraneErr::kInvalidNodeNum,
fmt::format(
"Nodes not enough for task #{}. "
"Partition total Nodes: {}",
task->TaskId(), metas_ptr->craned_ids.size())};
}

auto craned_meta_map = g_meta_container->GetCranedMetaMapConstPtr();
Expand All @@ -2841,10 +2875,15 @@ CraneErr TaskScheduler::CheckTaskValidity(TaskInCtld* task) {
"Resource not enough. Task #{} needs {} nodes, while only {} "
"nodes satisfy its requirement.",
task->TaskId(), task->node_num, avail_nodes.size());
return CraneErr::kNoAvailNode;
return Result{
CraneErr::kNoAvailNode,
fmt::format(
"Resource not enough. Task #{} needs {} nodes, while only {} "
"nodes satisfy its requirement",
task->TaskId(), task->node_num, avail_nodes.size())};
}

return CraneErr::kOk;
return Result{CraneErr::kOk, ""};
}

void TaskScheduler::TerminateTasksOnCraned(const CranedId& craned_id,
Expand Down
9 changes: 7 additions & 2 deletions src/CraneCtld/TaskScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ using OrderedTaskMap = absl::btree_map<task_id_t, std::unique_ptr<TaskInCtld>>;
using UnorderedTaskMap =
absl::flat_hash_map<task_id_t, std::unique_ptr<TaskInCtld>>;

struct Result {
CraneErr err{};
std::string reason;
};

class IPrioritySorter {
public:
virtual std::vector<task_id_t> GetOrderedTaskIdList(
Expand Down Expand Up @@ -282,9 +287,9 @@ class TaskScheduler {
return TerminateRunningTaskNoLock_(iter->second.get());
}

static CraneErr AcquireTaskAttributes(TaskInCtld* task);
static Result AcquireTaskAttributes(TaskInCtld* task);

static CraneErr CheckTaskValidity(TaskInCtld* task);
static Result CheckTaskValidity(TaskInCtld* task);

private:
template <class... Ts>
Expand Down

0 comments on commit a31b5d6

Please sign in to comment.