Skip to content

Commit

Permalink
Merge pull request #312 from shintaro-iwasaki/pr/poolquickcheck
Browse files Browse the repository at this point in the history
pool: check the pool's emptiness before taking a lock
  • Loading branch information
shintaro-iwasaki authored Mar 15, 2021
2 parents 6fbeec1 + 3028387 commit 0a24d76
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 112 deletions.
214 changes: 127 additions & 87 deletions src/pool/fifo.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ struct data {
size_t num_threads;
ABTI_thread *p_head;
ABTI_thread *p_tail;
/* If the pool is empty, pop() accesses only is_empty so that pop() does not
* slow down a push operation. */
ABTD_atomic_int is_empty; /* Whether the pool is empty or not. */
};
typedef struct data data_t;

Expand All @@ -40,6 +43,28 @@ static inline data_t *pool_get_data_ptr(void *p_data)
return (data_t *)p_data;
}

ABTU_ret_err static inline int spinlock_acquire_if_not_empty(data_t *p_data)
{
if (ABTD_atomic_acquire_load_int(&p_data->is_empty)) {
/* The pool is empty. Lock is not taken. */
return 1;
}
while (ABTD_spinlock_try_acquire(&p_data->mutex)) {
/* Lock acquisition failed. Check the size. */
while (1) {
if (ABTD_atomic_acquire_load_int(&p_data->is_empty)) {
/* The pool becomes empty. Lock is not taken. */
return 1;
} else if (!ABTD_spinlock_is_locked(&p_data->mutex)) {
/* Lock seems released. Let's try to take a lock again. */
break;
}
}
}
/* Lock is acquired. */
return 0;
}

/* Obtain the FIFO pool definition according to the access type */
ABTU_ret_err int ABTI_pool_get_fifo_def(ABT_pool_access access,
ABTI_pool_def *p_def)
Expand Down Expand Up @@ -105,6 +130,7 @@ static int pool_init(ABT_pool pool, ABT_pool_config config)
p_data->num_threads = 0;
p_data->p_head = NULL;
p_data->p_tail = NULL;
ABTD_atomic_relaxed_store_int(&p_data->is_empty, 1);

p_pool->data = p_data;

Expand Down Expand Up @@ -141,6 +167,8 @@ static void pool_push_shared(ABT_pool pool, ABT_unit unit)
p_thread->p_next = p_thread;
p_data->p_head = p_thread;
p_data->p_tail = p_thread;
p_data->num_threads = 1;
ABTD_atomic_release_store_int(&p_data->is_empty, 0);
} else {
ABTI_thread *p_head = p_data->p_head;
ABTI_thread *p_tail = p_data->p_tail;
Expand All @@ -149,8 +177,8 @@ static void pool_push_shared(ABT_pool pool, ABT_unit unit)
p_thread->p_prev = p_tail;
p_thread->p_next = p_head;
p_data->p_tail = p_thread;
p_data->num_threads++;
}
p_data->num_threads++;

ABTD_atomic_release_store_int(&p_thread->is_in_pool, 1);
ABTD_spinlock_release(&p_data->mutex);
Expand All @@ -167,6 +195,8 @@ static void pool_push_private(ABT_pool pool, ABT_unit unit)
p_thread->p_next = p_thread;
p_data->p_head = p_thread;
p_data->p_tail = p_thread;
p_data->num_threads = 1;
ABTD_atomic_release_store_int(&p_data->is_empty, 0);
} else {
ABTI_thread *p_head = p_data->p_head;
ABTI_thread *p_tail = p_data->p_tail;
Expand All @@ -175,8 +205,8 @@ static void pool_push_private(ABT_pool pool, ABT_unit unit)
p_thread->p_prev = p_tail;
p_thread->p_next = p_head;
p_data->p_tail = p_thread;
p_data->num_threads++;
}
p_data->num_threads++;

ABTD_atomic_release_store_int(&p_thread->is_in_pool, 1);
}
Expand All @@ -186,148 +216,154 @@ static ABT_unit pool_pop_wait(ABT_pool pool, double time_secs)
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
data_t *p_data = pool_get_data_ptr(p_pool->data);
ABTI_thread *p_thread = NULL;
ABT_unit h_unit = ABT_UNIT_NULL;

double time_start = 0.0;

