Skip to content

Commit

Permalink
Merge pull request #72 from ikbuibui/test_smaller_task
Browse files Browse the repository at this point in the history
Refactor Task to make it smaller
  • Loading branch information
ikbuibui authored May 24, 2024
2 parents 5227f14 + a782775 commit 20679a5
Show file tree
Hide file tree
Showing 18 changed files with 95 additions and 97 deletions.
11 changes: 6 additions & 5 deletions redGrapes/TaskFreeCtx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
#include "redGrapes/memory/hwloc_alloc.hpp"
#include "redGrapes/sync/cv.hpp"

#include <cstdint>
#include <functional>
#include <memory>
#include <optional>
#include <vector>

namespace redGrapes
{

using WorkerId = unsigned;
using WorkerId = uint8_t;

/** WorkerID of parser to wake it up
* ID 0,1,2... are used for worker threads
Expand All @@ -43,10 +43,11 @@ namespace redGrapes

struct TaskFreeCtx
{
static inline unsigned n_workers;
static inline unsigned n_pus;
static inline HwlocContext hwloc_ctx;
static inline std::shared_ptr<WorkerAllocPool> worker_alloc_pool;
static inline WorkerId n_pus{
static_cast<WorkerId>(hwloc_get_nbobjs_by_type(hwloc_ctx.topology, HWLOC_OBJ_PU))};
static inline WorkerId n_workers;
static inline WorkerAllocPool worker_alloc_pool;
static inline CondVar cv{0};

static inline std::function<void()> idle = []
Expand Down
11 changes: 5 additions & 6 deletions redGrapes/dispatch/thread/worker_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ namespace redGrapes
inline std::optional<T> probe_worker_by_state(
F&& f,
bool expected_worker_state,
unsigned start_worker_idx,
WorkerId start_worker_idx,
bool exclude_start = true)
{
return worker_state.template probe_by_value<T, F>(
Expand All @@ -100,7 +100,7 @@ namespace redGrapes
TTask* steal_new_task(Worker& worker)
{
std::optional<TTask*> task = probe_worker_by_state<TTask*>(
[&worker, this](unsigned idx) -> std::optional<TTask*>
[&worker, this](WorkerId idx) -> std::optional<TTask*>
{
// we have a candidate of a busy worker,
// now check its queue
Expand Down Expand Up @@ -131,7 +131,7 @@ namespace redGrapes
TTask* steal_ready_task(Worker& worker)
{
std::optional<TTask*> task = probe_worker_by_state<TTask*>(
[&worker, this](unsigned idx) -> std::optional<TTask*>
[&worker, this](WorkerId idx) -> std::optional<TTask*>
{
// we have a candidate of a busy worker,
// now check its queue
Expand Down Expand Up @@ -160,7 +160,7 @@ namespace redGrapes
// @return task if a new task was found, nullptr otherwise
TTask* steal_task(Worker& worker)
{
unsigned local_worker_id = worker.id - m_base_id;
WorkerId local_worker_id = worker.id - m_base_id;

SPDLOG_DEBUG("steal task for worker {}", local_worker_id);

Expand Down Expand Up @@ -188,9 +188,8 @@ namespace redGrapes

private:
std::vector<std::shared_ptr<dispatch::thread::WorkerThread<Worker>>> workers;
HwlocContext* hwloc_ctx_p;
AtomicBitfield worker_state;
unsigned int num_workers;
WorkerId num_workers;
WorkerId m_base_id;
};

Expand Down
10 changes: 5 additions & 5 deletions redGrapes/dispatch/thread/worker_pool.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ namespace redGrapes
SPDLOG_DEBUG("populate WorkerPool with {} workers", num_workers);
for(size_t worker_id = base_id; worker_id < base_id + num_workers; ++worker_id)
{
unsigned pu_id = worker_id % TaskFreeCtx::n_pus;
WorkerId pu_id = worker_id % TaskFreeCtx::n_pus;
// allocate worker with id `i` on arena `i`,
hwloc_obj_t obj = hwloc_get_obj_by_type(TaskFreeCtx::hwloc_ctx.topology, HWLOC_OBJ_PU, pu_id);
TaskFreeCtx::worker_alloc_pool->allocs.emplace_back(
TaskFreeCtx::worker_alloc_pool.allocs.emplace_back(
memory::HwlocAlloc(TaskFreeCtx::hwloc_ctx, obj),
REDGRAPES_ALLOC_CHUNKSIZE);

Expand Down Expand Up @@ -82,12 +82,12 @@ namespace redGrapes

SPDLOG_TRACE("find worker...");

unsigned start_idx = 0;
WorkerId start_idx = 0;
if(TaskFreeCtx::current_worker_id)
start_idx = *TaskFreeCtx::current_worker_id - m_base_id;

std::optional<unsigned> idx = this->probe_worker_by_state<unsigned>(
[this](unsigned idx) -> std::optional<unsigned>
std::optional<WorkerId> idx = this->probe_worker_by_state<WorkerId>(
[this](WorkerId idx) -> std::optional<WorkerId>
{
if(set_worker_state(idx, WorkerState::BUSY))
return idx;
Expand Down
4 changes: 2 additions & 2 deletions redGrapes/memory/allocator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ namespace redGrapes

Block allocate(size_t n_bytes)
{
return TaskFreeCtx::worker_alloc_pool->get_alloc(worker_id).allocate(n_bytes);
return TaskFreeCtx::worker_alloc_pool.get_alloc(worker_id).allocate(n_bytes);
}

void deallocate(Block blk)
{
TaskFreeCtx::worker_alloc_pool->get_alloc(worker_id).deallocate(blk);
TaskFreeCtx::worker_alloc_pool.get_alloc(worker_id).deallocate(blk);
}
};

Expand Down
6 changes: 2 additions & 4 deletions redGrapes/redGrapes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,13 @@ namespace redGrapes
TaskFreeCtx::n_workers
= std::apply([](auto... args) { return (args.scheduler->n_workers + ...); }, execDescTuple);

TaskFreeCtx::n_pus = hwloc_get_nbobjs_by_type(TaskFreeCtx::hwloc_ctx.topology, HWLOC_OBJ_PU);
if(TaskFreeCtx::n_workers > TaskFreeCtx::n_pus)
SPDLOG_WARN(
"{} worker-threads requested, but only {} PUs available!",
TaskFreeCtx::n_workers,
TaskFreeCtx::n_pus);

TaskFreeCtx::worker_alloc_pool = std::make_shared<WorkerAllocPool>();
TaskFreeCtx::worker_alloc_pool->allocs.reserve(TaskFreeCtx::n_workers);
TaskFreeCtx::worker_alloc_pool.allocs.reserve(TaskFreeCtx::n_workers);

TaskCtx<RGTask>::root_space = std::make_shared<TaskSpace<RGTask>>();

Expand All @@ -65,7 +63,7 @@ namespace redGrapes
scheduler->init(base_worker_id);
base_worker_id = base_worker_id + scheduler->n_workers;
};
unsigned base_worker_id = 0;
WorkerId base_worker_id = 0;
std::apply(
[&base_worker_id, initAdd](auto... args) { ((initAdd(args.scheduler, base_worker_id)), ...); },
execDescTuple);
Expand Down
29 changes: 15 additions & 14 deletions redGrapes/resource/resource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,37 +34,38 @@
namespace redGrapes
{

using ResourceID = uint16_t;

template<typename TTask, typename AccessPolicy>
class Resource;

template<typename TTask>
class ResourceBase
{
protected:
static unsigned int generateID()
static ResourceID generateID()
{
static std::atomic<unsigned int> id_counter;
static std::atomic<ResourceID> id_counter;
return id_counter.fetch_add(1);
}

public:
unsigned int id;
unsigned int scope_level;

SpinLock users_mutex;
ChunkedList<TTask*, REDGRAPES_RUL_CHUNKSIZE> users;
SpinLock users_mutex;
ResourceID id;
uint8_t scope_level;

/**
* Create a new resource with an unused ID.
*/
ResourceBase()
: id(generateID())
: users(memory::Allocator(get_arena_id()))
, id(generateID())
, scope_level(TaskCtx<TTask>::scope_depth())
, users(memory::Allocator(get_arena_id()))
{
}

unsigned get_arena_id() const
WorkerId get_arena_id() const
{
return id % TaskFreeCtx::n_workers;
}
Expand Down Expand Up @@ -158,7 +159,7 @@ namespace redGrapes
return this->obj->resource->scope_level;
}

