Skip to content

Commit

Permalink
Fix a race condition in DTD for the local termdet
Browse files Browse the repository at this point in the history
DTD uses termination_detected() to destroy the taskpool,
and on_leave_wait() to reset the taskpool and reinitialize
the termination detector.

The 'local' termination detector module had a race condition:
it would mark the taskpool as 'terminated', then call the
termination_detected() callback.

As any thread can detect the termination, a worker thread could
enter taskpool destruction in parallel with the main thread
calling on_leave_wait() because taskpool_state() has returned
TERMINATED already.

Thus, the taskpool could be destroyed *after* its termination
detector is reset, leading to asserts.

Proposed solution consists of adding an intermediary state in
the termination detector, 'TERMINATING'. TERMINATING is shown
as 'BUSY' to the scheduler, so the main thread does not return
from the main loop and does not call on_leave_wait() while the
thread that detected termination is still executing
termination_detected(). TERMINATING is still a necessary
intermediary step in order to ensure that only one thread calls
termination_detected().

Solves issue ICLDisco#634
  • Loading branch information
therault committed Nov 13, 2024
1 parent cb32a62 commit 8dea7af
Showing 1 changed file with 37 additions and 14 deletions.
51 changes: 37 additions & 14 deletions parsec/mca/termdet/local/termdet_local_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,28 @@ const parsec_termdet_module_t parsec_termdet_local_module = {
}
};

/* The local detector does not need to allocate memory:
* we use the constants below to keep track of the state.
* There is no need for a constant for idle, as the termdet
* transitions directly from busy to terminated.
/* The local detector does not need to allocate memory: we use the constants
* below to keep track of the state. There is no need for a constant for idle,
* as the termdet transitions directly from busy to terminating, and then to
* terminated.
*
* The local termdet is in 'terminating' state during the call to the
* termination_detected() function. It shows the state as 'BUSY' to the user
* at this time. This is used to guarantee that a single thread calls
* termination_detected(), yet that the main thread is not allowed to exit the
* progress loop and call on_leave_wait() in parallel with
* termination_detected(). Calling on_leave_wait() and termination_detected()
* in parallel leads to race conditions for some DSLs (e.g. DTD), as they
* want to delete the monitor in termination_detected(), and create a new one
* in on_leave_wait(). As on_leave_wait() is only called by the master thread
* once taskpool_state() has returned TERMINATED, returning BUSY until *after*
* termination_detected() has returned (by any thread) ensures the proper
* order of execution.
*/
#define PARSEC_TERMDET_LOCAL_TERMINATED NULL
#define PARSEC_TERMDET_LOCAL_NOT_READY ((void*)(0x1))
#define PARSEC_TERMDET_LOCAL_BUSY ((void*)(0x2))
#define PARSEC_TERMDET_LOCAL_TERMINATING ((void*)(0x3))

static void parsec_termdet_local_monitor_taskpool(parsec_taskpool_t *tp,
parsec_termdet_termination_detected_function_t cb)
Expand All @@ -94,14 +108,18 @@ static void parsec_termdet_local_unmonitor_taskpool(parsec_taskpool_t *tp)

