Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAOS-15540 object: add bulk transfer timeout #15204

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions src/engine/srv.c
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,43 @@ dss_sleep(uint64_t msec)
return 0;
}

/**
* Wait for the eventual is ready until it is timeout.
*
* \param[in] eventual eventual to be waitted.
* \param[in] status eventual status.
* \param[in] timeout timeout seconds.
* \param[in] step sleep seconds if it is not ready.
*
* \return status if eventual is ready or failed within the timeout.
* DER_TIMEOUT if it is not ready within timeout.
*/
int
dss_eventual_timeout_wait(ABT_eventual eventual, void *status, uint32_t timeout, uint32_t step)
{
ABT_bool is_ready;
uint64_t start = daos_gettime_coarse();
int rc;

while (1) {
rc = ABT_eventual_test(eventual, status, &is_ready);
if (rc != ABT_SUCCESS || is_ready == ABT_TRUE) {
if (rc != ABT_SUCCESS)
rc = dss_abterr2der(rc);
break;
}

if (daos_gettime_coarse() - start >= timeout) {
D_ERROR("Started from "DF_U64", timeout after %u\n", start, timeout);
rc = -DER_TIMEDOUT;
break;
}
dss_sleep(step);
}

return rc;
}

struct dss_rpc_cntr *
dss_rpc_cntr_get(enum dss_rpc_cntr_id id)
{
Expand Down
3 changes: 3 additions & 0 deletions src/include/daos/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,9 @@ enum {
#define DAOS_POOL_EVICT_FAIL (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0xa0)
#define DAOS_POOL_RFCHECK_FAIL (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0xa1)

#define DAOS_OBJ_FAIL_BULK_TIMEOUT (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0xa3)
#define DAOS_OBJ_FAIL_BULK_ABORT (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0xa4)

#define DAOS_CHK_CONT_ORPHAN (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0xb0)
#define DAOS_CHK_CONT_BAD_LABEL (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0xb1)
#define DAOS_CHK_LEADER_BLOCK (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0xb2)
Expand Down
3 changes: 3 additions & 0 deletions src/include/daos_srv/daos_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,9 @@ int dss_ult_periodic(void (*func)(void *), void *arg, int xs_type, int tgt_id,

int dss_sleep(uint64_t ms);

int dss_eventual_timeout_wait(ABT_eventual eventual, void *status, uint32_t timeout,
uint32_t step);

/* Pack return codes with additional argument to reduce */
struct dss_stream_arg_type {
/** return value */
Expand Down
4 changes: 4 additions & 0 deletions src/object/srv_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ struct migrate_cont_hdl {
struct obj_bulk_args {
ABT_eventual eventual;
uint64_t bulk_size;
crt_bulk_t *bulk_hdls;
crt_bulk_opid_t *bulk_opids;
uint16_t bulk_hdls_max_size;
uint16_t bulk_hdls_size;
int bulks_inflight;
int result;
bool inited;
Expand Down
120 changes: 116 additions & 4 deletions src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,63 @@ obj_rw_reply(crt_rpc_t *rpc, int status, uint64_t epoch,
}
}

#define BULK_HANDLE_MIN_SIZE 5
static int
obj_bulk_args_insert_hdl_opid(struct obj_bulk_args *arg, crt_bulk_t bulk_hdl,
crt_bulk_opid_t bulk_opid)
{
D_ASSERT(arg->bulk_hdls_size <= arg->bulk_hdls_max_size);
if (arg->bulk_hdls_size == arg->bulk_hdls_max_size - 1) {
crt_bulk_t *new_hdls;
crt_bulk_opid_t *new_opids;

D_ALLOC_ARRAY(new_hdls, arg->bulk_hdls_size + BULK_HANDLE_MIN_SIZE);
if (new_hdls == NULL)
return -DER_NOMEM;

memcpy(new_hdls, arg->bulk_hdls, arg->bulk_hdls_size * sizeof(*arg->bulk_hdls));
if (arg->bulk_hdls_size > BULK_HANDLE_MIN_SIZE)
D_FREE(arg->bulk_hdls);
arg->bulk_hdls = new_hdls;

D_ALLOC_ARRAY(new_opids, arg->bulk_hdls_size + BULK_HANDLE_MIN_SIZE);
if (new_hdls == NULL)
return -DER_NOMEM;

memcpy(new_opids, arg->bulk_opids, arg->bulk_hdls_size * sizeof(*arg->bulk_opids));
if (arg->bulk_hdls_size > BULK_HANDLE_MIN_SIZE)
D_FREE(arg->bulk_opids);

arg->bulk_opids = new_opids;
arg->bulk_hdls_max_size = arg->bulk_hdls_size + BULK_HANDLE_MIN_SIZE;
}

/* Insert bulk_hdl(key) and bulk_opid(value) to the obj_bulk_args, so if the bulk transfer
* complete, the hdl and opid will be removed, otherwise the bulk will be aborted if the bulk
* transfer can not be finished in time.
*/
arg->bulk_hdls[arg->bulk_hdls_size] = bulk_hdl;
arg->bulk_opids[arg->bulk_hdls_size] = bulk_opid;
arg->bulk_hdls_size++;

return 0;
}

static void
obj_bulk_args_delete_hdl_opid(struct obj_bulk_args *arg, crt_bulk_t bulk_hdl)
{
int i;

D_ASSERT(arg->bulk_hdls_size <= arg->bulk_hdls_max_size);
for (i = 0; i < arg->bulk_hdls_size; i++) {
if (arg->bulk_hdls[i] == bulk_hdl) {
arg->bulk_hdls[i] = NULL;
arg->bulk_opids[i] = NULL;
return;
}
}
}

static int
obj_bulk_comp_cb(const struct crt_bulk_cb_info *cb_info)
{
Expand All @@ -252,30 +309,53 @@ obj_bulk_comp_cb(const struct crt_bulk_cb_info *cb_info)

D_ASSERT(arg->bulks_inflight > 0);
arg->bulks_inflight--;
if (arg->bulks_inflight == 0)
if (arg->bulks_inflight == 0 &&
!DAOS_FAIL_CHECK(DAOS_OBJ_FAIL_BULK_TIMEOUT))
ABT_eventual_set(arg->eventual, &arg->result,
sizeof(arg->result));

obj_bulk_args_delete_hdl_opid((struct obj_bulk_args *)cb_info->bci_arg,
bulk_desc->bd_local_hdl);
crt_req_decref(rpc);
return cb_info->bci_rc;
}

static void
obj_bulk_args_bulks_abort(crt_rpc_t *rpc, struct obj_bulk_args *arg)
{
int i;

D_ASSERT(arg->bulk_hdls_size <= arg->bulk_hdls_max_size);
for (i = 0; i < arg->bulk_hdls_size; i++) {
if (arg->bulk_opids[i] != NULL) {
D_DEBUG(DB_IO, "abort bulk %p/%p\n", arg->bulk_opids[i],
arg->bulk_hdls[i]);
D_ASSERT(arg->bulk_hdls[i] != NULL);
crt_bulk_abort(rpc->cr_ctx, arg->bulk_opids[i]);
}
}
}

static inline int
bulk_cp(const struct crt_bulk_cb_info *cb_info)
{
struct crt_bulk_desc *bulk_desc;
int rc;

rc = obj_bulk_comp_cb(cb_info);

bulk_desc = cb_info->bci_bulk_desc;
D_ASSERT(bulk_desc->bd_local_hdl != CRT_BULK_NULL);
crt_bulk_free(bulk_desc->bd_local_hdl);
bulk_desc->bd_local_hdl = CRT_BULK_NULL;

return obj_bulk_comp_cb(cb_info);
return rc;
}

static inline int
cached_bulk_cp(const struct crt_bulk_cb_info *cb_info)
{

return obj_bulk_comp_cb(cb_info);
}

Expand Down Expand Up @@ -439,6 +519,9 @@ bulk_transfer_sgl(daos_handle_t ioh, crt_rpc_t *rpc, crt_bulk_t remote_bulk,
D_ASSERT(remote_size > remote_off);
if (length > (remote_size - remote_off)) {
rc = -DER_OVERFLOW;
if (!cached_bulk)
crt_bulk_free(local_bulk);

D_ERROR("Remote bulk isn't large enough. local_sz:%zu, remote_sz:%zu, "
"remote_off:%zu, "DF_RC"\n", length, remote_size, remote_off,
DP_RC(rc));
Expand Down Expand Up @@ -475,6 +558,10 @@ bulk_transfer_sgl(daos_handle_t ioh, crt_rpc_t *rpc, crt_bulk_t remote_bulk,
}
remote_off += length;

/* Since there are no progress happening after crt_bulk_transfer, so cached_bulk_cp
* or bulk_cp should not be called yet.
*/
obj_bulk_args_insert_hdl_opid(p_arg, local_bulk, bulk_opid);
/* Give cart progress a chance to complete some in-flight bulk transfers */
if (bulk_iovs >= MAX_BULK_IOVS) {
bulk_iovs = 0;
Expand All @@ -491,10 +578,13 @@ obj_bulk_transfer(crt_rpc_t *rpc, crt_bulk_op_t bulk_op, bool bulk_bind, crt_bul
int sgl_nr, int bulk_nr, struct obj_bulk_args *p_arg, struct ds_cont_hdl *coh)
{
struct obj_bulk_args arg = { 0 };
crt_bulk_t bulk_hdls[BULK_HANDLE_MIN_SIZE];
crt_bulk_opid_t bulk_opids[BULK_HANDLE_MIN_SIZE];
int i, rc, *status, ret;
int skip_nr = 0;
bool async = true;
uint64_t time = daos_get_ntime();
uint32_t timeout;

if (unlikely(sgl_nr > bulk_nr)) {
D_ERROR("Invalid sgl_nr vs bulk_nr: %d/%d\n", sgl_nr, bulk_nr);
Expand All @@ -515,6 +605,10 @@ obj_bulk_transfer(crt_rpc_t *rpc, crt_bulk_op_t bulk_op, bool bulk_bind, crt_bul
if (rc != 0)
return dss_abterr2der(rc);

p_arg->bulk_hdls = bulk_hdls;
p_arg->bulk_opids = bulk_opids;
p_arg->bulk_hdls_max_size = BULK_HANDLE_MIN_SIZE;
p_arg->bulk_hdls_size = 0;
p_arg->inited = true;
D_DEBUG(DB_IO, "bulk_op %d, sgl_nr %d, bulk_nr %d\n", bulk_op, sgl_nr, bulk_nr);

Expand Down Expand Up @@ -589,9 +683,23 @@ obj_bulk_transfer(crt_rpc_t *rpc, crt_bulk_op_t bulk_op, bool bulk_bind, crt_bul
if (async)
return rc;

ret = ABT_eventual_wait(p_arg->eventual, (void **)&status);
if (DAOS_FAIL_CHECK(DAOS_OBJ_FAIL_BULK_ABORT))
timeout = 0;
else
crt_req_get_timeout(rpc, &timeout);

ret = dss_eventual_timeout_wait(p_arg->eventual, (void **)&status, timeout, 1);
if (ret == -DER_TIMEDOUT) {
obj_bulk_args_bulks_abort(rpc, p_arg);
while (p_arg->bulks_inflight > 0) {
D_INFO("Wait for comp callback to be executed inflight %d\n",
p_arg->bulks_inflight);
dss_sleep(1);
}
}

if (rc == 0)
rc = ret ? dss_abterr2der(ret) : *status;
rc = ret ? ret : *status;

ABT_eventual_free(&p_arg->eventual);

Expand Down Expand Up @@ -620,6 +728,10 @@ obj_bulk_transfer(crt_rpc_t *rpc, crt_bulk_op_t bulk_op, bool bulk_bind, crt_bul
*fbuffer += 0x2;
d_sgl_fini(&fsgl, false);
}
if (p_arg->bulk_hdls != bulk_hdls)
D_FREE(p_arg->bulk_hdls);
if (p_arg->bulk_opids != bulk_opids)
D_FREE(p_arg->bulk_opids);
return rc;
}

Expand Down
60 changes: 60 additions & 0 deletions src/tests/suite/daos_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -5362,6 +5362,64 @@ io_57(void **state)
reintegrate_single_pool_rank(arg, 0, false);
}

void
io_bulk_timeout(void **state)
{
daos_obj_id_t oid;
test_arg_t *arg = *state;
struct ioreq req;
char *fetch_buf;
char *update_buf;
int size =1048576;

oid = daos_test_oid_gen(arg->coh, dts_obj_class, 0, 0, arg->myrank);
ioreq_init(&req, arg->coh, oid, DAOS_IOD_ARRAY, arg);

D_ALLOC(fetch_buf, size);
assert_non_null(fetch_buf);
D_ALLOC(update_buf, size);
assert_non_null(update_buf);
dts_buf_render(update_buf, size);

print_message("timeout bulk\n");
if (arg->myrank == 0)
daos_debug_set_params(arg->group, -1, DMG_KEY_FAIL_LOC,
DAOS_OBJ_FAIL_BULK_TIMEOUT | DAOS_FAIL_ONCE,
0, NULL);

/** Insert */
insert_single("dkey", "akey", 0, update_buf, size, DAOS_TX_NONE, &req);

/** Lookup */
memset(fetch_buf, 0, size);
lookup_single("dkey", "akey", 0, fetch_buf, size, DAOS_TX_NONE, &req);

/** Verify data consistency */
assert_int_equal(req.iod[0].iod_size, size);
assert_memory_equal(update_buf, fetch_buf, size);

print_message("abort bulk\n");
if (arg->myrank == 0)
daos_debug_set_params(arg->group, -1, DMG_KEY_FAIL_LOC,
DAOS_OBJ_FAIL_BULK_ABORT | DAOS_FAIL_ONCE,
0, NULL);

/** Insert */
insert_single("dkey1", "akey", 0, update_buf, size, DAOS_TX_NONE, &req);

/** Lookup */
memset(fetch_buf, 0, size);
lookup_single("dkey1", "akey", 0, fetch_buf, size, DAOS_TX_NONE, &req);

/** Verify data consistency */
assert_int_equal(req.iod[0].iod_size, size);
assert_memory_equal(update_buf, fetch_buf, size);

D_FREE(update_buf);
D_FREE(fetch_buf);
ioreq_fini(&req);
}

static const struct CMUnitTest io_tests[] = {
{ "IO1: simple update/fetch/verify",
io_simple, async_disable, test_case_teardown},
Expand Down Expand Up @@ -5476,6 +5534,8 @@ static const struct CMUnitTest io_tests[] = {
io_56, async_disable, test_case_teardown},
{ "IO57: collective object query with rank_0 excluded",
io_57, rebuild_sub_rf1_setup, test_teardown},
{ "IO58: IO bulk timeout",
io_bulk_timeout, async_disable, test_case_teardown},
};

int
Expand Down
Loading