do {
ABTD_spinlock_acquire(&p_data->mutex);
if (p_data->num_threads > 0) {
p_thread = p_data->p_head;
if (p_data->num_threads == 1) {
p_data->p_head = NULL;
p_data->p_tail = NULL;
} else {
p_thread->p_prev->p_next = p_thread->p_next;
p_thread->p_next->p_prev = p_thread->p_prev;
p_data->p_head = p_thread->p_next;
while (1) {
if (spinlock_acquire_if_not_empty(p_data) == 0) {
ABT_unit h_unit = ABT_UNIT_NULL;
if (p_data->num_threads > 0) {
p_thread = p_data->p_head;
if (p_data->num_threads == 1) {
p_data->p_head = NULL;
p_data->p_tail = NULL;
p_data->num_threads = 0;
ABTD_atomic_release_store_int(&p_data->is_empty, 1);
} else {
p_thread->p_prev->p_next = p_thread->p_next;
p_thread->p_next->p_prev = p_thread->p_prev;
p_data->p_head = p_thread->p_next;
p_data->num_threads--;
}

p_thread->p_prev = NULL;
p_thread->p_next = NULL;
ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);

h_unit = (ABT_unit)p_thread;
}
p_data->num_threads--;

p_thread->p_prev = NULL;
p_thread->p_next = NULL;
ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);

h_unit = (ABT_unit)p_thread;
ABTD_spinlock_release(&p_data->mutex);
if (h_unit != ABT_UNIT_NULL)
return h_unit;
}
if (time_start == 0.0) {
time_start = ABTI_get_wtime();
} else {
ABTD_spinlock_release(&p_data->mutex);
if (time_start == 0.0) {
time_start = ABTI_get_wtime();
} else {
double elapsed = ABTI_get_wtime() - time_start;
if (elapsed > time_secs)
break;
double elapsed = ABTI_get_wtime() - time_start;
if (elapsed > time_secs)
return ABT_UNIT_NULL;
}
/* Sleep. */
const int sleep_nsecs = 100;
struct timespec ts = { 0, sleep_nsecs };
nanosleep(&ts, NULL);
}
}

static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
{
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
data_t *p_data = pool_get_data_ptr(p_pool->data);
ABTI_thread *p_thread = NULL;

while (1) {
if (spinlock_acquire_if_not_empty(p_data) == 0) {
ABT_unit h_unit = ABT_UNIT_NULL;
if (p_data->num_threads > 0) {
p_thread = p_data->p_head;
if (p_data->num_threads == 1) {
p_data->p_head = NULL;
p_data->p_tail = NULL;
p_data->num_threads = 0;
ABTD_atomic_release_store_int(&p_data->is_empty, 1);
} else {
p_thread->p_prev->p_next = p_thread->p_next;
p_thread->p_next->p_prev = p_thread->p_prev;
p_data->p_head = p_thread->p_next;
p_data->num_threads--;
}

p_thread->p_prev = NULL;
p_thread->p_next = NULL;
ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);

h_unit = (ABT_unit)p_thread;
}
/* Sleep. */
const int sleep_nsecs = 100;
struct timespec ts = { 0, sleep_nsecs };
nanosleep(&ts, NULL);
ABTD_spinlock_release(&p_data->mutex);
if (h_unit != ABT_UNIT_NULL)
return h_unit;
}
} while (h_unit == ABT_UNIT_NULL);
const int sleep_nsecs = 100;
struct timespec ts = { 0, sleep_nsecs };
nanosleep(&ts, NULL);

return h_unit;
if (ABTI_get_wtime() > abstime_secs)
return ABT_UNIT_NULL;
}
}

static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
static ABT_unit pool_pop_shared(ABT_pool pool)
{
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
data_t *p_data = pool_get_data_ptr(p_pool->data);
ABTI_thread *p_thread = NULL;
ABT_unit h_unit = ABT_UNIT_NULL;

do {
ABTD_spinlock_acquire(&p_data->mutex);
if (spinlock_acquire_if_not_empty(p_data) == 0) {
ABT_unit h_unit = ABT_UNIT_NULL;
if (p_data->num_threads > 0) {
p_thread = p_data->p_head;
if (p_data->num_threads == 1) {
p_data->p_head = NULL;
p_data->p_tail = NULL;
p_data->num_threads = 0;
ABTD_atomic_release_store_int(&p_data->is_empty, 1);
} else {
p_thread->p_prev->p_next = p_thread->p_next;
p_thread->p_next->p_prev = p_thread->p_prev;
p_data->p_head = p_thread->p_next;
p_data->num_threads--;
}
p_data->num_threads--;

p_thread->p_prev = NULL;
p_thread->p_next = NULL;
ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);

h_unit = (ABT_unit)p_thread;
ABTD_spinlock_release(&p_data->mutex);
} else {
ABTD_spinlock_release(&p_data->mutex);
/* Sleep. */
const int sleep_nsecs = 100;
struct timespec ts = { 0, sleep_nsecs };
nanosleep(&ts, NULL);

if (ABTI_get_wtime() > abstime_secs)
break;
}
} while (h_unit == ABT_UNIT_NULL);

return h_unit;
}

