Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

an "operations" field #24

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ EXTRA_DIST =
BUILT_SOURCES =
include_HEADERS = include/quintain.h \
include/quintain-server.h \
include/quintain-client.h
include/quintain-client.h \
include/quintain-macros.h

TESTS_ENVIRONMENT =

Expand Down
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,21 @@ mochi-quintain is a microservice (i.e., a Mochi provider) that responds to
parameterized RPCs to generate synthetic concurrent server load.

## Provider
The provider portion of mochi-quintain can be started with bedrock.
The provider portion of mochi-quintain can be started with bedrock. bedrock
will need to be able to find the libquintain-bedrock.so library, so either
install quintain via spack or set up your `LD_LIBRARY_PATH` apropriately.

configure --prefix=${QUINTAIN_DIR}
make install
# spack does this for us...
export LD_LIBRARY_PATH=${QUINTAIN_DIR}/lib:$LD_LIBRARY_PATH}
bedrock -c tests/mochi-quintain-provider.json tcp
./src/quintain-benchmark -g quintain.ssg -j ../tests/quintain-benchmark-example.json -o test-output
bedrock-shutdown -s quintain.ssg tcp://


The test script 'basic.sh' does all this for you.


### Bedrock and JX9

Expand Down
4 changes: 2 additions & 2 deletions include/quintain-client.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ int quintain_work(quintain_provider_handle_t provider,
int req_buffer_size,
int resp_buffer_size,
hg_size_t bulk_size,
hg_bulk_op_t bulk_op,
void* bulk_buffer,
int flags);
int flags,
int op_flags);

#ifdef __cplusplus
}
Expand Down
File renamed without changes.
11 changes: 11 additions & 0 deletions include/quintain.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ extern "C" {
/* flags for workload operations */
#define QTN_WORK_USE_SERVER_POOLSET 1

/* op flags: caller can combine multiple operations into a single call */
typedef enum {
QTN_NOOP = 0,
QTN_BULK_PULL = 1 << 0,
QTN_BULK_PUSH = 1 << 1,
QTN_ABTIO_READ = 1 << 2,
QTN_ABTIO_WRITE = 1 << 3,
QTN_MEM_GET = 1 << 4,
QTN_MEM_PUT = 1 << 5
} qtn_operations;

#ifdef __cplusplus
}
#endif
Expand Down
39 changes: 19 additions & 20 deletions src/quintain-benchmark.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ int main(int argc, char** argv)
struct json_object* json_cfg;
int req_buffer_size, resp_buffer_size, duration_seconds, warmup_iterations,
bulk_size;
hg_bulk_op_t bulk_op;
double this_ts, prev_ts, start_ts;
double* samples;
int sample_index = 0;
Expand All @@ -85,6 +84,8 @@ int main(int argc, char** argv)
ssg_member_id_t svr_id;
void* bulk_buffer = NULL;
int work_flags = 0;
int op_flags = 0;
const char* op_str = NULL;
struct margo_init_info mii = {0};
struct json_object* margo_config = NULL;
struct json_object* svr_config = NULL;
Expand Down Expand Up @@ -207,22 +208,17 @@ int main(int argc, char** argv)
if (json_object_get_boolean(
json_object_object_get(json_cfg, "use_server_poolset")))
work_flags |= QTN_WORK_USE_SERVER_POOLSET;

/* todo: make this an object, not a string, so we can buld up / time a
* sequence of operations */
op_str
= json_object_get_string(json_object_object_get(json_cfg, "operation"));
roblatham00 marked this conversation as resolved.
Show resolved Hide resolved
if (op_str && strcmp("bulk_get", op_str) == 0)
op_flags = QTN_BULK_PULL;
else if (op_str && strcmp("bulk_put", op_str) == 0)
op_flags = QTN_BULK_PUSH;
bulk_size
= json_object_get_int(json_object_object_get(json_cfg, "bulk_size"));
if (strcmp("pull", json_object_get_string(
json_object_object_get(json_cfg, "bulk_direction"))))
bulk_op = HG_BULK_PULL;
else if (strcmp("push", json_object_get_string(json_object_object_get(
json_cfg, "bulk_direction"))))
bulk_op = HG_BULK_PUSH;
else {
fprintf(stderr,
"Error: invalid bulk_direction parameter: %s (must be push or "
"pull).\n",
json_object_get_string(
json_object_object_get(json_cfg, "bulk_direction")));
goto err_qtn_cleanup;
}

