Skip to content

Commit

Permalink
dev/bulk_optimization (#199)
Browse files Browse the repository at this point in the history
* Bulk optimized for submitting and canceling tasks

* Add std::promise/future to get result after submit a task asynchronously

* Sleep uvw loop

* Refactor

Signed-off-by: RileyW <wrllrwwrllrw@gmail.com>

---------

Signed-off-by: RileyW <wrllrwwrllrw@gmail.com>
Co-authored-by: RileyW <wrllrwwrllrw@gmail.com>
  • Loading branch information
MCKZX-llx and RileyWen authored Nov 15, 2023
1 parent 8cc29bb commit 86835eb
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 104 deletions.
48 changes: 29 additions & 19 deletions src/CraneCtld/CtldGrpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ grpc::Status CraneCtldServiceImpl::SubmitBatchTask(
auto result = m_ctld_server_->SubmitTaskToScheduler(std::move(task));
if (result.has_value()) {
response->set_ok(true);
response->set_task_id(result.value());
response->set_task_id(result.value().get());
} else {
response->set_ok(false);
response->set_reason(result.error());
Expand All @@ -49,16 +49,21 @@ grpc::Status CraneCtldServiceImpl::SubmitBatchTasks(
grpc::ServerContext *context,
const crane::grpc::SubmitBatchTasksRequest *request,
crane::grpc::SubmitBatchTasksReply *response) {
std::vector<result::result<std::future<task_id_t>, std::string>> results;

for (auto const &task_to_ctld : request->tasks()) {
auto task = std::make_unique<TaskInCtld>();
task->SetFieldsByTaskToCtld(task_to_ctld);

auto result = m_ctld_server_->SubmitTaskToScheduler(std::move(task));
if (result.has_value()) {
response->mutable_task_id_list()->Add(result.value());
} else {
response->mutable_reason_list()->Add(result.error());
}
results.emplace_back(std::move(result));
}

for (auto &res : results) {
if (res.has_value())
response->mutable_task_id_list()->Add(res.value().get());
else
response->mutable_reason_list()->Add(res.error());
}

return grpc::Status::OK;
Expand Down Expand Up @@ -1197,7 +1202,8 @@ grpc::Status CraneCtldServiceImpl::CforedStream(
auto result =
m_ctld_server_->SubmitTaskToScheduler(std::move(task));

ok = stream_writer.WriteTaskIdReply(payload.pid(), result);
ok = stream_writer.WriteTaskIdReply(payload.pid(),
std::move(result));
if (!ok) {
CRANE_ERROR(
"Failed to send msg to cfored {}. Connection is broken. "
Expand All @@ -1208,7 +1214,7 @@ grpc::Status CraneCtldServiceImpl::CforedStream(
if (result.has_value()) {
m_ctld_server_->m_mtx_.Lock();
m_ctld_server_->m_cfored_running_tasks_[cfored_name].emplace(
result.value());
result.value().get());
m_ctld_server_->m_mtx_.Unlock();
}
}
Expand Down Expand Up @@ -1324,8 +1330,8 @@ CtldServer::CtldServer(const Config::CraneCtldListenConf &listen_conf) {
signal(SIGINT, &CtldServer::signal_handler_func);
}

result::result<task_id_t, std::string> CtldServer::SubmitTaskToScheduler(
std::unique_ptr<TaskInCtld> task) {
result::result<std::future<task_id_t>, std::string>
CtldServer::SubmitTaskToScheduler(std::unique_ptr<TaskInCtld> task) {
CraneErr err;

if (!task->password_entry->Valid()) {
Expand Down Expand Up @@ -1367,12 +1373,19 @@ result::result<task_id_t, std::string> CtldServer::SubmitTaskToScheduler(
return result::fail(enable_res.error());
}

uint32_t task_id;
err = g_task_scheduler->SubmitTask(std::move(task), &task_id);
err = g_task_scheduler->AcquireTaskAttributes(task.get());

if (err == CraneErr::kOk)
err = g_task_scheduler->CheckTaskValidity(task.get());

if (err == CraneErr::kOk) {
return {task_id};
CRANE_DEBUG("Received an task request. Task id allocated: {}", task_id);
} else if (err == CraneErr::kNonExistent) {
task->SetSubmitTime(absl::Now());
std::future<task_id_t> future =
g_task_scheduler->SubmitTaskAsync(std::move(task));
return {std::move(future)};
}

if (err == CraneErr::kNonExistent) {
CRANE_DEBUG("Task submission failed. Reason: Partition doesn't exist!");
return result::fail("Partition doesn't exist!");
} else if (err == CraneErr::kInvalidNodeNum) {
Expand All @@ -1392,11 +1405,8 @@ result::result<task_id_t, std::string> CtldServer::SubmitTaskToScheduler(
"Task submission failed. "
"Reason: The param of task is invalid.");
return result::fail("The param of task is invalid.");
} else {
return result::fail(CraneErrStr(err));
}

return {task_id};
return result::fail(CraneErrStr(err));
}

} // namespace Ctld
10 changes: 6 additions & 4 deletions src/CraneCtld/CtldGrpcServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ class CforedStreamWriter {
crane::grpc::StreamCforedRequest> *stream)
: m_stream_(stream), m_valid_(true) {}

bool WriteTaskIdReply(pid_t calloc_pid,
result::result<task_id_t, std::string> res) {
bool WriteTaskIdReply(
pid_t calloc_pid,
result::result<std::future<task_id_t>, std::string> res) {
LockGuard guard(&m_stream_mtx_);
if (!m_valid_) return false;

Expand All @@ -54,7 +55,7 @@ class CforedStreamWriter {
if (res.has_value()) {
task_id_reply->set_ok(true);
task_id_reply->set_pid(calloc_pid);
task_id_reply->set_task_id(res.value());
task_id_reply->set_task_id(res.value().get());
} else {
task_id_reply->set_ok(false);
task_id_reply->set_pid(calloc_pid);
Expand Down Expand Up @@ -175,6 +176,7 @@ class CraneCtldServiceImpl final : public crane::grpc::CraneCtld::Service {
const crane::grpc::SubmitBatchTaskRequest *request,
crane::grpc::SubmitBatchTaskReply *response) override;

// This gRPC is for testing purposes only
grpc::Status SubmitBatchTasks(
grpc::ServerContext *context,
const crane::grpc::SubmitBatchTasksRequest *request,
Expand Down Expand Up @@ -259,7 +261,7 @@ class CtldServer {

inline void Wait() { m_server_->Wait(); }

result::result<task_id_t, std::string> SubmitTaskToScheduler(
result::result<std::future<task_id_t>, std::string> SubmitTaskToScheduler(
std::unique_ptr<TaskInCtld> task);

private:
Expand Down
2 changes: 2 additions & 0 deletions src/CraneCtld/CtldPublicDefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ constexpr uint64_t kTaskScheduleIntervalMs = 1000;
constexpr uint16_t kCompletionQueueDelaySeconds = 15;
constexpr uint32_t kCancelTaskTimeoutMs = 500;
constexpr uint64_t kCancelTaskBatchNum = 1000;
constexpr uint32_t kSubmitTaskTimeoutMs = 500;
constexpr uint64_t kSubmitTaskBatchNum = 1000;

struct Config {
struct Node {
Expand Down
23 changes: 23 additions & 0 deletions src/CraneCtld/DbClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,29 @@ bool MongodbClient::InsertJob(TaskInCtld* task) {
return false;
}

bool MongodbClient::InsertJobs(const std::vector<TaskInCtld*>& tasks) {
if (tasks.empty()) return false;
std::vector<bsoncxx::document::value> documents;

for (const auto& task : tasks) {
document doc = TaskInCtldToDocument_(task);
documents.push_back(doc.extract());
}

mongocxx::options::insert insert_options;
insert_options.ordered(false); // unordered to speed up the operation

bsoncxx::stdx::optional<mongocxx::result::insert_many> ret =
(*GetClient_())[m_db_name_][m_task_collection_name_].insert_many(
*GetSession_(), documents, insert_options);

if (ret != bsoncxx::stdx::nullopt && ret->inserted_count() == tasks.size())
return true;

PrintError_("Failed to insert in-memory TaskInCtld.");
return false;
}

bool MongodbClient::FetchJobRecords(
std::vector<std::unique_ptr<Ctld::TaskInCtld>>* task_list, size_t limit,
bool reverse) {
Expand Down
1 change: 1 addition & 0 deletions src/CraneCtld/DbClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class MongodbClient {
bool InsertRecoveredJob(
crane::grpc::TaskInEmbeddedDb const& task_in_embedded_db);
bool InsertJob(TaskInCtld* task);
bool InsertJobs(const std::vector<TaskInCtld*>& tasks);

// Todo: Ugly interface! Since the task is fetch from DB, TaskInCtld is
// not a good type choice here!
Expand Down
Loading

0 comments on commit 86835eb

Please sign in to comment.