Skip to content

Commit

Permalink
Merge pull request #1478 from PietroGhg/pietro/vecz_threadpool
Browse files Browse the repository at this point in the history
[NATIVECPU] Initial threadpool implementation for Native CPU
  • Loading branch information
kbenzie committed Apr 11, 2024
2 parents b582fb8 + c594cdc commit 38e9478
Show file tree
Hide file tree
Showing 9 changed files with 452 additions and 58 deletions.
24 changes: 17 additions & 7 deletions source/adapters/native_cpu/device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ UR_APIEXPORT ur_result_t UR_APICALL urDeviceGetInfo(ur_device_handle_t hDevice,
case UR_DEVICE_INFO_LINKER_AVAILABLE:
return ReturnValue(bool{false});
case UR_DEVICE_INFO_MAX_COMPUTE_UNITS:
return ReturnValue(uint32_t{256});
return ReturnValue(static_cast<uint32_t>(hDevice->tp.num_threads()));
case UR_DEVICE_INFO_PARTITION_MAX_SUB_DEVICES:
return ReturnValue(uint32_t{0});
case UR_DEVICE_INFO_SUPPORTED_PARTITIONS:
Expand Down Expand Up @@ -138,7 +138,10 @@ UR_APIEXPORT ur_result_t UR_APICALL urDeviceGetInfo(ur_device_handle_t hDevice,
case UR_DEVICE_INFO_MAX_WORK_ITEM_DIMENSIONS:
return ReturnValue(uint32_t{3});
case UR_DEVICE_INFO_PARTITION_TYPE:
return ReturnValue(ur_device_partition_property_t{});
if (pPropSizeRet) {
*pPropSizeRet = 0;
}
return UR_RESULT_SUCCESS;
case UR_EXT_DEVICE_INFO_OPENCL_C_VERSION:
return ReturnValue("");
case UR_DEVICE_INFO_QUEUE_PROPERTIES:
Expand All @@ -158,15 +161,22 @@ UR_APIEXPORT ur_result_t UR_APICALL urDeviceGetInfo(ur_device_handle_t hDevice,
case UR_DEVICE_INFO_PREFERRED_VECTOR_WIDTH_FLOAT:
case UR_DEVICE_INFO_PREFERRED_VECTOR_WIDTH_DOUBLE:
case UR_DEVICE_INFO_PREFERRED_VECTOR_WIDTH_HALF:
// TODO: How can we query vector width in a platform
// independent way?
case UR_DEVICE_INFO_NATIVE_VECTOR_WIDTH_CHAR:
return ReturnValue(uint32_t{32});
case UR_DEVICE_INFO_NATIVE_VECTOR_WIDTH_SHORT:
return ReturnValue(uint32_t{16});
case UR_DEVICE_INFO_NATIVE_VECTOR_WIDTH_INT:
return ReturnValue(uint32_t{8});
case UR_DEVICE_INFO_NATIVE_VECTOR_WIDTH_LONG:
return ReturnValue(uint32_t{4});
case UR_DEVICE_INFO_NATIVE_VECTOR_WIDTH_FLOAT:
return ReturnValue(uint32_t{8});
case UR_DEVICE_INFO_NATIVE_VECTOR_WIDTH_DOUBLE:
return ReturnValue(uint32_t{4});
case UR_DEVICE_INFO_NATIVE_VECTOR_WIDTH_HALF:
return ReturnValue(uint32_t{1});

return ReturnValue(uint32_t{16});
// Imported from level_zero
case UR_DEVICE_INFO_USM_HOST_SUPPORT:
case UR_DEVICE_INFO_USM_DEVICE_SUPPORT:
Expand Down Expand Up @@ -213,10 +223,10 @@ UR_APIEXPORT ur_result_t UR_APICALL urDeviceGetInfo(ur_device_handle_t hDevice,
return ReturnValue(uint64_t{0});
case UR_DEVICE_INFO_GLOBAL_MEM_SIZE:
// TODO : CHECK
return ReturnValue(uint64_t{0});
return ReturnValue(uint64_t{32768});
case UR_DEVICE_INFO_LOCAL_MEM_SIZE:
// TODO : CHECK
return ReturnValue(uint64_t{0});
return ReturnValue(uint64_t{32768});
case UR_DEVICE_INFO_MAX_CONSTANT_BUFFER_SIZE:
// TODO : CHECK
return ReturnValue(uint64_t{0});
Expand Down Expand Up @@ -256,7 +266,7 @@ UR_APIEXPORT ur_result_t UR_APICALL urDeviceGetInfo(ur_device_handle_t hDevice,
case UR_DEVICE_INFO_BUILD_ON_SUBDEVICE:
return ReturnValue(bool{0});
case UR_DEVICE_INFO_ATOMIC_64:
return ReturnValue(bool{0});
return ReturnValue(bool{1});
case UR_DEVICE_INFO_BFLOAT16:
return ReturnValue(bool{0});
case UR_DEVICE_INFO_MEM_CHANNEL_SUPPORT:
Expand Down
2 changes: 2 additions & 0 deletions source/adapters/native_cpu/device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@

#pragma once

#include "threadpool.hpp"
#include <ur/ur.hpp>

struct ur_device_handle_t_ {
native_cpu::threadpool_t tp;
ur_device_handle_t_(ur_platform_handle_t ArgPlt) : Platform(ArgPlt) {}

ur_platform_handle_t Platform;
Expand Down
160 changes: 145 additions & 15 deletions source/adapters/native_cpu/enqueue.cpp
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
//===----------- enqueue.cpp - NATIVE CPU Adapter -------------------------===//
//
// Copyright (C) 2023 Intel Corporation
//
// Part of the Unified-Runtime Project, under the Apache License v2.0 with LLVM
// Exceptions. See LICENSE.TXT
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//
#include <array>
#include <cstddef>
#include <cstdint>
#include <vector>

#include "ur_api.h"

#include "common.hpp"
#include "kernel.hpp"
#include "memory.hpp"
#include "queue.hpp"
#include "threadpool.hpp"

namespace native_cpu {
struct NDRDescT {
Expand All @@ -37,9 +39,29 @@ struct NDRDescT {
GlobalOffset[I] = 0;
}
}

void dump(std::ostream &os) const {
os << "GlobalSize: " << GlobalSize[0] << " " << GlobalSize[1] << " "
<< GlobalSize[2] << "\n";
os << "LocalSize: " << LocalSize[0] << " " << LocalSize[1] << " "
<< LocalSize[2] << "\n";
os << "GlobalOffset: " << GlobalOffset[0] << " " << GlobalOffset[1] << " "
<< GlobalOffset[2] << "\n";
}
};
} // namespace native_cpu

#ifdef NATIVECPU_USE_OCK
static native_cpu::state getResizedState(const native_cpu::NDRDescT &ndr,
size_t itemsPerThread) {
native_cpu::state resized_state(
ndr.GlobalSize[0], ndr.GlobalSize[1], ndr.GlobalSize[2], itemsPerThread,
ndr.LocalSize[1], ndr.LocalSize[2], ndr.GlobalOffset[0],
ndr.GlobalOffset[1], ndr.GlobalOffset[2]);
return resized_state;
}
#endif

UR_APIEXPORT ur_result_t UR_APICALL urEnqueueKernelLaunch(
ur_queue_handle_t hQueue, ur_kernel_handle_t hKernel, uint32_t workDim,
const size_t *pGlobalWorkOffset, const size_t *pGlobalWorkSize,
Expand All @@ -63,23 +85,23 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueKernelLaunch(
// TODO: add proper event dep management
native_cpu::NDRDescT ndr(workDim, pGlobalWorkOffset, pGlobalWorkSize,
pLocalWorkSize);
hKernel->handleLocalArgs();

auto &tp = hQueue->device->tp;
const size_t numParallelThreads = tp.num_threads();
hKernel->updateMemPool(numParallelThreads);
std::vector<std::future<void>> futures;
std::vector<std::function<void(size_t, ur_kernel_handle_t_)>> groups;
auto numWG0 = ndr.GlobalSize[0] / ndr.LocalSize[0];
auto numWG1 = ndr.GlobalSize[1] / ndr.LocalSize[1];
auto numWG2 = ndr.GlobalSize[2] / ndr.LocalSize[2];
native_cpu::state state(ndr.GlobalSize[0], ndr.GlobalSize[1],
ndr.GlobalSize[2], ndr.LocalSize[0], ndr.LocalSize[1],
ndr.LocalSize[2], ndr.GlobalOffset[0],
ndr.GlobalOffset[1], ndr.GlobalOffset[2]);

auto numWG0 = ndr.GlobalSize[0] / ndr.LocalSize[0];
auto numWG1 = ndr.GlobalSize[1] / ndr.LocalSize[1];
auto numWG2 = ndr.GlobalSize[2] / ndr.LocalSize[2];
#ifndef NATIVECPU_USE_OCK
hKernel->handleLocalArgs(1, 0);
for (unsigned g2 = 0; g2 < numWG2; g2++) {
for (unsigned g1 = 0; g1 < numWG1; g1++) {
for (unsigned g0 = 0; g0 < numWG0; g0++) {
#ifdef NATIVECPU_USE_OCK
state.update(g0, g1, g2);
hKernel->_subhandler(hKernel->_args.data(), &state);
#else
for (unsigned local2 = 0; local2 < ndr.LocalSize[2]; local2++) {
for (unsigned local1 = 0; local1 < ndr.LocalSize[1]; local1++) {
for (unsigned local0 = 0; local0 < ndr.LocalSize[0]; local0++) {
Expand All @@ -88,10 +110,118 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueKernelLaunch(
}
}
}
#endif
}
}
}
#else
bool isLocalSizeOne =
ndr.LocalSize[0] == 1 && ndr.LocalSize[1] == 1 && ndr.LocalSize[2] == 1;
if (isLocalSizeOne && ndr.GlobalSize[0] > numParallelThreads) {
// If the local size is one, we make the assumption that we are running a
// parallel_for over a sycl::range.
// Todo: we could add compiler checks and
// kernel properties for this (e.g. check that no barriers are called, no
// local memory args).

// Todo: this assumes that dim 0 is the best dimension over which we want to
// parallelize

// Since we also vectorize the kernel, and vectorization happens within the
// work group loop, it's better to have a large-ish local size. We can
// divide the global range by the number of threads, set that as the local
// size and peel everything else.

size_t new_num_work_groups_0 = numParallelThreads;
size_t itemsPerThread = ndr.GlobalSize[0] / numParallelThreads;

for (unsigned g2 = 0; g2 < numWG2; g2++) {
for (unsigned g1 = 0; g1 < numWG1; g1++) {
for (unsigned g0 = 0; g0 < new_num_work_groups_0; g0 += 1) {
futures.emplace_back(
tp.schedule_task([&ndr = std::as_const(ndr), itemsPerThread,
hKernel, g0, g1, g2](size_t) {
native_cpu::state resized_state =
getResizedState(ndr, itemsPerThread);
resized_state.update(g0, g1, g2);
hKernel->_subhandler(hKernel->_args.data(), &resized_state);
}));
}
// Peel the remaining work items. Since the local size is 1, we iterate
// over the work groups.
for (unsigned g0 = new_num_work_groups_0 * itemsPerThread; g0 < numWG0;
g0++) {
state.update(g0, g1, g2);
hKernel->_subhandler(hKernel->_args.data(), &state);
}
}
}

} else {
// We are running a parallel_for over an nd_range

if (numWG1 * numWG2 >= numParallelThreads) {
// Dimensions 1 and 2 have enough work, split them across the threadpool
for (unsigned g2 = 0; g2 < numWG2; g2++) {
for (unsigned g1 = 0; g1 < numWG1; g1++) {
futures.emplace_back(
tp.schedule_task([state, kernel = *hKernel, numWG0, g1, g2,
numParallelThreads](size_t threadId) mutable {
for (unsigned g0 = 0; g0 < numWG0; g0++) {
kernel.handleLocalArgs(numParallelThreads, threadId);
state.update(g0, g1, g2);
kernel._subhandler(kernel._args.data(), &state);
}
}));
}
}
} else {
// Split dimension 0 across the threadpool
// Here we try to create groups of workgroups in order to reduce
// synchronization overhead
for (unsigned g2 = 0; g2 < numWG2; g2++) {
for (unsigned g1 = 0; g1 < numWG1; g1++) {
for (unsigned g0 = 0; g0 < numWG0; g0++) {
groups.push_back(
[state, g0, g1, g2, numParallelThreads](
size_t threadId, ur_kernel_handle_t_ kernel) mutable {
kernel.handleLocalArgs(numParallelThreads, threadId);
state.update(g0, g1, g2);
kernel._subhandler(kernel._args.data(), &state);
});
}
}
}
auto numGroups = groups.size();
auto groupsPerThread = numGroups / numParallelThreads;
auto remainder = numGroups % numParallelThreads;
for (unsigned thread = 0; thread < numParallelThreads; thread++) {
futures.emplace_back(tp.schedule_task(
[&groups, thread, groupsPerThread, hKernel](size_t threadId) {
for (unsigned i = 0; i < groupsPerThread; i++) {
auto index = thread * groupsPerThread + i;
groups[index](threadId, *hKernel);
}
}));
}

// schedule the remaining tasks
if (remainder) {
futures.emplace_back(
tp.schedule_task([&groups, remainder,
scheduled = numParallelThreads * groupsPerThread,
hKernel](size_t threadId) {
for (unsigned i = 0; i < remainder; i++) {
auto index = scheduled + i;
groups[index](threadId, *hKernel);
}
}));
}
}
}

for (auto &f : futures)
f.get();
#endif // NATIVECPU_USE_OCK
// TODO: we should avoid calling clear here by avoiding using push_back
// in setKernelArgs.
hKernel->_args.clear();
Expand Down
64 changes: 32 additions & 32 deletions source/adapters/native_cpu/kernel.hpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
//===--------------- kernel.hpp - Native CPU Adapter ----------------------===//
//
// Copyright (C) 2023 Intel Corporation
//
// Part of the Unified-Runtime Project, under the Apache License v2.0 with LLVM
// Exceptions. See LICENSE.TXT
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//
Expand Down Expand Up @@ -42,50 +40,52 @@ struct ur_kernel_handle_t_ : RefCounted {
ur_kernel_handle_t_(const char *name, nativecpu_task_t subhandler)
: _name{name}, _subhandler{std::move(subhandler)} {}

const char *_name;
nativecpu_task_t _subhandler;
std::vector<native_cpu::NativeCPUArgDesc> _args;
std::vector<local_arg_info_t> _localArgInfo;

// To be called before enqueing the kernel.
void handleLocalArgs() {
updateMemPool();
size_t offset = 0;
for (auto &entry : _localArgInfo) {
_args[entry.argIndex].MPtr =
reinterpret_cast<char *>(_localMemPool) + offset;
// update offset in the memory pool
// Todo: update this offset computation when we have work-group
// level parallelism.
offset += entry.argSize;
}
ur_kernel_handle_t_(const ur_kernel_handle_t_ &other)
: _name(other._name), _subhandler(other._subhandler), _args(other._args),
_localArgInfo(other._localArgInfo), _localMemPool(other._localMemPool),
_localMemPoolSize(other._localMemPoolSize) {
incrementReferenceCount();
}

~ur_kernel_handle_t_() {
if (_localMemPool) {
if (decrementReferenceCount() == 0) {
free(_localMemPool);
}
}

private:
void updateMemPool() {
const char *_name;
nativecpu_task_t _subhandler;
std::vector<native_cpu::NativeCPUArgDesc> _args;
std::vector<local_arg_info_t> _localArgInfo;

// To be called before enqueueing the kernel.
void updateMemPool(size_t numParallelThreads) {
// compute requested size.
// Todo: currently we execute only one work-group at a time, so for each
// local arg we can allocate just 1 * argSize local arg. When we implement
// work-group level parallelism we should allocate N * argSize where N is
// the number of work groups being executed in parallel (e.g. number of
// threads in the thread pool).
size_t reqSize = 0;
for (auto &entry : _localArgInfo) {
reqSize += entry.argSize;
reqSize += entry.argSize * numParallelThreads;
}
if (reqSize == 0 || reqSize == _localMemPoolSize) {
return;
}
// realloc handles nullptr case
_localMemPool = realloc(_localMemPool, reqSize);
_localMemPool = (char *)realloc(_localMemPool, reqSize);
_localMemPoolSize = reqSize;
}
void *_localMemPool = nullptr;

// To be called before executing a work group
void handleLocalArgs(size_t numParallelThread, size_t threadId) {
// For each local argument we have size*numthreads
size_t offset = 0;
for (auto &entry : _localArgInfo) {
_args[entry.argIndex].MPtr =
_localMemPool + offset + (entry.argSize * threadId);
// update offset in the memory pool
offset += entry.argSize * numParallelThread;
}
}

private:
char *_localMemPool = nullptr;
size_t _localMemPoolSize = 0;
};
Loading

0 comments on commit 38e9478

Please sign in to comment.