diff --git a/examples/scheduling/Makefile.am b/examples/scheduling/Makefile.am index 4cc282f6..694da021 100644 --- a/examples/scheduling/Makefile.am +++ b/examples/scheduling/Makefile.am @@ -8,7 +8,8 @@ TESTS = \ sched_predef \ sched_shared_pool \ sched_stack \ - sched_user + sched_user \ + sched_reg check_PROGRAMS = $(TESTS) noinst_PROGRAMS = $(TESTS) @@ -20,3 +21,4 @@ sched_predef_SOURCES = sched_predef.c sched_shared_pool_SOURCES = sched_shared_pool.c sched_stack_SOURCES = sched_stack.c sched_user_SOURCES = sched_user.c +sched_reg_SOURCES = sched_reg.c diff --git a/examples/scheduling/sched_reg.c b/examples/scheduling/sched_reg.c new file mode 100644 index 00000000..ca7ed611 --- /dev/null +++ b/examples/scheduling/sched_reg.c @@ -0,0 +1,146 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */ +/* + * See COPYRIGHT in top-level directory. + */ + +#include +#include +#include +#include +#include "abt.h" + +#define NUM_XSTREAMS 4 +#define NUM_THREADS 4 + + +static void create_threads(void *arg); +static void thread_hello(void *arg); + +int main(int argc, char *argv[]) +{ + ABT_xstream xstreams[NUM_XSTREAMS]; + ABT_sched scheds[NUM_XSTREAMS]; + ABT_pool pools[NUM_XSTREAMS]; + ABT_thread threads[NUM_XSTREAMS]; + int i; + + ABT_init(argc, argv); + + /* Create pools */ + for (i = 0; i < NUM_XSTREAMS; i++) { + ABT_pool_create_basic(ABT_POOL_FIFO, ABT_POOL_ACCESS_MPMC, ABT_TRUE, + &pools[i]); + } + + /* Create schedulers */ + //create_scheds(NUM_XSTREAMS, pools, scheds); + ABT_sched_policy policies[NUM_XSTREAMS]; + for (int i = 0; i < NUM_XSTREAMS; i++) { + policies[i].ready_at = 0; + policies[i].ready_at_wt = ABT_get_wtime(); + policies[i].priority = i; + policies[i].max_successes = 10; + policies[i].min_successes = 2 * i + 2; + policies[i].success_timeout = 5; + policies[i].fail_timeout = 100; + policies[i].success_timeout_wt = 0.0; + policies[i].fail_timeout_wt = 5.0; + } + + policies[0].max_successes = 100; + policies[0].fail_timeout_wt=10.0; + for (int i = 0; i < NUM_XSTREAMS; i++) { + for (int j = 0; j < NUM_XSTREAMS;j++) { + policies[(i+j) % NUM_XSTREAMS].priority = j; + } + ABT_sched_create_reg(NUM_XSTREAMS, pools, NUM_XSTREAMS, policies, &scheds[i]); + } + + /* Create ESs */ + ABT_xstream_self(&xstreams[0]); + ABT_xstream_set_main_sched(xstreams[0], scheds[0]); + for (i = 1; i < NUM_XSTREAMS; i++) { + ABT_xstream_create(scheds[i], &xstreams[i]); + } + + /* Create ULTs */ + for (i = 0; i < NUM_XSTREAMS; i++) { + size_t tid = (size_t)i; + ABT_thread_create(pools[i], create_threads, (void *)tid, + ABT_THREAD_ATTR_NULL, &threads[i]); + } + /* Join & Free */ + for (i = 0; i < NUM_XSTREAMS; i++) { + ABT_thread_join(threads[i]); + ABT_thread_free(&threads[i]); + } + for (i = 1; i < NUM_XSTREAMS; i++) { + ABT_xstream_join(xstreams[i]); + ABT_xstream_free(&xstreams[i]); + } + /* Free schedulers */ + /* Note that we do not need to free the scheduler for the primary ES, + * i.e., xstreams[0], because its scheduler will be automatically freed in + * ABT_finalize(). */ + for (i = 1; i < NUM_XSTREAMS; i++) { + ABT_sched_free(&scheds[i]); + } + + /* Finalize */ + ABT_finalize(); + + return 0; +} + +static void create_threads(void *arg) +{ + int i, rank, tid = (int)(size_t)arg; + ABT_xstream xstream; + ABT_pool pools[NUM_XSTREAMS]; + ABT_thread *threads; + + ABT_xstream_self(&xstream); + ABT_xstream_get_main_pools(xstream, NUM_XSTREAMS, pools); + + ABT_xstream_get_rank(xstream, &rank); + printf("[U%d:E%d] creating ULTs\n", tid, rank); + + threads = (ABT_thread *)malloc(sizeof(ABT_thread) * NUM_THREADS); + for (i = 0; i < NUM_THREADS; i++) { + size_t id = (rank + 1) * 10 + i; + ABT_thread_create(pools[rank], thread_hello, (void *)id, ABT_THREAD_ATTR_NULL, + &threads[i]); + } + + ABT_xstream_get_rank(xstream, &rank); + printf("[U%d:E%d] freeing ULTs\n", tid, rank); + for (i = 0; i < NUM_THREADS; i++) { + ABT_thread_free(&threads[i]); + } + free(threads); +} + +static void thread_hello(void *arg) +{ + int tid = (int)(size_t)arg; + int old_rank, cur_rank; + char *msg; + + ABT_xstream_self_rank(&cur_rank); + + printf(" [U%d:E%d] Hello, world!\n", tid, cur_rank); + + ABT_thread_yield(); + + old_rank = cur_rank; + ABT_xstream_self_rank(&cur_rank); + msg = (cur_rank == old_rank) ? "" : " (stolen)"; + printf(" [U%d:E%d] Hello again.%s\n", tid, cur_rank, msg); + + ABT_thread_yield(); + + old_rank = cur_rank; + ABT_xstream_self_rank(&cur_rank); + msg = (cur_rank == old_rank) ? "" : " (stolen)"; + printf(" [U%d:E%d] Goodbye, world!%s\n", tid, cur_rank, msg); +} diff --git a/src/include/abt.h.in b/src/include/abt.h.in index 0d9f295b..cf4e7741 100644 --- a/src/include/abt.h.in +++ b/src/include/abt.h.in @@ -2309,6 +2309,87 @@ typedef void (*ABT_tool_thread_callback_fn)(ABT_thread, ABT_xstream, uint64_t ev typedef void (*ABT_tool_task_callback_fn)(ABT_task, ABT_xstream, uint64_t event, ABT_tool_context context, void *user_arg); +/** + * @ingroup SCHED + * @brief A struct that defines the scheduling policy for an associated pool. + */ +typedef struct { + /** + * @brief Pool priority. + * + * This value determines the relative priority of the associated pool when + * the scheduler is selecting between multiple ready pools. The scheduler + * will favor pools with lower priority. Value must be non-negative. + */ + int priority; + /** + * @brief Minimum number of successful attempts. + * + * This value determines how many work units will be pulled from this pool + * before returning to the scheduler to check for interrupts and/or higher + * priority pools that may have become ready. + */ + int min_successes; + /** + * @brief Maximum number of successful attempts. + * + * This value determines the maximum number of work units will be executed + * from this pool before the pool is timed-out to allow for the scheduler + * to check other pools. + */ + int max_successes; + /** + * @brief Scheduler event number at which this pool becomes ready. + * + * This value determines the point in time that this pool becomes ready + * after being timed-out. It is specified in terms of number of work units + * that the scheduler has executed. + */ + int ready_at; + /** + * @brief Timeout value when the scheduler fails to pop a work unit. + * + * This value determines how many work units the scheduler should execute + * from OTHER pools before this pool becomes ready again in the case where + * the scheduler fails to execute a work unit from this pool (because the + * pool has become empty). + */ + int fail_timeout; + /** + * @brief Timeout value after the maximum number of successful pops. + * + * This value determines how many work units the scheduler should execute + * from OTHER pools before this pool becomes ready again in the case where + * the scheduler has executed the maximum number of events from this pool, + * as specified by max_successes. + */ + int success_timeout; + /** + * @brief Walltime at which this pool becomes ready after a timeout. + * + * This value determines the walltime at which this pool becomes ready + * again after a timeout. It works in conjunction with ready_at. For a pool + * to be ready, it must have reached both the ready_at_wt walltime and the + * ready_at scheduler event number. + */ + double ready_at_wt; + /** + * @brief Timeout value (in seconds) when the scheduler fails to pop. + * + * This value determines how many seconds this pool will be timed out + * in the case where the scheduler fails to pop an work unit from this pool. + */ + double fail_timeout_wt; + /** + * @brief Timeout value (in seconds) after the maximum successful pops. + * + * This value determines how many seconds this pool will be timed out + * in the case where the scheduler fails pops the maximum number of work + * units from this pool as determined by max_successes. + */ + double success_timeout_wt; +} ABT_sched_policy; + /* Init & Finalize */ int ABT_init(int argc, char **argv) ABT_API_PUBLIC; int ABT_finalize(void) ABT_API_PUBLIC; @@ -2363,6 +2444,9 @@ int ABT_sched_create(ABT_sched_def *def, int num_pools, ABT_pool *pools, int ABT_sched_create_basic(ABT_sched_predef predef, int num_pools, ABT_pool *pools, ABT_sched_config config, ABT_sched *newsched) ABT_API_PUBLIC; +void ABT_sched_create_reg(int num_pools, ABT_pool *pools, + int num_policies, ABT_sched_policy *policies, + ABT_sched *newsched) ABT_API_PUBLIC; int ABT_sched_free(ABT_sched *sched) ABT_API_PUBLIC; int ABT_sched_set_data(ABT_sched sched, void *data) ABT_API_PUBLIC; int ABT_sched_get_data(ABT_sched sched, void **data) ABT_API_PUBLIC; diff --git a/src/sched/Makefile.mk b/src/sched/Makefile.mk index f87ab1f2..926fa822 100644 --- a/src/sched/Makefile.mk +++ b/src/sched/Makefile.mk @@ -8,6 +8,7 @@ abt_sources += \ sched/basic_wait.c \ sched/prio.c \ sched/randws.c \ + sched/reg.c \ sched/sched.c \ sched/sched_config.c diff --git a/src/sched/reg.c b/src/sched/reg.c new file mode 100644 index 00000000..8942ff29 --- /dev/null +++ b/src/sched/reg.c @@ -0,0 +1,136 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */ +/* + * See COPYRIGHT in top-level directory. + */ + +#include +#include +#include "abti.h" + +typedef struct { + uint64_t event_num; + uint32_t num_pools; + uint32_t prev_pool; + uint32_t successes; + ABT_sched_policy *policies; +} sched_data_t; + +static int sched_init(ABT_sched sched, ABT_sched_config config) { + ABT_sched_policy *policies; + + sched_data_t *p_data = (sched_data_t *)calloc(1, sizeof(sched_data_t)); + + p_data->event_num = 0; + p_data->prev_pool = -1; + p_data->successes = 0; + ABT_sched_get_num_pools(sched, &p_data->num_pools); + p_data->policies = (ABT_sched_policy *)calloc(p_data->num_pools, sizeof(ABT_sched_policy)); + + ABT_sched_config_read(config, 1, &policies); + memcpy(p_data->policies, policies, p_data->num_pools * sizeof(ABT_sched_policy)); + + ABT_sched_set_data(sched, (void *)p_data); + + return ABT_SUCCESS; +} + +static void sched_run(ABT_sched sched) { + int cur_rank; + sched_data_t* p_data; + ABT_bool stop; + + int num_pools; + ABT_pool *pools; + + ABT_xstream_self_rank(&cur_rank); + ABT_sched_get_data(sched, (void **)&p_data); + + ABT_sched_get_num_pools(sched, &num_pools); + pools = (ABT_pool *)malloc(num_pools * sizeof(ABT_pool)); + ABT_sched_get_pools(sched, num_pools, 0, pools); + + while (1) { + int pool = -1; + int prio = -1; + double current_time = ABT_get_wtime(); + // TODO: Can also check "isEmpty" so we don't take from pools we don't want to + for (int i = 0; i < num_pools; i++) { + if ((p_data->policies[i].ready_at <= p_data->event_num && + p_data->policies[i].ready_at_wt <= current_time) && + (p_data->policies[i].priority < prio || prio == -1)) { + pool = i; + prio = p_data->policies[i].priority; + } + } + if (pool != p_data->prev_pool) { + p_data->prev_pool = pool; + p_data->successes = 0; + } + if (pool == -1) { + p_data->event_num++; + } else { + ABT_sched_policy *policy = &p_data->policies[pool]; + for (int i = 0; i < policy->min_successes; i++) { + /* Execute one work unit from the scheduler's pool */ + ABT_thread thread; + current_time = ABT_get_wtime(); + ABT_pool_pop_thread(pools[pool], &thread); + if (thread != ABT_THREAD_NULL) { + /* "thread" is associated with its original pool (pools[0]). */ + ABT_self_schedule(thread, ABT_POOL_NULL); + p_data->event_num++; + p_data->successes++; + if (p_data->successes >= policy->max_successes) { + p_data->successes = 0; + policy->ready_at = p_data->event_num + policy->success_timeout; + policy->ready_at_wt = current_time + policy->success_timeout_wt; + break; + } + } else { + p_data->successes = 0; + policy->ready_at = p_data->event_num + policy->fail_timeout; + policy->ready_at_wt = current_time + policy->fail_timeout_wt; + break; + } + } + } + ABT_sched_has_to_stop(sched, &stop); + if (stop == ABT_TRUE) + break; + ABT_xstream_check_events(sched); + } + + free(pools); +} + +static int sched_free(ABT_sched sched) { + sched_data_t* p_data; + + ABT_sched_get_data(sched, (void **)&p_data); + free(p_data->policies); + free(p_data); + + return ABT_SUCCESS; +} + +void ABT_sched_create_reg(int num_pools, ABT_pool *pools, + int num_policies, ABT_sched_policy *policies, + ABT_sched *sched) { + ABT_sched_config config; + + ABT_sched_config_var cv_policies = { .idx = 0, + .type = ABT_SCHED_CONFIG_PTR }; + + ABT_sched_def sched_def = { .type = ABT_SCHED_TYPE_ULT, + .init = sched_init, + .run = sched_run, + .free = sched_free, + .get_migr_pool = NULL }; + + /* Create a scheduler config */ + ABT_sched_config_create(&config, cv_policies, policies, + ABT_sched_config_var_end); + + ABT_sched_create(&sched_def, num_pools, pools, config, sched); + ABT_sched_config_free(&config); +}