unsigned int resource_id() const
ResourceID resource_id() const
{
return this->obj->resource->id;
}
Expand Down Expand Up @@ -267,7 +268,7 @@ namespace redGrapes

Access(Access&& other)
: ResourceAccess<TTask>::AccessBase(
std::move(std::forward<ResourceAccess<TTask>::AccessBase>(other))) // TODO check this
std::move(std::forward<ResourceAccess<TTask>::AccessBase>(other))) // TODO check this
, policy(std::move(other.policy))
{
}
Expand Down Expand Up @@ -322,10 +323,10 @@ namespace redGrapes
public:
Resource()
{
static unsigned i = 0;
static ResourceID i = 0;

WorkerId worker_id = i++ % TaskFreeCtx::n_workers;
base = redGrapes::memory::alloc_shared_bind<ResourceBase<TTask>>(worker_id);
i = i++ % TaskFreeCtx::n_workers;
base = redGrapes::memory::alloc_shared_bind<ResourceBase<TTask>>(i);
}

/**
Expand Down
3 changes: 1 addition & 2 deletions redGrapes/resource/resource_user.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,9 @@ namespace redGrapes
return false;
}

uint8_t scope_level;

ChunkedList<ResourceAccess<TTask>, 8> access_list;
ChunkedList<ResourceUsageEntry<TTask>, 8> unique_resources;
uint8_t scope_level;
}; // struct ResourceUser

} // namespace redGrapes
Expand Down
4 changes: 2 additions & 2 deletions redGrapes/scheduler/cuda_thread_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ namespace redGrapes
// TODO check if it was already initalized
if(!this->m_worker_thread)
{
unsigned pu_id = base_id % TaskFreeCtx::n_pus;
WorkerId pu_id = base_id % TaskFreeCtx::n_pus;
// allocate worker with id `i` on arena `i`,
hwloc_obj_t obj = hwloc_get_obj_by_type(TaskFreeCtx::hwloc_ctx.topology, HWLOC_OBJ_PU, pu_id);
TaskFreeCtx::worker_alloc_pool->allocs.emplace_back(
TaskFreeCtx::worker_alloc_pool.allocs.emplace_back(
memory::HwlocAlloc(TaskFreeCtx::hwloc_ctx, obj),
REDGRAPES_ALLOC_CHUNKSIZE);

Expand Down
12 changes: 6 additions & 6 deletions redGrapes/scheduler/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ namespace redGrapes
template<typename TTask>
struct EventPtr
{
enum EventPtrTag tag;
TTask* task;
std::shared_ptr<Event<TTask>> external_event;
TTask* task;
enum EventPtrTag tag = T_UNINITIALIZED;

inline bool operator==(EventPtr<TTask> const& other) const
{
Expand Down Expand Up @@ -84,16 +84,16 @@ namespace redGrapes
template<typename TTask>
struct Event
{
//! the set of subsequent events
ChunkedList<EventPtr<TTask>, REDGRAPES_EVENT_FOLLOWER_LIST_CHUNKSIZE> followers;

/*! number of incoming edges
* state == 0: event is reached and can be removed
*/
std::atomic_uint16_t state;

