Skip to content

Commit

Permalink
19133: Reduces contention for concurrency, fixes rare slow memory leak (
Browse files Browse the repository at this point in the history
  • Loading branch information
howsohazard authored Jan 29, 2024
1 parent 2588ee4 commit 7cf40dc
Show file tree
Hide file tree
Showing 13 changed files with 304 additions and 151 deletions.
9 changes: 4 additions & 5 deletions src/Amalgam/KnnCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class KnnNonZeroDistanceQuerySBFCache
#ifdef MULTITHREAD_SUPPORT
if(run_concurrently && relevantIndices->size() > 1)
{
if(Concurrency::threadPool.AreThreadsAvailable())
auto enqueue_task_lock = Concurrency::threadPool.BeginEnqueueBatchTask();
if(enqueue_task_lock.AreThreadsAvailable())
{
std::vector<std::future<void>> indices_completed;
indices_completed.reserve(relevantIndices->size());
Expand All @@ -55,7 +56,7 @@ class KnnNonZeroDistanceQuerySBFCache
if(top_k > cachedNeighbors[index].size())
{
indices_completed.emplace_back(
Concurrency::threadPool.EnqueueSingleTask(
Concurrency::threadPool.EnqueueBatchTask(
[this, index, top_k]
{
// could have knn cache constructor take in dist params and just get top_k from there, so don't need to pass it in everywhere
Expand All @@ -67,13 +68,11 @@ class KnnNonZeroDistanceQuerySBFCache
}
}

Concurrency::threadPool.CountCurrentThreadAsPaused();
enqueue_task_lock.Unlock();

for(auto &future : indices_completed)
future.wait();

Concurrency::threadPool.CountCurrentThreadAsResumed();

return;
}
}
Expand Down
9 changes: 4 additions & 5 deletions src/Amalgam/SeparableBoxFilterDataStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,25 +144,24 @@ class SeparableBoxFilterDataStore
//if big enough (enough entities and/or enough columns), try to use multithreading
if(num_columns_added > 1 && (numEntities > 10000 || (numEntities > 200 && num_columns_added > 10)))
{
if(Concurrency::threadPool.AreThreadsAvailable())
auto enqueue_task_lock = Concurrency::threadPool.BeginEnqueueBatchTask();
if(enqueue_task_lock.AreThreadsAvailable())
{
std::vector<std::future<void>> columns_completed;
columns_completed.reserve(num_columns);

for(size_t i = num_previous_columns; i < num_columns; i++)
{
columns_completed.emplace_back(
Concurrency::threadPool.EnqueueSingleTask([this, &entities, i]() { BuildLabel(i, entities); })
Concurrency::threadPool.EnqueueBatchTask([this, &entities, i]() { BuildLabel(i, entities); })
);
}

Concurrency::threadPool.CountCurrentThreadAsPaused();
enqueue_task_lock.Unlock();

for(auto &future : columns_completed)
future.wait();

Concurrency::threadPool.CountCurrentThreadAsResumed();

return;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/Amalgam/ThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ ThreadPool::ThreadPool(size_t max_num_threads)
shutdownThreads = false;
ChangeThreadPoolSize(max_num_threads);

//there must be one active thread
numActiveThreads = 1;
//no active threads in the pool to start
numActiveThreads = 0;

mainThreadId = std::this_thread::get_id();
}
Expand Down
14 changes: 0 additions & 14 deletions src/Amalgam/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,20 +196,6 @@ class ThreadPool
return result;
}

//if a thread will be sitting waiting for other threads to complete, it can mark itself as inactive
// but it should call ResumeCurrentThread once ready again
inline void CountCurrentThreadAsPaused()
{
numActiveThreads--;
}

//if a thread will be sitting waiting for other threads to complete, it can mark itself as inactive via PauseCurrentThread
// and should call ResumeCurrentThread once ready again
inline void CountCurrentThreadAsResumed()
{
numActiveThreads++;
}

private:
//waits for all threads to complete, then shuts them down
void ShutdownAllThreads();
Expand Down
55 changes: 45 additions & 10 deletions src/Amalgam/evaluablenode/EvaluableNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -573,19 +573,55 @@ class EvaluableNode
}
}

//returns the last garbage collection iteration of this node, 0 if it has not been set before
constexpr uint8_t GetGarbageCollectionIteration()
//returns whether this node has been marked as known to be currently in use
constexpr bool GetKnownToBeInUse()
{
return attributes.individualAttribs.garbageCollectionIteration;
return attributes.individualAttribs.knownToBeInUse;
}

//sets the garbage collection iteration of this node, which defaults to 0
// values 1, 2, 3 are valid values
constexpr void SetGarbageCollectionIteration(uint8_t gc_collect_iteration)
//sets whether this node is currently known to be in use
constexpr void SetKnownToBeInUse(bool in_use)
{
attributes.individualAttribs.garbageCollectionIteration = gc_collect_iteration;
attributes.individualAttribs.knownToBeInUse = in_use;
}

#ifdef MULTITHREAD_SUPPORT
//returns whether this node has been marked as known to be currently in use
__forceinline bool GetKnownToBeInUseAtomic()
{
EvaluableNodeAttributesType attrib_with_known_true;
attrib_with_known_true.allAttributes = 0;
attrib_with_known_true.individualAttribs.knownToBeInUse = true;

//TODO 15993: once C++20 is widely supported, change type to atomic_ref
uint8_t all_attributes = reinterpret_cast<std::atomic<uint8_t>&>(attributes.allAttributes);
return (all_attributes & attrib_with_known_true.allAttributes);
}

//sets whether this node is currently known to be in use
__forceinline void SetKnownToBeInUseAtomic(bool in_use)
{
if(in_use)
{
EvaluableNodeAttributesType attrib_with_known_true;
attrib_with_known_true.allAttributes = 0;
attrib_with_known_true.individualAttribs.knownToBeInUse = true;

//TODO 15993: once C++20 is widely supported, change type to atomic_ref
reinterpret_cast<std::atomic<uint8_t>&>(attributes.allAttributes).fetch_or(attrib_with_known_true.allAttributes);
}
else
{
EvaluableNodeAttributesType attrib_with_known_false;
attrib_with_known_false.allAttributes = 0xFF;
attrib_with_known_false.individualAttribs.knownToBeInUse = false;

//TODO 15993: once C++20 is widely supported, change type to atomic_ref
reinterpret_cast<std::atomic<uint8_t>&>(attributes.allAttributes).fetch_and(attrib_with_known_false.allAttributes);
}
}
#endif

//returns the number of child nodes regardless of mapped or ordered
size_t GetNumChildNodes();

Expand Down Expand Up @@ -889,9 +925,8 @@ class EvaluableNode
bool isIdempotent : 1;
//if true, then the node is marked for concurrency
bool concurrent : 1;
//the iteration used for garbage collection; an EvaluableNode should be initialized to 0,
// and values 1-3 are reserved for garbage collection cycles
uint8_t garbageCollectionIteration : 2;
//if true, then known to be in use with regard to garbage collection
bool knownToBeInUse : 1;
} individualAttribs;
};
#pragma pack(pop)
Expand Down
Loading

0 comments on commit 7cf40dc

Please sign in to comment.