From cdd912628f0450802c5bdca771ead649c4db6937 Mon Sep 17 00:00:00 2001 From: Alexey Radul Date: Fri, 17 Nov 2023 11:08:09 -0500 Subject: [PATCH 1/3] Draft a runtime function for executing Dex loops with a tree-pattern for stealable work chunks. --- src/lib/work-stealing.c | 84 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 81 insertions(+), 3 deletions(-) diff --git a/src/lib/work-stealing.c b/src/lib/work-stealing.c index 05fa93866..19455e05f 100644 --- a/src/lib/work-stealing.c +++ b/src/lib/work-stealing.c @@ -18,13 +18,16 @@ struct Work; -// A Task is a function pointer that consumes a Work* and returns a Work* -// The input is the `Work` Always passed a pointer to the containing Work struct +// A Task is a function pointer that consumes a Work* and returns a Work*. +// The input `Work` is always a pointer to the Work struct containing that +// Task, which it accepts in order to be able to deallocate it. // Question: Do we also want to tell the task the thread id of the worker // that's running it? Maybe to support thread-local accumulators for // commutative reductions? // Oh yeah, also to know which worker's queue to put more stuff onto. -// Trampoline: returns the next work to do, if ready, or NULL if not. +// +// The return value is a trampoline: a `Task` returns the next work to do, if +// it's runnable, or NULL if there isn't one. typedef struct Work* (*Task)(struct Work*); typedef struct Work { @@ -215,6 +218,81 @@ void* thread(void* payload) { return NULL; } +////////////////////// +// Dex loop support // +////////////////////// + +// Return a `Work*` such that joining it `joiners` times is equivalent to joining +// the argument `cont` once. +// - `joiners` >= 1. +// - Do not use `cont` directly afterward, as this is allowed to mutate it. +Work* increase_cont_capacity(Work* cont, int joiners) { + // One way to achieve the goal is to just atomically increase the `join_count` + // of `cont` by `joiners - 1` and reuse it: + atomic_add(&cont->join_count, joiners - 1); + return cont; + // An alternative would be allocate a new `Work` with `join_count` equal to + // `joiners` and `task` to `join` the current `cont`. The advantage of this + // alternative is avoiding the atomic increment (on a potentially contentious + // variable if `cont` has many joiners already); the disadvantage is the + // allocation (which presumably entails some synchronization of its own), and + // an extra indirection at the end due to executing that mini-task. +} + +Work* execute_pure_loop(Work* cont, Task body, void* args[], int start_iter, int end_iter) { + if (end_iter - start_iter <= 1) { + // Few enough iterations; just do it. + // TODO: Can we avoid allocating this Work struct that the body is going to just + // immediately deallocate? I guess the type of `body` would then have to change + // from `Task`, and we'd have to code-gen it differently than if it were a `Task`. + Work* work = (Work*) malloc(sizeof(Work) + 3 * sizeof(int*)); + work->code = body; + work->join_count = 0; + work->args[0] = args; + work->args[1] = *start_iter; + work->args[2] = cont; + return body(work); + } else { + // Create Works that represent schedulable pieces of the loop. + int branching_factor = 2; + div_t iters_per_branch = div(end_iter - start_iter, branching_factor); + int this_iter = start_iter; + Work* subcont = increase_cont_capacity(cont, branching_factor); + for (i = 0; i < branching_factor; i++) { + int next_iter = this_iter + iters_per_branch.quot; + if (i < iters_per_branch.rem) { + next_iter++; + } + Work* section = (Work*) malloc(sizeof(Work) + 5 * sizeof(int*)); + section->code = &execute_pure_loop_task; + section->join_count = 0; + section->args[0] = subcont; + section->args[1] = body; + section->args[2] = args; + section->args[3] = this_iter; + section->args[4] = next_iter; + // TODO I need to know my id so that I push onto my own queue + int me = 0; + if (i == branching_factor - 1) { + // TODO Maybe I could skip allocating this Work, too? + return do_one_work(me, section); + } else { + push(&thread_queues[me], section); + } + this_iter = next_iter; + } // This loop never completes because it hits the `return` + } +} + +Work* execute_pure_loop_task(Work* self) { + Work* cont = self->args[0]; + Task body = self->args[1]; + void* args[] = self->args[2]; + int start_iter = self->args[3]; // Do I have to box these ints, or can I just store them? + int end_iter = self->args[4]; + return execute_pure_loop(cont, body, args, start_iter, end_iter); +} + //////////////////// // Client program // //////////////////// From e50e920f77ec208812b8901d61a959509340a9da Mon Sep 17 00:00:00 2001 From: Alexey Radul Date: Fri, 17 Nov 2023 13:33:33 -0500 Subject: [PATCH 2/3] Draft a complete work-stealing API for Dex code-generation to work against, and test it on a trivial one-task program. The test exposed a queue initialization bug, which is that the initial buffer indices should be 1, not 0. --- src/lib/work-stealing.c | 228 +++++++++++++++++++++++++--------------- 1 file changed, 144 insertions(+), 84 deletions(-) diff --git a/src/lib/work-stealing.c b/src/lib/work-stealing.c index 19455e05f..e45c1076a 100644 --- a/src/lib/work-stealing.c +++ b/src/lib/work-stealing.c @@ -28,7 +28,10 @@ struct Work; // // The return value is a trampoline: a `Task` returns the next work to do, if // it's runnable, or NULL if there isn't one. -typedef struct Work* (*Task)(struct Work*); +// +// `Task`s are internal to the work-stealing system; the client does not +// provide or consume `Task`s. +typedef struct Work* (*Task)(int thread_id, struct Work*); typedef struct Work { Task code; @@ -36,8 +39,8 @@ typedef struct Work { void* args[]; } Work; -Work* EMPTY = -1; -Work* ABORT = -2; +Work* EMPTY = (Work*)-1; +Work* ABORT = (Work*)-2; ///////////////////////// // Work-stealing deque // @@ -59,8 +62,13 @@ typedef struct { void init(Deque* q, int size_hint) { // This does not appear in https://fzn.fr/readings/ppopp13.pdf; I am imputing // it. - atomic_init(&q->top, 0); - atomic_init(&q->bottom, 0); + // Initialize the buffer indices at 1 to prevent underflow. The buffer + // indices are of type `size_t`; the top index never decreases, and the bottom + // index is never less than the top index at rest. The smallest intermediate + // value ever used is `bottom-1`, inside `take`. Initializing `top` and + // `bottom` at 1 suffices to prevent this computation from underflowing. + atomic_init(&q->top, 1); + atomic_init(&q->bottom, 1); Array* a = (Array*) malloc(sizeof(Array) + sizeof(Work*) * size_hint); atomic_init(&a->size, size_hint); atomic_init(&q->array, a); @@ -150,7 +158,7 @@ Work* steal(Deque *q) { // Worker loop // ///////////////// -#define nthreads 24 +int thread_count; Deque* thread_queues; @@ -159,7 +167,7 @@ atomic_bool done; // Trampoline: Returns the next item to work on, or NULL if there aren't any. Work* do_one_work(int id, Work* work) { printf("Worker %d running item %p\n", id, work); - return (*(work->code))(work); + return (*(work->code))(id, work); } void do_work(int id, Work* work) { @@ -188,7 +196,7 @@ void* thread(void* payload) { } else { // No work in my own queue Work* stolen = EMPTY; - for (int i = 0; i < nthreads; ++i) { + for (int i = 0; i < thread_count; ++i) { if (i == id) continue; stolen = steal(&thread_queues[i]); if (stolen == ABORT) { @@ -201,7 +209,7 @@ void* thread(void* payload) { } } if (stolen == EMPTY) { - // Even though the queues we all empty when I tried them, somebody + // Even though the queues were all empty when I tried them, somebody // might have added some more work since. Busy-wait until the global // "done" flag is set. if (atomic_load(&done)) { @@ -218,9 +226,46 @@ void* thread(void* payload) { return NULL; } -////////////////////// -// Dex loop support // -////////////////////// +/////////////////////////// +// Dex codegen interface // +/////////////////////////// + +// A (pointer to a) code-generated function. +// This should either return the result of calling `begin_pure_loop` or return `NULL`. +typedef Work* (*GenBlock)(int thread_id, void** env); + +// A (pointer to a) code-generated function that is a loop body. +// This should either return the result of calling `begin_pure_loop` or return `NULL`. +typedef Work* (*GenLoopBody)(int thread_id, int iteration, void** env); + +// Call this from Haskell once at the start of the process. +// The integer is the number of OS threads to spawn to run work-stealing. +void initialize_work_stealing(int nthreads); + +// Call this from Haskell to run a top block with work-stealing. When this +// exits, the work-stealing system is stopped, and results are written to their +// proper `Dest`s. +void execute_top_block(GenBlock body, void** env); + +// Call this from code-gen at the end of each top-level block. +void finish_work_stealing(); + +// Call this from code-gen to start a loop that you want work-stealing to +// parallelize. +// This assumes that the environment frame for the loop body and for the +// continuation is the same. That assumption isn't hard to change. +Work* begin_pure_loop(int thread_id, GenLoopBody body, GenBlock cont, void** env, int trip_count); + +///////////////////////// +// Dex codegen support // +///////////////////////// + +Work* run_gen_block(int thread_id, Work* self) { + GenBlock body = (GenBlock)self->args[0]; + void** env = (void**)self->args[1]; + free(self); + return body(thread_id, env); +} // Return a `Work*` such that joining it `joiners` times is equivalent to joining // the argument `cont` once. @@ -229,7 +274,7 @@ void* thread(void* payload) { Work* increase_cont_capacity(Work* cont, int joiners) { // One way to achieve the goal is to just atomically increase the `join_count` // of `cont` by `joiners - 1` and reuse it: - atomic_add(&cont->join_count, joiners - 1); + atomic_fetch_add(&cont->join_count, joiners - 1); return cont; // An alternative would be allocate a new `Work` with `join_count` equal to // `joiners` and `task` to `join` the current `cont`. The advantage of this @@ -239,26 +284,23 @@ Work* increase_cont_capacity(Work* cont, int joiners) { // an extra indirection at the end due to executing that mini-task. } -Work* execute_pure_loop(Work* cont, Task body, void* args[], int start_iter, int end_iter) { +Work* execute_pure_loop_task(int id, Work* self); + +Work* execute_pure_loop(int thread_id, Work* cont, GenLoopBody body, void** env, int start_iter, int end_iter) { if (end_iter - start_iter <= 1) { - // Few enough iterations; just do it. - // TODO: Can we avoid allocating this Work struct that the body is going to just - // immediately deallocate? I guess the type of `body` would then have to change - // from `Task`, and we'd have to code-gen it differently than if it were a `Task`. - Work* work = (Work*) malloc(sizeof(Work) + 3 * sizeof(int*)); - work->code = body; - work->join_count = 0; - work->args[0] = args; - work->args[1] = *start_iter; - work->args[2] = cont; - return body(work); + // Few enough iterations; just do them. + for (int i = start_iter; i < end_iter; i++) { + do_work(thread_id, body(thread_id, i, env)); + } + return join_work(cont); } else { // Create Works that represent schedulable pieces of the loop. int branching_factor = 2; div_t iters_per_branch = div(end_iter - start_iter, branching_factor); int this_iter = start_iter; Work* subcont = increase_cont_capacity(cont, branching_factor); - for (i = 0; i < branching_factor; i++) { + // Queue up all but one chunk of the loop + for (int i = 0; i < branching_factor - 1; i++) { int next_iter = this_iter + iters_per_branch.quot; if (i < iters_per_branch.rem) { next_iter++; @@ -268,88 +310,106 @@ Work* execute_pure_loop(Work* cont, Task body, void* args[], int start_iter, int section->join_count = 0; section->args[0] = subcont; section->args[1] = body; - section->args[2] = args; - section->args[3] = this_iter; - section->args[4] = next_iter; - // TODO I need to know my id so that I push onto my own queue - int me = 0; - if (i == branching_factor - 1) { - // TODO Maybe I could skip allocating this Work, too? - return do_one_work(me, section); - } else { - push(&thread_queues[me], section); - } + section->args[2] = env; + // TODO Is just casting ok here, or do I have to heap-allocate these ints? + // gcc complains about the integer and the pointer having different sizes. + section->args[3] = (void*)this_iter; + section->args[4] = (void*)next_iter; + push(&thread_queues[thread_id], section); this_iter = next_iter; - } // This loop never completes because it hits the `return` + } + // Do the last chunk directly yourself + return execute_pure_loop(thread_id, subcont, body, env, this_iter, end_iter); } } -Work* execute_pure_loop_task(Work* self) { +Work* execute_pure_loop_task(int id, Work* self) { Work* cont = self->args[0]; - Task body = self->args[1]; - void* args[] = self->args[2]; - int start_iter = self->args[3]; // Do I have to box these ints, or can I just store them? - int end_iter = self->args[4]; - return execute_pure_loop(cont, body, args, start_iter, end_iter); + GenLoopBody body = self->args[1]; + void** env = self->args[2]; + int start_iter = (int)self->args[3]; + int end_iter = (int)self->args[4]; + return execute_pure_loop(id, cont, body, env, start_iter, end_iter); } -//////////////////// -// Client program // -//////////////////// - -Work* print_task(Work* w) { - int* payload = (int*)w->args[0]; - int item = *payload; - printf("Did item %p with payload %d\n", w, item); - Work* cont = (Work*)w->args[1]; - free(payload); - free(w); - return join_work(cont); +Work* begin_pure_loop(int thread_id, GenLoopBody body, GenBlock cont, void** env, int trip_count) { + // TODO: If the whole loop is smaller than the grain size for + // execute_pure_loop, I can avoid allocating the `Work` for the continuation + // too by just executing the iterations inline here. + Work* k = (Work*) malloc(sizeof(Work) + 5 * sizeof(int*)); + k->code = &run_gen_block; + k->join_count = 1; + k->args[0] = cont; + k->args[1] = env; + return execute_pure_loop(thread_id, k, body, env, 0, trip_count); } -Work* done_task(Work* w) { - free(w); - atomic_store(&done, true); - return NULL; -} +pthread_t* the_threads; +int* tids; -int main(int argc, char **argv) { +void initialize_work_stealing(int nthreads) { // Check that top and bottom are 64-bit so they never overflow assert(sizeof(atomic_size_t) == 8); - pthread_t threads[nthreads]; - int tids[nthreads]; + the_threads = (pthread_t*) malloc(nthreads * sizeof(pthread_t)); + tids = (int*) malloc(nthreads * sizeof(int)); thread_queues = (Deque*) malloc(nthreads * sizeof(Deque)); - int nprints = 10; - atomic_store(&done, false); - Work* done_work = (Work*) malloc(sizeof(Work)); - done_work->code = &done_task; - done_work->join_count = nthreads * nprints; for (int i = 0; i < nthreads; ++i) { tids[i] = i; init(&thread_queues[i], 8); - for (int j = 0; j < nprints; ++j) { - Work* work = (Work*) malloc(sizeof(Work) + 2 * sizeof(int*)); - work->code = &print_task; - work->join_count = 0; - int* payload = malloc(sizeof(int)); - *payload = 1000 * i + j; - work->args[0] = payload; - work->args[1] = done_work; - push(&thread_queues[i], work); - } } - for (int i = 0; i < nthreads; ++i) { - if (pthread_create(&threads[i], NULL, thread, &tids[i]) != 0) { + thread_count = nthreads; +} + +void execute_top_block(GenBlock body, void** env) { + Work* job = (Work*) malloc(sizeof(Work) + 2 * sizeof(int*)); + job->code = &run_gen_block; + job->join_count = 0; + job->args[0] = body; + job->args[1] = env; + atomic_store(&done, false); + push(&thread_queues[0], job); + // TODO: Do we really want to start and kill all the threads for every top + // level block, or is there a way to suspend and reuse them? + for (int i = 0; i < thread_count; ++i) { + if (pthread_create(&the_threads[i], NULL, thread, &tids[i]) != 0) { perror("failed to start the thread"); exit(EXIT_FAILURE); } } - for (int i = 0; i < nthreads; ++i) { - if (pthread_join(threads[i], NULL) != 0) { + for (int i = 0; i < thread_count; ++i) { + if (pthread_join(the_threads[i], NULL) != 0) { perror("failed to join the thread"); exit(EXIT_FAILURE); } } - printf("Expect %d lines of output (including this one)\n", 2 * nthreads * nprints + nthreads + 2); + // We expect all the queues to be empty at this point. TODO: Check? +} + +void finish_work_stealing() { + atomic_store(&done, true); +} + +//////////////////// +// Client program // +//////////////////// + +Work* print_gen_block(int thread_id, void** env) { + int* payload = (int*)env[0]; + int item = *payload; + printf("Did payload %d\n", item); + free(payload); + free(env); + finish_work_stealing(); + return NULL; +} + +int main(int argc, char **argv) { + initialize_work_stealing(24); + void** env = malloc(sizeof(int*)); + int* payload = malloc(sizeof(int)); + *payload = 17; + env[0] = payload; + execute_top_block(&print_gen_block, env); + printf("Expect %d lines of output (including this one)\n", 3 + thread_count); return 0; } From 77ed709402b751603e48d7a0c44877bcbf5690ea Mon Sep 17 00:00:00 2001 From: Alexey Radul Date: Fri, 17 Nov 2023 13:56:23 -0500 Subject: [PATCH 3/3] Test on a larger program that also exercises the loop execution code path. --- src/lib/work-stealing.c | 42 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/src/lib/work-stealing.c b/src/lib/work-stealing.c index e45c1076a..3a79fcd50 100644 --- a/src/lib/work-stealing.c +++ b/src/lib/work-stealing.c @@ -393,23 +393,55 @@ void finish_work_stealing() { // Client program // //////////////////// -Work* print_gen_block(int thread_id, void** env) { +// A slightly silly program that iterates a single loop a synamic number of +// times, and has each loop iteration (and the coda) echo the trip count + 1, +// just to show that data can be mutated. + +Work* gen_loop_body(int thread_id, int iteration, void** env) { + int* payload = (int*)env[0]; + int item = *payload; + printf("Loop iteration %d on worker %d, payload %d\n", + iteration, thread_id, item); + return NULL; +} + +Work* end_gen_block(int thread_id, void** env) { int* payload = (int*)env[0]; int item = *payload; - printf("Did payload %d\n", item); + printf("Finishing on worker %d, payload %d\n", thread_id, item); free(payload); free(env); finish_work_stealing(); return NULL; } +Work* start_gen_block(int thread_id, void** env) { + int* payload = (int*)env[0]; + int item = *payload; + printf("Starting on worker %d, payload %d\n", thread_id, item); + *payload = item + 1; + return begin_pure_loop(thread_id, gen_loop_body, end_gen_block, env, item); +} + int main(int argc, char **argv) { initialize_work_stealing(24); void** env = malloc(sizeof(int*)); int* payload = malloc(sizeof(int)); - *payload = 17; + int num_iters = 200; + *payload = num_iters; env[0] = payload; - execute_top_block(&print_gen_block, env); - printf("Expect %d lines of output (including this one)\n", 3 + thread_count); + execute_top_block(&start_gen_block, env); + int expected_output_lines = + 1 // "Starting" + + 1 // "Finishing" + + thread_count // "Worker n finished" + + 1 // Expected line report + + num_iters // Each loop iteration + + 1 // "Worker running item" for the entry point + + 1 // "Worker running item" for the end + + (num_iters - 1) // "Worker running item" for itermediates in the loop tree + ; + printf("Expect %d lines of output (including this one)\n", + expected_output_lines); return 0; }