Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sched: add a registration-based scheduler to argobots #373

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion examples/scheduling/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ TESTS = \
sched_predef \
sched_shared_pool \
sched_stack \
sched_user
sched_user \
sched_reg

check_PROGRAMS = $(TESTS)
noinst_PROGRAMS = $(TESTS)
Expand All @@ -20,3 +21,4 @@ sched_predef_SOURCES = sched_predef.c
sched_shared_pool_SOURCES = sched_shared_pool.c
sched_stack_SOURCES = sched_stack.c
sched_user_SOURCES = sched_user.c
sched_reg_SOURCES = sched_reg.c
146 changes: 146 additions & 0 deletions examples/scheduling/sched_reg.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
* See COPYRIGHT in top-level directory.
*/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "abt.h"

#define NUM_XSTREAMS 4
#define NUM_THREADS 4


static void create_threads(void *arg);
static void thread_hello(void *arg);

int main(int argc, char *argv[])
{
ABT_xstream xstreams[NUM_XSTREAMS];
ABT_sched scheds[NUM_XSTREAMS];
ABT_pool pools[NUM_XSTREAMS];
ABT_thread threads[NUM_XSTREAMS];
int i;

ABT_init(argc, argv);

/* Create pools */
for (i = 0; i < NUM_XSTREAMS; i++) {
ABT_pool_create_basic(ABT_POOL_FIFO, ABT_POOL_ACCESS_MPMC, ABT_TRUE,
&pools[i]);
}

/* Create schedulers */
//create_scheds(NUM_XSTREAMS, pools, scheds);
ABT_sched_policy policies[NUM_XSTREAMS];
for (int i = 0; i < NUM_XSTREAMS; i++) {
policies[i].ready_at = 0;
policies[i].ready_at_wt = ABT_get_wtime();
policies[i].priority = i;
policies[i].max_successes = 10;
policies[i].min_successes = 2 * i + 2;
policies[i].success_timeout = 5;
policies[i].fail_timeout = 100;
policies[i].success_timeout_wt = 0.0;
policies[i].fail_timeout_wt = 5.0;
}

policies[0].max_successes = 100;
policies[0].fail_timeout_wt=10.0;
for (int i = 0; i < NUM_XSTREAMS; i++) {
for (int j = 0; j < NUM_XSTREAMS;j++) {
policies[(i+j) % NUM_XSTREAMS].priority = j;
}
ABT_sched_create_reg(NUM_XSTREAMS, pools, NUM_XSTREAMS, policies, &scheds[i]);
}

/* Create ESs */
ABT_xstream_self(&xstreams[0]);
ABT_xstream_set_main_sched(xstreams[0], scheds[0]);
for (i = 1; i < NUM_XSTREAMS; i++) {
ABT_xstream_create(scheds[i], &xstreams[i]);
}

/* Create ULTs */
for (i = 0; i < NUM_XSTREAMS; i++) {
size_t tid = (size_t)i;
ABT_thread_create(pools[i], create_threads, (void *)tid,
ABT_THREAD_ATTR_NULL, &threads[i]);
}
/* Join & Free */
for (i = 0; i < NUM_XSTREAMS; i++) {
ABT_thread_join(threads[i]);
ABT_thread_free(&threads[i]);
}
for (i = 1; i < NUM_XSTREAMS; i++) {
ABT_xstream_join(xstreams[i]);
ABT_xstream_free(&xstreams[i]);
}
/* Free schedulers */
/* Note that we do not need to free the scheduler for the primary ES,
* i.e., xstreams[0], because its scheduler will be automatically freed in
* ABT_finalize(). */
for (i = 1; i < NUM_XSTREAMS; i++) {
ABT_sched_free(&scheds[i]);
}

/* Finalize */
ABT_finalize();

return 0;
}

static void create_threads(void *arg)
{
int i, rank, tid = (int)(size_t)arg;
ABT_xstream xstream;
ABT_pool pools[NUM_XSTREAMS];
ABT_thread *threads;

ABT_xstream_self(&xstream);
ABT_xstream_get_main_pools(xstream, NUM_XSTREAMS, pools);

ABT_xstream_get_rank(xstream, &rank);
printf("[U%d:E%d] creating ULTs\n", tid, rank);

threads = (ABT_thread *)malloc(sizeof(ABT_thread) * NUM_THREADS);
for (i = 0; i < NUM_THREADS; i++) {
size_t id = (rank + 1) * 10 + i;
ABT_thread_create(pools[rank], thread_hello, (void *)id, ABT_THREAD_ATTR_NULL,
&threads[i]);
}

ABT_xstream_get_rank(xstream, &rank);
printf("[U%d:E%d] freeing ULTs\n", tid, rank);
for (i = 0; i < NUM_THREADS; i++) {
ABT_thread_free(&threads[i]);
}
free(threads);
}

