Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Task to make it smaller #72

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions redGrapes/TaskFreeCtx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
#include "redGrapes/memory/hwloc_alloc.hpp"
#include "redGrapes/sync/cv.hpp"

#include <cstdint>
#include <functional>
#include <memory>
#include <optional>
#include <vector>

namespace redGrapes
{

using WorkerId = unsigned;
using WorkerId = uint8_t;

/** WorkerID of parser to wake it up
* ID 0,1,2... are used for worker threads
Expand All @@ -43,10 +43,11 @@ namespace redGrapes

struct TaskFreeCtx
{
static inline unsigned n_workers;
static inline unsigned n_pus;
static inline HwlocContext hwloc_ctx;
static inline std::shared_ptr<WorkerAllocPool> worker_alloc_pool;
static inline WorkerId n_pus{
static_cast<WorkerId>(hwloc_get_nbobjs_by_type(hwloc_ctx.topology, HWLOC_OBJ_PU))};
static inline WorkerId n_workers;
static inline WorkerAllocPool worker_alloc_pool;
static inline CondVar cv{0};

static inline std::function<void()> idle = []
Expand Down
11 changes: 5 additions & 6 deletions redGrapes/dispatch/thread/worker_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ namespace redGrapes
inline std::optional<T> probe_worker_by_state(
F&& f,
bool expected_worker_state,
unsigned start_worker_idx,
WorkerId start_worker_idx,
bool exclude_start = true)
{
return worker_state.template probe_by_value<T, F>(
Expand All @@ -100,7 +100,7 @@ namespace redGrapes
TTask* steal_new_task(Worker& worker)
{
std::optional<TTask*> task = probe_worker_by_state<TTask*>(
[&worker, this](unsigned idx) -> std::optional<TTask*>
[&worker, this](WorkerId idx) -> std::optional<TTask*>
{
// we have a candidate of a busy worker,
// now check its queue
Expand Down Expand Up @@ -131,7 +131,7 @@ namespace redGrapes
TTask* steal_ready_task(Worker& worker)
{
std::optional<TTask*> task = probe_worker_by_state<TTask*>(
[&worker, this](unsigned idx) -> std::optional<TTask*>
[&worker, this](WorkerId idx) -> std::optional<TTask*>
{
// we have a candidate of a busy worker,
// now check its queue
Expand Down Expand Up @@ -160,7 +160,7 @@ namespace redGrapes
// @return task if a new task was found, nullptr otherwise
TTask* steal_task(Worker& worker)
{
unsigned local_worker_id = worker.id - m_base_id;
WorkerId local_worker_id = worker.id - m_base_id;

SPDLOG_DEBUG("steal task for worker {}", local_worker_id);

Expand Down Expand Up @@ -188,9 +188,8 @@ namespace redGrapes

private:
std::vector<std::shared_ptr<dispatch::thread::WorkerThread<Worker>>> workers;
HwlocContext* hwloc_ctx_p;
AtomicBitfield worker_state;
unsigned int num_workers;
WorkerId num_workers;
WorkerId m_base_id;
};

Expand Down
10 changes: 5 additions & 5 deletions redGrapes/dispatch/thread/worker_pool.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ namespace redGrapes
SPDLOG_DEBUG("populate WorkerPool with {} workers", num_workers);
for(size_t worker_id = base_id; worker_id < base_id + num_workers; ++worker_id)
{
unsigned pu_id = worker_id % TaskFreeCtx::n_pus;
WorkerId pu_id = worker_id % TaskFreeCtx::n_pus;
// allocate worker with id `i` on arena `i`,
hwloc_obj_t obj = hwloc_get_obj_by_type(TaskFreeCtx::hwloc_ctx.topology, HWLOC_OBJ_PU, pu_id);
TaskFreeCtx::worker_alloc_pool->allocs.emplace_back(
TaskFreeCtx::worker_alloc_pool.allocs.emplace_back(
memory::HwlocAlloc(TaskFreeCtx::hwloc_ctx, obj),
REDGRAPES_ALLOC_CHUNKSIZE);

Expand Down Expand Up @@ -82,12 +82,12 @@ namespace redGrapes

SPDLOG_TRACE("find worker...");

unsigned start_idx = 0;
WorkerId start_idx = 0;
if(TaskFreeCtx::current_worker_id)
start_idx = *TaskFreeCtx::current_worker_id - m_base_id;

std::optional<unsigned> idx = this->probe_worker_by_state<unsigned>(
[this](unsigned idx) -> std::optional<unsigned>
std::optional<WorkerId> idx = this->probe_worker_by_state<WorkerId>(
[this](WorkerId idx) -> std::optional<WorkerId>
{
if(set_worker_state(idx, WorkerState::BUSY))
return idx;
Expand Down
4 changes: 2 additions & 2 deletions redGrapes/memory/allocator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ namespace redGrapes

Block allocate(size_t n_bytes)
{
return TaskFreeCtx::worker_alloc_pool->get_alloc(worker_id).allocate(n_bytes);
return TaskFreeCtx::worker_alloc_pool.get_alloc(worker_id).allocate(n_bytes);
}

void deallocate(Block blk)
{
TaskFreeCtx::worker_alloc_pool->get_alloc(worker_id).deallocate(blk);
TaskFreeCtx::worker_alloc_pool.get_alloc(worker_id).deallocate(blk);
}
};

Expand Down
6 changes: 2 additions & 4 deletions redGrapes/redGrapes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,13 @@ namespace redGrapes
TaskFreeCtx::n_workers
= std::apply([](auto... args) { return (args.scheduler->n_workers + ...); }, execDescTuple);

TaskFreeCtx::n_pus = hwloc_get_nbobjs_by_type(TaskFreeCtx::hwloc_ctx.topology, HWLOC_OBJ_PU);
if(TaskFreeCtx::n_workers > TaskFreeCtx::n_pus)
SPDLOG_WARN(
"{} worker-threads requested, but only {} PUs available!",
TaskFreeCtx::n_workers,
TaskFreeCtx::n_pus);

TaskFreeCtx::worker_alloc_pool = std::make_shared<WorkerAllocPool>();
TaskFreeCtx::worker_alloc_pool->allocs.reserve(TaskFreeCtx::n_workers);
TaskFreeCtx::worker_alloc_pool.allocs.reserve(TaskFreeCtx::n_workers);

TaskCtx<RGTask>::root_space = std::make_shared<TaskSpace<RGTask>>();

Expand All @@ -65,7 +63,7 @@ namespace redGrapes
scheduler->init(base_worker_id);
base_worker_id = base_worker_id + scheduler->n_workers;
};
unsigned base_worker_id = 0;
WorkerId base_worker_id = 0;
std::apply(
[&base_worker_id, initAdd](auto... args) { ((initAdd(args.scheduler, base_worker_id)), ...); },
execDescTuple);
Expand Down
29 changes: 15 additions & 14 deletions redGrapes/resource/resource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,37 +34,38 @@
namespace redGrapes
{

using ResourceID = uint16_t;

template<typename TTask, typename AccessPolicy>
class Resource;

template<typename TTask>
class ResourceBase
{
protected:
static unsigned int generateID()
static ResourceID generateID()
{
static std::atomic<unsigned int> id_counter;
static std::atomic<ResourceID> id_counter;
return id_counter.fetch_add(1);
}

public:
unsigned int id;
unsigned int scope_level;

SpinLock users_mutex;
ChunkedList<TTask*, REDGRAPES_RUL_CHUNKSIZE> users;
SpinLock users_mutex;
ResourceID id;
uint8_t scope_level;

/**
* Create a new resource with an unused ID.
*/
ResourceBase()
: id(generateID())
: users(memory::Allocator(get_arena_id()))
, id(generateID())
, scope_level(TaskCtx<TTask>::scope_depth())
, users(memory::Allocator(get_arena_id()))
{
}

unsigned get_arena_id() const
WorkerId get_arena_id() const
{
return id % TaskFreeCtx::n_workers;
}
Expand Down Expand Up @@ -158,7 +159,7 @@ namespace redGrapes
return this->obj->resource->scope_level;
}

