Skip to content

Commit

Permalink
Dev/export (#201)
Browse files Browse the repository at this point in the history
* add --get_user_env

* add --export={[ALL,]<environment_variables>|ALL|NIL|NONE}

* change env to type map<string, string> and replace TaskInCtld with TaskInDB.

Update function InsertJobs to use TaskInDB

* Modify --get-user-env and remove CRANE_EXPORT_ENV and CRANE_GET_USER_ENV.

* Refactor.

Signed-off-by: RileyW <wrllrwwrllrw@gmail.com>

* Refactor.

Signed-off-by: RileyW <wrllrwwrllrw@gmail.com>

* Refactor.

Signed-off-by: RileyW <wrllrwwrllrw@gmail.com>

* Fix argv[0] error.

Signed-off-by: RileyW <wrllrwwrllrw@gmail.com>

* Fixed the database garbled error and recover InsertRecoveredJob function.

* Simplified TaskInDb.

* Refactor.

Signed-off-by: RileyW <wrllrwwrllrw@gmail.com>

---------

Signed-off-by: RileyW <wrllrwwrllrw@gmail.com>
Co-authored-by: RileyW <wrllrwwrllrw@gmail.com>
  • Loading branch information
NamelessOIer and RileyWen authored Feb 19, 2024
1 parent 60bf4a4 commit 11b76b7
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 102 deletions.
7 changes: 5 additions & 2 deletions protos/PublicDefs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ message TaskToCtld {
double cpus_per_task = 11;

bool requeue_if_failed = 12;
bool get_user_env = 13;

oneof payload {
BatchTaskAdditionalMeta batch_meta = 21;
Expand All @@ -96,7 +97,7 @@ message TaskToCtld {

string cmd_line = 31;
string cwd = 32; // Current working directory
string env = 33;
map<string, string> env = 33;

string excludes = 34;
string nodelist = 35;
Expand Down Expand Up @@ -146,7 +147,7 @@ message TaskToD {
InteractiveTaskAdditionalMeta interactive_meta = 11;
}

string env = 12;
map<string, string> env = 12;
string cwd = 13;

repeated string allocated_nodes = 14;
Expand All @@ -161,6 +162,8 @@ message TaskToD {
uint32 node_num = 21;
uint32 ntasks_per_node = 22;
double cpus_per_task = 23;

bool get_user_env = 24;
}

message BatchTaskAdditionalMeta {
Expand Down
4 changes: 3 additions & 1 deletion src/CraneCtld/CranedKeeper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ CraneErr CranedStub::ExecuteTasks(
mutable_task->set_cpus_per_task(task->cpus_per_task);

mutable_task->set_uid(task->uid);
mutable_task->set_env(task->env);
mutable_task->mutable_env()->insert(task->env.begin(), task->env.end());

mutable_task->set_cwd(task->cwd);
mutable_task->set_get_user_env(task->get_user_env);

for (const auto &hostname : task->CranedIds())
mutable_task->mutable_allocated_nodes()->Add()->assign(hostname);
Expand Down
66 changes: 31 additions & 35 deletions src/CraneCtld/CtldGrpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,47 +352,46 @@ grpc::Status CraneCtldServiceImpl::QueryTasksInfo(

// Query completed tasks in Mongodb
// (only for cacct, which sets `option_include_completed_tasks` to true)
std::vector<std::unique_ptr<TaskInCtld>> db_ended_list;
std::vector<std::unique_ptr<TaskInDb>> db_ended_list;
ok = g_db_client->FetchJobRecords(&db_ended_list,
num_limit - task_list->size(), true);
if (!ok) {
CRANE_ERROR("Failed to call g_db_client->FetchJobRecords");
return grpc::Status::OK;
}

auto db_ended_append_fn = [&](std::unique_ptr<TaskInCtld> const &task) {
auto db_ended_append_fn = [&](std::unique_ptr<TaskInDb> const &task) {
auto *task_it = task_list->Add();

task_it->set_type(task->type);
task_it->set_task_id(task->TaskId());
task_it->set_task_id(task->task_id);
task_it->set_name(task->name);
task_it->set_partition(task->partition_id);
task_it->set_uid(task->uid);

task_it->set_gid(task->Gid());
task_it->set_gid(task->gid);
task_it->mutable_time_limit()->set_seconds(
ToInt64Seconds(task->time_limit));
task_it->mutable_submit_time()->CopyFrom(
task->PersistedPart().submit_time());
task_it->mutable_start_time()->CopyFrom(task->PersistedPart().start_time());
task_it->mutable_end_time()->CopyFrom(task->PersistedPart().end_time());
task_it->mutable_submit_time()->CopyFrom(task->submit_time);
task_it->mutable_start_time()->CopyFrom(task->start_time);
task_it->mutable_end_time()->CopyFrom(task->end_time);
task_it->set_account(task->account);

task_it->set_node_num(task->node_num);
task_it->set_cmd_line(task->cmd_line);
task_it->set_cwd(task->cwd);
task_it->set_username(task->PersistedPart().username());
task_it->set_username(task->username);
task_it->set_qos(task->qos);

task_it->set_alloc_cpu(task->resources.allocatable_resource.cpu_count *
task->node_num);
task_it->set_exit_code(task->ExitCode());
task_it->set_exit_code(task->exit_code);

task_it->set_status(task->Status());
task_it->set_status(task->status);
task_it->set_craned_list(task->allocated_craneds_regex);
};

auto db_task_rng_filter_time = [&](std::unique_ptr<TaskInCtld> const &task) {
auto db_task_rng_filter_time = [&](std::unique_ptr<TaskInDb> const &task) {
bool has_submit_time_interval = request->has_filter_submit_time_interval();
bool has_start_time_interval = request->has_filter_start_time_interval();
bool has_end_time_interval = request->has_filter_end_time_interval();
Expand All @@ -401,61 +400,58 @@ grpc::Status CraneCtldServiceImpl::QueryTasksInfo(
if (has_submit_time_interval) {
const auto &interval = request->filter_submit_time_interval();
valid &= !interval.has_lower_bound() ||
task->PersistedPart().submit_time() >= interval.lower_bound();
task->submit_time >= interval.lower_bound();
valid &= !interval.has_upper_bound() ||
task->PersistedPart().submit_time() <= interval.upper_bound();
task->submit_time <= interval.upper_bound();
}

if (has_start_time_interval) {
const auto &interval = request->filter_start_time_interval();
valid &= !interval.has_lower_bound() ||
task->PersistedPart().start_time() >= interval.lower_bound();
task->start_time >= interval.lower_bound();
valid &= !interval.has_upper_bound() ||
task->PersistedPart().start_time() <= interval.upper_bound();
task->start_time <= interval.upper_bound();
}

if (has_end_time_interval) {
const auto &interval = request->filter_end_time_interval();
valid &= !interval.has_lower_bound() ||
task->PersistedPart().end_time() >= interval.lower_bound();
task->end_time >= interval.lower_bound();
valid &= !interval.has_upper_bound() ||
task->PersistedPart().end_time() <= interval.upper_bound();
task->end_time <= interval.upper_bound();
}

return valid;
};

auto db_task_rng_filter_account =
[&](std::unique_ptr<TaskInCtld> const &task) {
return no_accounts_constraint || req_accounts.contains(task->account);
};
auto db_task_rng_filter_account = [&](std::unique_ptr<TaskInDb> const &task) {
return no_accounts_constraint || req_accounts.contains(task->account);
};

auto db_task_rng_filter_user = [&](std::unique_ptr<TaskInCtld> const &task) {
return no_username_constraint || req_users.contains(task->Username());
auto db_task_rng_filter_user = [&](std::unique_ptr<TaskInDb> const &task) {
return no_username_constraint || req_users.contains(task->username);
};

auto db_task_rng_filter_name = [&](std::unique_ptr<TaskInCtld> const &task) {
return no_task_names_constraint ||
req_task_names.contains(task->TaskToCtld().name());
auto db_task_rng_filter_name = [&](std::unique_ptr<TaskInDb> const &task) {
return no_task_names_constraint || req_task_names.contains(task->name);
};

auto db_task_rng_filter_qos = [&](std::unique_ptr<TaskInCtld> const &task) {
auto db_task_rng_filter_qos = [&](std::unique_ptr<TaskInDb> const &task) {
return no_qos_constraint || req_qos.contains(task->qos);
};

auto db_task_rng_filter_partition =
[&](std::unique_ptr<TaskInCtld> const &task) {
[&](std::unique_ptr<TaskInDb> const &task) {
return no_partitions_constraint ||
req_partitions.contains(task->TaskToCtld().partition_name());
req_partitions.contains(task->partition_id);
};

auto db_task_rng_filter_id = [&](std::unique_ptr<TaskInCtld> const &task) {
return no_task_ids_constraint || req_task_ids.contains(task->TaskId());
auto db_task_rng_filter_id = [&](std::unique_ptr<TaskInDb> const &task) {
return no_task_ids_constraint || req_task_ids.contains(task->task_id);
};

auto db_task_rng_filter_state = [&](std::unique_ptr<TaskInCtld> const &task) {
return no_task_states_constraint ||
req_task_states.contains(task->PersistedPart().status());
auto db_task_rng_filter_state = [&](std::unique_ptr<TaskInDb> const &task) {
return no_task_states_constraint || req_task_states.contains(task->status);
};

auto db_ended_rng = db_ended_list |
Expand Down
40 changes: 38 additions & 2 deletions src/CraneCtld/CtldPublicDefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,10 @@ struct TaskInCtld {
std::unordered_set<std::string> excluded_nodes;

bool requeue_if_failed{false};
bool get_user_env{false};

std::string cmd_line;
std::string env;
std::unordered_map<std::string, std::string> env;
std::string cwd;

std::variant<InteractiveMetaInTask, BatchMetaInTask> meta;
Expand Down Expand Up @@ -413,9 +414,13 @@ struct TaskInCtld {
name = val.name();
qos = val.qos();
cmd_line = val.cmd_line();
env = val.env();

for (auto& [k, v] : val.env()) env[k] = v;

cwd = val.cwd();
qos = val.qos();

get_user_env = val.get_user_env();
}

void SetFieldsByPersistedPart(
Expand All @@ -442,6 +447,37 @@ struct TaskInCtld {
}
};

struct TaskInDb {
crane::grpc::TaskType type;
task_id_t task_id;
std::string name;
PartitionId partition_id;
uid_t uid;

gid_t gid;
absl::Duration time_limit;
google::protobuf::Timestamp submit_time;
google::protobuf::Timestamp start_time;
google::protobuf::Timestamp end_time;
std::string account;

uint32_t node_num{0};
std::string cmd_line;
std::string cwd;
std::string username;
std::string qos;

Resources resources;
uint32_t exit_code;
crane::grpc::TaskStatus status;
std::string allocated_craneds_regex;

void SetSubmitTimeByUnixSecond(uint64_t val) { submit_time.set_seconds(val); }
void SetStartTimeByUnixSecond(uint64_t val) { start_time.set_seconds(val); }

void SetEndTimeByUnixSecond(uint64_t val) { end_time.set_seconds(val); }
};

struct Qos {
bool deleted = false;
std::string name;
Expand Down
Loading

0 comments on commit 11b76b7

Please sign in to comment.