static void thread_hello(void *arg)
{
int tid = (int)(size_t)arg;
int old_rank, cur_rank;
char *msg;

ABT_xstream_self_rank(&cur_rank);

printf(" [U%d:E%d] Hello, world!\n", tid, cur_rank);

ABT_thread_yield();

old_rank = cur_rank;
ABT_xstream_self_rank(&cur_rank);
msg = (cur_rank == old_rank) ? "" : " (stolen)";
printf(" [U%d:E%d] Hello again.%s\n", tid, cur_rank, msg);

ABT_thread_yield();

old_rank = cur_rank;
ABT_xstream_self_rank(&cur_rank);
msg = (cur_rank == old_rank) ? "" : " (stolen)";
printf(" [U%d:E%d] Goodbye, world!%s\n", tid, cur_rank, msg);
}
84 changes: 84 additions & 0 deletions src/include/abt.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -2309,6 +2309,87 @@ typedef void (*ABT_tool_thread_callback_fn)(ABT_thread, ABT_xstream, uint64_t ev
typedef void (*ABT_tool_task_callback_fn)(ABT_task, ABT_xstream, uint64_t event,
ABT_tool_context context, void *user_arg);

/**
* @ingroup SCHED
* @brief A struct that defines the scheduling policy for an associated pool.
*/
typedef struct {
/**
* @brief Pool priority.
*
* This value determines the relative priority of the associated pool when
* the scheduler is selecting between multiple ready pools. The scheduler
* will favor pools with lower priority. Value must be non-negative.
*/
int priority;
/**
* @brief Minimum number of successful attempts.
*
* This value determines how many work units will be pulled from this pool
* before returning to the scheduler to check for interrupts and/or higher
* priority pools that may have become ready.
*/
int min_successes;
/**
* @brief Maximum number of successful attempts.
*
* This value determines the maximum number of work units will be executed
* from this pool before the pool is timed-out to allow for the scheduler
* to check other pools.
*/
int max_successes;
/**
* @brief Scheduler event number at which this pool becomes ready.
*
* This value determines the point in time that this pool becomes ready
* after being timed-out. It is specified in terms of number of work units
* that the scheduler has executed.
*/
int ready_at;
/**
* @brief Timeout value when the scheduler fails to pop a work unit.
*
* This value determines how many work units the scheduler should execute
* from OTHER pools before this pool becomes ready again in the case where
* the scheduler fails to execute a work unit from this pool (because the
* pool has become empty).
*/
int fail_timeout;
/**
* @brief Timeout value after the maximum number of successful pops.
*
* This value determines how many work units the scheduler should execute
* from OTHER pools before this pool becomes ready again in the case where
* the scheduler has executed the maximum number of events from this pool,
* as specified by max_successes.
*/
int success_timeout;
/**
* @brief Walltime at which this pool becomes ready after a timeout.
*
* This value determines the walltime at which this pool becomes ready
* again after a timeout. It works in conjunction with ready_at. For a pool
* to be ready, it must have reached both the ready_at_wt walltime and the
* ready_at scheduler event number.
*/
double ready_at_wt;
/**
* @brief Timeout value (in seconds) when the scheduler fails to pop.
*
* This value determines how many seconds this pool will be timed out
* in the case where the scheduler fails to pop an work unit from this pool.
*/
double fail_timeout_wt;
/**
* @brief Timeout value (in seconds) after the maximum successful pops.
*
* This value determines how many seconds this pool will be timed out
* in the case where the scheduler fails pops the maximum number of work
* units from this pool as determined by max_successes.
*/
double success_timeout_wt;
} ABT_sched_policy;

/* Init & Finalize */
int ABT_init(int argc, char **argv) ABT_API_PUBLIC;
int ABT_finalize(void) ABT_API_PUBLIC;
Expand Down Expand Up @@ -2363,6 +2444,9 @@ int ABT_sched_create(ABT_sched_def *def, int num_pools, ABT_pool *pools,
int ABT_sched_create_basic(ABT_sched_predef predef, int num_pools,
ABT_pool *pools, ABT_sched_config config,
ABT_sched *newsched) ABT_API_PUBLIC;
void ABT_sched_create_reg(int num_pools, ABT_pool *pools,
int num_policies, ABT_sched_policy *policies,
ABT_sched *newsched) ABT_API_PUBLIC;
int ABT_sched_free(ABT_sched *sched) ABT_API_PUBLIC;
int ABT_sched_set_data(ABT_sched sched, void *data) ABT_API_PUBLIC;
int ABT_sched_get_data(ABT_sched sched, void **data) ABT_API_PUBLIC;
Expand Down
1 change: 1 addition & 0 deletions src/sched/Makefile.mk
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ abt_sources += \
sched/basic_wait.c \
sched/prio.c \
sched/randws.c \
sched/reg.c \
sched/sched.c \
sched/sched_config.c

Loading