Skip to content

Commit

Permalink
fixes for threading
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexandr-Solovev committed Oct 22, 2024
1 parent b07b027 commit 94aab86
Showing 1 changed file with 91 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,39 +61,72 @@ using namespace kdtree_knn_classification::internal;
template <typename T, CpuType cpu>
class Queue
{
static const size_t defaultSize = 4;
public:
Queue() : _data(nullptr) {}
Queue() : _data(nullptr), _first(0), _last(0), _count(0), _size(0), _capacity(0) {}

~Queue()
{
services::daal_free(_data);
_data = nullptr;
clear();
}

Queue(const Queue &) = delete;
Queue & operator=(const Queue &) = delete;

bool init(size_t size)
{
clear();
if (size == 0) // Check for valid size
{
return false;
}

_first = _count = 0;
_last = _sizeMinus1 = (_size = size) - 1;
return ((_data = static_cast<T *>(service_malloc<T, cpu>(size * sizeof(T)))) != nullptr);
_data = static_cast<T *>(service_malloc<T, cpu>(size));

if (!_data) // Check if memory allocation was successful
{
return false;
}

_capacity = _size; // Initialize capacity
return true;
}

void clear()
{
daal_free(_data);
_data = nullptr;
if (_data)
{
daal::services::internal::service_free<T, cpu>(_data); // Free allocated memory if it exists
_data = nullptr;
}
_first = _last = _count = _size = _sizeMinus1 = _capacity = 0; // Reset state
}

void reset() { _first = _last = _count = 0; }

DAAL_FORCEINLINE void push(const T & value)
{
_data[_last = (_last + 1) & _sizeMinus1] = value;
if (_count >= _capacity) // Check if capacity is exceeded
{
services::Status status = grow(); // Grow if necessary
//DAAL_CHECK_STATUS_VAR(status);
}

_data[_last = (_last + 1) & _sizeMinus1] = value; // Add element to queue
++_count;
}

DAAL_FORCEINLINE T pop()
{
const T value = _data[_first++];
_first *= (_first != _size);
// if (empty()) // Check if queue is empty
// {
// throw std::underflow_error("Queue underflow: no elements to pop.");
// }

const T value = _data[_first++]; // Retrieve element
_first *= (_first != _size); // Reset first index if it reaches the end
--_count;
return value;
}
Expand All @@ -102,13 +135,36 @@ public:

size_t size() const { return _count; }


private:
services::Status grow()
{
int result = 0;
_capacity = (_capacity == 0 ? defaultSize : _capacity * 2); // Double capacity or set to default

T * const newData = daal::services::internal::service_malloc<T, cpu>(_capacity);
DAAL_CHECK_MALLOC(newData);

if (_data != nullptr)
{
result = services::internal::daal_memcpy_s(newData, _last * sizeof(T), _data, _last * sizeof(T));
daal::services::internal::service_free<T, cpu>(_data); // Free old data
_data = nullptr;
}

_data = newData; // Assign new expanded memory
_size = _capacity; // Adjust size to new capacity
_sizeMinus1 = _capacity - 1; // Update size minus 1 for wrapping
return (!result) ? services::Status() : services::Status(services::ErrorMemoryCopyFailedInternal);
}

T * _data;
size_t _first;
size_t _last;
size_t _count;
size_t _size;
size_t _sizeMinus1;
size_t _first; // Index of the first element
size_t _last; // Index of the last element
size_t _count; // Current number of elements
size_t _size; // Current size of the queue
size_t _sizeMinus1; // Helper for wrap-around logic
size_t _capacity; // Maximum capacity of the queue
};

