Skip to content

Commit

Permalink
test: queue_perf: add queue pair test mode
Browse files Browse the repository at this point in the history
Add pair test mode ('-m 1') where events are moved between queue pairs
connecting two or more workers. Requires an even number of both queues and
workers.

Signed-off-by: Matias Elo <matias.elo@nokia.com>
  • Loading branch information
MatiasElo committed Oct 30, 2024
1 parent 610ac0c commit 250db5f
Showing 1 changed file with 91 additions and 27 deletions.
118 changes: 91 additions & 27 deletions test/performance/odp_queue_perf.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,19 @@

#define MAX_QUEUES (32 * 1024)

typedef enum {
TEST_MODE_LOOP = 0,
TEST_MODE_PAIR,
} test_mode_t;

typedef struct test_options_t {
uint32_t num_queue;
uint32_t num_event;
uint32_t num_round;
uint32_t max_burst;
uint32_t num_cpu;
odp_nonblocking_t nonblock;
test_mode_t mode;
odp_bool_t private_queues;
odp_bool_t single;

Expand Down Expand Up @@ -80,6 +86,12 @@ static void print_usage(void)
"\n"
"Usage: odp_queue_perf [options]\n"
"\n"
" -m, --mode <arg> Test mode:\n"
" 0: Loop: events are enqueued back to the same queue they\n"
" were dequeued from (default)\n"
" 1: Pair: queues are paired and events are always moved\n"
" between the queues when doing dequeue/enqueue. Requires\n"
" an even number of both queues and workers.\n"
" -c, --num_cpu Number of worker threads. Default: 1\n"
" -q, --num_queue Number of queues. Default: 1\n"
" -e, --num_event Number of events per queue. Default: 1\n"
Expand All @@ -95,15 +107,15 @@ static void print_usage(void)

static int parse_options(int argc, char *argv[], test_options_t *test_options)
{
int opt;
int long_index;
int opt, long_index, num_cpu;
int ret = 0;

static const struct option longopts[] = {
{"num_cpu", required_argument, NULL, 'c'},
{"num_queue", required_argument, NULL, 'q'},
{"num_event", required_argument, NULL, 'e'},
{"burst_size", required_argument, NULL, 'b'},
{"mode", required_argument, NULL, 'm'},
{"private", no_argument, NULL, 'p'},
{"num_round", required_argument, NULL, 'r'},
{"lockfree", no_argument, NULL, 'l'},
Expand All @@ -113,12 +125,13 @@ static int parse_options(int argc, char *argv[], test_options_t *test_options)
{NULL, 0, NULL, 0}
};

static const char *shortopts = "+c:q:e:b:pr:lwsh";
static const char *shortopts = "+c:q:e:b:m:pr:lwsh";

test_options->num_cpu = 1;
test_options->num_queue = 1;
test_options->num_event = 1;
test_options->max_burst = 1;
test_options->mode = TEST_MODE_LOOP;
test_options->num_round = 1000;
test_options->nonblock = ODP_BLOCKING;
test_options->single = false;
Expand All @@ -143,6 +156,10 @@ static int parse_options(int argc, char *argv[], test_options_t *test_options)
case 'b':
test_options->max_burst = atoi(optarg);
break;
case 'm':
if (atoi(optarg) == TEST_MODE_PAIR)
test_options->mode = TEST_MODE_PAIR;
break;
case 'r':
test_options->num_round = atoi(optarg);
break;
Expand All @@ -167,29 +184,36 @@ static int parse_options(int argc, char *argv[], test_options_t *test_options)
}
}

