Skip to content

Commit

Permalink
Merge pull request #1518 from PietroGhg/pietro/coverity
Browse files Browse the repository at this point in the history
[NATIVECPU] Address coverity warnings in threadpool implementation
  • Loading branch information
kbenzie authored Apr 25, 2024
2 parents 48facdd + 56ffd29 commit 37242e3
Showing 1 changed file with 35 additions and 57 deletions.
92 changes: 35 additions & 57 deletions source/adapters/native_cpu/threadpool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
#include <atomic>
#include <condition_variable>
#include <cstdlib>
#include <forward_list>
#include <functional>
#include <future>
#include <iostream>
#include <iterator>
#include <mutex>
#include <numeric>
#include <queue>
Expand All @@ -30,18 +31,14 @@ namespace detail {
class worker_thread {
public:
// Initializes state, but does not start the worker thread
worker_thread() noexcept : m_isRunning(false), m_numTasks(0) {}

// Creates and launches the worker thread
inline void start(size_t threadId) {
worker_thread(size_t threadId) noexcept
: m_threadId(threadId), m_isRunning(false), m_numTasks(0) {
std::lock_guard<std::mutex> lock(m_workMutex);
if (this->is_running()) {
return;
}
m_threadId = threadId;
m_worker = std::thread([this]() {
while (true) {
// pin the thread to the cpu
std::unique_lock<std::mutex> lock(m_workMutex);
// Wait until there's work available
m_startWorkCondition.wait(
Expand All @@ -51,7 +48,7 @@ class worker_thread {
break;
}
// Retrieve a task from the queue
auto task = m_tasks.front();
worker_task_t task = std::move(m_tasks.front());
m_tasks.pop();

// Not modifying internal state anymore, can release the mutex
Expand All @@ -63,7 +60,7 @@ class worker_thread {
}
});

m_isRunning = true;
m_isRunning.store(true, std::memory_order_release);
}

inline void schedule(const worker_task_t &task) {
Expand All @@ -79,16 +76,12 @@ class worker_thread {
size_t num_pending_tasks() const noexcept {
// m_numTasks is an atomic counter because we don't want to lock the mutex
// here, num_pending_tasks is only used for heuristics
return m_numTasks.load();
return m_numTasks.load(std::memory_order_acquire);
}

// Waits for all tasks to finish and destroys the worker thread
inline void stop() {
{
// Notify the worker thread to stop executing
std::lock_guard<std::mutex> lock(m_workMutex);
m_isRunning = false;
}
m_isRunning.store(false, std::memory_order_release);
m_startWorkCondition.notify_all();
if (m_worker.joinable()) {
// Wait for the worker thread to finish handling the task queue
Expand All @@ -97,18 +90,21 @@ class worker_thread {
}

// Checks whether the thread pool is currently running threads
inline bool is_running() const noexcept { return m_isRunning; }
inline bool is_running() const noexcept {
return m_isRunning.load(std::memory_order_acquire);
}

private:
// Unique ID identifying the thread in the threadpool
size_t m_threadId;
const size_t m_threadId;

std::thread m_worker;

std::mutex m_workMutex;

std::condition_variable m_startWorkCondition;

bool m_isRunning;
std::atomic<bool> m_isRunning;

std::queue<worker_task_t> m_tasks;

Expand All @@ -121,47 +117,21 @@ class worker_thread {
// parameters and futures.
class simple_thread_pool {
public:
simple_thread_pool(size_t numThreads = 0) noexcept : m_isRunning(false) {
this->resize(numThreads);
this->start();
}

~simple_thread_pool() { this->stop(); }

// Creates and launches the worker threads
inline void start() {
if (this->is_running()) {
return;
}
size_t threadId = 0;
for (auto &t : m_workers) {
t.start(threadId);
threadId++;
simple_thread_pool() noexcept
: m_isRunning(false), m_numThreads(get_num_threads()) {
for (size_t i = 0; i < m_numThreads; i++) {
m_workers.emplace_front(i);
}
m_isRunning.store(true, std::memory_order_release);
}

// Waits for all tasks to finish and destroys the worker threads
inline void stop() {
~simple_thread_pool() {
for (auto &t : m_workers) {
t.stop();
}
m_isRunning.store(false, std::memory_order_release);
}

inline void resize(size_t numThreads) {
char *envVar = std::getenv("SYCL_NATIVE_CPU_HOST_THREADS");
if (envVar) {
numThreads = std::stoul(envVar);
}
if (numThreads == 0) {
numThreads = std::thread::hardware_concurrency();
}
if (!this->is_running() && (numThreads != this->num_threads())) {
m_workers = decltype(m_workers)(numThreads);
}
}

inline void schedule(const worker_task_t &task) {
// Schedule the task on the best available worker thread
this->best_worker().schedule(task);
Expand All @@ -171,7 +141,7 @@ class simple_thread_pool {
return m_isRunning.load(std::memory_order_acquire);
}

inline size_t num_threads() const noexcept { return m_workers.size(); }
inline size_t num_threads() const noexcept { return m_numThreads; }

inline size_t num_pending_tasks() const noexcept {
return std::accumulate(std::begin(m_workers), std::end(m_workers),
Expand Down Expand Up @@ -201,24 +171,32 @@ class simple_thread_pool {
}

private:
std::vector<worker_thread> m_workers;
static size_t get_num_threads() {
size_t numThreads;
char *envVar = std::getenv("SYCL_NATIVE_CPU_HOST_THREADS");
if (envVar) {
numThreads = std::stoul(envVar);
} else {
numThreads = std::thread::hardware_concurrency();
}
return numThreads;
}

std::forward_list<worker_thread> m_workers;

std::atomic<bool> m_isRunning;

const size_t m_numThreads;
};
} // namespace detail

template <typename ThreadPoolT> class threadpool_interface {
ThreadPoolT threadpool;

public:
void start() { threadpool.start(); }

void stop() { threadpool.stop(); }

size_t num_threads() const noexcept { return threadpool.num_threads(); }

threadpool_interface(size_t numThreads) : threadpool(numThreads) {}
threadpool_interface() : threadpool(0) {}
threadpool_interface() : threadpool() {}

auto schedule_task(worker_task_t &&task) {
auto workerTask = std::make_shared<std::packaged_task<void(size_t)>>(
Expand Down

0 comments on commit 37242e3

Please sign in to comment.