Skip to content

Commit

Permalink
test: queue_perf: add option for using private queues
Browse files Browse the repository at this point in the history
Add new option ('-p') for using private queues per worker. In private queue
mode each worker thread operates on separate queues which can improve
performance scaling.

Signed-off-by: Matias Elo <matias.elo@nokia.com>
  • Loading branch information
MatiasElo committed Oct 23, 2024
1 parent b4de7da commit 23d3b0a
Showing 1 changed file with 102 additions and 30 deletions.
132 changes: 102 additions & 30 deletions test/performance/odp_queue_perf.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
/**
* @example odp_queue_perf.c
*
* Performance test application for queue APIs
* Performance test application for plain queues
*
* @cond _ODP_HIDE_FROM_DOXYGEN_
*/
Expand All @@ -30,9 +30,10 @@ typedef struct test_options_t {
uint32_t num_event;
uint32_t num_round;
uint32_t max_burst;
uint32_t num_cpu;
odp_nonblocking_t nonblock;
int single;
int num_cpu;
odp_bool_t private_queues;
odp_bool_t single;

} test_options_t;

Expand All @@ -46,6 +47,14 @@ typedef struct test_stat_t {

} test_stat_t;

typedef struct {
odp_barrier_t *barrier;
test_options_t *options;
test_stat_t *stats;
odp_queue_t queue[MAX_QUEUES];
uint32_t num_queues;
} thread_args_t;

typedef struct test_global_t {
odp_barrier_t barrier;
test_options_t options;
Expand All @@ -54,6 +63,7 @@ typedef struct test_global_t {
odp_pool_t pool;
odp_queue_t queue[MAX_QUEUES];
odph_thread_t thread_tbl[ODP_THREAD_COUNT_MAX];
thread_args_t thread_args[ODP_THREAD_COUNT_MAX];
test_stat_t stat[ODP_THREAD_COUNT_MAX];
test_common_options_t common_options;

Expand All @@ -70,6 +80,7 @@ static void print_usage(void)
" -q, --num_queue Number of queues. Default: 1\n"
" -e, --num_event Number of events per queue. Default: 1\n"
" -b, --burst_size Maximum number of events per operation. Default: 1\n"
" -p, --private Use separate queues for each worker.\n"
" -r, --num_round Number of rounds\n"
" -l, --lockfree Lockfree queues\n"
" -w, --waitfree Waitfree queues\n"
Expand All @@ -89,6 +100,7 @@ static int parse_options(int argc, char *argv[], test_options_t *test_options)
{"num_queue", required_argument, NULL, 'q'},
{"num_event", required_argument, NULL, 'e'},
{"burst_size", required_argument, NULL, 'b'},
{"private", no_argument, NULL, 'p'},
{"num_round", required_argument, NULL, 'r'},
{"lockfree", no_argument, NULL, 'l'},
{"waitfree", no_argument, NULL, 'w'},
Expand All @@ -97,15 +109,16 @@ static int parse_options(int argc, char *argv[], test_options_t *test_options)
{NULL, 0, NULL, 0}
};

static const char *shortopts = "+c:q:e:b:r:lwsh";
static const char *shortopts = "+c:q:e:b:pr:lwsh";

test_options->num_cpu = 1;
test_options->num_queue = 1;
test_options->num_event = 1;
test_options->max_burst = 1;
test_options->num_round = 1000;
test_options->nonblock = ODP_BLOCKING;
test_options->single = 0;
test_options->single = false;
test_options->private_queues = false;

while (1) {
opt = getopt_long(argc, argv, shortopts, longopts, &long_index);
Expand Down Expand Up @@ -135,8 +148,11 @@ static int parse_options(int argc, char *argv[], test_options_t *test_options)
case 'w':
test_options->nonblock = ODP_NONBLOCKING_WF;
break;
case 'p':
test_options->private_queues = true;
break;
case 's':
test_options->single = 1;
test_options->single = true;
break;
case 'h':
/* fall through */
Expand All @@ -153,11 +169,23 @@ static int parse_options(int argc, char *argv[], test_options_t *test_options)
return -1;
}

if (test_options->single && test_options->num_cpu != 1) {
if (test_options->single && !test_options->private_queues && test_options->num_cpu != 1) {
ODPH_ERR("Using single prod/cons queue(s) with multiple workers not supported.\n");
return -1;
}

if (test_options->private_queues) {
int num_cpu = test_options->num_cpu;

if (num_cpu == 0)
num_cpu = odp_cpumask_default_worker(NULL, 0);

if ((int)test_options->num_queue < num_cpu) {
ODPH_ERR("Not enough queues for %d workers.\n", num_cpu);
return -1;
}
}

return ret;
}

Expand All @@ -183,7 +211,8 @@ static int create_queues(test_global_t *global)
nonblock == ODP_BLOCKING ? "NORMAL" :
(nonblock == ODP_NONBLOCKING_LF ? "LOCKFREE" :
(nonblock == ODP_NONBLOCKING_WF ? "WAITFREE" : "???")));
printf(" single prod/cons %d\n", test_options->single);
printf(" private queues %s\n", test_options->private_queues ? "yes" : "no");
printf(" single prod/cons %s\n", test_options->single ? "yes" : "no");
printf(" num rounds %u\n", num_round);
printf(" num queues %u\n", num_queue);
printf(" num events per queue %u\n", num_event);
Expand Down Expand Up @@ -374,25 +403,24 @@ static int run_test(void *arg)
odp_time_t t1, t2;
uint32_t rounds;
int num_ev;
test_stat_t *stat;
test_global_t *global = arg;
test_options_t *test_options = &global->options;
thread_args_t *thr_args = arg;
test_stat_t *stat = thr_args->stats;
odp_queue_t queue;
uint64_t num_deq_retry = 0;
uint64_t num_enq_retry = 0;
uint64_t events = 0;
uint32_t num_queue = test_options->num_queue;
uint32_t num_round = test_options->num_round;
int thr = odp_thread_id();
int ret = 0;
uint32_t i = 0;
uint32_t max_burst = test_options->max_burst;
const uint32_t num_queue = thr_args->num_queues;
const uint32_t num_round = thr_args->options->num_round;
const uint32_t max_burst = thr_args->options->max_burst;
uint32_t queue_idx = 0;
odp_event_t ev[max_burst];
odp_queue_t queue_tbl[MAX_QUEUES];