struct BuildNode
Expand Down Expand Up @@ -161,15 +217,15 @@ Status KNNClassificationTrainBatchKernel<algorithmFpType, training::defaultDense
Queue<BuildNode, cpu> q;
BBox * bboxQ = nullptr;
auto oldThreads = services::Environment::getInstance()->getNumberOfThreads();
services::Environment::getInstance()->setNumberOfThreads(1);
DAAL_CHECK_STATUS(status, buildFirstPartOfKDTree(q, bboxQ, *x, *r, indexes, engine));
services::Environment::getInstance()->setNumberOfThreads(1);
DAAL_CHECK_STATUS(status, buildSecondPartOfKDTree(q, bboxQ, *x, *r, indexes, engine));
services::Environment::getInstance()->setNumberOfThreads(oldThreads);
DAAL_CHECK_STATUS(status, rearrangePoints(*x, indexes));
if (y)
{
DAAL_CHECK_STATUS(status, rearrangePoints(*y, indexes));
}
services::Environment::getInstance()->setNumberOfThreads(oldThreads);
daal_free(bboxQ);
bboxQ = nullptr;
return status;
Expand All @@ -185,10 +241,9 @@ Status KNNClassificationTrainBatchKernel<algorithmFpType, training::defaultDense
typedef daal::internal::MathInst<algorithmFpType, cpu> Math;
typedef BoundingBox<algorithmFpType> BBox;

const auto maxThreads = threader_get_threads_number();
const algorithmFpType base = 2.0;
const size_t queueSize =
2 * Math::sPowx(base, Math::sCeil(Math::sLog(__KDTREE_FIRST_PART_LEAF_NODES_PER_THREAD * maxThreads) / Math::sLog(base)));
2 * Math::sPowx(base, Math::sCeil(Math::sLog(__KDTREE_FIRST_PART_LEAF_NODES_PER_THREAD) / Math::sLog(base)));
const size_t firstPartLeafNodeCount = queueSize / 2;
q.init(queueSize);
const size_t xColumnCount = x.getNumberOfColumns();
Expand All @@ -198,7 +253,7 @@ Status KNNClassificationTrainBatchKernel<algorithmFpType, training::defaultDense
DAAL_OVERFLOW_CHECK_BY_MULTIPLICATION(size_t, queueSize, xColumnCount);
DAAL_OVERFLOW_CHECK_BY_MULTIPLICATION(size_t, bboxSize, sizeof(BBox));

bboxQ = static_cast<BBox *>(service_malloc<BBox, cpu>(bboxSize * sizeof(BBox), sizeof(BBox)));
bboxQ = static_cast<BBox *>(service_malloc<BBox, cpu>(bboxSize));

DAAL_CHECK_MALLOC(bboxQ)
r.impl()->setLastNodeIndex(0);
Expand All @@ -223,7 +278,7 @@ Status KNNClassificationTrainBatchKernel<algorithmFpType, training::defaultDense
size_t sophisticatedSampleIndexes[__KDTREE_DIMENSION_SELECTION_SIZE];
algorithmFpType sophisticatedSampleValues[__KDTREE_DIMENSION_SELECTION_SIZE];
const size_t subSampleCount = xRowCount / __KDTREE_SEARCH_SKIP + 1;
algorithmFpType * subSamples = static_cast<algorithmFpType *>(service_malloc<algorithmFpType, cpu>(subSampleCount * sizeof(algorithmFpType)));
algorithmFpType * subSamples = static_cast<algorithmFpType *>(service_malloc<algorithmFpType, cpu>(subSampleCount));
DAAL_CHECK_MALLOC(subSamples)

while (maxNodeCountForCurrentDepth < firstPartLeafNodeCount)
Expand Down Expand Up @@ -716,8 +771,8 @@ size_t KNNClassificationTrainBatchKernel<algorithmFpType, training::defaultDense
const auto blockCount = (end - start + rowsPerBlock - 1) / rowsPerBlock;
const auto idxMultiplier = 16; // For cache line separation.

size_t * leftSegmentStartPerBlock = static_cast<size_t *>(service_malloc<size_t, cpu>(idxMultiplier * (blockCount + 1) * sizeof(size_t)));
size_t * rightSegmentStartPerBlock = static_cast<size_t *>(service_malloc<size_t, cpu>(idxMultiplier * blockCount * sizeof(size_t)));
size_t * leftSegmentStartPerBlock = static_cast<size_t *>(service_malloc<size_t, cpu>(idxMultiplier * (blockCount + 1)));
size_t * rightSegmentStartPerBlock = static_cast<size_t *>(service_malloc<size_t, cpu>(idxMultiplier * blockCount));

