From c594cdc0a428a2bf6410bcda1287b52313ca5ab3 Mon Sep 17 00:00:00 2001 From: PietroGhg Date: Mon, 20 Nov 2023 11:07:14 +0000 Subject: [PATCH] Handle subdevice partition correctly --- source/adapters/native_cpu/device.cpp | 14 +- source/adapters/native_cpu/device.hpp | 10 +- source/adapters/native_cpu/enqueue.cpp | 157 +++++++++++++++--- source/adapters/native_cpu/kernel.hpp | 17 +- .../adapters/native_cpu/nativecpu_state.hpp | 5 + source/adapters/native_cpu/queue.hpp | 3 +- source/adapters/native_cpu/threadpool.hpp | 82 ++++----- source/adapters/native_cpu/usm.cpp | 11 +- 8 files changed, 203 insertions(+), 96 deletions(-) mode change 100644 => 100755 source/adapters/native_cpu/nativecpu_state.hpp diff --git a/source/adapters/native_cpu/device.cpp b/source/adapters/native_cpu/device.cpp index 5bfd04eaac..1babdb0f10 100644 --- a/source/adapters/native_cpu/device.cpp +++ b/source/adapters/native_cpu/device.cpp @@ -98,8 +98,7 @@ UR_APIEXPORT ur_result_t UR_APICALL urDeviceGetInfo(ur_device_handle_t hDevice, case UR_DEVICE_INFO_LINKER_AVAILABLE: return ReturnValue(bool{false}); case UR_DEVICE_INFO_MAX_COMPUTE_UNITS: - return ReturnValue(static_cast( - hDevice->tp.num_threads())); + return ReturnValue(static_cast(hDevice->tp.num_threads())); case UR_DEVICE_INFO_PARTITION_MAX_SUB_DEVICES: return ReturnValue(uint32_t{0}); case UR_DEVICE_INFO_SUPPORTED_PARTITIONS: @@ -139,7 +138,10 @@ UR_APIEXPORT ur_result_t UR_APICALL urDeviceGetInfo(ur_device_handle_t hDevice, case UR_DEVICE_INFO_MAX_WORK_ITEM_DIMENSIONS: return ReturnValue(uint32_t{3}); case UR_DEVICE_INFO_PARTITION_TYPE: - return ReturnValue(ur_device_partition_property_t{}); + if (pPropSizeRet) { + *pPropSizeRet = 0; + } + return UR_RESULT_SUCCESS; case UR_EXT_DEVICE_INFO_OPENCL_C_VERSION: return ReturnValue(""); case UR_DEVICE_INFO_QUEUE_PROPERTIES: @@ -159,8 +161,8 @@ UR_APIEXPORT ur_result_t UR_APICALL urDeviceGetInfo(ur_device_handle_t hDevice, case UR_DEVICE_INFO_PREFERRED_VECTOR_WIDTH_FLOAT: case UR_DEVICE_INFO_PREFERRED_VECTOR_WIDTH_DOUBLE: case UR_DEVICE_INFO_PREFERRED_VECTOR_WIDTH_HALF: - // todo: how can we query vector width in a platform - // indipendent way? + // TODO: How can we query vector width in a platform + // independent way? case UR_DEVICE_INFO_NATIVE_VECTOR_WIDTH_CHAR: return ReturnValue(uint32_t{32}); case UR_DEVICE_INFO_NATIVE_VECTOR_WIDTH_SHORT: @@ -266,7 +268,7 @@ UR_APIEXPORT ur_result_t UR_APICALL urDeviceGetInfo(ur_device_handle_t hDevice, case UR_DEVICE_INFO_ATOMIC_64: return ReturnValue(bool{1}); case UR_DEVICE_INFO_BFLOAT16: - return ReturnValue(bool{1}); + return ReturnValue(bool{0}); case UR_DEVICE_INFO_MEM_CHANNEL_SUPPORT: return ReturnValue(bool{0}); case UR_DEVICE_INFO_IMAGE_SRGB: diff --git a/source/adapters/native_cpu/device.hpp b/source/adapters/native_cpu/device.hpp index 653e04d714..01245410c9 100644 --- a/source/adapters/native_cpu/device.hpp +++ b/source/adapters/native_cpu/device.hpp @@ -10,18 +10,12 @@ #pragma once -#include #include "threadpool.hpp" +#include struct ur_device_handle_t_ { native_cpu::threadpool_t tp; - ur_device_handle_t_(ur_platform_handle_t ArgPlt) : Platform(ArgPlt) { - tp.start(); - } - - ~ur_device_handle_t_() { - tp.stop(); - } + ur_device_handle_t_(ur_platform_handle_t ArgPlt) : Platform(ArgPlt) {} ur_platform_handle_t Platform; }; diff --git a/source/adapters/native_cpu/enqueue.cpp b/source/adapters/native_cpu/enqueue.cpp index 27ab5a58c8..836d6e3cfc 100644 --- a/source/adapters/native_cpu/enqueue.cpp +++ b/source/adapters/native_cpu/enqueue.cpp @@ -6,15 +6,17 @@ // //===----------------------------------------------------------------------===// #include +#include #include +#include #include "ur_api.h" #include "common.hpp" #include "kernel.hpp" #include "memory.hpp" -#include "threadpool.hpp" #include "queue.hpp" +#include "threadpool.hpp" namespace native_cpu { struct NDRDescT { @@ -37,9 +39,29 @@ struct NDRDescT { GlobalOffset[I] = 0; } } + + void dump(std::ostream &os) const { + os << "GlobalSize: " << GlobalSize[0] << " " << GlobalSize[1] << " " + << GlobalSize[2] << "\n"; + os << "LocalSize: " << LocalSize[0] << " " << LocalSize[1] << " " + << LocalSize[2] << "\n"; + os << "GlobalOffset: " << GlobalOffset[0] << " " << GlobalOffset[1] << " " + << GlobalOffset[2] << "\n"; + } }; } // namespace native_cpu +#ifdef NATIVECPU_USE_OCK +static native_cpu::state getResizedState(const native_cpu::NDRDescT &ndr, + size_t itemsPerThread) { + native_cpu::state resized_state( + ndr.GlobalSize[0], ndr.GlobalSize[1], ndr.GlobalSize[2], itemsPerThread, + ndr.LocalSize[1], ndr.LocalSize[2], ndr.GlobalOffset[0], + ndr.GlobalOffset[1], ndr.GlobalOffset[2]); + return resized_state; +} +#endif + UR_APIEXPORT ur_result_t UR_APICALL urEnqueueKernelLaunch( ur_queue_handle_t hQueue, ur_kernel_handle_t hKernel, uint32_t workDim, const size_t *pGlobalWorkOffset, const size_t *pGlobalWorkSize, @@ -61,38 +83,25 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueKernelLaunch( // TODO: add proper error checking // TODO: add proper event dep management - native_cpu::NDRDescT ndr(workDim, pGlobalWorkOffset, pGlobalWorkSize, pLocalWorkSize); - auto& tp = hQueue->device->tp; + native_cpu::NDRDescT ndr(workDim, pGlobalWorkOffset, pGlobalWorkSize, + pLocalWorkSize); + auto &tp = hQueue->device->tp; const size_t numParallelThreads = tp.num_threads(); hKernel->updateMemPool(numParallelThreads); std::vector> futures; + std::vector> groups; auto numWG0 = ndr.GlobalSize[0] / ndr.LocalSize[0]; auto numWG1 = ndr.GlobalSize[1] / ndr.LocalSize[1]; auto numWG2 = ndr.GlobalSize[2] / ndr.LocalSize[2]; - bool isLocalSizeOne = - ndr.LocalSize[0] == 1 && ndr.LocalSize[1] == 1 && ndr.LocalSize[2] == 1; - - native_cpu::state state(ndr.GlobalSize[0], ndr.GlobalSize[1], ndr.GlobalSize[2], ndr.LocalSize[0], ndr.LocalSize[1], ndr.LocalSize[2], ndr.GlobalOffset[0], ndr.GlobalOffset[1], ndr.GlobalOffset[2]); - if (isLocalSizeOne) { - // If the local size is one, we make the assumption that we are running a - // parallel_for over a sycl::range Todo: we could add compiler checks and - // kernel properties for this (e.g. check that no barriers are called, no - // local memory args). - - auto numWG0 = ndr.GlobalSize[0] / ndr.LocalSize[0]; - auto numWG1 = ndr.GlobalSize[1] / ndr.LocalSize[1]; - auto numWG2 = ndr.GlobalSize[2] / ndr.LocalSize[2]; +#ifndef NATIVECPU_USE_OCK + hKernel->handleLocalArgs(1, 0); for (unsigned g2 = 0; g2 < numWG2; g2++) { for (unsigned g1 = 0; g1 < numWG1; g1++) { for (unsigned g0 = 0; g0 < numWG0; g0++) { -#ifdef NATIVECPU_USE_OCK - state.update(g0, g1, g2); - hKernel->_subhandler(hKernel->_args.data(), &state); -#else for (unsigned local2 = 0; local2 < ndr.LocalSize[2]; local2++) { for (unsigned local1 = 0; local1 < ndr.LocalSize[1]; local1++) { for (unsigned local0 = 0; local0 < ndr.LocalSize[0]; local0++) { @@ -101,13 +110,118 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueKernelLaunch( } } } -#endif + } + } + } +#else + bool isLocalSizeOne = + ndr.LocalSize[0] == 1 && ndr.LocalSize[1] == 1 && ndr.LocalSize[2] == 1; + if (isLocalSizeOne && ndr.GlobalSize[0] > numParallelThreads) { + // If the local size is one, we make the assumption that we are running a + // parallel_for over a sycl::range. + // Todo: we could add compiler checks and + // kernel properties for this (e.g. check that no barriers are called, no + // local memory args). + + // Todo: this assumes that dim 0 is the best dimension over which we want to + // parallelize + + // Since we also vectorize the kernel, and vectorization happens within the + // work group loop, it's better to have a large-ish local size. We can + // divide the global range by the number of threads, set that as the local + // size and peel everything else. + + size_t new_num_work_groups_0 = numParallelThreads; + size_t itemsPerThread = ndr.GlobalSize[0] / numParallelThreads; + + for (unsigned g2 = 0; g2 < numWG2; g2++) { + for (unsigned g1 = 0; g1 < numWG1; g1++) { + for (unsigned g0 = 0; g0 < new_num_work_groups_0; g0 += 1) { + futures.emplace_back( + tp.schedule_task([&ndr = std::as_const(ndr), itemsPerThread, + hKernel, g0, g1, g2](size_t) { + native_cpu::state resized_state = + getResizedState(ndr, itemsPerThread); + resized_state.update(g0, g1, g2); + hKernel->_subhandler(hKernel->_args.data(), &resized_state); + })); + } + // Peel the remaining work items. Since the local size is 1, we iterate + // over the work groups. + for (unsigned g0 = new_num_work_groups_0 * itemsPerThread; g0 < numWG0; + g0++) { + state.update(g0, g1, g2); + hKernel->_subhandler(hKernel->_args.data(), &state); + } + } + } + + } else { + // We are running a parallel_for over an nd_range + + if (numWG1 * numWG2 >= numParallelThreads) { + // Dimensions 1 and 2 have enough work, split them across the threadpool + for (unsigned g2 = 0; g2 < numWG2; g2++) { + for (unsigned g1 = 0; g1 < numWG1; g1++) { + futures.emplace_back( + tp.schedule_task([state, kernel = *hKernel, numWG0, g1, g2, + numParallelThreads](size_t threadId) mutable { + for (unsigned g0 = 0; g0 < numWG0; g0++) { + kernel.handleLocalArgs(numParallelThreads, threadId); + state.update(g0, g1, g2); + kernel._subhandler(kernel._args.data(), &state); + } + })); + } + } + } else { + // Split dimension 0 across the threadpool + // Here we try to create groups of workgroups in order to reduce + // synchronization overhead + for (unsigned g2 = 0; g2 < numWG2; g2++) { + for (unsigned g1 = 0; g1 < numWG1; g1++) { + for (unsigned g0 = 0; g0 < numWG0; g0++) { + groups.push_back( + [state, g0, g1, g2, numParallelThreads]( + size_t threadId, ur_kernel_handle_t_ kernel) mutable { + kernel.handleLocalArgs(numParallelThreads, threadId); + state.update(g0, g1, g2); + kernel._subhandler(kernel._args.data(), &state); + }); + } + } + } + auto numGroups = groups.size(); + auto groupsPerThread = numGroups / numParallelThreads; + auto remainder = numGroups % numParallelThreads; + for (unsigned thread = 0; thread < numParallelThreads; thread++) { + futures.emplace_back(tp.schedule_task( + [&groups, thread, groupsPerThread, hKernel](size_t threadId) { + for (unsigned i = 0; i < groupsPerThread; i++) { + auto index = thread * groupsPerThread + i; + groups[index](threadId, *hKernel); + } + })); + } + + // schedule the remaining tasks + if (remainder) { + futures.emplace_back( + tp.schedule_task([&groups, remainder, + scheduled = numParallelThreads * groupsPerThread, + hKernel](size_t threadId) { + for (unsigned i = 0; i < remainder; i++) { + auto index = scheduled + i; + groups[index](threadId, *hKernel); + } + })); } } } for (auto &f : futures) f.get(); +#endif // NATIVECPU_USE_OCK // TODO: we should avoid calling clear here by avoiding using push_back // in setKernelArgs. hKernel->_args.clear(); @@ -553,4 +667,3 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueWriteHostPipe( DIE_NO_IMPLEMENTATION; } - diff --git a/source/adapters/native_cpu/kernel.hpp b/source/adapters/native_cpu/kernel.hpp index dd3600263b..9023a23fb2 100644 --- a/source/adapters/native_cpu/kernel.hpp +++ b/source/adapters/native_cpu/kernel.hpp @@ -40,17 +40,17 @@ struct ur_kernel_handle_t_ : RefCounted { ur_kernel_handle_t_(const char *name, nativecpu_task_t subhandler) : _name{name}, _subhandler{std::move(subhandler)} {} - ur_kernel_handle_t_(const ur_kernel_handle_t_& other) : _name(other._name), _subhandler(other._subhandler), - _args(other._args), _localArgInfo(other._localArgInfo), _localMemPool(other._localMemPool), _localMemPoolSize(other._localMemPoolSize) { + ur_kernel_handle_t_(const ur_kernel_handle_t_ &other) + : _name(other._name), _subhandler(other._subhandler), _args(other._args), + _localArgInfo(other._localArgInfo), _localMemPool(other._localMemPool), + _localMemPoolSize(other._localMemPoolSize) { incrementReferenceCount(); } ~ur_kernel_handle_t_() { - decrementReferenceCount(); - if (_refCount == 0) { + if (decrementReferenceCount() == 0) { free(_localMemPool); } - } const char *_name; @@ -58,7 +58,7 @@ struct ur_kernel_handle_t_ : RefCounted { std::vector _args; std::vector _localArgInfo; - // To be called before enqueing the kernel. + // To be called before enqueueing the kernel. void updateMemPool(size_t numParallelThreads) { // compute requested size. size_t reqSize = 0; @@ -69,7 +69,7 @@ struct ur_kernel_handle_t_ : RefCounted { return; } // realloc handles nullptr case - _localMemPool = (char*)realloc(_localMemPool, reqSize); + _localMemPool = (char *)realloc(_localMemPool, reqSize); _localMemPoolSize = reqSize; } @@ -86,7 +86,6 @@ struct ur_kernel_handle_t_ : RefCounted { } private: - char* _localMemPool = nullptr; + char *_localMemPool = nullptr; size_t _localMemPoolSize = 0; }; - diff --git a/source/adapters/native_cpu/nativecpu_state.hpp b/source/adapters/native_cpu/nativecpu_state.hpp old mode 100644 new mode 100755 index c5016fc5aa..bb798b22e6 --- a/source/adapters/native_cpu/nativecpu_state.hpp +++ b/source/adapters/native_cpu/nativecpu_state.hpp @@ -19,6 +19,7 @@ struct state { size_t MLocal_id[3]; size_t MNumGroups[3]; size_t MGlobalOffset[3]; + uint32_t NumSubGroups, SubGroup_id, SubGroup_local_id, SubGroup_size; state(size_t globalR0, size_t globalR1, size_t globalR2, size_t localR0, size_t localR1, size_t localR2, size_t globalO0, size_t globalO1, size_t globalO2) @@ -36,6 +37,10 @@ struct state { MLocal_id[0] = 0; MLocal_id[1] = 0; MLocal_id[2] = 0; + NumSubGroups = 32; + SubGroup_id = 0; + SubGroup_local_id = 0; + SubGroup_size = 1; } void update(size_t group0, size_t group1, size_t group2, size_t local0, diff --git a/source/adapters/native_cpu/queue.hpp b/source/adapters/native_cpu/queue.hpp index b553db2818..8c34af6327 100644 --- a/source/adapters/native_cpu/queue.hpp +++ b/source/adapters/native_cpu/queue.hpp @@ -12,8 +12,7 @@ #include "device.hpp" struct ur_queue_handle_t_ : RefCounted { - ur_device_handle_t_ *device; + ur_device_handle_t_ *const device; ur_queue_handle_t_(ur_device_handle_t_ *device) : device(device) {} - }; diff --git a/source/adapters/native_cpu/threadpool.hpp b/source/adapters/native_cpu/threadpool.hpp index 92822bb100..03763224e8 100644 --- a/source/adapters/native_cpu/threadpool.hpp +++ b/source/adapters/native_cpu/threadpool.hpp @@ -1,39 +1,37 @@ -//===----------- threadpool.hpp - Native CPU Threadpool --------------------===// +//===----------- threadpool.hpp - Native CPU Threadpool +//--------------------===// // // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. // See https://llvm.org/LICENSE.txt for license information. // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception // //===----------------------------------------------------------------------===// -#pragma once +#pragma once #include #include #include #include #include #include +#include #include #include #include #include #include #include -#include namespace native_cpu { - using worker_task_t = std::function; namespace detail { class worker_thread { - public: - +public: // Initializes state, but does not start the worker thread worker_thread() noexcept : m_isRunning(false), m_numTasks(0) {} - // Creates and launches the worker thread inline void start(size_t threadId) { std::lock_guard lock(m_workMutex); @@ -43,7 +41,7 @@ class worker_thread { m_threadId = threadId; m_worker = std::thread([this]() { while (true) { - // pin the thread to the cpu + // pin the thread to the cpu std::unique_lock lock(m_workMutex); // Wait until there's work available m_startWorkCondition.wait( @@ -65,11 +63,10 @@ class worker_thread { } }); - m_isRunning = true; } - inline void schedule(const worker_task_t& task) { + inline void schedule(const worker_task_t &task) { { std::lock_guard lock(m_workMutex); // Add the task to the queue @@ -85,7 +82,6 @@ class worker_thread { return m_numTasks.load(); } - // Waits for all tasks to finish and destroys the worker thread inline void stop() { { @@ -100,11 +96,10 @@ class worker_thread { } } - // Checks whether the thread pool is currently running threads inline bool is_running() const noexcept { return m_isRunning; } - private: +private: // Unique ID identifying the thread in the threadpool size_t m_threadId; std::thread m_worker; @@ -120,44 +115,44 @@ class worker_thread { std::atomic m_numTasks; }; - // Implementation of a thread pool. The worker threads are created and -// ready at construction. This class mainly holds the interface for -// scheduling a task to the most appropriate thread and handling input -// parameters and futures. +// ready at construction. This class mainly holds the interface for +// scheduling a task to the most appropriate thread and handling input +// parameters and futures. class simple_thread_pool { - public: +public: simple_thread_pool(size_t numThreads = 0) noexcept : m_isRunning(false) { this->resize(numThreads); + this->start(); } - + ~simple_thread_pool() { this->stop(); } + // Creates and launches the worker threads inline void start() { if (this->is_running()) { return; } size_t threadId = 0; - for (auto& t : m_workers) { + for (auto &t : m_workers) { t.start(threadId); threadId++; } m_isRunning.store(true, std::memory_order_release); } - // Waits for all tasks to finish and destroys the worker threads inline void stop() { - for (auto& t : m_workers) { + for (auto &t : m_workers) { t.stop(); } m_isRunning.store(false, std::memory_order_release); } inline void resize(size_t numThreads) { - char* envVar = std::getenv("SYCL_NATIVE_CPU_HOST_THREADS"); - if(envVar){ - numThreads = std::stoul(envVar); + char *envVar = std::getenv("SYCL_NATIVE_CPU_HOST_THREADS"); + if (envVar) { + numThreads = std::stoul(envVar); } if (numThreads == 0) { numThreads = std::thread::hardware_concurrency(); @@ -167,7 +162,7 @@ class simple_thread_pool { } } - inline void schedule(const worker_task_t& task) { + inline void schedule(const worker_task_t &task) { // Schedule the task on the best available worker thread this->best_worker().schedule(task); } @@ -181,7 +176,7 @@ class simple_thread_pool { inline size_t num_pending_tasks() const noexcept { return std::accumulate(std::begin(m_workers), std::end(m_workers), size_t(0), - [](size_t numTasks, const worker_thread& t) { + [](size_t numTasks, const worker_thread &t) { return (numTasks + t.num_pending_tasks()); }); } @@ -192,51 +187,43 @@ class simple_thread_pool { } } - protected: +protected: // Determines which thread is the most appropriate for having work // scheduled - worker_thread& best_worker() noexcept { + worker_thread &best_worker() noexcept { return *std::min_element( std::begin(m_workers), std::end(m_workers), - [](const worker_thread& w1, const worker_thread& w2) { + [](const worker_thread &w1, const worker_thread &w2) { // Prefer threads whose task queues are shorter // This is just an approximation, it doesn't need to be exact return (w1.num_pending_tasks() < w2.num_pending_tasks()); }); } - private: +private: std::vector m_workers; std::atomic m_isRunning; }; } // namespace detail - -template -class threadpool_interface { +template class threadpool_interface { ThreadPoolT threadpool; + public: - void start() { - threadpool.start(); - } + void start() { threadpool.start(); } - void stop() { - threadpool.stop(); - } + void stop() { threadpool.stop(); } - size_t num_threads() const noexcept { - return threadpool.num_threads(); - } + size_t num_threads() const noexcept { return threadpool.num_threads(); } threadpool_interface(size_t numThreads) : threadpool(numThreads) {} threadpool_interface() : threadpool(0) {} - std::future schedule_task(worker_task_t &&task) { + auto schedule_task(worker_task_t &&task) { auto workerTask = std::make_shared>( - [task](auto&& PH1) { return task(std::forward(PH1)); }); - threadpool.schedule( - [=](size_t threadId) { (*workerTask)(threadId); }); + [task](auto &&PH1) { return task(std::forward(PH1)); }); + threadpool.schedule([=](size_t threadId) { (*workerTask)(threadId); }); return workerTask->get_future(); } }; @@ -244,4 +231,3 @@ class threadpool_interface { using threadpool_t = threadpool_interface; } // namespace native_cpu - diff --git a/source/adapters/native_cpu/usm.cpp b/source/adapters/native_cpu/usm.cpp index 7cdac0cd8f..45ac0596f3 100644 --- a/source/adapters/native_cpu/usm.cpp +++ b/source/adapters/native_cpu/usm.cpp @@ -86,7 +86,16 @@ urUSMGetMemAllocInfo(ur_context_handle_t hContext, const void *pMem, std::ignore = pPropValue; std::ignore = pPropSizeRet; - DIE_NO_IMPLEMENTATION; + UrReturnHelper ReturnValue(propSize, pPropValue, pPropSizeRet); + + switch (propName) { + case UR_USM_ALLOC_INFO_TYPE: + // Todo implement this in context + return ReturnValue(UR_USM_TYPE_DEVICE); + default: + DIE_NO_IMPLEMENTATION; + } + return UR_RESULT_ERROR_INVALID_VALUE; } UR_APIEXPORT ur_result_t UR_APICALL