Skip to content

Commit

Permalink
Added methods for working with external async task systems
Browse files Browse the repository at this point in the history
  • Loading branch information
gsnook committed May 24, 2023
1 parent 2bcb38a commit d4461b9
Show file tree
Hide file tree
Showing 18 changed files with 3,061 additions and 80 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ bazel-genfiles
bazel-out
bazel-testlogs
**/CMakeFiles/*
.vs
out
18 changes: 18 additions & 0 deletions docs/Systems.md
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,24 @@ By default systems are created as single threaded. Single threaded systems are a

The way the scheduler ensures that the same entities are processed by the same threads is by slicing up the entities in a table into N slices, where N is the number of threads. For a table that has 1000 entities, the first thread will process entities 0..249, thread 2 250..499, thread 3 500..749 and thread 4 entities 750..999. For more details on this behavior, see `ecs_worker_iter`/`flecs::iterable::worker_iter`.

### Threading with Async Tasks
Systems in Flecs can also be multithreaded using an external asynchronous task system. Instead of creating regular worker threads using `set_threads`, use the `set_task_threads` function and provide the API callbacks to create and wait for task completion using your job system.
This can be helpful when using Flecs within an application which already has a job queue system to handle multithreaded tasks.

```c
ecs_set_tasks_threads(world, 4, <create_task_callback>, <wait_task_callback>); // Create 4 worker task threads for the duration of each ecs_progress update
```
```cpp
world.set_task_threads(4, <create_task_callback>, <wait_task_callback>);
```

For simplicity, these task callbacks use the same format as Flecs `ecs_os_api_t` thread APIs. In fact, you could provide your regular os thread api functions to create short-duration threads for multithreaded system processing.
Create multithreaded systems using the `multi_threaded` flag as with `ecs_set_threads` above.

When `ecs_progress` is called, your `<create_task_callback>` will be called once for every task thread needed to create task threads on demand. When `ecs_progress` is complete, your `<wait_task_callback>` function will be called to clean up each task thread.
By providing callback functions which create and remove tasks for your specific asynchronous task system, you can use Flecs with any kind of async task management scheme.
The only limitation is that your async task manager must be able to create and execute the number of simultaneous tasks specified in `ecs_set_task_threads`.

## Timers
When running a pipeline, systems are ran each time `progress()` is called. The `FLECS_TIMER` addon makes it possible to run systems at a specific time interval or rate.

Expand Down
164 changes: 132 additions & 32 deletions flecs.c
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,8 @@ struct ecs_world_t {
ecs_os_mutex_t sync_mutex; /* Mutex for job_cond */
int32_t workers_running; /* Number of threads running */
int32_t workers_waiting; /* Number of workers waiting on sync */
ecs_os_api_thread_new_t task_worker_new; /* override to create task workers. 0= unused */
ecs_os_api_thread_join_t task_worker_join; /* override to join task workers. 0= unused */

