Skip to content

Commit

Permalink
pthreads specific thread implementation to set stack size (#411)
Browse files Browse the repository at this point in the history
* first implementation of own wrapper around pthreads

fixes github issue #410

* PThread wrapper uses interface similar to std::thread

* also set stack guard size for debug builds

* cleanup

* move function call outside of CHECK()
  • Loading branch information
K-os authored Aug 7, 2024
1 parent c306b2c commit ab3733e
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 10 deletions.
90 changes: 83 additions & 7 deletions source/Lib/Utilities/NoMallocThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ POSSIBILITY OF SUCH DAMAGE.

#include "NoMallocThreadPool.h"


#if __linux
#include <pthread.h>
#ifdef HAVE_PTHREADS
# include <pthread.h>
# define THREAD_MIN_STACK_SIZE 1024 * 1024
#endif


//! \ingroup Utilities
//! \{

Expand All @@ -63,12 +64,15 @@ thread_local std::unique_ptr<TProfiler> ptls;

NoMallocThreadPool::NoMallocThreadPool( int numThreads, const char * threadPoolName, const VVEncCfg* encCfg )
: m_poolName( threadPoolName )
, m_threads ( numThreads < 0 ? std::thread::hardware_concurrency() : numThreads )
{
int tid = 0;
for( auto& t: m_threads )
if( numThreads < 0 )
{
t = std::thread( &NoMallocThreadPool::threadProc, this, tid++, *encCfg );
numThreads = std::thread::hardware_concurrency();
}

for( int i = 0; i < numThreads; ++i )
{
m_threads.emplace_back( &NoMallocThreadPool::threadProc, this, i, *encCfg );
}
}

Expand Down Expand Up @@ -270,6 +274,78 @@ bool NoMallocThreadPool::processTask( int threadId, NoMallocThreadPool::Slot& ta
return true;
}

#ifdef HAVE_PTHREADS

template<class TFunc, class... TArgs>
NoMallocThreadPool::PThread::PThread( TFunc&& func, TArgs&&... args )
{
using WrappedCall = std::function<void()>;
std::unique_ptr<WrappedCall> call = std::make_unique<WrappedCall>( std::bind( func, args... ) );

using PThreadsStartFn = void* (*) ( void* );
PThreadsStartFn threadFn = []( void* p ) -> void*
{
std::unique_ptr<WrappedCall> call( static_cast<WrappedCall*>( p ) );

( *call )();

return nullptr;
};

pthread_attr_t attr;
int ret = pthread_attr_init( &attr );
CHECK( ret != 0, "pthread_attr_init() failed" );

try
{
size_t currStackSize = 0;
ret = pthread_attr_getstacksize( &attr, &currStackSize );
CHECK( ret != 0, "pthread_attr_getstacksize() failed" );

if( currStackSize < THREAD_MIN_STACK_SIZE )
{
ret = pthread_attr_setstacksize( &attr, THREAD_MIN_STACK_SIZE );
CHECK( ret != 0, "pthread_attr_setstacksize() failed" );

# ifdef _DEBUG
ret = pthread_attr_setguardsize( &attr, 1024 * 1024 ); // set stack guard size to 1MB to more reliably deteck stack overflows
CHECK( ret != 0, "pthread_attr_setguardsize() failed" );
# endif
}
m_joinable = 0 == pthread_create( &m_id, &attr, threadFn, call.get() );
CHECK( !m_joinable, "pthread_create() faild" );

call.release(); // will now be freed by the thread

pthread_attr_destroy( &attr );
}
catch( ... )
{
pthread_attr_destroy( &attr );
throw;
}
}

NoMallocThreadPool::PThread& NoMallocThreadPool::PThread::operator=( PThread&& other )
{
m_id = other.m_id;
m_joinable = other.m_joinable;
other.m_id = 0;
other.m_joinable = false;
return *this;
}

void NoMallocThreadPool::PThread::join()
{
if( m_joinable )
{
m_joinable = false;
pthread_join( m_id, nullptr );
}
}

#endif // HAVE_PTHREADS

} // namespace vvenc

//! \}
Expand Down
33 changes: 30 additions & 3 deletions source/Lib/Utilities/NoMallocThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include <condition_variable>
#include <atomic>
#include <chrono>
#include <iostream>
#include <array>

#include "CommonLib/CommonDef.h"
Expand Down Expand Up @@ -366,7 +365,6 @@ class NoMallocThreadPool
Iterator grow()
{
std::unique_lock<std::mutex> l( m_resizeMutex ); // prevent concurrent growth of the queue. Read access while growing is no problem
// std::cerr << __PRETTY_FUNCTION__ << std::endl;

m_lastChunk->m_next = new Chunk( &m_firstChunk );
m_lastChunk = m_lastChunk->m_next;
Expand Down Expand Up @@ -485,13 +483,42 @@ class NoMallocThreadPool
#endif

private:
#ifdef HAVE_PTHREADS
struct PThread
{
PThread() = default;
~PThread() = default;

PThread( const PThread& ) = delete;
PThread& operator=( const PThread& ) = delete;

PThread( PThread&& other ) { *this = std::move( other ); };
PThread& operator=( PThread&& other );

template<class TFunc, class... TArgs>
PThread( TFunc&& func, TArgs&&... args );

bool joinable() { return m_joinable; }
void join();

private:
pthread_t m_id = 0;
bool m_joinable = false;
};
#endif // HAVE_PTHREADS

#if HAVE_PTHREADS
using ThreadImpl = PThread;
#else
using ThreadImpl = std::thread;
#endif

using TaskIterator = ChunkedTaskQueue::Iterator;

// members
std::string m_poolName;
std::atomic_bool m_exitThreads{ false };
std::vector<std::thread> m_threads;
std::vector<ThreadImpl> m_threads;
ChunkedTaskQueue m_tasks;
TaskIterator m_nextFillSlot = m_tasks.begin();
#if ADD_TASK_THREAD_SAFE
Expand Down
4 changes: 4 additions & 0 deletions source/Lib/vvenc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ else()
target_link_libraries( ${LIB_NAME} PRIVATE Threads::Threads )
endif()

if( CMAKE_USE_PTHREADS_INIT )
target_compile_definitions( ${LIB_NAME} PRIVATE HAVE_PTHREADS )
endif()

# set the folder where to place the projects
set_target_properties( ${LIB_NAME} PROPERTIES
VERSION ${PROJECT_VERSION}
Expand Down

0 comments on commit ab3733e

Please sign in to comment.