static ABT_unit pool_pop_shared(ABT_pool pool)
{
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
data_t *p_data = pool_get_data_ptr(p_pool->data);
ABTI_thread *p_thread = NULL;
ABT_unit h_unit = ABT_UNIT_NULL;

ABTD_spinlock_acquire(&p_data->mutex);
if (p_data->num_threads > 0) {
p_thread = p_data->p_head;
if (p_data->num_threads == 1) {
p_data->p_head = NULL;
p_data->p_tail = NULL;
} else {
p_thread->p_prev->p_next = p_thread->p_next;
p_thread->p_next->p_prev = p_thread->p_prev;
p_data->p_head = p_thread->p_next;
}
p_data->num_threads--;

p_thread->p_prev = NULL;
p_thread->p_next = NULL;
ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);

h_unit = (ABT_unit)p_thread;
ABTD_spinlock_release(&p_data->mutex);
return h_unit;
} else {
return ABT_UNIT_NULL;
}
ABTD_spinlock_release(&p_data->mutex);

return h_unit;
}

static ABT_unit pool_pop_private(ABT_pool pool)
{
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
data_t *p_data = pool_get_data_ptr(p_pool->data);
ABTI_thread *p_thread = NULL;
ABT_unit h_unit = ABT_UNIT_NULL;

ABT_unit h_unit = ABT_UNIT_NULL;
if (p_data->num_threads > 0) {
p_thread = p_data->p_head;
if (p_data->num_threads == 1) {
p_data->p_head = NULL;
p_data->p_tail = NULL;
p_data->num_threads = 0;
ABTD_atomic_relaxed_store_int(&p_data->is_empty, 1);
} else {
p_thread->p_prev->p_next = p_thread->p_next;
p_thread->p_next->p_prev = p_thread->p_prev;
p_data->p_head = p_thread->p_next;
p_data->num_threads--;
}
p_data->num_threads--;

p_thread->p_prev = NULL;
p_thread->p_next = NULL;
ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);

h_unit = (ABT_unit)p_thread;
}

return h_unit;
}

Expand All @@ -345,6 +381,8 @@ static int pool_remove_shared(ABT_pool pool, ABT_unit unit)
if (p_data->num_threads == 1) {
p_data->p_head = NULL;
p_data->p_tail = NULL;
p_data->num_threads = 0;
ABTD_atomic_release_store_int(&p_data->is_empty, 1);
} else {
p_thread->p_prev->p_next = p_thread->p_next;
p_thread->p_next->p_prev = p_thread->p_prev;
Expand All @@ -353,8 +391,8 @@ static int pool_remove_shared(ABT_pool pool, ABT_unit unit)
} else if (p_thread == p_data->p_tail) {
p_data->p_tail = p_thread->p_prev;
}
p_data->num_threads--;
}
p_data->num_threads--;

ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);
ABTD_spinlock_release(&p_data->mutex);
Expand All @@ -378,6 +416,8 @@ static int pool_remove_private(ABT_pool pool, ABT_unit unit)
if (p_data->num_threads == 1) {
p_data->p_head = NULL;
p_data->p_tail = NULL;
p_data->num_threads = 0;
ABTD_atomic_relaxed_store_int(&p_data->is_empty, 1);
} else {
p_thread->p_prev->p_next = p_thread->p_next;
p_thread->p_next->p_prev = p_thread->p_prev;
Expand All @@ -386,8 +426,8 @@ static int pool_remove_private(ABT_pool pool, ABT_unit unit)
} else if (p_thread == p_data->p_tail) {
p_data->p_tail = p_thread->p_prev;
}
p_data->num_threads--;
}
p_data->num_threads--;

ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);
p_thread->p_prev = NULL;
Expand Down
Loading

0 comments on commit 0a24d76

Please sign in to comment.