Skip to content

Commit

Permalink
feat: Add function for task launch in Supervisor.
Browse files Browse the repository at this point in the history
Signed-off-by: Li Junlin <xiafeng.li@foxmail.com>
  • Loading branch information
L-Xiafeng committed Dec 23, 2024
1 parent acf50b1 commit 6b31364
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 153 deletions.
34 changes: 0 additions & 34 deletions src/Craned/Craned/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,39 +220,6 @@ class JobManager {
uint32_t exit_code,
std::optional<std::string> reason);

// todo: Move timer to Supervisor
template <typename Duration>
void AddTerminationTimer_(TaskInstance* instance, Duration duration) {
auto termination_handel = m_uvw_loop_->resource<uvw::timer_handle>();
termination_handel->on<uvw::timer_event>(
[this, task_id = instance->task.task_id()](const uvw::timer_event&,
uvw::timer_handle& h) {
EvTaskTimerCb_(task_id);
});
termination_handel->start(
std::chrono::duration_cast<std::chrono::milliseconds>(duration),
std::chrono::seconds(0));
instance->termination_timer = termination_handel;
}

void AddTerminationTimer_(TaskInstance* instance, int64_t secs) {
auto termination_handel = m_uvw_loop_->resource<uvw::timer_handle>();
termination_handel->on<uvw::timer_event>(
[this, task_id = instance->task.task_id()](const uvw::timer_event&,
uvw::timer_handle& h) {
EvTaskTimerCb_(task_id);
});
termination_handel->start(std::chrono::seconds(secs),
std::chrono::seconds(0));
instance->termination_timer = termination_handel;
}

static void DelTerminationTimer_(TaskInstance* instance) {
// Close handle before free
instance->termination_timer->close();
instance->termination_timer.reset();
}

// todo: Refactor this, send rpc to supervisor
/**
* Send a signal to the process group to which the processes in
Expand Down Expand Up @@ -315,7 +282,6 @@ class JobManager {

void EvCleanChangeTaskTimeLimitQueueCb_();

void EvTaskTimerCb_(task_id_t task_id);

std::shared_ptr<uvw::loop> m_uvw_loop_;

Expand Down
14 changes: 0 additions & 14 deletions src/Craned/Supervisor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,9 @@ add_executable(supervisor

target_precompile_headers(supervisor PRIVATE SupervisorPreCompiledHeader.h)

add_dependencies(supervisor libcgroup)
target_include_directories(
supervisor PRIVATE ${LIBCGROUP_PATH}/include/
)

if (ENABLE_BPF)
add_dependencies(supervisor cgroup_dev_bpf_object)
target_compile_definitions(supervisor PRIVATE CRANE_ENABLE_BPF)
endif ()

target_link_libraries(supervisor
$<$<BOOL:${CRANE_USE_MIMALLOC}>:dev_mimalloc>

${LIBCGROUP_BUILD_PRODUCTS}

Utility_PublicHeader
Utility_PluginClient

Expand All @@ -45,8 +33,6 @@ target_link_libraries(supervisor

uvw

yaml-cpp

Backward::Interface

$<$<BOOL:${ENABLE_BPF}>:bpf>
Expand Down
6 changes: 3 additions & 3 deletions src/Craned/Supervisor/CforedClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void CforedClient::CleanOutputQueueAndWriteToStreamThread_(
StreamCforedTaskIOReply>* stream,
std::atomic<bool>* write_pending) {
CRANE_TRACE("CleanOutputQueueThread started.");
std::pair<task_id_t, std::string> output;
std::string output;
bool ok = m_output_queue_.try_dequeue(output);

// Make sure before exit all output has been drained.
Expand All @@ -76,7 +76,7 @@ void CforedClient::CleanOutputQueueAndWriteToStreamThread_(
request.set_type(StreamCforedTaskIORequest::CRANED_TASK_OUTPUT);

auto* payload = request.mutable_payload_task_output_req();
payload->set_msg(output.second), payload->set_task_id(output.first);
payload->set_msg(output), payload->set_task_id(g_config.TaskId);

while (write_pending->load(std::memory_order::acquire))
std::this_thread::sleep_for(std::chrono::milliseconds(25));
Expand Down Expand Up @@ -460,7 +460,7 @@ void CforedManager::UnregisterIOForward_() { m_unregister_handle_->send(); }

void CforedManager::UnregisterCb_() {
m_cfored_client_.reset();
g_job_mgr->TaskStopAndDoStatusChange();
g_task_mgr->TaskStopAndDoStatusChange();
}

} // namespace Supervisor
33 changes: 0 additions & 33 deletions src/Craned/Supervisor/Supervisor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,42 +95,11 @@ void InitFromStdin(int argc, char** argv) {
}
}

void CreatePidFile() {
pid_t pid = getpid();
auto pid_file_path =
Supervisor::kSupervisorPidFileDir /
std::filesystem::path(fmt::format("supervisor_{}.pid", g_config.TaskId));
if (std::filesystem::exists(pid_file_path)) {
std::ifstream pid_file(pid_file_path);
pid_t existing_pid;
pid_file >> existing_pid;

if (kill(existing_pid, 0) == 0) {
CRANE_TRACE("Supervisor is already running with PID: {}", existing_pid);
std::exit(1);
} else {
CRANE_TRACE("Stale PID file detected. Cleaning up.");
std::filesystem::remove(pid_file_path);
}
}
std::ofstream pid_file(pid_file_path, std::ios::out | std::ios::trunc);
if (!pid_file) {
CRANE_TRACE("Failed to create PID file: {}", pid_file_path);
std::exit(1);
}
pid_file << pid << std::endl;
pid_file.flush();
pid_file.close();
}

void CreateRequiredDirectories() {
bool ok;
ok = util::os::CreateFolders(g_config.CraneScriptDir);
if (!ok) std::exit(1);

ok = util::os::CreateFolders(Supervisor::kSupervisorPidFileDir);
if (!ok) std::exit(1);

if (g_config.SupervisorDebugLevel != "off") {
ok = util::os::CreateFoldersForFile(g_config.SupervisorLogFile);
if (!ok) std::exit(1);
Expand All @@ -153,8 +122,6 @@ void GlobalVariableInit() {
signal(SIGALRM, SIG_IGN);
signal(SIGHUP, SIG_IGN);

CreatePidFile();

PasswordEntry::InitializeEntrySize();

g_thread_pool =
Expand Down
Loading

0 comments on commit 6b31364

Please sign in to comment.