if (!leftSegmentStartPerBlock || !rightSegmentStartPerBlock)
{
Expand Down Expand Up @@ -849,7 +904,7 @@ Status KNNClassificationTrainBatchKernel<algorithmFpType, training::defaultDense
(rx != wx) ?
wx :
(buffer ? buffer :
(buffer = static_cast<algorithmFpType *>(service_malloc<algorithmFpType, cpu>(xRowCount * sizeof(algorithmFpType)))));
(buffer = static_cast<algorithmFpType *>(service_malloc<algorithmFpType, cpu>(xRowCount))));
if (!awx)
{
status.add(services::ErrorMemoryAllocationFailed);
Expand Down Expand Up @@ -931,10 +986,10 @@ Status KNNClassificationTrainBatchKernel<algorithmFpType, training::defaultDense
const size_t xColumnCount = x.getNumberOfColumns();

const algorithmFpType base = 2.0;
const size_t expectedMaxDepth = (Math::sLog(xRowCount) / Math::sLog(base) + 1) * __KDTREE_DEPTH_MULTIPLICATION_FACTOR;
const size_t stackSize = Math::sPowx(base, Math::sCeil(Math::sLog(expectedMaxDepth) / Math::sLog(base)));
const size_t expectedMaxDepth = (Math::xsLog(xRowCount) / Math::xsLog(base) + 1) * __KDTREE_DEPTH_MULTIPLICATION_FACTOR;
const size_t stackSize = Math::xsPowx(base, Math::xsCeil(Math::xsLog(expectedMaxDepth) / Math::xsLog(base)));

BuildNode * bnQ = static_cast<BuildNode *>(service_malloc<BuildNode, cpu>(q.size() * sizeof(BuildNode)));
BuildNode * bnQ = static_cast<BuildNode *>(service_malloc<BuildNode, cpu>(q.size()));
DAAL_CHECK_MALLOC(bnQ)
size_t posQ = 0;
while (q.size() > 0)
Expand Down Expand Up @@ -972,7 +1027,7 @@ Status KNNClassificationTrainBatchKernel<algorithmFpType, training::defaultDense
const size_t maxNodeCount = kdTreeTable.getNumberOfRows();
const size_t emptyNodeCount = maxNodeCount - lastNodeIndex;
const size_t segment = (emptyNodeCount + maxThreads - 1) / maxThreads;
size_t * firstNodeIndex = static_cast<size_t *>(service_malloc<size_t, cpu>((maxThreads + 1) * sizeof(*firstNodeIndex)));
size_t * firstNodeIndex = static_cast<size_t *>(service_malloc<size_t, cpu>((maxThreads + 1)));
DAAL_CHECK_MALLOC(firstNodeIndex)
size_t nodeIndex = lastNodeIndex;
for (size_t i = 0; i < maxThreads; ++i)
Expand All @@ -991,7 +1046,7 @@ Status KNNClassificationTrainBatchKernel<algorithmFpType, training::defaultDense
if (!(((ptr->bboxes = service_scalable_calloc<BBox, cpu>(ptr->bboxesCapacity * xColumnCount)) != nullptr)
&& ((ptr->inSortValues = service_scalable_calloc<IdxValue, cpu>(__KDTREE_INDEX_VALUE_PAIRS_PER_THREAD)) != nullptr)
&& ((ptr->outSortValues = service_scalable_calloc<IdxValue, cpu>(__KDTREE_INDEX_VALUE_PAIRS_PER_THREAD)) != nullptr)
&& ((ptr->fixupQueue = static_cast<size_t *>(service_malloc<size_t, cpu>(ptr->fixupQueueCapacity * sizeof(size_t)))) != nullptr)
&& ((ptr->fixupQueue = static_cast<size_t *>(service_malloc<size_t, cpu>(ptr->fixupQueueCapacity))) != nullptr)
&& ptr->buildStack.init(stackSize)))
{
status.add(services::ErrorMemoryAllocationFailed);
Expand Down Expand Up @@ -1090,7 +1145,7 @@ Status KNNClassificationTrainBatchKernel<algorithmFpType, training::defaultDense
if (local->fixupQueueIndex >= local->fixupQueueCapacity)
{
const size_t newCapacity = local->fixupQueueCapacity * 2;
size_t * const newQueue = static_cast<size_t *>(service_malloc<size_t, cpu>(newCapacity * sizeof(size_t)));
size_t * const newQueue = static_cast<size_t *>(service_malloc<size_t, cpu>(newCapacity));
DAAL_CHECK_THR(newQueue, services::ErrorMemoryAllocationFailed);
result |= daal::services::internal::daal_memcpy_s(newQueue, newCapacity * sizeof(size_t), local->fixupQueue,
local->fixupQueueIndex * sizeof(size_t));
Expand Down Expand Up @@ -1129,13 +1184,13 @@ Status KNNClassificationTrainBatchKernel<algorithmFpType, training::defaultDense
local->extraKDTreeNodesCapacity > 0 ? local->extraKDTreeNodesCapacity * 2 : static_cast<size_t>(1024),
extraIndex + 1);
KDTreeNode * const newNodes =
static_cast<KDTreeNode *>(service_malloc<KDTreeNode, cpu>(newCapacity * sizeof(KDTreeNode)));
static_cast<KDTreeNode *>(service_malloc<KDTreeNode, cpu>(newCapacity));

DAAL_CHECK_THR(newNodes, services::ErrorMemoryAllocationFailed);

result |= daal::services::internal::daal_memcpy_s(newNodes, newCapacity * sizeof(KDTreeNode),
result |= daal::services::internal::daal_memcpy_s(newNodes, newCapacity,
local->extraKDTreeNodes,
local->extraKDTreeNodesCapacity * sizeof(KDTreeNode));
local->extraKDTreeNodesCapacity);
KDTreeNode * oldNodes = local->extraKDTreeNodes;
local->extraKDTreeNodes = newNodes;
local->extraKDTreeNodesCapacity = newCapacity;
Expand All @@ -1147,7 +1202,7 @@ Status KNNClassificationTrainBatchKernel<algorithmFpType, training::defaultDense
{
local->extraKDTreeNodesCapacity = max<cpu>(extraIndex + 1, static_cast<size_t>(1024));
local->extraKDTreeNodes = static_cast<KDTreeNode *>(
service_malloc<KDTreeNode, cpu>(local->extraKDTreeNodesCapacity * sizeof(KDTreeNode)));
service_malloc<KDTreeNode, cpu>(local->extraKDTreeNodesCapacity));

DAAL_CHECK_THR(local->extraKDTreeNodes, services::ErrorMemoryAllocationFailed);
}
Expand Down Expand Up @@ -1358,7 +1413,7 @@ algorithmFpType KNNClassificationTrainBatchKernel<algorithmFpType, training::def
sampleCount = __KDTREE_MIN_SAMPLES + 1;
}