std::atomic<uint16_t> state;
//! waker that is waiting for this event
WakerId waker_id;

//! the set of subsequent events
ChunkedList<EventPtr<TTask>, REDGRAPES_EVENT_FOLLOWER_LIST_CHUNKSIZE> followers;

Event();
Event(Event&);
Expand Down
10 changes: 5 additions & 5 deletions redGrapes/scheduler/pool_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ namespace redGrapes
{
using TTask = Worker::task_type;
WorkerId m_base_id;
unsigned n_workers;
std::shared_ptr<dispatch::thread::WorkerPool<Worker>> m_worker_pool;
WorkerId n_workers;
dispatch::thread::WorkerPool<Worker> m_worker_pool;

PoolScheduler(unsigned num_workers);
PoolScheduler(std::shared_ptr<dispatch::thread::WorkerPool<Worker>> workerPool);
PoolScheduler(WorkerId num_workers);
PoolScheduler(dispatch::thread::WorkerPool<Worker> workerPool);

/* send the new task to a worker
*/
Expand All @@ -53,7 +53,7 @@ namespace redGrapes
*/
void wake_all();

unsigned getNextWorkerID();
WorkerId getNextWorkerID();

void init(WorkerId base_id);

Expand Down
Loading

0 comments on commit 20679a5

Please sign in to comment.