unsigned int resource_id() const
ResourceID resource_id() const
{
return this->obj->resource->id;
}
Expand Down Expand Up @@ -267,7 +268,7 @@ namespace redGrapes

Access(Access&& other)
: ResourceAccess<TTask>::AccessBase(
std::move(std::forward<ResourceAccess<TTask>::AccessBase>(other))) // TODO check this
std::move(std::forward<ResourceAccess<TTask>::AccessBase>(other))) // TODO check this
, policy(std::move(other.policy))
{
}
Expand Down Expand Up @@ -322,10 +323,10 @@ namespace redGrapes
public:
Resource()
{
static unsigned i = 0;
static ResourceID i = 0;

WorkerId worker_id = i++ % TaskFreeCtx::n_workers;
base = redGrapes::memory::alloc_shared_bind<ResourceBase<TTask>>(worker_id);
i = i++ % TaskFreeCtx::n_workers;
base = redGrapes::memory::alloc_shared_bind<ResourceBase<TTask>>(i);
}

/**
Expand Down
3 changes: 1 addition & 2 deletions redGrapes/resource/resource_user.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,9 @@ namespace redGrapes
return false;
}

uint8_t scope_level;

ChunkedList<ResourceAccess<TTask>, 8> access_list;
ChunkedList<ResourceUsageEntry<TTask>, 8> unique_resources;
uint8_t scope_level;
}; // struct ResourceUser

} // namespace redGrapes
Expand Down
4 changes: 2 additions & 2 deletions redGrapes/scheduler/cuda_thread_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ namespace redGrapes
// TODO check if it was already initalized
if(!this->m_worker_thread)
{
unsigned pu_id = base_id % TaskFreeCtx::n_pus;
WorkerId pu_id = base_id % TaskFreeCtx::n_pus;
// allocate worker with id `i` on arena `i`,
hwloc_obj_t obj = hwloc_get_obj_by_type(TaskFreeCtx::hwloc_ctx.topology, HWLOC_OBJ_PU, pu_id);
TaskFreeCtx::worker_alloc_pool->allocs.emplace_back(
TaskFreeCtx::worker_alloc_pool.allocs.emplace_back(
memory::HwlocAlloc(TaskFreeCtx::hwloc_ctx, obj),
REDGRAPES_ALLOC_CHUNKSIZE);

Expand Down
12 changes: 6 additions & 6 deletions redGrapes/scheduler/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ namespace redGrapes
template<typename TTask>
struct EventPtr
{
enum EventPtrTag tag;
TTask* task;
std::shared_ptr<Event<TTask>> external_event;
TTask* task;
enum EventPtrTag tag = T_UNINITIALIZED;

inline bool operator==(EventPtr<TTask> const& other) const
{
Expand Down Expand Up @@ -84,16 +84,16 @@ namespace redGrapes
template<typename TTask>
struct Event
{
//! the set of subsequent events
ChunkedList<EventPtr<TTask>, REDGRAPES_EVENT_FOLLOWER_LIST_CHUNKSIZE> followers;

/*! number of incoming edges
* state == 0: event is reached and can be removed
*/
std::atomic_uint16_t state;

std::atomic<uint16_t> state;
//! waker that is waiting for this event
WakerId waker_id;

//! the set of subsequent events
ChunkedList<EventPtr<TTask>, REDGRAPES_EVENT_FOLLOWER_LIST_CHUNKSIZE> followers;

Event();
Event(Event&);
Expand Down
10 changes: 5 additions & 5 deletions redGrapes/scheduler/pool_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ namespace redGrapes
{
using TTask = Worker::task_type;
WorkerId m_base_id;
unsigned n_workers;
std::shared_ptr<dispatch::thread::WorkerPool<Worker>> m_worker_pool;
WorkerId n_workers;
dispatch::thread::WorkerPool<Worker> m_worker_pool;

PoolScheduler(unsigned num_workers);
PoolScheduler(std::shared_ptr<dispatch::thread::WorkerPool<Worker>> workerPool);
PoolScheduler(WorkerId num_workers);
PoolScheduler(dispatch::thread::WorkerPool<Worker> workerPool);

/* send the new task to a worker
*/
Expand All @@ -53,7 +53,7 @@ namespace redGrapes
*/
void wake_all();

unsigned getNextWorkerID();
WorkerId getNextWorkerID();

void init(WorkerId base_id);

Expand Down
Loading
Loading