diff --git a/src/include/abt.h.in b/src/include/abt.h.in index c626c3af..0e4e848f 100644 --- a/src/include/abt.h.in +++ b/src/include/abt.h.in @@ -1779,6 +1779,7 @@ int ABT_pool_add_sched(ABT_pool pool, ABT_sched sched) ABT_API_PUBLIC; int ABT_pool_get_id(ABT_pool pool, int *id) ABT_API_PUBLIC; /* Work Unit */ +int ABT_unit_get_thread(ABT_unit unit, ABT_thread *thread) ABT_API_PUBLIC; int ABT_unit_set_associated_pool(ABT_unit unit, ABT_pool pool) ABT_API_PUBLIC; /* User-level Thread (ULT) */ @@ -1805,6 +1806,7 @@ int ABT_thread_get_last_xstream(ABT_thread thread, ABT_xstream *xstream) ABT_API int ABT_thread_get_state(ABT_thread thread, ABT_thread_state *state) ABT_API_PUBLIC; int ABT_thread_get_last_pool(ABT_thread thread, ABT_pool *pool) ABT_API_PUBLIC; int ABT_thread_get_last_pool_id(ABT_thread thread, int *id) ABT_API_PUBLIC; +int ABT_thread_get_unit(ABT_thread thread, ABT_unit *unit) ABT_API_PUBLIC; int ABT_thread_set_associated_pool(ABT_thread thread, ABT_pool pool) ABT_API_PUBLIC; int ABT_thread_yield_to(ABT_thread thread) ABT_API_PUBLIC; int ABT_thread_yield(void) ABT_API_PUBLIC; @@ -1825,6 +1827,7 @@ int ABT_thread_get_stacksize(ABT_thread thread, size_t *stacksize) ABT_API_PUBLI int ABT_thread_get_id(ABT_thread thread, ABT_unit_id *thread_id) ABT_API_PUBLIC; int ABT_thread_set_arg(ABT_thread thread, void *arg) ABT_API_PUBLIC; int ABT_thread_get_arg(ABT_thread thread, void **arg) ABT_API_PUBLIC; +int ABT_thread_get_thread_func(ABT_thread thread, void (**thread_func)(void *)) ABT_API_PUBLIC; int ABT_thread_set_specific(ABT_thread thread, ABT_key key, void *value) ABT_API_PUBLIC; int ABT_thread_get_specific(ABT_thread thread, ABT_key key, void **value) ABT_API_PUBLIC; int ABT_thread_get_attr(ABT_thread thread, ABT_thread_attr *attr) ABT_API_PUBLIC; @@ -1880,12 +1883,16 @@ int ABT_self_get_type(ABT_unit_type *type) ABT_API_PUBLIC; int ABT_self_is_primary(ABT_bool *is_primary) ABT_API_PUBLIC; int ABT_self_on_primary_xstream(ABT_bool *on_primary) ABT_API_PUBLIC; int ABT_self_is_unnamed(ABT_bool *is_unnamed) ABT_API_PUBLIC; +int ABT_self_get_last_pool(ABT_pool *pool) ABT_API_PUBLIC; int ABT_self_get_last_pool_id(int *pool_id) ABT_API_PUBLIC; +int ABT_self_set_associated_pool(ABT_pool pool) ABT_API_PUBLIC; +int ABT_self_get_unit(ABT_unit *unit) ABT_API_PUBLIC; int ABT_self_yield(void) ABT_API_PUBLIC; int ABT_self_suspend(void) ABT_API_PUBLIC; int ABT_self_exit(void) ABT_API_PUBLIC; int ABT_self_set_arg(void *arg) ABT_API_PUBLIC; int ABT_self_get_arg(void **arg) ABT_API_PUBLIC; +int ABT_self_get_thread_func(void (**thread_func)(void *)) ABT_API_PUBLIC; /* ULT-specific data */ int ABT_key_create(void (*destructor)(void *value), ABT_key *newkey) ABT_API_PUBLIC; diff --git a/src/self.c b/src/self.c index bdef4446..2c48a2e1 100644 --- a/src/self.c +++ b/src/self.c @@ -412,6 +412,37 @@ int ABT_self_on_primary_xstream(ABT_bool *on_primary) return ABT_SUCCESS; } +/** + * @ingroup SELF + * @brief Get the last pool of the calling work unit. + * + * \c ABT_self_get_last_pool() returns the last pool associated with the calling + * work unit through \c pool. + * + * @contexts + * \DOC_CONTEXT_INIT_NOEXT \DOC_CONTEXT_NOCTXSWITCH + * + * @errors + * \DOC_ERROR_SUCCESS + * \DOC_ERROR_INV_XSTREAM_EXT + * + * @undefined + * \DOC_UNDEFINED_UNINIT + * \DOC_UNDEFINED_NULL_PTR{\c pool} + * + * @param[out] pool pool handle + * @return Error code + */ +int ABT_self_get_last_pool(ABT_pool *pool) +{ + ABTI_xstream *p_local_xstream; + ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream); + ABTI_thread *p_self = p_local_xstream->p_thread; + ABTI_ASSERT(p_self->p_pool); + *pool = ABTI_pool_get_handle(p_self->p_pool); + return ABT_SUCCESS; +} + /** * @ingroup SELF * @brief Get ID of the last pool of the calling work unit. @@ -459,6 +490,75 @@ int ABT_self_get_last_pool_id(int *pool_id) return ABT_SUCCESS; } +/** + * @ingroup SELF + * @brief Set an associated pool for the calling work unit. + * + * \c ABT_self_set_associated_pool() changes the associated pool of the work + * unit \c thread to the pool \c pool. This routine does not yield the calling + * work unit. + * + * @contexts + * \DOC_CONTEXT_INIT_NOEXT \DOC_CONTEXT_NOCTXSWITCH + * + * @errors + * \DOC_ERROR_SUCCESS + * \DOC_ERROR_INV_XSTREAM_EXT + * \DOC_ERROR_INV_POOL_HANDLE{\c pool} + * \DOC_ERROR_RESOURCE + * \DOC_ERROR_RESOURCE_UNIT_CREATE + * + * @undefined + * \DOC_UNDEFINED_UNINIT + * \DOC_UNDEFINED_THREAD_UNSAFE{the caller} + * + * @param[in] pool pool handle + * @return Error code + */ +int ABT_self_set_associated_pool(ABT_pool pool) +{ + ABTI_global *p_global = ABTI_global_get_global(); + ABTI_xstream *p_local_xstream; + ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream); + ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); + ABTI_CHECK_NULL_POOL_PTR(p_pool); + ABTI_thread *p_self = p_local_xstream->p_thread; + + int abt_errno = ABTI_thread_set_associated_pool(p_global, p_self, p_pool); + ABTI_CHECK_ERROR(abt_errno); + return ABT_SUCCESS; +} + +/** + * @ingroup SELF + * @brief Get a unit handle of the calling work unit. + * + * \c ABT_self_get_unit() returns the \c ABT_unit handle associated with the + * calling work unit through \c unit. + * + * @contexts + * \DOC_CONTEXT_INIT_NOEXT \DOC_CONTEXT_NOCTXSWITCH + * + * @errors + * \DOC_ERROR_SUCCESS + * \DOC_ERROR_INV_XSTREAM_EXT + * + * @undefined + * \DOC_UNDEFINED_UNINIT + * \DOC_UNDEFINED_NULL_PTR{\c unit} + * + * @param[out] unit work unit handle + * @return Error code + */ +int ABT_self_get_unit(ABT_unit *unit) +{ + /* We don't allow an external thread to call this routine. */ + ABTI_xstream *p_local_xstream; + ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream); + *unit = p_local_xstream->p_thread->unit; + return ABT_SUCCESS; +} + /** * @ingroup SELF * @brief Yield the calling ULT to its parent ULT @@ -647,6 +747,36 @@ int ABT_self_get_arg(void **arg) return ABT_SUCCESS; } +/** + * @ingroup SELF + * @brief Retrieve a work-unit function of the calling work unit + * + * \c ABT_self_get_thread_func() returns the work-unit function of the calling + * work unit through \c thread_func. + * + * @contexts + * \DOC_CONTEXT_INIT_NOEXT \DOC_CONTEXT_NOCTXSWITCH + * + * @errors + * \DOC_ERROR_SUCCESS + * \DOC_ERROR_INV_XSTREAM_EXT + * + * @undefined + * \DOC_UNDEFINED_UNINIT + * \DOC_UNDEFINED_NULL_PTR{\c thread_func} + * + * @param[out] thread_func the caller's function + * @return Error code + */ +int ABT_self_get_thread_func(void (**thread_func)(void *)) +{ + ABTI_xstream *p_local_xstream; + ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream); + + *thread_func = p_local_xstream->p_thread->f_thread; + return ABT_SUCCESS; +} + /** * @ingroup SELF * @brief Check if the calling work unit is unnamed diff --git a/src/thread.c b/src/thread.c index f687cff6..f5eb407a 100644 --- a/src/thread.c +++ b/src/thread.c @@ -935,6 +935,36 @@ int ABT_thread_get_last_pool_id(ABT_thread thread, int *id) return ABT_SUCCESS; } +/** + * @ingroup ULT + * @brief Get a unit handle of the target work unit. + * + * \c ABT_thread_get_unit() returns the \c ABT_unit handle associated with the + * work unit \c thread through \c unit. + * + * @contexts + * \DOC_CONTEXT_INIT \DOC_CONTEXT_NOCTXSWITCH + * + * @errors + * \DOC_ERROR_SUCCESS + * \DOC_ERROR_INV_THREAD_HANDLE{\c thread} + * + * @undefined + * \DOC_UNDEFINED_UNINIT + * \DOC_UNDEFINED_NULL_PTR{\c unit} + * + * @param[in] thread work unit handle + * @param[out] unit work unit handle + * @return Error code + */ +int ABT_thread_get_unit(ABT_thread thread, ABT_unit *unit) +{ + ABTI_thread *p_thread = ABTI_thread_get_ptr(thread); + ABTI_CHECK_NULL_THREAD_PTR(p_thread); + *unit = p_thread->unit; + return ABT_SUCCESS; +} + /** * @ingroup ULT * @brief Set an associated pool for the target work unit. @@ -2021,6 +2051,37 @@ int ABT_thread_get_arg(ABT_thread thread, void **arg) return ABT_SUCCESS; } +/** + * @ingroup ULT + * @brief Retrieve a work-unit function of a work unit. + * + * \c ABT_thread_get_thread_func() returns the work-unit function of the work + * unit \c thread through \c thread_func. + * + * @contexts + * \DOC_CONTEXT_INIT \DOC_CONTEXT_NOCTXSWITCH + * + * @errors + * \DOC_ERROR_SUCCESS + * \DOC_ERROR_INV_THREAD_HANDLE{\c thread} + * + * @undefined + * \DOC_UNDEFINED_UNINIT + * \DOC_UNDEFINED_NULL_PTR{\c thread_func} + * + * @param[in] thread work unit handle + * @param[out] thread_func work-unit function + * @return Error code + */ +int ABT_thread_get_thread_func(ABT_thread thread, void (**thread_func)(void *)) +{ + ABTI_thread *p_thread = ABTI_thread_get_ptr(thread); + ABTI_CHECK_NULL_THREAD_PTR(p_thread); + + *thread_func = p_thread->f_thread; + return ABT_SUCCESS; +} + /** * @ingroup ULT * @brief Set a value with a work-unit-specific data key in a work unit. diff --git a/src/unit.c b/src/unit.c index ffb8b463..e21ccc25 100644 --- a/src/unit.c +++ b/src/unit.c @@ -53,6 +53,37 @@ int ABT_unit_set_associated_pool(ABT_unit unit, ABT_pool pool) return ABT_SUCCESS; } +/** + * @ingroup UNIT + * @brief Get a thread handle of the target work unit. + * + * \c ABT_unit_get_thread() returns the \c ABT_thread handle associated with the + * work unit \c unit through \c thread. + * + * @contexts + * \DOC_CONTEXT_INIT \DOC_CONTEXT_NOCTXSWITCH + * + * @errors + * \DOC_ERROR_SUCCESS + * \DOC_ERROR_INV_UNIT_HANDLE{\c unit} + * + * @undefined + * \DOC_UNDEFINED_UNINIT + * \DOC_UNDEFINED_NULL_PTR{\c thread} + * + * @param[in] unit work unit handle + * @param[out] thread work unit handle + * @return Error code + */ +int ABT_unit_get_thread(ABT_unit unit, ABT_thread *thread) +{ + ABTI_global *p_global = ABTI_global_get_global(); + ABTI_CHECK_TRUE(unit != ABT_UNIT_NULL, ABT_ERR_INV_UNIT); + ABTI_thread *p_thread = ABTI_unit_get_thread(p_global, unit); + *thread = ABTI_thread_get_handle(p_thread); + return ABT_SUCCESS; +} + /*****************************************************************************/ /* Private APIs */ /*****************************************************************************/ diff --git a/test/.gitignore b/test/.gitignore index 71967861..70150d7a 100644 --- a/test/.gitignore +++ b/test/.gitignore @@ -17,6 +17,7 @@ basic/thread_yield_to basic/thread_exit basic/thread_self_suspend_resume basic/thread_get_last_xstream +basic/thread_get_func_arg basic/thread_migrate basic/thread_data basic/thread_data2 @@ -75,6 +76,7 @@ basic/info_print basic/info_print_stack basic/info_stackdump basic/info_stackdump2 +basic/unit basic/error # benchmark diff --git a/test/basic/Makefile.am b/test/basic/Makefile.am index 7893eb76..63a150aa 100644 --- a/test/basic/Makefile.am +++ b/test/basic/Makefile.am @@ -22,6 +22,7 @@ TESTS = \ thread_exit \ thread_self_suspend_resume \ thread_get_last_xstream \ + thread_get_func_arg \ thread_migrate \ thread_data \ thread_data2 \ @@ -80,6 +81,7 @@ TESTS = \ info_print_stack \ info_stackdump \ info_stackdump2 \ + unit \ error XFAIL_TESTS = @@ -115,6 +117,7 @@ thread_yield_to_SOURCES = thread_yield_to.c thread_exit_SOURCES = thread_exit.c thread_self_suspend_resume_SOURCES = thread_self_suspend_resume.c thread_get_last_xstream_SOURCES = thread_get_last_xstream.c +thread_get_func_arg_SOURCES = thread_get_func_arg.c thread_migrate_SOURCES = thread_migrate.c thread_data_SOURCES = thread_data.c thread_data2_SOURCES = thread_data2.c @@ -173,6 +176,7 @@ info_print_SOURCES = info_print.c info_print_stack_SOURCES = info_print_stack.c info_stackdump_SOURCES = info_stackdump.c info_stackdump2_SOURCES = info_stackdump2.c +unit_SOURCES = unit.c error_SOURCES = error.c testing: @@ -194,6 +198,7 @@ testing: ./thread_exit ./thread_self_suspend_resume ./thread_get_last_xstream + ./thread_get_func_arg ./thread_migrate ./thread_data ./thread_data2 @@ -252,4 +257,5 @@ testing: ./info_print_stack ./info_stackdump ./info_stackdump2 + ./unit ./error diff --git a/test/basic/pool_custom.c b/test/basic/pool_custom.c index 2ad2d6a4..40ad9e2b 100644 --- a/test/basic/pool_custom.c +++ b/test/basic/pool_custom.c @@ -25,12 +25,9 @@ void thread_func(void *arg) for (i = 0; i < 10; i++) { if (i % 3 == 0) { ABT_pool target_pool = (ABT_pool)arg; - ABT_thread thread; - ret = ABT_self_get_thread(&thread); - ATS_ERROR(ret, "ABT_self_get_thread"); /* Let's change the associated pool sometimes. */ - ret = ABT_thread_set_associated_pool(thread, target_pool); - ATS_ERROR(ret, "ABT_thread_set_associated_pool"); + ret = ABT_self_set_associated_pool(target_pool); + ATS_ERROR(ret, "ABT_self_set_associated_pool"); } ret = ABT_thread_yield(); ATS_ERROR(ret, "ABT_thread_yield"); @@ -227,12 +224,8 @@ int main(int argc, char *argv[]) /* Move this thread to the main pool. This is needed since the following * user-defined pool_free() checks whether the pool is empty or not. */ - ABT_thread self_thread; - ret = ABT_self_get_thread(&self_thread); - ATS_ERROR(ret, "ABT_thread_self"); - /* Let's change the associated pool sometimes. */ - ret = ABT_thread_set_associated_pool(self_thread, pools[0]); - ATS_ERROR(ret, "ABT_thread_set_associated_pool"); + ret = ABT_self_set_associated_pool(pools[0]); + ATS_ERROR(ret, "ABT_self_set_associated_pool"); /* Free schedulers of the secondary execution streams (since the scheduler * created by ABT_sched_create() are not automatically freed). */ diff --git a/test/basic/task_data.c b/test/basic/task_data.c index 447f4c0a..7e018189 100644 --- a/test/basic/task_data.c +++ b/test/basic/task_data.c @@ -99,14 +99,11 @@ static void task_create(void *arg) { int i, ret; int my_id = (int)(intptr_t)arg; - ABT_thread my_thread; ABT_pool my_pool; ABT_task *tasks; - ret = ABT_thread_self(&my_thread); - ATS_ERROR(ret, "ABT_thread_self"); - ret = ABT_thread_get_last_pool(my_thread, &my_pool); - ATS_ERROR(ret, "ABT_thread_get_last_pool"); + ret = ABT_self_get_last_pool(&my_pool); + ATS_ERROR(ret, "ABT_self_get_last_pool"); /* Create tasklets */ tasks = (ABT_task *)malloc(num_tasks * sizeof(ABT_task)); diff --git a/test/basic/thread_create2.c b/test/basic/thread_create2.c index dfa6895d..c7bdf5d4 100644 --- a/test/basic/thread_create2.c +++ b/test/basic/thread_create2.c @@ -23,13 +23,10 @@ void thread_create(void *arg) { int i, ret; size_t my_id = (size_t)arg; - ABT_thread my_thread; ABT_pool my_pool; - ret = ABT_thread_self(&my_thread); - ATS_ERROR(ret, "ABT_thread_self"); - ret = ABT_thread_get_last_pool(my_thread, &my_pool); - ATS_ERROR(ret, "ABT_thread_get_last_pool"); + ret = ABT_self_get_last_pool(&my_pool); + ATS_ERROR(ret, "ABT_self_get_last_pool"); /* Create threads */ for (i = 0; i < num_threads; i++) { diff --git a/test/basic/thread_data.c b/test/basic/thread_data.c index ba619538..40b316a5 100644 --- a/test/basic/thread_data.c +++ b/test/basic/thread_data.c @@ -104,14 +104,11 @@ static void thread_create(void *arg) { int i, ret; int my_id = (int)(intptr_t)arg; - ABT_thread my_thread; ABT_pool my_pool; ABT_thread *threads; - ret = ABT_thread_self(&my_thread); - ATS_ERROR(ret, "ABT_thread_self"); - ret = ABT_thread_get_last_pool(my_thread, &my_pool); - ATS_ERROR(ret, "ABT_thread_get_last_pool"); + ret = ABT_self_get_last_pool(&my_pool); + ATS_ERROR(ret, "ABT_self_get_last_pool"); /* Create ULTs */ threads = (ABT_thread *)malloc(num_threads * sizeof(ABT_thread)); diff --git a/test/basic/thread_get_func_arg.c b/test/basic/thread_get_func_arg.c new file mode 100644 index 00000000..a1d4702f --- /dev/null +++ b/test/basic/thread_get_func_arg.c @@ -0,0 +1,128 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */ +/* + * See COPYRIGHT in top-level directory. + */ + +#include +#include +#include "abt.h" +#include "abttest.h" + +#define DEFAULT_NUM_XSTREAMS 4 +#define DEFAULT_NUM_THREADS 8 +#define DEFAULT_NUM_TASKS 4 + +void check_func_arg(void (*thread_f1)(void *), void *arg1) +{ + int ret; + void *arg2, *arg3; + void (*thread_f2)(void *), (*thread_f3)(void *); + + ret = ABT_self_get_thread_func(&thread_f2); + ATS_ERROR(ret, "ABT_self_get_thread_func"); + assert(thread_f1 == thread_f2); + ret = ABT_self_get_arg(&arg2); + ATS_ERROR(ret, "ABT_self_get_arg"); + assert(arg1 == arg2); + + ABT_thread self_thread; + ret = ABT_self_get_thread(&self_thread); + ATS_ERROR(ret, "ABT_self_get_thread"); + ret = ABT_thread_get_thread_func(self_thread, &thread_f3); + ATS_ERROR(ret, "ABT_thread_get_thread_func"); + assert(thread_f1 == thread_f3); + ret = ABT_thread_get_arg(self_thread, &arg3); + ATS_ERROR(ret, "ABT_thread_get_arg"); + assert(arg1 == arg3); +} + +void thread_func(void *arg) +{ + check_func_arg(thread_func, arg); + int ret = ABT_self_yield(); + ATS_ERROR(ret, "ABT_self_yield"); + check_func_arg(thread_func, arg); +} + +void task_func(void *arg) +{ + check_func_arg(task_func, arg); +} + +int main(int argc, char *argv[]) +{ + int i; + int ret; + int num_xstreams, num_threads, num_tasks; + + /* Initialize */ + ATS_read_args(argc, argv); + if (argc < 2) { + num_xstreams = DEFAULT_NUM_XSTREAMS; + num_threads = DEFAULT_NUM_THREADS; + num_tasks = DEFAULT_NUM_TASKS; + } else { + num_xstreams = ATS_get_arg_val(ATS_ARG_N_ES); + num_threads = ATS_get_arg_val(ATS_ARG_N_ULT); + num_tasks = ATS_get_arg_val(ATS_ARG_N_TASK); + } + + ATS_init(argc, argv, num_xstreams); + ABT_xstream *xstreams = + (ABT_xstream *)malloc(sizeof(ABT_xstream) * num_xstreams); + ABT_thread *threads = + (ABT_thread *)malloc(sizeof(ABT_thread) * num_threads); + ABT_thread *tasks = (ABT_thread *)malloc(sizeof(ABT_thread) * num_tasks); + + /* Create Execution Streams */ + ret = ABT_xstream_self(&xstreams[0]); + ATS_ERROR(ret, "ABT_xstream_self"); + for (i = 1; i < num_xstreams; i++) { + ret = ABT_xstream_create(ABT_SCHED_NULL, &xstreams[i]); + ATS_ERROR(ret, "ABT_xstream_create"); + } + + /* Create ULTs for each ES */ + for (i = 0; i < num_threads; i++) { + void *arg = (void *)(uintptr_t)i; + ret = ABT_thread_create_on_xstream(xstreams[i % num_xstreams], + thread_func, arg, + ABT_THREAD_ATTR_NULL, &threads[i]); + ATS_ERROR(ret, "ABT_thread_create_on_xstream"); + } + + /* Create tasklets for each ES */ + for (i = 0; i < num_tasks; i++) { + void *arg = (void *)(uintptr_t)i; + ret = ABT_task_create_on_xstream(xstreams[i % num_xstreams], task_func, + arg, &tasks[i]); + ATS_ERROR(ret, "ABT_task_create_on_xstream"); + } + + /* Join and free ULTs */ + for (i = 0; i < num_threads; i++) { + ret = ABT_thread_free(&threads[i]); + ATS_ERROR(ret, "ABT_thread_join"); + } + + /* Join and free tasklets */ + for (i = 0; i < num_tasks; i++) { + ret = ABT_thread_free(&tasks[i]); + ATS_ERROR(ret, "ABT_thread_free"); + } + + /* Join and free execution streams */ + for (i = 1; i < num_xstreams; i++) { + ret = ABT_xstream_free(&xstreams[i]); + ATS_ERROR(ret, "ABT_xstream_free"); + } + + free(xstreams); + free(threads); + free(tasks); + + /* Finalize */ + ret = ATS_finalize(0); + + return ret; +} diff --git a/test/basic/timer.c b/test/basic/timer.c index 34d2b8c0..b4631581 100644 --- a/test/basic/timer.c +++ b/test/basic/timer.c @@ -24,7 +24,6 @@ void thread_create(void *arg) { int i, ret; size_t my_id = (size_t)arg; - ABT_thread my_thread; ABT_pool my_pool; ABT_timer my_timer; double t_start = 0.0; @@ -33,10 +32,8 @@ void thread_create(void *arg) ret = ABT_timer_dup(timer, &my_timer); ATS_ERROR(ret, "ABT_timer_dup"); - ret = ABT_thread_self(&my_thread); - ATS_ERROR(ret, "ABT_thread_self"); - ret = ABT_thread_get_last_pool(my_thread, &my_pool); - ATS_ERROR(ret, "ABT_thread_get_last_pool"); + ret = ABT_self_get_last_pool(&my_pool); + ATS_ERROR(ret, "ABT_self_get_last_pool"); ABT_timer_stop_and_read(my_timer, &t_start); diff --git a/test/basic/unit.c b/test/basic/unit.c new file mode 100644 index 00000000..0b5c3bfe --- /dev/null +++ b/test/basic/unit.c @@ -0,0 +1,365 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */ +/* + * See COPYRIGHT in top-level directory. + */ + +/* Several types of pools are used. This test checks if corresponding pool + * operations are called properly. */ + +#include +#include +#include "abt.h" +#include "abttest.h" + +void create_sched_def(ABT_sched_def *p_def); +void create_pool_def(ABT_pool_def *p_def); + +#define DEFAULT_NUM_XSTREAMS 2 +#define DEFAULT_NUM_THREADS 100 +#define NUM_POOLS 2 + +void check_self_unit_mapping(void) +{ + int ret; + ABT_thread self_thread1, self_thread2; + ABT_unit self_unit1, self_unit2; + ret = ABT_self_get_thread(&self_thread1); + ATS_ERROR(ret, "ABT_self_get_thread"); + ret = ABT_self_get_unit(&self_unit1); + ATS_ERROR(ret, "ABT_self_get_unit"); + ret = ABT_thread_get_unit(self_thread1, &self_unit2); + ATS_ERROR(ret, "ABT_thread_get_unit"); + ret = ABT_unit_get_thread(self_unit1, &self_thread2); + ATS_ERROR(ret, "ABT_unit_get_thread"); + assert(self_unit1 == self_unit2); + assert(self_thread1 == self_thread2); +} + +void thread_func(void *arg) +{ + int ret, i; + for (i = 0; i < 10; i++) { + if (i % 3 == 0) { + check_self_unit_mapping(); + ABT_pool target_pool = (ABT_pool)arg; + /* Let's change the associated pool sometimes. */ + ret = ABT_self_set_associated_pool(target_pool); + ATS_ERROR(ret, "ABT_self_set_associated_pool"); + } + check_self_unit_mapping(); + ret = ABT_thread_yield(); + ATS_ERROR(ret, "ABT_thread_yield"); + } +} + +int sched_init(ABT_sched sched, ABT_sched_config config) +{ + return ABT_SUCCESS; +} + +void sched_run(ABT_sched sched) +{ + check_self_unit_mapping(); + int ret; + ABT_pool pools[NUM_POOLS]; + ret = ABT_sched_get_pools(sched, NUM_POOLS, 0, pools); + ATS_ERROR(ret, "ABT_sched_get_pools"); + int work_count = 0; + while (1) { + ABT_unit unit; + ABT_pool victim_pool = pools[work_count % NUM_POOLS]; + int no_run = (work_count % 3) == 0; + + ret = ABT_pool_pop(victim_pool, &unit); + ATS_ERROR(ret, "ABT_pool_pop"); + if (unit != ABT_UNIT_NULL) { + ABT_pool target_pool = pools[(work_count / 2) % NUM_POOLS]; + if (no_run) { + /* Push back to the pool. */ + ret = ABT_pool_push(target_pool, unit); + ATS_ERROR(ret, "ABT_pool_push"); + } else { + ret = ABT_xstream_run_unit(unit, target_pool); + ATS_ERROR(ret, "ABT_xstream_run_unit"); + } + } + if (work_count++ % 100 == 0) { + ABT_bool stop; + ret = ABT_sched_has_to_stop(sched, &stop); + ATS_ERROR(ret, "ABT_sched_has_to_stop"); + if (stop == ABT_TRUE) + break; + ret = ABT_xstream_check_events(sched); + ATS_ERROR(ret, "ABT_xstream_check_events"); + } + } +} + +int sched_free(ABT_sched sched) +{ + return ABT_SUCCESS; +} + +void create_sched_def(ABT_sched_def *p_def) +{ + p_def->type = ABT_SCHED_TYPE_ULT; + p_def->init = sched_init; + p_def->run = sched_run; + p_def->free = sched_free; + p_def->get_migr_pool = NULL; +} + +ABT_sched create_sched(int num_pools, ABT_pool *pools) +{ + int ret; + ABT_sched sched; + ABT_sched_def sched_def; + create_sched_def(&sched_def); + ret = ABT_sched_create(&sched_def, num_pools, pools, ABT_SCHED_CONFIG_NULL, + &sched); + ATS_ERROR(ret, "ABT_sched_create"); + return sched; +} + +int main(int argc, char *argv[]) +{ + int i, ret; + int num_xstreams = DEFAULT_NUM_XSTREAMS; + int num_threads = DEFAULT_NUM_THREADS; + + /* Initialize */ + ATS_read_args(argc, argv); + if (argc > 1) { + num_xstreams = ATS_get_arg_val(ATS_ARG_N_ES); + num_threads = ATS_get_arg_val(ATS_ARG_N_ULT); + } + + /* Allocate memory. */ + ABT_xstream *xstreams = + (ABT_xstream *)malloc(sizeof(ABT_xstream) * num_xstreams); + ABT_pool *pools = (ABT_pool *)malloc(sizeof(ABT_pool) * NUM_POOLS); + ABT_sched *scheds = (ABT_sched *)malloc(sizeof(ABT_sched) * num_xstreams); + + /* Initialize Argobots. */ + ATS_init(argc, argv, num_xstreams); + + /* Create pools. */ + /* pools[0]: the built-in FIFO pool. */ + ret = ABT_pool_create_basic(ABT_POOL_FIFO, ABT_POOL_ACCESS_MPMC, ABT_FALSE, + &pools[0]); + ATS_ERROR(ret, "ABT_pool_create_basic"); + /* pools[1]: user-defined basic pool. */ + ABT_pool_def pool_def; + create_pool_def(&pool_def); + ret = ABT_pool_create(&pool_def, ABT_POOL_CONFIG_NULL, &pools[1]); + ATS_ERROR(ret, "ABT_pool_create"); + + /* Create schedulers. */ + for (i = 0; i < num_xstreams; i++) { + scheds[i] = create_sched(NUM_POOLS, pools); + } + + /* Create secondary execution streams. */ + for (i = 1; i < num_xstreams; i++) { + ret = ABT_xstream_create(scheds[i], &xstreams[i]); + ATS_ERROR(ret, "ABT_xstream_create"); + } + + check_self_unit_mapping(); + + /* Update the main scheduler of the primary execution stream. */ + ret = ABT_xstream_self(&xstreams[0]); + ATS_ERROR(ret, "ABT_xstream_self"); + ret = ABT_xstream_set_main_sched(xstreams[0], scheds[0]); + ATS_ERROR(ret, "ABT_xstream_set_main_sched"); + + check_self_unit_mapping(); + + ABT_thread *threads = + (ABT_thread *)malloc(sizeof(ABT_thread) * num_threads); + /* Create threads. */ + for (i = 0; i < num_threads; i++) { + ABT_pool target_pool = pools[i % NUM_POOLS]; + ABT_pool arg_pool = pools[(i / 2) % NUM_POOLS]; + ret = ABT_thread_create(target_pool, thread_func, (void *)arg_pool, + ABT_THREAD_ATTR_NULL, &threads[i]); + ATS_ERROR(ret, "ABT_thread_create"); + } + + /* Join and revive threads. */ + for (i = 0; i < num_threads; i++) { + ret = ABT_thread_join(threads[i]); + ATS_ERROR(ret, "ABT_thread_join"); + ABT_pool target_pool = pools[(i / 3) % NUM_POOLS]; + ABT_pool arg_pool = pools[(i / 4) % NUM_POOLS]; + ret = ABT_thread_revive(target_pool, thread_func, (void *)arg_pool, + &threads[i]); + ATS_ERROR(ret, "ABT_thread_revive"); + } + + /* Free threads. */ + for (i = 0; i < num_threads; i++) { + ret = ABT_thread_free(&threads[i]); + ATS_ERROR(ret, "ABT_thread_free"); + } + + free(threads); + + check_self_unit_mapping(); + + /* Join and free secondary execution streams. */ + for (i = 1; i < num_xstreams; i++) { + while (1) { + ABT_bool on_primary_xstream = ABT_FALSE; + ret = ABT_self_on_primary_xstream(&on_primary_xstream); + ATS_ERROR(ret, "ABT_self_on_primary_xstream"); + if (on_primary_xstream) + break; + ret = ABT_thread_yield(); + ATS_ERROR(ret, "ABT_thread_yield"); + } + /* Yield myself until this thread is running on the primary execution + * stream. */ + ret = ABT_xstream_free(&xstreams[i]); + ATS_ERROR(ret, "ABT_xstream_free"); + } + + check_self_unit_mapping(); + + /* Free schedulers of the secondary execution streams (since the scheduler + * created by ABT_sched_create() are not automatically freed). */ + for (i = 1; i < num_xstreams; i++) { + ret = ABT_sched_free(&scheds[i]); + ATS_ERROR(ret, "ABT_sched_free"); + } + + check_self_unit_mapping(); + + /* The scheduler of the primary execution stream will be freed by + * ABT_finalize(). Pools are associated with the scheduler of the primary + * execution stream, so they will be freed by ABT_finallize(), too. */ + + /* Finalize Argobots. */ + ret = ATS_finalize(0); + + /* Free allocated memory. */ + free(xstreams); + free(pools); + free(scheds); + + return ret; +} + +/******************************************************************************/ + +typedef struct unit_t { + ABT_thread thread; + struct unit_t *p_prev, *p_next; +} unit_t; + +typedef struct pool_t { + unit_t list; + int size; + pthread_mutex_t lock; +} pool_t; + +ABT_unit pool_unit_create_from_thread(ABT_thread thread) +{ + unit_t *p_unit = (unit_t *)malloc(sizeof(unit_t)); + p_unit->thread = thread; + return (ABT_unit)p_unit; +} + +void pool_unit_free(ABT_unit *p_unit) +{ + free(*p_unit); +} + +int pool_init(ABT_pool pool, ABT_pool_config config) +{ + pool_t *p_pool = (pool_t *)malloc(sizeof(pool_t)); + p_pool->list.p_prev = &p_pool->list; + p_pool->list.p_next = &p_pool->list; + p_pool->size = 0; + pthread_mutex_init(&p_pool->lock, NULL); + int ret = ABT_pool_set_data(pool, (void *)p_pool); + ATS_ERROR(ret, "ABT_pool_set_data"); + return ABT_SUCCESS; +} + +size_t pool_get_size(ABT_pool pool) +{ + pool_t *p_pool; + int ret = ABT_pool_get_data(pool, (void **)&p_pool); + ATS_ERROR(ret, "ABT_pool_get_data"); + return p_pool->size; +} + +void pool_push(ABT_pool pool, ABT_unit unit) +{ + pool_t *p_pool; + int ret = ABT_pool_get_data(pool, (void **)&p_pool); + ATS_ERROR(ret, "ABT_pool_get_data"); + + unit_t *p_unit = (unit_t *)unit; + pthread_mutex_lock(&p_pool->lock); + p_unit->p_next = &p_pool->list; + p_unit->p_prev = p_pool->list.p_prev; + p_pool->list.p_prev->p_next = p_unit; + p_pool->list.p_prev = p_unit; + p_pool->size++; + pthread_mutex_unlock(&p_pool->lock); +} + +ABT_unit pool_pop(ABT_pool pool) +{ + pool_t *p_pool; + int ret = ABT_pool_get_data(pool, (void **)&p_pool); + ATS_ERROR(ret, "ABT_pool_get_data"); + + pthread_mutex_lock(&p_pool->lock); + if (p_pool->size == 0) { + pthread_mutex_unlock(&p_pool->lock); + /* Empty. */ + return ABT_UNIT_NULL; + } else { + p_pool->size--; + unit_t *p_ret = p_pool->list.p_next; + p_pool->list.p_next = p_ret->p_next; + p_pool->list.p_next->p_prev = &p_pool->list; + pthread_mutex_unlock(&p_pool->lock); + return (ABT_unit)p_ret; + } +} + +int pool_free(ABT_pool pool) +{ + pool_t *p_pool; + int ret = ABT_pool_get_data(pool, (void **)&p_pool); + ATS_ERROR(ret, "ABT_pool_get_data"); + + pthread_mutex_destroy(&p_pool->lock); + free(p_pool); + return ABT_SUCCESS; +} + +void create_pool_def(ABT_pool_def *p_def) +{ + p_def->access = ABT_POOL_ACCESS_MPMC; + p_def->u_create_from_thread = pool_unit_create_from_thread; + p_def->u_free = pool_unit_free; + p_def->p_init = pool_init; + p_def->p_get_size = pool_get_size; + p_def->p_push = pool_push; + p_def->p_pop = pool_pop; + p_def->p_free = pool_free; + + /* Optional. */ + p_def->u_is_in_pool = NULL; +#ifdef ABT_ENABLE_VER_20_API + p_def->p_pop_wait = NULL; +#endif + p_def->p_pop_timedwait = NULL; + p_def->p_remove = NULL; + p_def->p_print_all = NULL; +} diff --git a/test/leakcheck/unit.c b/test/leakcheck/unit.c index d85629fe..66af4685 100644 --- a/test/leakcheck/unit.c +++ b/test/leakcheck/unit.c @@ -126,6 +126,11 @@ void thread_func(void *arg) ret = ABT_thread_set_associated_pool(thread, (ABT_pool)arg); /* ABT_thread_set_associated_pool() might fail. */ (void)ret; + + ret = ABT_self_set_associated_pool((ABT_pool)arg); + /* ABT_self_set_associated_pool() might fail, too. */ + (void)ret; + ret = ABT_thread_yield(); assert(ret == ABT_SUCCESS); }