Skip to content

Commit

Permalink
Merge pull request #320 from shintaro-iwasaki/pr/fix_set_main_sched2
Browse files Browse the repository at this point in the history
xstream: fix xstream_set_main_sched (cont.)
  • Loading branch information
shintaro-iwasaki committed Mar 25, 2021
2 parents 75bbd3b + cc32038 commit bb94949
Show file tree
Hide file tree
Showing 7 changed files with 333 additions and 38 deletions.
22 changes: 13 additions & 9 deletions src/include/abti.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

#define ABTI_SCHED_REQ_FINISH (1 << 0)
#define ABTI_SCHED_REQ_EXIT (1 << 1)
#define ABTI_SCHED_REQ_REPLACE (1 << 2)

#define ABTI_THREAD_REQ_JOIN (1 << 0)
#define ABTI_THREAD_REQ_TERMINATE (1 << 1)
Expand Down Expand Up @@ -286,15 +287,18 @@ struct ABTI_xstream {
};

struct ABTI_sched {
ABTI_sched_used used; /* To know if it is used and how */
ABT_bool automatic; /* To know if automatic data free */
ABTI_sched_kind kind; /* Kind of the scheduler */
ABT_sched_type type; /* Can yield or not (ULT or task) */
ABTD_atomic_uint32 request; /* Request */
ABT_pool *pools; /* Thread pools */
size_t num_pools; /* Number of thread pools */
ABTI_ythread *p_ythread; /* Associated ULT */
void *data; /* Data for a specific scheduler */
ABTI_sched_used used; /* To know if it is used and how */
ABT_bool automatic; /* To know if automatic data free */
ABTI_sched_kind kind; /* Kind of the scheduler */
ABT_sched_type type; /* Can yield or not (ULT or task) */
ABTI_sched *p_replace_sched; /* Main scheduler that should replace this.
* ABTI_SCHED_REQ_REPLACE should be set. */
ABTI_ythread *p_replace_waiter; /* Thread waiting for replacement. */
ABTD_atomic_uint32 request; /* Request */
ABT_pool *pools; /* Thread pools */
size_t num_pools; /* Number of thread pools */
ABTI_ythread *p_ythread; /* Associated ULT */
void *data; /* Data for a specific scheduler */

/* Scheduler functions */
ABT_sched_init_fn init;
Expand Down
4 changes: 3 additions & 1 deletion src/sched/sched.c
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ ABT_bool ABTI_sched_has_to_stop(ABTI_local **pp_local, ABTI_sched *p_sched)

if (ABTI_sched_get_effective_size(*pp_local, p_sched) == 0) {
if (ABTD_atomic_acquire_load_uint32(&p_sched->request) &
ABTI_SCHED_REQ_FINISH) {
(ABTI_SCHED_REQ_FINISH | ABTI_SCHED_REQ_REPLACE)) {
/* Check join request */
if (ABTI_sched_get_effective_size(*pp_local, p_sched) == 0)
return ABT_TRUE;
Expand Down Expand Up @@ -1104,6 +1104,8 @@ ABTU_ret_err static int sched_create(ABT_sched_def *def, int num_pools,
p_sched->used = ABTI_SCHED_NOT_USED;
p_sched->automatic = automatic;
p_sched->kind = sched_get_kind(def);
p_sched->p_replace_sched = NULL;
p_sched->p_replace_waiter = NULL;
ABTD_atomic_relaxed_store_uint32(&p_sched->request, 0);
p_sched->pools = pool_list;
p_sched->num_pools = num_pools;
Expand Down
49 changes: 25 additions & 24 deletions src/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -2101,33 +2101,34 @@ static int xstream_update_main_sched(ABTI_global *p_global,
}
}

/* Set the scheduler as a main scheduler */
p_sched->used = ABTI_SCHED_MAIN;

/* Finish the current main scheduler */
ABTI_sched_set_request(p_main_sched, ABTI_SCHED_REQ_FINISH);

/* If the ES is secondary, we should take the associated ULT of the
* current main scheduler and keep it in the new scheduler. */
p_sched->p_ythread = p_main_sched->p_ythread;

/* Set the scheduler */
p_xstream->p_main_sched = p_sched;
if (p_main_sched->p_replace_sched) {
/* We need to overwrite the scheduler. Free the existing one. */
ABTI_ythread *p_waiter = p_main_sched->p_replace_waiter;
ABTI_sched_discard_and_free(p_global,
ABTI_xstream_get_local(
*pp_local_xstream),
p_main_sched->p_replace_sched,
ABT_FALSE);
p_main_sched->p_replace_sched = NULL;
p_main_sched->p_replace_waiter = NULL;
/* Resume the waiter. This waiter sees that the scheduler finished
* immediately and was replaced by this new scheduler. */
ABTI_ythread_set_ready(ABTI_xstream_get_local(*pp_local_xstream),
p_waiter);
}
ABTI_ythread_set_blocked(p_ythread);
/* Set the replace scheduler */
p_main_sched->p_replace_sched = p_sched;
p_main_sched->p_replace_waiter = p_ythread;
/* Ask the current main scheduler to replace its scheduler */
ABTI_sched_set_request(p_main_sched, ABTI_SCHED_REQ_REPLACE);

/* Switch to the current main scheduler. The current ULT is pushed to
* the new scheduler's pool so that when the new scheduler starts, this
* ULT can be scheduled by the new scheduler. When the current ULT
* resumes its execution, it will free the current main scheduler
* (see below). */
ABTI_ythread_context_switch_to_parent(pp_local_xstream, p_ythread,
ABT_SYNC_EVENT_TYPE_OTHER, NULL);

/* Now, we free the current main scheduler. p_main_sched->p_ythread must
* be NULL to avoid freeing it in ABTI_sched_discard_and_free(). */
p_main_sched->p_ythread = NULL;
ABTI_sched_discard_and_free(p_global,
ABTI_xstream_get_local(*pp_local_xstream),
p_main_sched, ABT_FALSE);
* ULT can be scheduled by the new scheduler. The existing main
* scheduler will be freed by ABTI_SCHED_REQ_RELEASE. */
ABTI_ythread_suspend(pp_local_xstream, p_ythread,
ABT_SYNC_EVENT_TYPE_OTHER, NULL);
return ABT_SUCCESS;
}
}
Expand Down
30 changes: 26 additions & 4 deletions src/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -3009,12 +3009,34 @@ static void thread_main_sched_func(void *arg)

LOG_DEBUG("[S%" PRIu64 "] start\n", p_sched->id);
p_sched->run(ABTI_sched_get_handle(p_sched));
/* From here the main scheduler can have been already replaced. */
/* The main scheduler must be executed on the same execution stream. */
ABTI_ASSERT(p_local == ABTI_local_get_local_uninlined());
LOG_DEBUG("[S%" PRIu64 "] end\n", p_sched->id);
/* The main scheduler's thread must be executed on the same execution
* stream. */
ABTI_ASSERT(p_local == ABTI_local_get_local_uninlined());

p_sched = p_local_xstream->p_main_sched;
/* We free the current main scheduler and replace it if requested. */
if (ABTD_atomic_relaxed_load_uint32(&p_sched->request) &
ABTI_SCHED_REQ_REPLACE) {
ABTI_ythread *p_waiter = p_sched->p_replace_waiter;
ABTI_sched *p_new_sched = p_sched->p_replace_sched;
/* Set this scheduler as a main scheduler */
p_new_sched->used = ABTI_SCHED_MAIN;
/* Take the ULT of the current main scheduler and use it for the new
* scheduler. */
p_new_sched->p_ythread = p_sched->p_ythread;
p_local_xstream->p_main_sched = p_new_sched;
/* Now, we free the current main scheduler. p_sched->p_ythread must
* be NULL to avoid freeing it in ABTI_sched_discard_and_free(). */
p_sched->p_ythread = NULL;
ABTI_sched_discard_and_free(ABTI_global_get_global(), p_local,
p_sched, ABT_FALSE);
/* We do not need to unset ABTI_SCHED_REQ_REPLACE since that p_sched
* has already been replaced. */
p_sched = p_new_sched;
/* Resume the waiter. */
ABTI_ythread_set_ready(p_local, p_waiter);
}
ABTI_ASSERT(p_sched == p_local_xstream->p_main_sched);
uint32_t request = ABTD_atomic_acquire_load_uint32(
&p_sched->p_ythread->thread.request);

Expand Down
1 change: 1 addition & 0 deletions test/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ basic/xstream_revive
basic/xstream_affinity
basic/xstream_barrier
basic/xstream_rank
basic/xstream_set_main_sched
basic/thread_create
basic/thread_create2
basic/thread_create_on_xstream
Expand Down
3 changes: 3 additions & 0 deletions test/basic/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ TESTS = \
xstream_affinity \
xstream_barrier \
xstream_rank \
xstream_set_main_sched \
thread_create \
thread_create2 \
thread_create_on_xstream \
Expand Down Expand Up @@ -105,6 +106,7 @@ xstream_revive_SOURCES = xstream_revive.c
xstream_affinity_SOURCES = xstream_affinity.c
xstream_barrier_SOURCES = xstream_barrier.c
xstream_rank_SOURCES = xstream_rank.c
xstream_set_main_sched_SOURCES = xstream_set_main_sched.c
thread_create_SOURCES = thread_create.c
thread_create2_SOURCES = thread_create2.c
thread_create_on_xstream_SOURCES = thread_create_on_xstream.c
Expand Down Expand Up @@ -186,6 +188,7 @@ testing:
./xstream_affinity
./xstream_barrier
./xstream_rank
./xstream_set_main_sched
./thread_create
./thread_create2
./thread_create_on_xstream
Expand Down
Loading

0 comments on commit bb94949

Please sign in to comment.