static parsec_termdet_taskpool_state_t parsec_termdet_local_taskpool_state(parsec_taskpool_t *tp)
{
/* termination_detected() might run in parallel with taskpool_state(),
* so use a local copy of the monitor. It's fine to return an old
* value for the state. */
void *monitor = tp->tdm.monitor;
if( tp->tdm.module == NULL )
return PARSEC_TERM_TP_NOT_MONITORED;
assert(tp->tdm.module == &parsec_termdet_local_module.module);
if( tp->tdm.monitor == PARSEC_TERMDET_LOCAL_TERMINATED )
if( PARSEC_TERMDET_LOCAL_TERMINATED == monitor )
return PARSEC_TERM_TP_TERMINATED;
if( tp->tdm.monitor == PARSEC_TERMDET_LOCAL_BUSY )
if( (PARSEC_TERMDET_LOCAL_BUSY == monitor) || (PARSEC_TERMDET_LOCAL_TERMINATING == monitor) )
return PARSEC_TERM_TP_BUSY;
if( tp->tdm.monitor == PARSEC_TERMDET_LOCAL_NOT_READY )
if( PARSEC_TERMDET_LOCAL_NOT_READY == monitor )
return PARSEC_TERM_TP_NOT_READY;
assert(0);
return -1;
Expand All @@ -111,7 +129,7 @@ static void parsec_termdet_local_termination_detected(parsec_taskpool_t *tp)
{
assert( tp->tdm.module != NULL);
assert( tp->tdm.module == &parsec_termdet_local_module.module );
assert( tp->tdm.monitor == PARSEC_TERMDET_LOCAL_TERMINATED );
assert( tp->tdm.monitor == PARSEC_TERMDET_LOCAL_TERMINATING );

PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "TERMDET-LOCAL\tTASKPOOL %p: termination detected", tp);

Expand All @@ -132,8 +150,9 @@ static int parsec_termdet_local_taskpool_ready(parsec_taskpool_t *tp)
if( tp->nb_pending_actions == 0) {
/* It's possible another thread sees nb_pending_actions == 0 and BUSY before me, so call the callback
* only if I'm the one setting to terminated */
if( parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_BUSY, PARSEC_TERMDET_LOCAL_TERMINATED) ) {
if( parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_BUSY, PARSEC_TERMDET_LOCAL_TERMINATING) ) {
parsec_termdet_local_termination_detected(tp);
parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_TERMINATING, PARSEC_TERMDET_LOCAL_TERMINATED);
}
}
return PARSEC_SUCCESS;
Expand All @@ -157,8 +176,9 @@ static int32_t parsec_termdet_local_taskpool_set_nb_tasks(parsec_taskpool_t *tp,
}
if( tp->tdm.monitor == PARSEC_TERMDET_LOCAL_BUSY && nbpa == 0 ) {
PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "TERMDET-LOCAL:\tTASKPOOL %p nbpa == 0", tp);
if( parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_BUSY, PARSEC_TERMDET_LOCAL_TERMINATED) ) {
if( parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_BUSY, PARSEC_TERMDET_LOCAL_TERMINATING) ) {
parsec_termdet_local_termination_detected(tp);
parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_TERMINATING, PARSEC_TERMDET_LOCAL_TERMINATED);
}
}
}
Expand All @@ -174,8 +194,9 @@ static int32_t parsec_termdet_local_taskpool_set_runtime_actions(parsec_taskpool
ov = tp->nb_pending_actions;
} while(!parsec_atomic_cas_int32(&tp->nb_pending_actions, ov, v));
if( tp->tdm.monitor == PARSEC_TERMDET_LOCAL_BUSY && v == 0 ) {
if( parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_BUSY, PARSEC_TERMDET_LOCAL_TERMINATED) ) {
if( parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_BUSY, PARSEC_TERMDET_LOCAL_TERMINATING) ) {
parsec_termdet_local_termination_detected(tp);
parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_TERMINATING, PARSEC_TERMDET_LOCAL_TERMINATED);
}
}
return v;
Expand All @@ -199,9 +220,10 @@ static int32_t parsec_termdet_local_taskpool_addto_nb_tasks(parsec_taskpool_t *t
PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "TERMDET-LOCAL:\tTASKPOOL %p NB_PA %d -> %d", tp, nbpa+1, nbpa);
}
if( tp->tdm.monitor == PARSEC_TERMDET_LOCAL_BUSY && nbpa == 0 ) {
if( parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_BUSY, PARSEC_TERMDET_LOCAL_TERMINATED) ) {
if( parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_BUSY, PARSEC_TERMDET_LOCAL_TERMINATING) ) {
parsec_termdet_local_termination_detected(tp);
}
parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_TERMINATING, PARSEC_TERMDET_LOCAL_TERMINATED);
}
}
return ov+v;
}
Expand All @@ -216,8 +238,9 @@ static int32_t parsec_termdet_local_taskpool_addto_runtime_actions(parsec_taskpo
ov = parsec_atomic_fetch_add_int32(&tp->nb_pending_actions, v);
assert(ov+v >= 0);
if( tp->tdm.monitor == PARSEC_TERMDET_LOCAL_BUSY && ov+v == 0 ) {
if( parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_BUSY, PARSEC_TERMDET_LOCAL_TERMINATED) ) {
if( parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_BUSY, PARSEC_TERMDET_LOCAL_TERMINATING) ) {
parsec_termdet_local_termination_detected(tp);
parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_TERMINATING, PARSEC_TERMDET_LOCAL_TERMINATED);
}
}
return ov+v;
Expand Down

0 comments on commit 8dea7af

Please sign in to comment.