/* allocate with mmap rather than malloc just so we can use the
* MAP_POPULATE flag to get the paging out of the way before we start
Expand All @@ -236,11 +232,15 @@ int main(int argc, char** argv)
goto err_qtn_cleanup;
}

/* Allocate a bulk buffer (if bulk_size > 0) to reuse in all _work()
/* Allocate a bulk buffer to reuse in all _work()
* calls. Note that we do not expliclitly register it for RDMA here;
* that will be handled within the _work() call as needed.
*
* we used to imply the need for bulk operations if bulk_size was nonzero,
* but now we explicitly request bulk operations through the 'operations'
* flag
*/
if (bulk_size > 0) {
if ((op_flags & QTN_BULK_PULL) | (op_flags & QTN_BULK_PUSH)) {
bulk_buffer = malloc(bulk_size);
if (!bulk_buffer) {
perror("malloc");
Expand All @@ -252,7 +252,7 @@ int main(int argc, char** argv)
/* run warm up iterations, if specified */
for (i = 0; i < warmup_iterations; i++) {
ret = quintain_work(qph, req_buffer_size, resp_buffer_size, bulk_size,
bulk_op, bulk_buffer, work_flags);
bulk_buffer, work_flags, op_flags);
if (ret != QTN_SUCCESS) {
fprintf(stderr, "Error: quintain_work() failure: (%d)\n", ret);
goto err_qtn_cleanup;
Expand All @@ -267,7 +267,7 @@ int main(int argc, char** argv)

do {
ret = quintain_work(qph, req_buffer_size, resp_buffer_size, bulk_size,
bulk_op, bulk_buffer, work_flags);
bulk_buffer, work_flags, op_flags);
if (ret != QTN_SUCCESS) {
fprintf(stderr, "Error: quintain_work() failure.\n");
goto err_qtn_cleanup;
Expand Down Expand Up @@ -509,7 +509,6 @@ static int parse_json(const char* json_file, struct json_object** json_cfg)
CONFIG_HAS_OR_CREATE(*json_cfg, int, "req_buffer_size", 128, val);
CONFIG_HAS_OR_CREATE(*json_cfg, int, "resp_buffer_size", 128, val);
CONFIG_HAS_OR_CREATE(*json_cfg, int, "bulk_size", 16384, val);
CONFIG_HAS_OR_CREATE(*json_cfg, string, "bulk_direction", "pull", val);
CONFIG_HAS_OR_CREATE(*json_cfg, int, "warmup_iterations", 10, val);
CONFIG_HAS_OR_CREATE(*json_cfg, boolean, "use_server_poolset", 1, val);
CONFIG_HAS_OR_CREATE(*json_cfg, boolean, "trace", 1, val);
Expand Down
16 changes: 11 additions & 5 deletions src/quintain-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ int quintain_work(quintain_provider_handle_t provider,
int req_buffer_size,
int resp_buffer_size,
hg_size_t bulk_size,
hg_bulk_op_t bulk_op,
void* bulk_buffer,
int flags)
int flags,
int operations)
{
hg_handle_t handle = HG_HANDLE_NULL;
qtn_work_in_t in;
Expand All @@ -125,8 +125,12 @@ int quintain_work(quintain_provider_handle_t provider,
goto finish;
}

in.bulk_op = bulk_op;
if (bulk_op == HG_BULK_PUSH) bulk_flags = HG_BULK_WRITE_ONLY;
if (operations & QTN_BULK_PUSH) {
in.bulk_op = HG_BULK_PUSH;
bulk_flags = HG_BULK_WRITE_ONLY;
} else if (operations & QTN_BULK_PULL) {
in.bulk_op = HG_BULK_PULL;
}
in.bulk_handle = HG_BULK_NULL;
in.resp_buffer_size = resp_buffer_size;
in.req_buffer_size = req_buffer_size;
Expand All @@ -135,7 +139,9 @@ int quintain_work(quintain_provider_handle_t provider,
else
in.req_buffer = NULL;
in.bulk_size = bulk_size;
if (bulk_size) {
in.operation = operations;

if ((operations & QTN_BULK_PULL) | (operations & QTN_BULK_PUSH)) {
hret = margo_bulk_create(provider->client->mid, 1,
(void**)(&bulk_buffer), &bulk_size, bulk_flags,
&in.bulk_handle);
Expand Down
2 changes: 2 additions & 0 deletions src/quintain-rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ typedef struct {
uint32_t flags; /* flags to modify behavior */
uint32_t bulk_op; /* what type of bulk xfer to do */
hg_bulk_t bulk_handle; /* bulk handle (if set) for bulk xfer */
uint32_t operation; /* what type of work we will carry out */
char* req_buffer; /* dummy buffer */
} qtn_work_in_t;
static inline hg_return_t hg_proc_qtn_work_in_t(hg_proc_t proc, void* v_out_p);
Expand All @@ -40,6 +41,7 @@ static inline hg_return_t hg_proc_qtn_work_in_t(hg_proc_t proc, void* v_out_p)
hg_proc_uint64_t(proc, &in->req_buffer_size);
hg_proc_uint64_t(proc, &in->bulk_size);
hg_proc_uint32_t(proc, &in->bulk_op);
hg_proc_uint32_t(proc, &in->operation);
hg_proc_hg_bulk_t(proc, &in->bulk_handle);

/* The remainder of the request contains the req_buffer; differentiate
Expand Down
2 changes: 1 addition & 1 deletion src/quintain-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ static void qtn_work_ult(hg_handle_t handle)
else
out.resp_buffer = NULL;

if (in.bulk_size) {
if ((in.operation & QTN_BULK_PULL) | (in.operation & QTN_BULK_PUSH)) {
/* we were asked to perform a bulk transfer */
if (in.flags & QTN_WORK_USE_SERVER_POOLSET) {
/* get buffer from poolset */
Expand Down