diff --git a/src/include/abti.h b/src/include/abti.h index b2d71acc..bfdbebb6 100644 --- a/src/include/abti.h +++ b/src/include/abti.h @@ -37,6 +37,7 @@ #define ABTI_SCHED_REQ_FINISH (1 << 0) #define ABTI_SCHED_REQ_EXIT (1 << 1) +#define ABTI_SCHED_REQ_REPLACE (1 << 2) #define ABTI_THREAD_REQ_JOIN (1 << 0) #define ABTI_THREAD_REQ_TERMINATE (1 << 1) @@ -286,15 +287,18 @@ struct ABTI_xstream { }; struct ABTI_sched { - ABTI_sched_used used; /* To know if it is used and how */ - ABT_bool automatic; /* To know if automatic data free */ - ABTI_sched_kind kind; /* Kind of the scheduler */ - ABT_sched_type type; /* Can yield or not (ULT or task) */ - ABTD_atomic_uint32 request; /* Request */ - ABT_pool *pools; /* Thread pools */ - size_t num_pools; /* Number of thread pools */ - ABTI_ythread *p_ythread; /* Associated ULT */ - void *data; /* Data for a specific scheduler */ + ABTI_sched_used used; /* To know if it is used and how */ + ABT_bool automatic; /* To know if automatic data free */ + ABTI_sched_kind kind; /* Kind of the scheduler */ + ABT_sched_type type; /* Can yield or not (ULT or task) */ + ABTI_sched *p_replace_sched; /* Main scheduler that should replace this. + * ABTI_SCHED_REQ_REPLACE should be set. */ + ABTI_ythread *p_replace_waiter; /* Thread waiting for replacement. */ + ABTD_atomic_uint32 request; /* Request */ + ABT_pool *pools; /* Thread pools */ + size_t num_pools; /* Number of thread pools */ + ABTI_ythread *p_ythread; /* Associated ULT */ + void *data; /* Data for a specific scheduler */ /* Scheduler functions */ ABT_sched_init_fn init; diff --git a/src/sched/sched.c b/src/sched/sched.c index 779e8d97..43bbae3e 100644 --- a/src/sched/sched.c +++ b/src/sched/sched.c @@ -878,7 +878,7 @@ ABT_bool ABTI_sched_has_to_stop(ABTI_local **pp_local, ABTI_sched *p_sched) if (ABTI_sched_get_effective_size(*pp_local, p_sched) == 0) { if (ABTD_atomic_acquire_load_uint32(&p_sched->request) & - ABTI_SCHED_REQ_FINISH) { + (ABTI_SCHED_REQ_FINISH | ABTI_SCHED_REQ_REPLACE)) { /* Check join request */ if (ABTI_sched_get_effective_size(*pp_local, p_sched) == 0) return ABT_TRUE; @@ -1104,6 +1104,8 @@ ABTU_ret_err static int sched_create(ABT_sched_def *def, int num_pools, p_sched->used = ABTI_SCHED_NOT_USED; p_sched->automatic = automatic; p_sched->kind = sched_get_kind(def); + p_sched->p_replace_sched = NULL; + p_sched->p_replace_waiter = NULL; ABTD_atomic_relaxed_store_uint32(&p_sched->request, 0); p_sched->pools = pool_list; p_sched->num_pools = num_pools; diff --git a/src/stream.c b/src/stream.c index 001af33c..d3712ac6 100644 --- a/src/stream.c +++ b/src/stream.c @@ -2101,33 +2101,34 @@ static int xstream_update_main_sched(ABTI_global *p_global, } } - /* Set the scheduler as a main scheduler */ - p_sched->used = ABTI_SCHED_MAIN; - - /* Finish the current main scheduler */ - ABTI_sched_set_request(p_main_sched, ABTI_SCHED_REQ_FINISH); - - /* If the ES is secondary, we should take the associated ULT of the - * current main scheduler and keep it in the new scheduler. */ - p_sched->p_ythread = p_main_sched->p_ythread; - - /* Set the scheduler */ - p_xstream->p_main_sched = p_sched; + if (p_main_sched->p_replace_sched) { + /* We need to overwrite the scheduler. Free the existing one. */ + ABTI_ythread *p_waiter = p_main_sched->p_replace_waiter; + ABTI_sched_discard_and_free(p_global, + ABTI_xstream_get_local( + *pp_local_xstream), + p_main_sched->p_replace_sched, + ABT_FALSE); + p_main_sched->p_replace_sched = NULL; + p_main_sched->p_replace_waiter = NULL; + /* Resume the waiter. This waiter sees that the scheduler finished + * immediately and was replaced by this new scheduler. */ + ABTI_ythread_set_ready(ABTI_xstream_get_local(*pp_local_xstream), + p_waiter); + } + ABTI_ythread_set_blocked(p_ythread); + /* Set the replace scheduler */ + p_main_sched->p_replace_sched = p_sched; + p_main_sched->p_replace_waiter = p_ythread; + /* Ask the current main scheduler to replace its scheduler */ + ABTI_sched_set_request(p_main_sched, ABTI_SCHED_REQ_REPLACE); /* Switch to the current main scheduler. The current ULT is pushed to * the new scheduler's pool so that when the new scheduler starts, this - * ULT can be scheduled by the new scheduler. When the current ULT - * resumes its execution, it will free the current main scheduler - * (see below). */ - ABTI_ythread_context_switch_to_parent(pp_local_xstream, p_ythread, - ABT_SYNC_EVENT_TYPE_OTHER, NULL); - - /* Now, we free the current main scheduler. p_main_sched->p_ythread must - * be NULL to avoid freeing it in ABTI_sched_discard_and_free(). */ - p_main_sched->p_ythread = NULL; - ABTI_sched_discard_and_free(p_global, - ABTI_xstream_get_local(*pp_local_xstream), - p_main_sched, ABT_FALSE); + * ULT can be scheduled by the new scheduler. The existing main + * scheduler will be freed by ABTI_SCHED_REQ_RELEASE. */ + ABTI_ythread_suspend(pp_local_xstream, p_ythread, + ABT_SYNC_EVENT_TYPE_OTHER, NULL); return ABT_SUCCESS; } } diff --git a/src/thread.c b/src/thread.c index f5eb407a..59974765 100644 --- a/src/thread.c +++ b/src/thread.c @@ -3009,12 +3009,34 @@ static void thread_main_sched_func(void *arg) LOG_DEBUG("[S%" PRIu64 "] start\n", p_sched->id); p_sched->run(ABTI_sched_get_handle(p_sched)); - /* From here the main scheduler can have been already replaced. */ - /* The main scheduler must be executed on the same execution stream. */ - ABTI_ASSERT(p_local == ABTI_local_get_local_uninlined()); LOG_DEBUG("[S%" PRIu64 "] end\n", p_sched->id); + /* The main scheduler's thread must be executed on the same execution + * stream. */ + ABTI_ASSERT(p_local == ABTI_local_get_local_uninlined()); - p_sched = p_local_xstream->p_main_sched; + /* We free the current main scheduler and replace it if requested. */ + if (ABTD_atomic_relaxed_load_uint32(&p_sched->request) & + ABTI_SCHED_REQ_REPLACE) { + ABTI_ythread *p_waiter = p_sched->p_replace_waiter; + ABTI_sched *p_new_sched = p_sched->p_replace_sched; + /* Set this scheduler as a main scheduler */ + p_new_sched->used = ABTI_SCHED_MAIN; + /* Take the ULT of the current main scheduler and use it for the new + * scheduler. */ + p_new_sched->p_ythread = p_sched->p_ythread; + p_local_xstream->p_main_sched = p_new_sched; + /* Now, we free the current main scheduler. p_sched->p_ythread must + * be NULL to avoid freeing it in ABTI_sched_discard_and_free(). */ + p_sched->p_ythread = NULL; + ABTI_sched_discard_and_free(ABTI_global_get_global(), p_local, + p_sched, ABT_FALSE); + /* We do not need to unset ABTI_SCHED_REQ_REPLACE since that p_sched + * has already been replaced. */ + p_sched = p_new_sched; + /* Resume the waiter. */ + ABTI_ythread_set_ready(p_local, p_waiter); + } + ABTI_ASSERT(p_sched == p_local_xstream->p_main_sched); uint32_t request = ABTD_atomic_acquire_load_uint32( &p_sched->p_ythread->thread.request); diff --git a/test/.gitignore b/test/.gitignore index 70150d7a..eb57d7a4 100644 --- a/test/.gitignore +++ b/test/.gitignore @@ -5,6 +5,7 @@ basic/xstream_revive basic/xstream_affinity basic/xstream_barrier basic/xstream_rank +basic/xstream_set_main_sched basic/thread_create basic/thread_create2 basic/thread_create_on_xstream diff --git a/test/basic/Makefile.am b/test/basic/Makefile.am index 63a150aa..02b27941 100644 --- a/test/basic/Makefile.am +++ b/test/basic/Makefile.am @@ -10,6 +10,7 @@ TESTS = \ xstream_affinity \ xstream_barrier \ xstream_rank \ + xstream_set_main_sched \ thread_create \ thread_create2 \ thread_create_on_xstream \ @@ -105,6 +106,7 @@ xstream_revive_SOURCES = xstream_revive.c xstream_affinity_SOURCES = xstream_affinity.c xstream_barrier_SOURCES = xstream_barrier.c xstream_rank_SOURCES = xstream_rank.c +xstream_set_main_sched_SOURCES = xstream_set_main_sched.c thread_create_SOURCES = thread_create.c thread_create2_SOURCES = thread_create2.c thread_create_on_xstream_SOURCES = thread_create_on_xstream.c @@ -186,6 +188,7 @@ testing: ./xstream_affinity ./xstream_barrier ./xstream_rank + ./xstream_set_main_sched ./thread_create ./thread_create2 ./thread_create_on_xstream diff --git a/test/basic/xstream_set_main_sched.c b/test/basic/xstream_set_main_sched.c new file mode 100644 index 00000000..4f8d2896 --- /dev/null +++ b/test/basic/xstream_set_main_sched.c @@ -0,0 +1,262 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */ +/* + * See COPYRIGHT in top-level directory. + */ + +#include +#include +#include "abt.h" +#include "abttest.h" + +#define NUM_POOLS 3 +#define NUM_ITERS 100 +#define DEFAULT_NUM_XSTREAMS 4 +#define DEFAULT_NUM_THREADS 10 + +int g_num_xstreams; +ABT_pool *g_pools; + +typedef struct { + int num_pools; + ABT_pool *pools; +} sched_data_t; + +int sched_init(ABT_sched sched, ABT_sched_config config) +{ + int ret; + sched_data_t *p_data = (sched_data_t *)malloc(sizeof(sched_data_t)); + ret = ABT_sched_get_num_pools(sched, &p_data->num_pools); + ATS_ERROR(ret, "ABT_sched_get_num_pools"); + assert(p_data->num_pools >= 0); + p_data->pools = (ABT_pool *)malloc(sizeof(ABT_pool) * p_data->num_pools); + ret = ABT_sched_get_pools(sched, p_data->num_pools, 0, p_data->pools); + ATS_ERROR(ret, "ABT_sched_get_pools"); + ret = ABT_sched_set_data(sched, (void *)p_data); + ATS_ERROR(ret, "ABT_sched_set_data"); + return ABT_SUCCESS; +} + +void sched_run(ABT_sched sched) +{ + int ret; + sched_data_t *p_data; + ret = ABT_sched_get_data(sched, (void **)&p_data); + ATS_ERROR(ret, "ABT_sched_get_data"); + int work_count = 0; + while (1) { + int i; + for (i = 0; i < p_data->num_pools; i++) { + ABT_unit unit; + ret = ABT_pool_pop(p_data->pools[i], &unit); + ATS_ERROR(ret, "ABT_pool_pop"); + if (unit != ABT_UNIT_NULL) { + ABT_xstream_run_unit(unit, p_data->pools[i]); + } + } + if (++work_count >= 16) { + ret = ABT_xstream_check_events(sched); + ATS_ERROR(ret, "ABT_xstream_check_events"); + ABT_bool stop; + ret = ABT_sched_has_to_stop(sched, &stop); + ATS_ERROR(ret, "ABT_sched_has_to_stop"); + if (stop == ABT_TRUE) + break; + work_count = 0; + } + } +} + +int sched_free(ABT_sched sched) +{ + sched_data_t *p_data; + int ret = ABT_sched_get_data(sched, (void **)&p_data); + ATS_ERROR(ret, "ABT_sched_get_data"); + free(p_data->pools); + free(p_data); + return ABT_SUCCESS; +} + +/* Change the scheduler. */ +void change_main_sched(int num_pools, ABT_pool *pools, int is_basic) +{ + int ret; + while (1) { + int rank; + ret = ABT_self_get_xstream_rank(&rank); + ATS_ERROR(ret, "ABT_get_xstream_rank"); + if (rank != g_num_xstreams - 1) + break; + + /* The last execution stream should keep the main scheduler that has all + * the NUM_POOLS pools: this is necessary to keep all the pools not + * automatically freed by the runtime. */ + ret = ABT_self_yield(); + ATS_ERROR(ret, "ABT_self_yield"); + } + ABT_xstream self_xstream; + ret = ABT_self_get_xstream(&self_xstream); + ATS_ERROR(ret, "ABT_self_get_xstream"); + if (is_basic) { + ret = ABT_xstream_set_main_sched_basic(self_xstream, ABT_SCHED_DEFAULT, + num_pools, pools); + ATS_ERROR(ret, "ABT_xstream_set_main_sched_basic"); + } else { + /* Create a custom scheduler. */ + ABT_sched_def sched_def = { .type = ABT_SCHED_TYPE_ULT, + .init = sched_init, + .run = sched_run, + .free = sched_free, + .get_migr_pool = NULL }; + ABT_sched_config config; + ret = ABT_sched_config_create(&config, ABT_sched_config_automatic, 1, + ABT_sched_config_var_end); + ATS_ERROR(ret, "ABT_sched_config_create"); + ABT_sched sched; + ret = ABT_sched_create(&sched_def, num_pools, pools, config, &sched); + ATS_ERROR(ret, "ABT_sched_create"); + ret = ABT_sched_config_free(&config); + ATS_ERROR(ret, "ABT_sched_config_free"); + ret = ABT_xstream_set_main_sched(self_xstream, sched); + ATS_ERROR(ret, "ABT_xstream_set_main_sched_basic"); + /* sched will be freed automatically. */ + } +} + +void thread_func(void *arg) +{ + int i; + ABT_pool *pools = (ABT_pool *)malloc(sizeof(ABT_pool) * NUM_POOLS); + for (i = 0; i < NUM_ITERS; i++) { + int num_pools = 1; + pools[0] = g_pools[i % NUM_POOLS]; + int is_basic = (i / NUM_POOLS) % 2; + if (((i / NUM_POOLS / 2) % 2) && NUM_POOLS > 1) { + num_pools++; + pools[1] = g_pools[(i + 1) % NUM_POOLS]; + } + if (((i / NUM_POOLS / 4) % 2) && NUM_POOLS > 2) { + num_pools++; + pools[2] = g_pools[(i + 2) % NUM_POOLS]; + } + change_main_sched(num_pools, pools, is_basic); + /* Sometimes we can yield. */ + if ((i / NUM_POOLS / 8) % 2 == 0) { + int ret = ABT_self_yield(); + ATS_ERROR(ret, "ABT_self_yield"); + } + } + /* Before finishing this thread, we should guarantee that each pool is + * checked by at least one scheduler. */ + change_main_sched(NUM_POOLS, g_pools, 1); + free(pools); +} + +int main(int argc, char *argv[]) +{ + int num_threads; + int i, ret; + + /* Initialize */ + ATS_read_args(argc, argv); + if (argc < 2) { + g_num_xstreams = DEFAULT_NUM_XSTREAMS; + num_threads = DEFAULT_NUM_THREADS; + } else { + g_num_xstreams = ATS_get_arg_val(ATS_ARG_N_ES); + num_threads = ATS_get_arg_val(ATS_ARG_N_ULT); + } + if (g_num_xstreams < 2) + g_num_xstreams = 2; /* The last execution stream is to keep pools. */ + + /* Allocate memory. */ + ABT_xstream *xstreams = + (ABT_xstream *)malloc(sizeof(ABT_xstream) * g_num_xstreams); + g_pools = (ABT_pool *)malloc(sizeof(ABT_pool) * NUM_POOLS); + ABT_thread *threads = + (ABT_thread *)malloc(sizeof(ABT_thread) * num_threads); + + /* Initialize Argobots. */ + ATS_init(argc, argv, g_num_xstreams); + + ret = ABT_self_get_xstream(&xstreams[0]); + ATS_ERROR(ret, "ABT_self_get_xstream"); + + /* Set up pools. */ + /* pools[0]: the original main pool. */ + ret = ABT_xstream_get_main_pools(xstreams[0], 1, &g_pools[0]); + ATS_ERROR(ret, "ABT_xstream_get_main_pools"); + /* pools[1:NUM_POOLS]: the built-in FIFO pools. */ + for (i = 1; i < NUM_POOLS; i++) { + ret = ABT_pool_create_basic(ABT_POOL_FIFO, ABT_POOL_ACCESS_MPMC, + ABT_TRUE, &g_pools[i]); + ATS_ERROR(ret, "ABT_pool_create_basic"); + } + + /* Create Execution Streams */ + for (i = 1; i < g_num_xstreams; i++) { + ret = ABT_xstream_create_basic(ABT_SCHED_DEFAULT, NUM_POOLS, g_pools, + ABT_SCHED_CONFIG_NULL, &xstreams[i]); + ATS_ERROR(ret, "ABT_xstream_create_basic"); + } + /* Change the scheduler of the primary execution stream. */ + change_main_sched(NUM_POOLS, g_pools, NUM_POOLS); + + /* Create ULTs that randomly change the main scheduler. */ + for (i = 0; i < num_threads; i++) { + ret = ABT_thread_create(g_pools[i % NUM_POOLS], thread_func, NULL, + ABT_THREAD_ATTR_NULL, &threads[i]); + ATS_ERROR(ret, "ABT_thread_create"); + } + + thread_func(NULL); + + /* Join and free ULTs */ + for (i = 0; i < num_threads; i++) { + ret = ABT_thread_free(&threads[i]); + ATS_ERROR(ret, "ABT_thread_free"); + } + + /* Yield myself until this thread is running on the primary execution + * stream. */ + i = 0; + while (1) { + ABT_bool on_primary_xstream = ABT_FALSE; + ret = ABT_self_on_primary_xstream(&on_primary_xstream); + ATS_ERROR(ret, "ABT_self_on_primary_xstream"); + if (on_primary_xstream) + break; + ret = ABT_self_set_associated_pool(g_pools[i]); + ATS_ERROR(ret, "ABT_self_set_associated_pool"); + ret = ABT_self_yield(); + ATS_ERROR(ret, "ABT_self_yield"); + i = (i + 1) % NUM_POOLS; + } + + /* Before freeing the other execution stream, we should guarantee that all + * pools are associated with the primary execution stream's scheduler. */ + change_main_sched(NUM_POOLS, g_pools, 1); + + /* Join and free execution streams */ + for (i = 1; i < g_num_xstreams; i++) { + while (1) { + ABT_bool on_primary_xstream = ABT_FALSE; + ret = ABT_self_on_primary_xstream(&on_primary_xstream); + ATS_ERROR(ret, "ABT_self_on_primary_xstream"); + if (on_primary_xstream) + break; + ret = ABT_self_yield(); + ATS_ERROR(ret, "ABT_self_yield"); + } + ret = ABT_xstream_free(&xstreams[i]); + ATS_ERROR(ret, "ABT_xstream_free"); + } + + /* Finalize */ + ret = ATS_finalize(0); + + free(xstreams); + free(g_pools); + free(threads); + + return ret; +}