/* -- Time management -- */
ecs_time_t world_start_time; /* Timestamp of simulation start */
Expand Down Expand Up @@ -11054,6 +11056,7 @@ void ecs_set_stage_count(
/* Set thread_ctx to stage, as this stage might be used in a
* multithreaded context */
stage->thread_ctx = (ecs_world_t*)stage;
stage->thread = 0;
}
} else {
/* Set to NULL to prevent double frees */
Expand Down Expand Up @@ -16970,6 +16973,12 @@ void flecs_workers_progress(
ecs_pipeline_state_t *pq,
ecs_ftime_t delta_time);

void flecs_create_worker_threads(
ecs_world_t *world);

bool flecs_join_worker_threads(
ecs_world_t *world);

#endif


Expand Down Expand Up @@ -17041,6 +17050,42 @@ bool flecs_is_main_thread(
}

/* Start threads */
void flecs_create_worker_threads(
ecs_world_t *world)
{
ecs_poly_assert(world, ecs_world_t);
int32_t stages = ecs_get_stage_count(world);

for (int32_t i = 1; i < stages; i++) {
ecs_stage_t *stage = (ecs_stage_t*)ecs_get_stage(world, i);
ecs_assert(stage != NULL, ECS_INTERNAL_ERROR, NULL);
ecs_poly_assert(stage, ecs_stage_t);

ecs_entity_t pipeline = world->pipeline;
ecs_assert(pipeline != 0, ECS_INVALID_OPERATION, NULL);
const EcsPipeline *pqc = ecs_get(world, pipeline, EcsPipeline);
ecs_assert(pqc != NULL, ECS_INVALID_OPERATION, NULL);
ecs_pipeline_state_t *pq = pqc->state;
ecs_assert(pq != NULL, ECS_INTERNAL_ERROR, NULL);

ecs_worker_state_t *state = ecs_os_calloc_t(ecs_worker_state_t);
state->stage = stage;
state->pq = pq;

if (ecs_using_task_threads(world))
{
/* task worker creation is deferred until flecs_worker_begin */
stage->thread = world->task_worker_new(flecs_worker, state);
}
else
{
/* workers are using long-running os threads */
stage->thread = ecs_os_thread_new(flecs_worker, state);
}
ecs_assert(stage->thread != 0, ECS_OPERATION_FAILED, NULL);
}
}

static
void flecs_start_workers(
ecs_world_t *world,
Expand All @@ -17050,25 +17095,10 @@ void flecs_start_workers(

ecs_assert(ecs_get_stage_count(world) == threads, ECS_INTERNAL_ERROR, NULL);

int32_t i;
for (i = 0; i < threads - 1; i ++) {
ecs_stage_t *stage = (ecs_stage_t*)ecs_get_stage(world, i + 1);
ecs_assert(stage != NULL, ECS_INTERNAL_ERROR, NULL);
ecs_poly_assert(stage, ecs_stage_t);

ecs_entity_t pipeline = world->pipeline;
ecs_assert(pipeline != 0, ECS_INVALID_OPERATION, NULL);
const EcsPipeline *pqc = ecs_get(world, pipeline, EcsPipeline);
ecs_assert(pqc != NULL, ECS_INVALID_OPERATION, NULL);
ecs_pipeline_state_t *pq = pqc->state;
ecs_assert(pq != NULL, ECS_INTERNAL_ERROR, NULL);

ecs_worker_state_t *state = ecs_os_calloc_t(ecs_worker_state_t);
state->stage = stage;
state->pq = pq;
stage->thread = ecs_os_thread_new(flecs_worker, state);
ecs_assert(stage->thread != 0, ECS_OPERATION_FAILED, NULL);
}
if (!ecs_using_task_threads(world))
{
flecs_create_worker_threads(world);
}
}

/* Wait until all workers are running */
Expand Down Expand Up @@ -17158,11 +17188,10 @@ void flecs_signal_workers(
ecs_os_mutex_unlock(world->sync_mutex);
}

/** Stop workers */
static
bool ecs_stop_threads(
bool flecs_join_worker_threads(
ecs_world_t *world)
{
ecs_poly_assert(world, ecs_world_t);
bool threads_active = false;

/* Test if threads are created. Cannot use workers_running, since this is
Expand Down Expand Up @@ -17192,19 +17221,41 @@ bool ecs_stop_threads(

/* Join all threads with main */
for (i = 1; i < count; i ++) {
ecs_os_thread_join(stages[i].thread);
if (world->task_worker_join != 0)
{
/* Join using the override provided */
world->task_worker_join(stages[i].thread);
}
else
{
ecs_os_thread_join(stages[i].thread);
}
stages[i].thread = 0;
}

world->flags &= ~EcsWorldQuitWorkers;
ecs_assert(world->workers_running == 0, ECS_INTERNAL_ERROR, NULL);

/* Deinitialize stages */
ecs_set_stage_count(world, 1);

return true;
}

/** Stop workers */
static
bool ecs_stop_threads(
ecs_world_t *world)
{
/* Join all existing worker threads */
if (flecs_join_worker_threads(world))
{
/* Deinitialize stages */
ecs_set_stage_count(world, 1);

return true;
}

return false;
}

/* -- Private functions -- */
bool flecs_worker_begin(
ecs_world_t *world,
Expand Down Expand Up @@ -17315,17 +17366,22 @@ void flecs_workers_progress(
ecs_set_scope((ecs_world_t*)stage, old_scope);
}

/* -- Public functions -- */

void ecs_set_threads(
static
void flecs_set_threads_internal(
ecs_world_t *world,
int32_t threads)
int32_t threads,
ecs_os_api_thread_new_t task_worker_new,
ecs_os_api_thread_join_t task_worker_join)
{
ecs_assert(threads <= 1 || ecs_os_has_threading(), ECS_MISSING_OS_API, NULL);

/* both overrides must either be set or clear */
ecs_assert((world->task_worker_new == 0) == (world->task_worker_join == 0), ECS_INTERNAL_ERROR, NULL);

int32_t stage_count = ecs_get_stage_count(world);

if (stage_count != threads) {
bool overrides_changed = (task_worker_new != world->task_worker_new) || (task_worker_join != world->task_worker_join);

if (stage_count != threads || overrides_changed) {
/* Stop existing threads */
if (stage_count > 1) {
if (ecs_stop_threads(world)) {
Expand All @@ -17335,6 +17391,10 @@ void ecs_set_threads(
}
}

/* Adopt the desired overrides for worker creation & join (if any) */
world->task_worker_new = task_worker_new;
world->task_worker_join = task_worker_join;

/* Start threads if number of threads > 1 */
if (threads > 1) {
world->worker_cond = ecs_os_cond_new();
Expand All @@ -17345,6 +17405,34 @@ void ecs_set_threads(
}
}

/* -- Public functions -- */

void ecs_set_threads(
ecs_world_t *world,
int32_t threads)
{
flecs_set_threads_internal(world, threads, 0, 0);
}

void ecs_set_task_threads(
ecs_world_t *world,
int32_t task_threads,
ecs_os_api_thread_new_t task_worker_new,
ecs_os_api_thread_join_t task_worker_join)
{
/* both overrides must be provided */
ecs_assert(task_worker_new != 0, ECS_MISSING_OS_API, NULL);
ecs_assert(task_worker_join != 0, ECS_MISSING_OS_API, NULL);

flecs_set_threads_internal(world, task_threads, task_worker_new, task_worker_join);
}

bool ecs_using_task_threads(
ecs_world_t *world)
{
return world->task_worker_new != 0 && world->task_worker_join != 0;
}

#endif

/**
Expand Down Expand Up @@ -18019,6 +18107,12 @@ bool ecs_progress(
flecs_run_startup_systems(world);
}

/* create any worker task threads request */
if (ecs_using_task_threads(world))
{
flecs_create_worker_threads(world);
}

ecs_dbg_3("#[bold]progress#[reset](dt = %.2f)", (double)delta_time);
ecs_log_push_3();
const EcsPipeline *p = ecs_get(world, world->pipeline, EcsPipeline);
Expand All @@ -18028,6 +18122,12 @@ bool ecs_progress(

ecs_frame_end(world);

if (ecs_using_task_threads(world))
{
/* task threads were temporary and may now be joined */
flecs_join_worker_threads(world);
}

return !ECS_BIT_IS_SET(world->flags, EcsWorldQuit);
error:
return false;
Expand Down
57 changes: 53 additions & 4 deletions flecs.h
Original file line number Diff line number Diff line change
Expand Up @@ -4415,9 +4415,9 @@ void ecs_set_automerge(
* multiple threads, where each thread gets its own queue, and commands are
* merged when threads are synchronized.
*
* Note that the ecs_set_threads function already creates the appropriate
* number of stages. The set_stage_count() operation is useful for applications that
* want to manage their own stages and/or threads.
* Note that the ecs_set_threads and ecs_set_task_threads functions already create
* the appropriate number of stages. The set_stage_count() operation is useful
* for applications that want to manage their own stages and/or threads.
*
* @param world The world.
* @param stages The number of stages.
Expand Down Expand Up @@ -10517,12 +10517,39 @@ void ecs_run_pipeline(
* Setting this value to a value higher than 1 will start as many threads and
* will cause systems to evenly distribute matched entities across threads. The
* operation may be called multiple times to reconfigure the number of threads
* used, but never while running a system / pipeline. */
* used, but never while running a system / pipeline.
* Calling ecs_set_threads will also end the use of task threads setup with
* ecs_set_task_threads and vice-versa */
FLECS_API
void ecs_set_threads(
ecs_world_t *world,
int32_t threads);

/** Set number of worker task threads.
* ecs_set_task_threads is similar to ecs_set_threads, except threads are treated
* as short-lived tasks and will be created and joined around each update of the world.
* Creation and joining of these tasks will use the APIs provided rather than those
* in the os_api_t structure, although they may be the same if desired.
* This function is useful for multithreading world updates using an external
* asynchronous job system rather than long running threads by providing the APIs
* to create tasks for your job system and then wait on their conclusion.
* The operation may be called multiple times to reconfigure the number of task threads
* used, but never while running a system / pipeline.
* Calling ecs_set_task_threads will also end the use of threads setup with
* ecs_set_threads and vice-versa */

FLECS_API
void ecs_set_task_threads(
ecs_world_t *world,
int32_t task_threads,
ecs_os_api_thread_new_t task_worker_new,
ecs_os_api_thread_join_t task_worker_join);

/** Returns true if task thread use have been requested. */
FLECS_API
bool ecs_using_task_threads(
ecs_world_t *world);

////////////////////////////////////////////////////////////////////////////////
//// Module
////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -19621,6 +19648,18 @@ void set_threads(int32_t threads) const;
*/
int32_t get_threads() const;

/** Set number of task threads.
* @see ecs_set_task_threads
*/
void set_task_threads(int32_t task_threads,
ecs_os_api_thread_new_t task_worker_new,
ecs_os_api_thread_join_t task_worker_join) const;

/** Returns true if task thread use has been requested.
* @see ecs_using_task_threads
*/
bool using_task_threads() const;

/** @} */

# endif
Expand Down Expand Up @@ -27737,6 +27776,16 @@ inline int32_t world::get_threads() const {
return ecs_get_stage_count(m_world);
}

inline void world::set_task_threads(int32_t task_threads,
ecs_os_api_thread_new_t task_worker_new,
ecs_os_api_thread_join_t task_worker_join) const {
ecs_set_task_threads(m_world, task_threads, task_worker_new, task_worker_join);
}

inline bool world::using_task_threads() const {
return ecs_using_task_threads(m_world);
}

}

#endif
Expand Down
10 changes: 10 additions & 0 deletions include/flecs/addons/cpp/mixins/pipeline/impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,14 @@ inline int32_t world::get_threads() const {
return ecs_get_stage_count(m_world);
}

inline void world::set_task_threads(int32_t task_threads,
ecs_os_api_thread_new_t task_worker_new,
ecs_os_api_thread_join_t task_worker_join) const {
ecs_set_task_threads(m_world, task_threads, task_worker_new, task_worker_join);
}

inline bool world::using_task_threads() const {
return ecs_using_task_threads(m_world);
}

}
Loading

0 comments on commit d4461b9

Please sign in to comment.