Skip to content

Commit

Permalink
HG perf: autodetect MPI implementation
Browse files Browse the repository at this point in the history
MPI can now be autodetected and dynamically loaded,
even if MERCURY_TEST_ENABLE_PARALLEL was turned off. If
MERCURY_TEST_ENABLE_PARALLEL is turned on, tests remain
linked against MPI as they used to be.

Add client and server comm size information.
  • Loading branch information
soumagne committed Dec 14, 2023
1 parent c299d33 commit dda8ccb
Show file tree
Hide file tree
Showing 13 changed files with 769 additions and 212 deletions.
2 changes: 2 additions & 0 deletions Testing/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ configure_file(
set(NA_TEST_COMMON_SRCS
${CMAKE_CURRENT_SOURCE_DIR}/na_test.c
${CMAKE_CURRENT_SOURCE_DIR}/na_test_getopt.c
${CMAKE_CURRENT_SOURCE_DIR}/na_test_mpi.c
)

set(MERCURY_TEST_COMMON_SRCS
Expand All @@ -109,6 +110,7 @@ set(NA_TEST_COMMON_PRIVATE_HEADERS
${CMAKE_CURRENT_BINARY_DIR}/mercury_test_config.h
${CMAKE_CURRENT_SOURCE_DIR}/na_test.h
${CMAKE_CURRENT_SOURCE_DIR}/na_test_getopt.h
${CMAKE_CURRENT_SOURCE_DIR}/na_test_mpi.h
)

set(MERCURY_TEST_COMMON_PRIVATE_HEADERS
Expand Down
13 changes: 4 additions & 9 deletions Testing/common/mercury_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -248,26 +248,21 @@ HG_Test_init(int argc, char *argv[], struct hg_test_info *hg_test_info)
hg_test_info->hg_class = hg_test_info->hg_classes[0]; /* default */

if (hg_test_info->na_test_info.listen) {
#ifdef HG_TEST_HAS_PARALLEL
int j;
for (j = 0; j < hg_test_info->na_test_info.mpi_comm_size; j++) {
if (hg_test_info->na_test_info.mpi_comm_rank == j) {
#endif
for (j = 0; j < hg_test_info->na_test_info.mpi_info.size; j++) {
if (hg_test_info->na_test_info.mpi_info.rank == j) {
for (i = 0; i < hg_test_info->na_test_info.max_classes; i++) {
ret = hg_test_self_addr_publish(hg_test_info->hg_classes[i],
i > 0 || hg_test_info->na_test_info.mpi_comm_rank != 0);
i > 0 || hg_test_info->na_test_info.mpi_info.rank != 0);
HG_TEST_CHECK_HG_ERROR(
error, ret, "hg_test_self_addr_publish() failed");
}

#ifdef HG_TEST_HAS_PARALLEL
}
NA_Test_barrier(&hg_test_info->na_test_info);
}
/* If static client, must wait for server to write config file */
if (hg_test_info->na_test_info.mpi_static)
MPI_Barrier(MPI_COMM_WORLD);
#endif
na_test_mpi_barrier_world();
}

return HG_SUCCESS;
Expand Down
192 changes: 28 additions & 164 deletions Testing/common/na_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@
#include "na_test.h"
#include "na_test_getopt.h"

#ifdef NA_HAS_MPI
# include "na_mpi.h"
#endif

#include "mercury_util.h"

#include <stdio.h>
Expand Down Expand Up @@ -57,14 +53,6 @@ na_test_parse_options(
static size_t
na_test_parse_size(const char *str);

#ifdef HG_TEST_HAS_PARALLEL
static na_return_t
na_test_mpi_init(struct na_test_info *na_test_info);

static void
na_test_mpi_finalize(struct na_test_info *na_test_info);
#endif

static char *
na_test_gen_config(struct na_test_info *na_test_info, unsigned int i);

Expand Down Expand Up @@ -254,106 +242,6 @@ na_test_parse_size(const char *str)
return 0;
}

/*---------------------------------------------------------------------------*/
#ifdef HG_TEST_HAS_PARALLEL
static na_return_t
na_test_mpi_init(struct na_test_info *na_test_info)
{
int mpi_initialized = 0;
na_return_t ret;
int rc;

rc = MPI_Initialized(&mpi_initialized);
NA_TEST_CHECK_ERROR(rc != MPI_SUCCESS, error, ret, NA_PROTOCOL_ERROR,
"MPI_Initialized() failed");
NA_TEST_CHECK_ERROR(mpi_initialized, error, ret, NA_PROTOCOL_ERROR,
"MPI was already initialized");

# ifdef NA_MPI_HAS_GNI_SETUP
/* Setup GNI job before initializing MPI */
ret = NA_MPI_Gni_job_setup();
NA_TEST_CHECK_NA_ERROR(error, ret, "Could not setup GNI job");
# endif

if ((na_test_info->listen && na_test_info->use_threads) ||
na_test_info->mpi_static) {
int provided;

rc = MPI_Init_thread(NULL, NULL, MPI_THREAD_MULTIPLE, &provided);
NA_TEST_CHECK_ERROR(rc != MPI_SUCCESS, error, ret, NA_PROTOCOL_ERROR,
"MPI_Init_thread() failed");

NA_TEST_CHECK_ERROR(provided != MPI_THREAD_MULTIPLE, error, ret,
NA_PROTOCOL_ERROR, "MPI_THREAD_MULTIPLE cannot be set");

/* Only if we do static MPMD MPI */
if (na_test_info->mpi_static) {
int color, global_rank;

rc = MPI_Comm_rank(MPI_COMM_WORLD, &global_rank);
NA_TEST_CHECK_ERROR(rc != MPI_SUCCESS, error, ret,
NA_PROTOCOL_ERROR, "MPI_Comm_rank() failed");

/* Color is 1 for server, 2 for client */
color = (na_test_info->listen) ? 1 : 2;

/* Assume that the application did not split MPI_COMM_WORLD already
*/
rc = MPI_Comm_split(
MPI_COMM_WORLD, color, global_rank, &na_test_info->mpi_comm);
NA_TEST_CHECK_ERROR(rc != MPI_SUCCESS, error, ret,
NA_PROTOCOL_ERROR, "MPI_Comm_split() failed");

# ifdef NA_HAS_MPI
/* Set init comm that will be used to setup NA MPI */
NA_MPI_Set_init_intra_comm(na_test_info->mpi_comm);
# endif
} else
na_test_info->mpi_comm = MPI_COMM_WORLD; /* default */
} else {
rc = MPI_Init(NULL, NULL);
NA_TEST_CHECK_ERROR(rc != MPI_SUCCESS, error, ret, NA_PROTOCOL_ERROR,
"MPI_Init() failed");

na_test_info->mpi_comm = MPI_COMM_WORLD; /* default */
}

rc = MPI_Comm_rank(na_test_info->mpi_comm, &na_test_info->mpi_comm_rank);
NA_TEST_CHECK_ERROR(rc != MPI_SUCCESS, error, ret, NA_PROTOCOL_ERROR,
"MPI_Comm_rank() failed");

rc = MPI_Comm_size(na_test_info->mpi_comm, &na_test_info->mpi_comm_size);
NA_TEST_CHECK_ERROR(rc != MPI_SUCCESS, error, ret, NA_PROTOCOL_ERROR,
"MPI_Comm_size() failed");

return NA_SUCCESS;

error:
na_test_mpi_finalize(na_test_info);

return ret;
}

/*---------------------------------------------------------------------------*/
static void
na_test_mpi_finalize(struct na_test_info *na_test_info)
{
int mpi_finalized = 0;

if (na_test_info->mpi_no_finalize)
return;

(void) MPI_Finalized(&mpi_finalized);
if (mpi_finalized)
return;

if (na_test_info->mpi_static && na_test_info->mpi_comm != MPI_COMM_NULL)
(void) MPI_Comm_free(&na_test_info->mpi_comm);

(void) MPI_Finalize();
}
#endif

/*---------------------------------------------------------------------------*/
static char *
na_test_gen_config(struct na_test_info *na_test_info, unsigned int i)
Expand Down Expand Up @@ -554,27 +442,22 @@ NA_Test_init(int argc, char *argv[], struct na_test_info *na_test_info)

na_test_parse_options(argc, argv, na_test_info);

#ifdef HG_TEST_HAS_PARALLEL
/* Test run in parallel using mpirun so must intialize MPI to get
* basic setup info etc */
ret = na_test_mpi_init(na_test_info);
ret = na_test_mpi_init(&na_test_info->mpi_info, na_test_info->listen,
na_test_info->use_threads, na_test_info->mpi_static);
NA_TEST_CHECK_NA_ERROR(error, ret, "na_test_mpi_init() failed");
na_test_info->max_number_of_peers = MPIEXEC_MAX_NUMPROCS;
#else
na_test_info->mpi_comm_rank = 0;
na_test_info->mpi_comm_size = 1;
na_test_info->max_number_of_peers = 1;
#endif

if (na_test_info->max_classes == 0)
na_test_info->max_classes = 1;

/* Call cleanup before doing anything */
if (na_test_info->listen && na_test_info->mpi_comm_rank == 0)
if (na_test_info->listen && na_test_info->mpi_info.rank == 0)
NA_Cleanup();

if (na_test_info->busy_wait) {
na_init_info.progress_mode = NA_NO_BLOCK;
if (na_test_info->mpi_comm_rank == 0)
if (na_test_info->mpi_info.rank == 0)
printf("# Initializing NA in busy wait mode\n");
}
na_init_info.auth_key = na_test_info->key;
Expand All @@ -593,12 +476,12 @@ NA_Test_init(int argc, char *argv[], struct na_test_info *na_test_info)
for (i = 0; i < na_test_info->max_classes; i++) {
/* Generate NA init string and get config options */
info_string = na_test_gen_config(na_test_info,
(unsigned int) (i + (unsigned int) na_test_info->mpi_comm_rank *
(unsigned int) (i + (unsigned int) na_test_info->mpi_info.rank *
na_test_info->max_classes));
NA_TEST_CHECK_ERROR(info_string == NULL, error, ret, NA_PROTOCOL_ERROR,
"Could not generate config string");

if (na_test_info->mpi_comm_rank == 0)
if (na_test_info->mpi_info.rank == 0)
printf("# Class %zu using info string: %s\n", i + 1, info_string);

na_test_info->na_classes[i] =
Expand All @@ -619,20 +502,17 @@ NA_Test_init(int argc, char *argv[], struct na_test_info *na_test_info)
error, ret, "na_test_self_addr_publish() failed");
}

#ifdef HG_TEST_HAS_PARALLEL
/* If static client must wait for server to write config file */
if (na_test_info->mpi_static)
MPI_Barrier(MPI_COMM_WORLD);
#endif
na_test_mpi_barrier_world();
}
/* Get config from file if self option is not passed */
else if (!na_test_info->listen && !na_test_info->self_send) {
#ifdef HG_TEST_HAS_PARALLEL
/* If static client must wait for server to write config file */
if (na_test_info->mpi_static)
MPI_Barrier(MPI_COMM_WORLD);
#endif
if (na_test_info->mpi_comm_rank == 0) {
na_test_mpi_barrier_world();

if (na_test_info->mpi_info.rank == 0) {
size_t count = 0;

ret = na_test_get_config(NULL, &count);
Expand All @@ -643,48 +523,45 @@ NA_Test_init(int argc, char *argv[], struct na_test_info *na_test_info)
na_test_info->max_targets = (uint32_t) count;
}

#ifdef HG_TEST_HAS_PARALLEL
if (na_test_info->mpi_comm_size > 1)
MPI_Bcast(&na_test_info->max_targets, 1, MPI_UINT32_T, 0,
na_test_info->mpi_comm);
#endif
if (na_test_info->mpi_info.size > 1)
na_test_mpi_bcast(&na_test_info->mpi_info,
&na_test_info->max_targets, sizeof(uint32_t), 0);

na_test_info->target_names = (char **) malloc(
na_test_info->max_targets * sizeof(*na_test_info->target_names));
NA_TEST_CHECK_ERROR(na_test_info->target_names == NULL, error, ret,
NA_NOMEM, "Could not allocated target name array");

if (na_test_info->mpi_comm_rank == 0) {
if (na_test_info->mpi_info.rank == 0) {
size_t count = na_test_info->max_targets;

ret = na_test_get_config(na_test_info->target_names, &count);
NA_TEST_CHECK_NA_ERROR(error, ret,
"na_test_get_config() failed (%s)", NA_Error_to_string(ret));
}

#ifdef HG_TEST_HAS_PARALLEL
if (na_test_info->mpi_comm_size > 1) {
if (na_test_info->mpi_info.size > 1) {
uint32_t j;

for (j = 0; j < na_test_info->max_targets; j++) {
char test_addr_name[NA_TEST_MAX_ADDR_NAME];

if (na_test_info->mpi_comm_rank == 0)
if (na_test_info->mpi_info.rank == 0)
strcpy(test_addr_name, na_test_info->target_names[j]);

MPI_Bcast(test_addr_name, NA_TEST_MAX_ADDR_NAME, MPI_BYTE, 0,
na_test_info->mpi_comm);
na_test_mpi_bcast(&na_test_info->mpi_info, test_addr_name,
NA_TEST_MAX_ADDR_NAME, 0);

if (na_test_info->mpi_comm_rank != 0) {
if (na_test_info->mpi_info.rank != 0) {
na_test_info->target_names[j] = strdup(test_addr_name);
NA_TEST_CHECK_ERROR(na_test_info->target_names[j] == NULL,
error, ret, NA_NOMEM, "strdup() of target_name failed");
}
}
}
#endif

na_test_info->target_name = na_test_info->target_names[0];
if (na_test_info->mpi_comm_rank == 0) {
if (na_test_info->mpi_info.rank == 0) {
uint32_t j;

printf("# %" PRIu32 " target name(s) read:\n",
Expand Down Expand Up @@ -754,9 +631,7 @@ NA_Test_finalize(struct na_test_info *na_test_info)
na_test_info->key = NULL;
}

#ifdef HG_TEST_HAS_PARALLEL
na_test_mpi_finalize(na_test_info);
#endif
na_test_mpi_finalize(&na_test_info->mpi_info);

done:
return ret;
Expand All @@ -766,26 +641,15 @@ NA_Test_finalize(struct na_test_info *na_test_info)
void
NA_Test_barrier(const struct na_test_info *na_test_info)
{
#ifdef HG_TEST_HAS_PARALLEL
if (na_test_info->mpi_comm_size > 1)
MPI_Barrier(na_test_info->mpi_comm);
#else
(void) na_test_info;
#endif
if (na_test_info->mpi_info.size > 1)
na_test_mpi_barrier(&na_test_info->mpi_info);
}

/*---------------------------------------------------------------------------*/
void
NA_Test_bcast(
char *buf, int count, int root, const struct na_test_info *na_test_info)
void *buf, size_t size, int root, const struct na_test_info *na_test_info)
{
#ifdef HG_TEST_HAS_PARALLEL
if (na_test_info->mpi_comm_size > 1)
MPI_Bcast(buf, count, MPI_BYTE, root, na_test_info->mpi_comm);
#else
(void) na_test_info;
(void) count;
(void) root;
(void) buf;
#endif
if (na_test_info->mpi_info.size > 1)
na_test_mpi_bcast(&na_test_info->mpi_info, buf, size, root);
}
17 changes: 3 additions & 14 deletions Testing/common/na_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,7 @@
#ifndef NA_TEST_H
#define NA_TEST_H

#include "mercury_test_config.h"
#include "na.h"

#ifdef HG_TEST_HAS_PARALLEL
# include <mpi.h>
#endif
#include "na_test_mpi.h"

/*************************************/
/* Public Type and Struct Definition */
Expand Down Expand Up @@ -43,13 +38,7 @@ struct na_test_info {
size_t buf_size_max; /* Max buffer size */
size_t buf_count; /* Buffer count */
bool verbose; /* Verbose mode */
int max_number_of_peers; /* Max number of peers */
#ifdef HG_TEST_HAS_PARALLEL
MPI_Comm mpi_comm; /* MPI comm */
bool mpi_no_finalize; /* Prevent from finalizing MPI */
#endif
int mpi_comm_rank; /* MPI comm rank */
int mpi_comm_size; /* MPI comm size */
struct na_test_mpi_info mpi_info;
bool extern_init; /* Extern init */
bool use_threads; /* Use threads */
bool force_register; /* Force registration each iteration */
Expand Down Expand Up @@ -216,7 +205,7 @@ NA_Test_barrier(const struct na_test_info *na_test_info);
*/
void
NA_Test_bcast(
char *buf, int count, int root, const struct na_test_info *na_test_info);
void *buf, size_t size, int root, const struct na_test_info *na_test_info);

#ifdef __cplusplus
}
Expand Down
Loading

0 comments on commit dda8ccb

Please sign in to comment.