algorithmFpType * samples = static_cast<algorithmFpType *>(service_malloc<algorithmFpType, cpu>(sampleCount * sizeof(*samples)));
algorithmFpType * samples = static_cast<algorithmFpType *>(service_malloc<algorithmFpType, cpu>(sampleCount));
if (!samples)
{
status = services::ErrorMemoryAllocationFailed;
Expand All @@ -1383,7 +1438,7 @@ algorithmFpType KNNClassificationTrainBatchKernel<algorithmFpType, training::def
samples[i] = upper;
daal::algorithms::internal::qSort<algorithmFpType, cpu>(sampleCount, samples);

size_t * hist = static_cast<size_t *>(service_malloc<size_t, cpu>(sampleCount * sizeof(*hist)));
size_t * hist = static_cast<size_t *>(service_malloc<size_t, cpu>(sampleCount));
if (!hist)
{
status = services::ErrorMemoryAllocationFailed;
Expand All @@ -1396,7 +1451,7 @@ algorithmFpType KNNClassificationTrainBatchKernel<algorithmFpType, training::def
}

size_t subSampleCount = (end - start) / __KDTREE_SEARCH_SKIP + 1;
algorithmFpType * subSamples = static_cast<algorithmFpType *>(service_malloc<algorithmFpType, cpu>(subSampleCount * sizeof(*subSamples)));
algorithmFpType * subSamples = static_cast<algorithmFpType *>(service_malloc<algorithmFpType, cpu>(subSampleCount));
if (!subSamples)
{
status = services::ErrorMemoryAllocationFailed;
Expand Down

0 comments on commit 94aab86

Please sign in to comment.