if (test_options->num_queue > MAX_QUEUES) {
ODPH_ERR("Too many queues %u. Test maximum %u.\n",
if (test_options->num_queue > MAX_QUEUES || test_options->num_queue == 0) {
ODPH_ERR("Invalid number of queues %u. Test maximum %u.\n",
test_options->num_queue, MAX_QUEUES);
return -1;
}

if (test_options->single && !test_options->private_queues && test_options->num_cpu != 1) {
ODPH_ERR("Using single prod/cons queue(s) with multiple workers not supported.\n");
return -1;
}
num_cpu = test_options->num_cpu;
if (num_cpu == 0)
num_cpu = odp_cpumask_default_worker(NULL, 0);

if (test_options->private_queues) {
int num_cpu = test_options->num_cpu;

if (num_cpu == 0)
num_cpu = odp_cpumask_default_worker(NULL, 0);

if ((int)test_options->num_queue < num_cpu) {
ODPH_ERR("Not enough queues for %d workers.\n", num_cpu);
return -1;
}
}

if (test_options->single && !test_options->private_queues) {
if ((test_options->mode == TEST_MODE_LOOP && num_cpu != 1) ||
(test_options->mode == TEST_MODE_PAIR && num_cpu != 2)) {
ODPH_ERR("Using single prod/cons queue(s) with multiple workers not allowed.\n");
return -1;
}
}

if (test_options->mode == TEST_MODE_PAIR && (test_options->num_queue % 2 || num_cpu % 2)) {
ODPH_ERR("Pair mode requires an even number of queues and workers.\n");
return -1;
}

return ret;
}

Expand All @@ -215,6 +239,8 @@ static int create_queues(test_global_t *global)
nonblock == ODP_BLOCKING ? "NORMAL" :
(nonblock == ODP_NONBLOCKING_LF ? "LOCKFREE" :
(nonblock == ODP_NONBLOCKING_WF ? "WAITFREE" : "???")));
printf(" mode %s\n", test_options->mode == TEST_MODE_LOOP ?
"loop" : "pair");
printf(" private queues %s\n", test_options->private_queues ? "yes" : "no");
printf(" single prod/cons %s\n", test_options->single ? "yes" : "no");
printf(" num rounds %u\n", num_round);
Expand Down Expand Up @@ -488,28 +514,66 @@ static void map_queues_to_threads(test_global_t *global)
{
test_options_t *opt = &global->options;

if (!opt->private_queues) {
if (opt->private_queues && opt->num_queue % opt->num_cpu)
ODPH_ERR("Warn: %" PRIu32 " queues shared unevenly amongst %" PRIu32 " workers.\n",
opt->num_queue, opt->num_cpu);

if (opt->mode == TEST_MODE_LOOP) {
if (!opt->private_queues) {
for (uint32_t i = 0; i < opt->num_queue; i++) {
for (uint32_t j = 0; j < opt->num_cpu; j++) {
global->thread_args[j].src_queue_id[i] = i;
global->thread_args[j].dst_queue_id[i] = i;
global->thread_args[j].num_queues++;
}
}
return;
}

for (uint32_t i = 0; i < opt->num_queue; i++) {
uint32_t thread_idx = i % opt->num_cpu;
uint32_t queue_idx = global->thread_args[thread_idx].num_queues;

global->thread_args[thread_idx].src_queue_id[queue_idx] = i;
global->thread_args[thread_idx].dst_queue_id[queue_idx] = i;
global->thread_args[thread_idx].num_queues++;
}
return;
}
/* Pair mode. Always an even number of both queues and CPUs. */
if (!opt->private_queues) {
for (uint32_t i = 0; i < opt->num_queue; i += 2) {
for (uint32_t j = 0; j < opt->num_cpu; j++) {
global->thread_args[j].src_queue_id[i] = i;
global->thread_args[j].dst_queue_id[i] = i;
global->thread_args[j].num_queues++;
thread_args_t *thread_args = &global->thread_args[j];
uint32_t num_queues = thread_args->num_queues;

if (j % 2 == 0) {
thread_args->src_queue_id[num_queues] = i;
thread_args->dst_queue_id[num_queues] = i + 1;
} else {
thread_args->src_queue_id[num_queues] = i + 1;
thread_args->dst_queue_id[num_queues] = i;
}
thread_args->num_queues++;
}
}
return;
}

if (opt->num_queue % opt->num_cpu)
ODPH_ERR("Warn: %" PRIu32 " queues shared unevenly amongst %" PRIu32 " workers.\n",
opt->num_queue, opt->num_cpu);
for (uint32_t i = 0; i < opt->num_queue; i += 2) {
uint32_t num_queues;
uint32_t thread_a = i % opt->num_cpu;
uint32_t thread_b = thread_a + 1;

for (uint32_t i = 0; i < opt->num_queue; i++) {
uint32_t thread_idx = i % opt->num_cpu;
uint32_t queue_idx = global->thread_args[thread_idx].num_queues;
num_queues = global->thread_args[thread_a].num_queues;
global->thread_args[thread_a].src_queue_id[num_queues] = i;
global->thread_args[thread_a].dst_queue_id[num_queues] = i + 1;
global->thread_args[thread_a].num_queues++;

global->thread_args[thread_idx].src_queue_id[queue_idx] = i;
global->thread_args[thread_idx].dst_queue_id[queue_idx] = i;
global->thread_args[thread_idx].num_queues++;
num_queues = global->thread_args[thread_b].num_queues;
global->thread_args[thread_b].src_queue_id[num_queues] = i + 1;
global->thread_args[thread_b].dst_queue_id[num_queues] = i;
global->thread_args[thread_b].num_queues++;
}
}

Expand Down

0 comments on commit 250db5f

Please sign in to comment.