stat = &global->stat[thr];
for (uint32_t i = 0; i < num_queue; i++)
queue_tbl[i] = thr_args->queue[i];

/* Start all workers at the same time */
odp_barrier_wait(&global->barrier);
odp_barrier_wait(thr_args->barrier);

t1 = odp_time_local_strict();
c1 = odp_cpu_cycles();
Expand All @@ -401,10 +429,10 @@ static int run_test(void *arg)
int num_enq = 0;

do {
queue = global->queue[i++];
queue = queue_tbl[queue_idx++];

if (i == num_queue)
i = 0;
if (queue_idx == num_queue)
queue_idx = 0;

num_ev = odp_queue_deq_multi(queue, ev, max_burst);

Expand Down Expand Up @@ -443,13 +471,54 @@ static int run_test(void *arg)
stat->deq_retry = num_deq_retry;
stat->enq_retry = num_enq_retry;

return ret;
return 0;
}

static void map_queues_to_threads(test_global_t *global)
{
test_options_t *opt = &global->options;

/* All queues shared amongst workers */
if (!opt->private_queues) {
for (uint32_t i = 0; i < opt->num_queue; i++) {
for (uint32_t j = 0; j < opt->num_cpu; j++) {
global->thread_args[j].queue[i] = global->queue[i];
global->thread_args[j].num_queues++;
}
}
return;
}

if (opt->num_queue % opt->num_cpu)
ODPH_ERR("Warn: %" PRIu32 " queues shared unevenly amongst %" PRIu32 " workers.\n",
opt->num_queue, opt->num_cpu);

for (uint32_t i = 0; i < opt->num_queue; i++) {
uint32_t thread_idx = i % opt->num_cpu;
uint32_t queue_idx = global->thread_args[thread_idx].num_queues;

global->thread_args[thread_idx].queue[queue_idx] = global->queue[i];
global->thread_args[thread_idx].num_queues++;
}
}

static void init_thread_args(test_global_t *global)
{
map_queues_to_threads(global);

for (uint32_t i = 0; i < global->options.num_cpu; i++) {
thread_args_t *thread_args = &global->thread_args[i];

thread_args->stats = &global->stat[i];
thread_args->options = &global->options;
thread_args->barrier = &global->barrier;
}
}

static int start_workers(test_global_t *global)
{
odph_thread_common_param_t thr_common;
odph_thread_param_t thr_param;
odph_thread_param_t thr_param[ODP_THREAD_COUNT_MAX];
odp_cpumask_t cpumask;
int ret;
test_options_t *test_options = &global->options;
Expand All @@ -475,14 +544,17 @@ static int start_workers(test_global_t *global)
odph_thread_common_param_init(&thr_common);
thr_common.instance = global->instance;
thr_common.cpumask = &cpumask;
thr_common.share_param = 1;

odph_thread_param_init(&thr_param);
thr_param.start = run_test;
thr_param.arg = global;
thr_param.thr_type = ODP_THREAD_WORKER;
init_thread_args(global);

for (int i = 0; i < num_cpu; i++) {
odph_thread_param_init(&thr_param[i]);
thr_param[i].start = run_test;
thr_param[i].arg = &global->thread_args[i];
thr_param[i].thr_type = ODP_THREAD_WORKER;
}

if (odph_thread_create(global->thread_tbl, &thr_common, &thr_param,
if (odph_thread_create(global->thread_tbl, &thr_common, thr_param,
num_cpu) != num_cpu)
return -1;

Expand Down

0 comments on commit 23d3b0a

Please sign in to comment.