diff --git a/source/adapters/native_cpu/device.cpp b/source/adapters/native_cpu/device.cpp index 1c9b2e20eb..1babdb0f10 100644 --- a/source/adapters/native_cpu/device.cpp +++ b/source/adapters/native_cpu/device.cpp @@ -98,7 +98,7 @@ UR_APIEXPORT ur_result_t UR_APICALL urDeviceGetInfo(ur_device_handle_t hDevice, case UR_DEVICE_INFO_LINKER_AVAILABLE: return ReturnValue(bool{false}); case UR_DEVICE_INFO_MAX_COMPUTE_UNITS: - return ReturnValue(uint32_t{256}); + return ReturnValue(static_cast(hDevice->tp.num_threads())); case UR_DEVICE_INFO_PARTITION_MAX_SUB_DEVICES: return ReturnValue(uint32_t{0}); case UR_DEVICE_INFO_SUPPORTED_PARTITIONS: @@ -138,7 +138,10 @@ UR_APIEXPORT ur_result_t UR_APICALL urDeviceGetInfo(ur_device_handle_t hDevice, case UR_DEVICE_INFO_MAX_WORK_ITEM_DIMENSIONS: return ReturnValue(uint32_t{3}); case UR_DEVICE_INFO_PARTITION_TYPE: - return ReturnValue(ur_device_partition_property_t{}); + if (pPropSizeRet) { + *pPropSizeRet = 0; + } + return UR_RESULT_SUCCESS; case UR_EXT_DEVICE_INFO_OPENCL_C_VERSION: return ReturnValue(""); case UR_DEVICE_INFO_QUEUE_PROPERTIES: @@ -158,15 +161,22 @@ UR_APIEXPORT ur_result_t UR_APICALL urDeviceGetInfo(ur_device_handle_t hDevice, case UR_DEVICE_INFO_PREFERRED_VECTOR_WIDTH_FLOAT: case UR_DEVICE_INFO_PREFERRED_VECTOR_WIDTH_DOUBLE: case UR_DEVICE_INFO_PREFERRED_VECTOR_WIDTH_HALF: + // TODO: How can we query vector width in a platform + // independent way? case UR_DEVICE_INFO_NATIVE_VECTOR_WIDTH_CHAR: + return ReturnValue(uint32_t{32}); case UR_DEVICE_INFO_NATIVE_VECTOR_WIDTH_SHORT: + return ReturnValue(uint32_t{16}); case UR_DEVICE_INFO_NATIVE_VECTOR_WIDTH_INT: + return ReturnValue(uint32_t{8}); case UR_DEVICE_INFO_NATIVE_VECTOR_WIDTH_LONG: + return ReturnValue(uint32_t{4}); case UR_DEVICE_INFO_NATIVE_VECTOR_WIDTH_FLOAT: + return ReturnValue(uint32_t{8}); case UR_DEVICE_INFO_NATIVE_VECTOR_WIDTH_DOUBLE: + return ReturnValue(uint32_t{4}); case UR_DEVICE_INFO_NATIVE_VECTOR_WIDTH_HALF: - return ReturnValue(uint32_t{1}); - + return ReturnValue(uint32_t{16}); // Imported from level_zero case UR_DEVICE_INFO_USM_HOST_SUPPORT: case UR_DEVICE_INFO_USM_DEVICE_SUPPORT: @@ -213,10 +223,10 @@ UR_APIEXPORT ur_result_t UR_APICALL urDeviceGetInfo(ur_device_handle_t hDevice, return ReturnValue(uint64_t{0}); case UR_DEVICE_INFO_GLOBAL_MEM_SIZE: // TODO : CHECK - return ReturnValue(uint64_t{0}); + return ReturnValue(uint64_t{32768}); case UR_DEVICE_INFO_LOCAL_MEM_SIZE: // TODO : CHECK - return ReturnValue(uint64_t{0}); + return ReturnValue(uint64_t{32768}); case UR_DEVICE_INFO_MAX_CONSTANT_BUFFER_SIZE: // TODO : CHECK return ReturnValue(uint64_t{0}); @@ -256,7 +266,7 @@ UR_APIEXPORT ur_result_t UR_APICALL urDeviceGetInfo(ur_device_handle_t hDevice, case UR_DEVICE_INFO_BUILD_ON_SUBDEVICE: return ReturnValue(bool{0}); case UR_DEVICE_INFO_ATOMIC_64: - return ReturnValue(bool{0}); + return ReturnValue(bool{1}); case UR_DEVICE_INFO_BFLOAT16: return ReturnValue(bool{0}); case UR_DEVICE_INFO_MEM_CHANNEL_SUPPORT: diff --git a/source/adapters/native_cpu/device.hpp b/source/adapters/native_cpu/device.hpp index 60dacde5bc..01245410c9 100644 --- a/source/adapters/native_cpu/device.hpp +++ b/source/adapters/native_cpu/device.hpp @@ -10,9 +10,11 @@ #pragma once +#include "threadpool.hpp" #include struct ur_device_handle_t_ { + native_cpu::threadpool_t tp; ur_device_handle_t_(ur_platform_handle_t ArgPlt) : Platform(ArgPlt) {} ur_platform_handle_t Platform; diff --git a/source/adapters/native_cpu/enqueue.cpp b/source/adapters/native_cpu/enqueue.cpp index 75c2caeac0..836d6e3cfc 100644 --- a/source/adapters/native_cpu/enqueue.cpp +++ b/source/adapters/native_cpu/enqueue.cpp @@ -1,20 +1,22 @@ //===----------- enqueue.cpp - NATIVE CPU Adapter -------------------------===// // -// Copyright (C) 2023 Intel Corporation -// -// Part of the Unified-Runtime Project, under the Apache License v2.0 with LLVM -// Exceptions. See LICENSE.TXT +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception // //===----------------------------------------------------------------------===// #include +#include #include +#include #include "ur_api.h" #include "common.hpp" #include "kernel.hpp" #include "memory.hpp" +#include "queue.hpp" +#include "threadpool.hpp" namespace native_cpu { struct NDRDescT { @@ -37,9 +39,29 @@ struct NDRDescT { GlobalOffset[I] = 0; } } + + void dump(std::ostream &os) const { + os << "GlobalSize: " << GlobalSize[0] << " " << GlobalSize[1] << " " + << GlobalSize[2] << "\n"; + os << "LocalSize: " << LocalSize[0] << " " << LocalSize[1] << " " + << LocalSize[2] << "\n"; + os << "GlobalOffset: " << GlobalOffset[0] << " " << GlobalOffset[1] << " " + << GlobalOffset[2] << "\n"; + } }; } // namespace native_cpu +#ifdef NATIVECPU_USE_OCK +static native_cpu::state getResizedState(const native_cpu::NDRDescT &ndr, + size_t itemsPerThread) { + native_cpu::state resized_state( + ndr.GlobalSize[0], ndr.GlobalSize[1], ndr.GlobalSize[2], itemsPerThread, + ndr.LocalSize[1], ndr.LocalSize[2], ndr.GlobalOffset[0], + ndr.GlobalOffset[1], ndr.GlobalOffset[2]); + return resized_state; +} +#endif + UR_APIEXPORT ur_result_t UR_APICALL urEnqueueKernelLaunch( ur_queue_handle_t hQueue, ur_kernel_handle_t hKernel, uint32_t workDim, const size_t *pGlobalWorkOffset, const size_t *pGlobalWorkSize, @@ -63,23 +85,23 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueKernelLaunch( // TODO: add proper event dep management native_cpu::NDRDescT ndr(workDim, pGlobalWorkOffset, pGlobalWorkSize, pLocalWorkSize); - hKernel->handleLocalArgs(); - + auto &tp = hQueue->device->tp; + const size_t numParallelThreads = tp.num_threads(); + hKernel->updateMemPool(numParallelThreads); + std::vector> futures; + std::vector> groups; + auto numWG0 = ndr.GlobalSize[0] / ndr.LocalSize[0]; + auto numWG1 = ndr.GlobalSize[1] / ndr.LocalSize[1]; + auto numWG2 = ndr.GlobalSize[2] / ndr.LocalSize[2]; native_cpu::state state(ndr.GlobalSize[0], ndr.GlobalSize[1], ndr.GlobalSize[2], ndr.LocalSize[0], ndr.LocalSize[1], ndr.LocalSize[2], ndr.GlobalOffset[0], ndr.GlobalOffset[1], ndr.GlobalOffset[2]); - - auto numWG0 = ndr.GlobalSize[0] / ndr.LocalSize[0]; - auto numWG1 = ndr.GlobalSize[1] / ndr.LocalSize[1]; - auto numWG2 = ndr.GlobalSize[2] / ndr.LocalSize[2]; +#ifndef NATIVECPU_USE_OCK + hKernel->handleLocalArgs(1, 0); for (unsigned g2 = 0; g2 < numWG2; g2++) { for (unsigned g1 = 0; g1 < numWG1; g1++) { for (unsigned g0 = 0; g0 < numWG0; g0++) { -#ifdef NATIVECPU_USE_OCK - state.update(g0, g1, g2); - hKernel->_subhandler(hKernel->_args.data(), &state); -#else for (unsigned local2 = 0; local2 < ndr.LocalSize[2]; local2++) { for (unsigned local1 = 0; local1 < ndr.LocalSize[1]; local1++) { for (unsigned local0 = 0; local0 < ndr.LocalSize[0]; local0++) { @@ -88,10 +110,118 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueKernelLaunch( } } } -#endif } } } +#else + bool isLocalSizeOne = + ndr.LocalSize[0] == 1 && ndr.LocalSize[1] == 1 && ndr.LocalSize[2] == 1; + if (isLocalSizeOne && ndr.GlobalSize[0] > numParallelThreads) { + // If the local size is one, we make the assumption that we are running a + // parallel_for over a sycl::range. + // Todo: we could add compiler checks and + // kernel properties for this (e.g. check that no barriers are called, no + // local memory args). + + // Todo: this assumes that dim 0 is the best dimension over which we want to + // parallelize + + // Since we also vectorize the kernel, and vectorization happens within the + // work group loop, it's better to have a large-ish local size. We can + // divide the global range by the number of threads, set that as the local + // size and peel everything else. + + size_t new_num_work_groups_0 = numParallelThreads; + size_t itemsPerThread = ndr.GlobalSize[0] / numParallelThreads; + + for (unsigned g2 = 0; g2 < numWG2; g2++) { + for (unsigned g1 = 0; g1 < numWG1; g1++) { + for (unsigned g0 = 0; g0 < new_num_work_groups_0; g0 += 1) { + futures.emplace_back( + tp.schedule_task([&ndr = std::as_const(ndr), itemsPerThread, + hKernel, g0, g1, g2](size_t) { + native_cpu::state resized_state = + getResizedState(ndr, itemsPerThread); + resized_state.update(g0, g1, g2); + hKernel->_subhandler(hKernel->_args.data(), &resized_state); + })); + } + // Peel the remaining work items. Since the local size is 1, we iterate + // over the work groups. + for (unsigned g0 = new_num_work_groups_0 * itemsPerThread; g0 < numWG0; + g0++) { + state.update(g0, g1, g2); + hKernel->_subhandler(hKernel->_args.data(), &state); + } + } + } + + } else { + // We are running a parallel_for over an nd_range + + if (numWG1 * numWG2 >= numParallelThreads) { + // Dimensions 1 and 2 have enough work, split them across the threadpool + for (unsigned g2 = 0; g2 < numWG2; g2++) { + for (unsigned g1 = 0; g1 < numWG1; g1++) { + futures.emplace_back( + tp.schedule_task([state, kernel = *hKernel, numWG0, g1, g2, + numParallelThreads](size_t threadId) mutable { + for (unsigned g0 = 0; g0 < numWG0; g0++) { + kernel.handleLocalArgs(numParallelThreads, threadId); + state.update(g0, g1, g2); + kernel._subhandler(kernel._args.data(), &state); + } + })); + } + } + } else { + // Split dimension 0 across the threadpool + // Here we try to create groups of workgroups in order to reduce + // synchronization overhead + for (unsigned g2 = 0; g2 < numWG2; g2++) { + for (unsigned g1 = 0; g1 < numWG1; g1++) { + for (unsigned g0 = 0; g0 < numWG0; g0++) { + groups.push_back( + [state, g0, g1, g2, numParallelThreads]( + size_t threadId, ur_kernel_handle_t_ kernel) mutable { + kernel.handleLocalArgs(numParallelThreads, threadId); + state.update(g0, g1, g2); + kernel._subhandler(kernel._args.data(), &state); + }); + } + } + } + auto numGroups = groups.size(); + auto groupsPerThread = numGroups / numParallelThreads; + auto remainder = numGroups % numParallelThreads; + for (unsigned thread = 0; thread < numParallelThreads; thread++) { + futures.emplace_back(tp.schedule_task( + [&groups, thread, groupsPerThread, hKernel](size_t threadId) { + for (unsigned i = 0; i < groupsPerThread; i++) { + auto index = thread * groupsPerThread + i; + groups[index](threadId, *hKernel); + } + })); + } + + // schedule the remaining tasks + if (remainder) { + futures.emplace_back( + tp.schedule_task([&groups, remainder, + scheduled = numParallelThreads * groupsPerThread, + hKernel](size_t threadId) { + for (unsigned i = 0; i < remainder; i++) { + auto index = scheduled + i; + groups[index](threadId, *hKernel); + } + })); + } + } + } + + for (auto &f : futures) + f.get(); +#endif // NATIVECPU_USE_OCK // TODO: we should avoid calling clear here by avoiding using push_back // in setKernelArgs. hKernel->_args.clear(); diff --git a/source/adapters/native_cpu/kernel.hpp b/source/adapters/native_cpu/kernel.hpp index d608bdcbd3..9023a23fb2 100644 --- a/source/adapters/native_cpu/kernel.hpp +++ b/source/adapters/native_cpu/kernel.hpp @@ -1,9 +1,7 @@ //===--------------- kernel.hpp - Native CPU Adapter ----------------------===// // -// Copyright (C) 2023 Intel Corporation -// -// Part of the Unified-Runtime Project, under the Apache License v2.0 with LLVM -// Exceptions. See LICENSE.TXT +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception // //===----------------------------------------------------------------------===// @@ -42,50 +40,52 @@ struct ur_kernel_handle_t_ : RefCounted { ur_kernel_handle_t_(const char *name, nativecpu_task_t subhandler) : _name{name}, _subhandler{std::move(subhandler)} {} - const char *_name; - nativecpu_task_t _subhandler; - std::vector _args; - std::vector _localArgInfo; - - // To be called before enqueing the kernel. - void handleLocalArgs() { - updateMemPool(); - size_t offset = 0; - for (auto &entry : _localArgInfo) { - _args[entry.argIndex].MPtr = - reinterpret_cast(_localMemPool) + offset; - // update offset in the memory pool - // Todo: update this offset computation when we have work-group - // level parallelism. - offset += entry.argSize; - } + ur_kernel_handle_t_(const ur_kernel_handle_t_ &other) + : _name(other._name), _subhandler(other._subhandler), _args(other._args), + _localArgInfo(other._localArgInfo), _localMemPool(other._localMemPool), + _localMemPoolSize(other._localMemPoolSize) { + incrementReferenceCount(); } ~ur_kernel_handle_t_() { - if (_localMemPool) { + if (decrementReferenceCount() == 0) { free(_localMemPool); } } -private: - void updateMemPool() { + const char *_name; + nativecpu_task_t _subhandler; + std::vector _args; + std::vector _localArgInfo; + + // To be called before enqueueing the kernel. + void updateMemPool(size_t numParallelThreads) { // compute requested size. - // Todo: currently we execute only one work-group at a time, so for each - // local arg we can allocate just 1 * argSize local arg. When we implement - // work-group level parallelism we should allocate N * argSize where N is - // the number of work groups being executed in parallel (e.g. number of - // threads in the thread pool). size_t reqSize = 0; for (auto &entry : _localArgInfo) { - reqSize += entry.argSize; + reqSize += entry.argSize * numParallelThreads; } if (reqSize == 0 || reqSize == _localMemPoolSize) { return; } // realloc handles nullptr case - _localMemPool = realloc(_localMemPool, reqSize); + _localMemPool = (char *)realloc(_localMemPool, reqSize); _localMemPoolSize = reqSize; } - void *_localMemPool = nullptr; + + // To be called before executing a work group + void handleLocalArgs(size_t numParallelThread, size_t threadId) { + // For each local argument we have size*numthreads + size_t offset = 0; + for (auto &entry : _localArgInfo) { + _args[entry.argIndex].MPtr = + _localMemPool + offset + (entry.argSize * threadId); + // update offset in the memory pool + offset += entry.argSize * numParallelThread; + } + } + +private: + char *_localMemPool = nullptr; size_t _localMemPoolSize = 0; }; diff --git a/source/adapters/native_cpu/nativecpu_state.hpp b/source/adapters/native_cpu/nativecpu_state.hpp old mode 100644 new mode 100755 index c5016fc5aa..bb798b22e6 --- a/source/adapters/native_cpu/nativecpu_state.hpp +++ b/source/adapters/native_cpu/nativecpu_state.hpp @@ -19,6 +19,7 @@ struct state { size_t MLocal_id[3]; size_t MNumGroups[3]; size_t MGlobalOffset[3]; + uint32_t NumSubGroups, SubGroup_id, SubGroup_local_id, SubGroup_size; state(size_t globalR0, size_t globalR1, size_t globalR2, size_t localR0, size_t localR1, size_t localR2, size_t globalO0, size_t globalO1, size_t globalO2) @@ -36,6 +37,10 @@ struct state { MLocal_id[0] = 0; MLocal_id[1] = 0; MLocal_id[2] = 0; + NumSubGroups = 32; + SubGroup_id = 0; + SubGroup_local_id = 0; + SubGroup_size = 1; } void update(size_t group0, size_t group1, size_t group2, size_t local0, diff --git a/source/adapters/native_cpu/queue.cpp b/source/adapters/native_cpu/queue.cpp index 516e66db64..7ee1fdf04c 100644 --- a/source/adapters/native_cpu/queue.cpp +++ b/source/adapters/native_cpu/queue.cpp @@ -35,10 +35,10 @@ UR_APIEXPORT ur_result_t UR_APICALL urQueueCreate( std::ignore = hDevice; std::ignore = pProperties; - auto Queue = new ur_queue_handle_t_(); + auto Queue = new ur_queue_handle_t_(hDevice); *phQueue = Queue; - CONTINUE_NO_IMPLEMENTATION; + return UR_RESULT_SUCCESS; } UR_APIEXPORT ur_result_t UR_APICALL urQueueRetain(ur_queue_handle_t hQueue) { diff --git a/source/adapters/native_cpu/queue.hpp b/source/adapters/native_cpu/queue.hpp index 5e9039dd24..8c34af6327 100644 --- a/source/adapters/native_cpu/queue.hpp +++ b/source/adapters/native_cpu/queue.hpp @@ -9,5 +9,10 @@ //===----------------------------------------------------------------------===// #pragma once #include "common.hpp" +#include "device.hpp" -struct ur_queue_handle_t_ : RefCounted {}; +struct ur_queue_handle_t_ : RefCounted { + ur_device_handle_t_ *const device; + + ur_queue_handle_t_(ur_device_handle_t_ *device) : device(device) {} +}; diff --git a/source/adapters/native_cpu/threadpool.hpp b/source/adapters/native_cpu/threadpool.hpp new file mode 100644 index 0000000000..03763224e8 --- /dev/null +++ b/source/adapters/native_cpu/threadpool.hpp @@ -0,0 +1,233 @@ +//===----------- threadpool.hpp - Native CPU Threadpool +//--------------------===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace native_cpu { + +using worker_task_t = std::function; + +namespace detail { + +class worker_thread { +public: + // Initializes state, but does not start the worker thread + worker_thread() noexcept : m_isRunning(false), m_numTasks(0) {} + + // Creates and launches the worker thread + inline void start(size_t threadId) { + std::lock_guard lock(m_workMutex); + if (this->is_running()) { + return; + } + m_threadId = threadId; + m_worker = std::thread([this]() { + while (true) { + // pin the thread to the cpu + std::unique_lock lock(m_workMutex); + // Wait until there's work available + m_startWorkCondition.wait( + lock, [this]() { return !this->is_running() || !m_tasks.empty(); }); + if (!this->is_running() && m_tasks.empty()) { + // Can only break if there is no more work to be done + break; + } + // Retrieve a task from the queue + auto task = m_tasks.front(); + m_tasks.pop(); + + // Not modifying internal state anymore, can release the mutex + lock.unlock(); + + // Execute the task + task(m_threadId); + --m_numTasks; + } + }); + + m_isRunning = true; + } + + inline void schedule(const worker_task_t &task) { + { + std::lock_guard lock(m_workMutex); + // Add the task to the queue + m_tasks.push(task); + ++m_numTasks; + } + m_startWorkCondition.notify_one(); + } + + size_t num_pending_tasks() const noexcept { + // m_numTasks is an atomic counter because we don't want to lock the mutex + // here, num_pending_tasks is only used for heuristics + return m_numTasks.load(); + } + + // Waits for all tasks to finish and destroys the worker thread + inline void stop() { + { + // Notify the worker thread to stop executing + std::lock_guard lock(m_workMutex); + m_isRunning = false; + } + m_startWorkCondition.notify_all(); + if (m_worker.joinable()) { + // Wait for the worker thread to finish handling the task queue + m_worker.join(); + } + } + + // Checks whether the thread pool is currently running threads + inline bool is_running() const noexcept { return m_isRunning; } + +private: + // Unique ID identifying the thread in the threadpool + size_t m_threadId; + std::thread m_worker; + + std::mutex m_workMutex; + + std::condition_variable m_startWorkCondition; + + bool m_isRunning; + + std::queue m_tasks; + + std::atomic m_numTasks; +}; + +// Implementation of a thread pool. The worker threads are created and +// ready at construction. This class mainly holds the interface for +// scheduling a task to the most appropriate thread and handling input +// parameters and futures. +class simple_thread_pool { +public: + simple_thread_pool(size_t numThreads = 0) noexcept : m_isRunning(false) { + this->resize(numThreads); + this->start(); + } + + ~simple_thread_pool() { this->stop(); } + + // Creates and launches the worker threads + inline void start() { + if (this->is_running()) { + return; + } + size_t threadId = 0; + for (auto &t : m_workers) { + t.start(threadId); + threadId++; + } + m_isRunning.store(true, std::memory_order_release); + } + + // Waits for all tasks to finish and destroys the worker threads + inline void stop() { + for (auto &t : m_workers) { + t.stop(); + } + m_isRunning.store(false, std::memory_order_release); + } + + inline void resize(size_t numThreads) { + char *envVar = std::getenv("SYCL_NATIVE_CPU_HOST_THREADS"); + if (envVar) { + numThreads = std::stoul(envVar); + } + if (numThreads == 0) { + numThreads = std::thread::hardware_concurrency(); + } + if (!this->is_running() && (numThreads != this->num_threads())) { + m_workers = decltype(m_workers)(numThreads); + } + } + + inline void schedule(const worker_task_t &task) { + // Schedule the task on the best available worker thread + this->best_worker().schedule(task); + } + + inline bool is_running() const noexcept { + return m_isRunning.load(std::memory_order_acquire); + } + + inline size_t num_threads() const noexcept { return m_workers.size(); } + + inline size_t num_pending_tasks() const noexcept { + return std::accumulate(std::begin(m_workers), std::end(m_workers), + size_t(0), + [](size_t numTasks, const worker_thread &t) { + return (numTasks + t.num_pending_tasks()); + }); + } + + void wait_for_all_pending_tasks() { + while (num_pending_tasks() > 0) { + std::this_thread::yield(); + } + } + +protected: + // Determines which thread is the most appropriate for having work + // scheduled + worker_thread &best_worker() noexcept { + return *std::min_element( + std::begin(m_workers), std::end(m_workers), + [](const worker_thread &w1, const worker_thread &w2) { + // Prefer threads whose task queues are shorter + // This is just an approximation, it doesn't need to be exact + return (w1.num_pending_tasks() < w2.num_pending_tasks()); + }); + } + +private: + std::vector m_workers; + + std::atomic m_isRunning; +}; +} // namespace detail + +template class threadpool_interface { + ThreadPoolT threadpool; + +public: + void start() { threadpool.start(); } + + void stop() { threadpool.stop(); } + + size_t num_threads() const noexcept { return threadpool.num_threads(); } + + threadpool_interface(size_t numThreads) : threadpool(numThreads) {} + threadpool_interface() : threadpool(0) {} + + auto schedule_task(worker_task_t &&task) { + auto workerTask = std::make_shared>( + [task](auto &&PH1) { return task(std::forward(PH1)); }); + threadpool.schedule([=](size_t threadId) { (*workerTask)(threadId); }); + return workerTask->get_future(); + } +}; + +using threadpool_t = threadpool_interface; + +} // namespace native_cpu diff --git a/source/adapters/native_cpu/usm.cpp b/source/adapters/native_cpu/usm.cpp index 7cdac0cd8f..45ac0596f3 100644 --- a/source/adapters/native_cpu/usm.cpp +++ b/source/adapters/native_cpu/usm.cpp @@ -86,7 +86,16 @@ urUSMGetMemAllocInfo(ur_context_handle_t hContext, const void *pMem, std::ignore = pPropValue; std::ignore = pPropSizeRet; - DIE_NO_IMPLEMENTATION; + UrReturnHelper ReturnValue(propSize, pPropValue, pPropSizeRet); + + switch (propName) { + case UR_USM_ALLOC_INFO_TYPE: + // Todo implement this in context + return ReturnValue(UR_USM_TYPE_DEVICE); + default: + DIE_NO_IMPLEMENTATION; + } + return UR_RESULT_ERROR_INVALID_VALUE; } UR_APIEXPORT ur_result_t UR_APICALL