diff --git a/src/engine/srv.c b/src/engine/srv.c index 9461a18e9d9..79b1b39e5e7 100644 --- a/src/engine/srv.c +++ b/src/engine/srv.c @@ -214,6 +214,43 @@ dss_sleep(uint64_t msec) return 0; } +/** + * Wait for the eventual is ready until it is timeout. + * + * \param[in] eventual eventual to be waitted. + * \param[in] status eventual status. + * \param[in] timeout timeout seconds. + * \param[in] step sleep seconds if it is not ready. + * + * \return status if eventual is ready or failed within the timeout. + * DER_TIMEOUT if it is not ready within timeout. + */ +int +dss_eventual_timeout_wait(ABT_eventual eventual, void *status, uint32_t timeout, uint32_t step) +{ + ABT_bool is_ready; + uint64_t start = daos_gettime_coarse(); + int rc; + + while (1) { + rc = ABT_eventual_test(eventual, status, &is_ready); + if (rc != ABT_SUCCESS || is_ready == ABT_TRUE) { + if (rc != ABT_SUCCESS) + rc = dss_abterr2der(rc); + break; + } + + if (daos_gettime_coarse() - start >= timeout) { + D_ERROR("Started from "DF_U64", timeout after %u\n", start, timeout); + rc = -DER_TIMEDOUT; + break; + } + dss_sleep(step); + } + + return rc; +} + struct dss_rpc_cntr * dss_rpc_cntr_get(enum dss_rpc_cntr_id id) { diff --git a/src/include/daos/common.h b/src/include/daos/common.h index 6bad86f91b8..a6098ae14ad 100644 --- a/src/include/daos/common.h +++ b/src/include/daos/common.h @@ -906,6 +906,9 @@ enum { #define DAOS_POOL_EVICT_FAIL (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0xa0) #define DAOS_POOL_RFCHECK_FAIL (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0xa1) +#define DAOS_OBJ_FAIL_BULK_TIMEOUT (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0xa3) +#define DAOS_OBJ_FAIL_BULK_ABORT (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0xa4) + #define DAOS_CHK_CONT_ORPHAN (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0xb0) #define DAOS_CHK_CONT_BAD_LABEL (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0xb1) #define DAOS_CHK_LEADER_BLOCK (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0xb2) diff --git a/src/include/daos_srv/daos_engine.h b/src/include/daos_srv/daos_engine.h index b5faa001176..77eaff5a20e 100644 --- a/src/include/daos_srv/daos_engine.h +++ b/src/include/daos_srv/daos_engine.h @@ -451,6 +451,9 @@ int dss_ult_periodic(void (*func)(void *), void *arg, int xs_type, int tgt_id, int dss_sleep(uint64_t ms); +int dss_eventual_timeout_wait(ABT_eventual eventual, void *status, uint32_t timeout, + uint32_t step); + /* Pack return codes with additional argument to reduce */ struct dss_stream_arg_type { /** return value */ diff --git a/src/object/srv_internal.h b/src/object/srv_internal.h index a24986247a5..1613cf67033 100644 --- a/src/object/srv_internal.h +++ b/src/object/srv_internal.h @@ -116,6 +116,10 @@ struct migrate_cont_hdl { struct obj_bulk_args { ABT_eventual eventual; uint64_t bulk_size; + crt_bulk_t *bulk_hdls; + crt_bulk_opid_t *bulk_opids; + uint16_t bulk_hdls_max_size; + uint16_t bulk_hdls_size; int bulks_inflight; int result; bool inited; diff --git a/src/object/srv_obj.c b/src/object/srv_obj.c index a51682b4785..24d5a12d685 100644 --- a/src/object/srv_obj.c +++ b/src/object/srv_obj.c @@ -230,6 +230,63 @@ obj_rw_reply(crt_rpc_t *rpc, int status, uint64_t epoch, } } +#define BULK_HANDLE_MIN_SIZE 5 +static int +obj_bulk_args_insert_hdl_opid(struct obj_bulk_args *arg, crt_bulk_t bulk_hdl, + crt_bulk_opid_t bulk_opid) +{ + D_ASSERT(arg->bulk_hdls_size <= arg->bulk_hdls_max_size); + if (arg->bulk_hdls_size == arg->bulk_hdls_max_size - 1) { + crt_bulk_t *new_hdls; + crt_bulk_opid_t *new_opids; + + D_ALLOC_ARRAY(new_hdls, arg->bulk_hdls_size + BULK_HANDLE_MIN_SIZE); + if (new_hdls == NULL) + return -DER_NOMEM; + + memcpy(new_hdls, arg->bulk_hdls, arg->bulk_hdls_size * sizeof(*arg->bulk_hdls)); + if (arg->bulk_hdls_size > BULK_HANDLE_MIN_SIZE) + D_FREE(arg->bulk_hdls); + arg->bulk_hdls = new_hdls; + + D_ALLOC_ARRAY(new_opids, arg->bulk_hdls_size + BULK_HANDLE_MIN_SIZE); + if (new_hdls == NULL) + return -DER_NOMEM; + + memcpy(new_opids, arg->bulk_opids, arg->bulk_hdls_size * sizeof(*arg->bulk_opids)); + if (arg->bulk_hdls_size > BULK_HANDLE_MIN_SIZE) + D_FREE(arg->bulk_opids); + + arg->bulk_opids = new_opids; + arg->bulk_hdls_max_size = arg->bulk_hdls_size + BULK_HANDLE_MIN_SIZE; + } + + /* Insert bulk_hdl(key) and bulk_opid(value) to the obj_bulk_args, so if the bulk transfer + * complete, the hdl and opid will be removed, otherwise the bulk will be aborted if the bulk + * transfer can not be finished in time. + */ + arg->bulk_hdls[arg->bulk_hdls_size] = bulk_hdl; + arg->bulk_opids[arg->bulk_hdls_size] = bulk_opid; + arg->bulk_hdls_size++; + + return 0; +} + +static void +obj_bulk_args_delete_hdl_opid(struct obj_bulk_args *arg, crt_bulk_t bulk_hdl) +{ + int i; + + D_ASSERT(arg->bulk_hdls_size <= arg->bulk_hdls_max_size); + for (i = 0; i < arg->bulk_hdls_size; i++) { + if (arg->bulk_hdls[i] == bulk_hdl) { + arg->bulk_hdls[i] = NULL; + arg->bulk_opids[i] = NULL; + return; + } + } +} + static int obj_bulk_comp_cb(const struct crt_bulk_cb_info *cb_info) { @@ -252,30 +309,53 @@ obj_bulk_comp_cb(const struct crt_bulk_cb_info *cb_info) D_ASSERT(arg->bulks_inflight > 0); arg->bulks_inflight--; - if (arg->bulks_inflight == 0) + if (arg->bulks_inflight == 0 && + !DAOS_FAIL_CHECK(DAOS_OBJ_FAIL_BULK_TIMEOUT)) ABT_eventual_set(arg->eventual, &arg->result, sizeof(arg->result)); + obj_bulk_args_delete_hdl_opid((struct obj_bulk_args *)cb_info->bci_arg, + bulk_desc->bd_local_hdl); crt_req_decref(rpc); return cb_info->bci_rc; } +static void +obj_bulk_args_bulks_abort(crt_rpc_t *rpc, struct obj_bulk_args *arg) +{ + int i; + + D_ASSERT(arg->bulk_hdls_size <= arg->bulk_hdls_max_size); + for (i = 0; i < arg->bulk_hdls_size; i++) { + if (arg->bulk_opids[i] != NULL) { + D_DEBUG(DB_IO, "abort bulk %p/%p\n", arg->bulk_opids[i], + arg->bulk_hdls[i]); + D_ASSERT(arg->bulk_hdls[i] != NULL); + crt_bulk_abort(rpc->cr_ctx, arg->bulk_opids[i]); + } + } +} + static inline int bulk_cp(const struct crt_bulk_cb_info *cb_info) { struct crt_bulk_desc *bulk_desc; + int rc; + + rc = obj_bulk_comp_cb(cb_info); bulk_desc = cb_info->bci_bulk_desc; D_ASSERT(bulk_desc->bd_local_hdl != CRT_BULK_NULL); crt_bulk_free(bulk_desc->bd_local_hdl); bulk_desc->bd_local_hdl = CRT_BULK_NULL; - return obj_bulk_comp_cb(cb_info); + return rc; } static inline int cached_bulk_cp(const struct crt_bulk_cb_info *cb_info) { + return obj_bulk_comp_cb(cb_info); } @@ -439,6 +519,9 @@ bulk_transfer_sgl(daos_handle_t ioh, crt_rpc_t *rpc, crt_bulk_t remote_bulk, D_ASSERT(remote_size > remote_off); if (length > (remote_size - remote_off)) { rc = -DER_OVERFLOW; + if (!cached_bulk) + crt_bulk_free(local_bulk); + D_ERROR("Remote bulk isn't large enough. local_sz:%zu, remote_sz:%zu, " "remote_off:%zu, "DF_RC"\n", length, remote_size, remote_off, DP_RC(rc)); @@ -475,6 +558,10 @@ bulk_transfer_sgl(daos_handle_t ioh, crt_rpc_t *rpc, crt_bulk_t remote_bulk, } remote_off += length; + /* Since there are no progress happening after crt_bulk_transfer, so cached_bulk_cp + * or bulk_cp should not be called yet. + */ + obj_bulk_args_insert_hdl_opid(p_arg, local_bulk, bulk_opid); /* Give cart progress a chance to complete some in-flight bulk transfers */ if (bulk_iovs >= MAX_BULK_IOVS) { bulk_iovs = 0; @@ -491,10 +578,13 @@ obj_bulk_transfer(crt_rpc_t *rpc, crt_bulk_op_t bulk_op, bool bulk_bind, crt_bul int sgl_nr, int bulk_nr, struct obj_bulk_args *p_arg, struct ds_cont_hdl *coh) { struct obj_bulk_args arg = { 0 }; + crt_bulk_t bulk_hdls[BULK_HANDLE_MIN_SIZE]; + crt_bulk_opid_t bulk_opids[BULK_HANDLE_MIN_SIZE]; int i, rc, *status, ret; int skip_nr = 0; bool async = true; uint64_t time = daos_get_ntime(); + uint32_t timeout; if (unlikely(sgl_nr > bulk_nr)) { D_ERROR("Invalid sgl_nr vs bulk_nr: %d/%d\n", sgl_nr, bulk_nr); @@ -515,6 +605,10 @@ obj_bulk_transfer(crt_rpc_t *rpc, crt_bulk_op_t bulk_op, bool bulk_bind, crt_bul if (rc != 0) return dss_abterr2der(rc); + p_arg->bulk_hdls = bulk_hdls; + p_arg->bulk_opids = bulk_opids; + p_arg->bulk_hdls_max_size = BULK_HANDLE_MIN_SIZE; + p_arg->bulk_hdls_size = 0; p_arg->inited = true; D_DEBUG(DB_IO, "bulk_op %d, sgl_nr %d, bulk_nr %d\n", bulk_op, sgl_nr, bulk_nr); @@ -589,9 +683,23 @@ obj_bulk_transfer(crt_rpc_t *rpc, crt_bulk_op_t bulk_op, bool bulk_bind, crt_bul if (async) return rc; - ret = ABT_eventual_wait(p_arg->eventual, (void **)&status); + if (DAOS_FAIL_CHECK(DAOS_OBJ_FAIL_BULK_ABORT)) + timeout = 0; + else + crt_req_get_timeout(rpc, &timeout); + + ret = dss_eventual_timeout_wait(p_arg->eventual, (void **)&status, timeout, 1); + if (ret == -DER_TIMEDOUT) { + obj_bulk_args_bulks_abort(rpc, p_arg); + while (p_arg->bulks_inflight > 0) { + D_INFO("Wait for comp callback to be executed inflight %d\n", + p_arg->bulks_inflight); + dss_sleep(1); + } + } + if (rc == 0) - rc = ret ? dss_abterr2der(ret) : *status; + rc = ret ? ret : *status; ABT_eventual_free(&p_arg->eventual); @@ -620,6 +728,10 @@ obj_bulk_transfer(crt_rpc_t *rpc, crt_bulk_op_t bulk_op, bool bulk_bind, crt_bul *fbuffer += 0x2; d_sgl_fini(&fsgl, false); } + if (p_arg->bulk_hdls != bulk_hdls) + D_FREE(p_arg->bulk_hdls); + if (p_arg->bulk_opids != bulk_opids) + D_FREE(p_arg->bulk_opids); return rc; } diff --git a/src/tests/suite/daos_obj.c b/src/tests/suite/daos_obj.c index 960c5b0a02e..b094d25fa02 100644 --- a/src/tests/suite/daos_obj.c +++ b/src/tests/suite/daos_obj.c @@ -5362,6 +5362,64 @@ io_57(void **state) reintegrate_single_pool_rank(arg, 0, false); } +void +io_bulk_timeout(void **state) +{ + daos_obj_id_t oid; + test_arg_t *arg = *state; + struct ioreq req; + char *fetch_buf; + char *update_buf; + int size =1048576; + + oid = daos_test_oid_gen(arg->coh, dts_obj_class, 0, 0, arg->myrank); + ioreq_init(&req, arg->coh, oid, DAOS_IOD_ARRAY, arg); + + D_ALLOC(fetch_buf, size); + assert_non_null(fetch_buf); + D_ALLOC(update_buf, size); + assert_non_null(update_buf); + dts_buf_render(update_buf, size); + + print_message("timeout bulk\n"); + if (arg->myrank == 0) + daos_debug_set_params(arg->group, -1, DMG_KEY_FAIL_LOC, + DAOS_OBJ_FAIL_BULK_TIMEOUT | DAOS_FAIL_ONCE, + 0, NULL); + + /** Insert */ + insert_single("dkey", "akey", 0, update_buf, size, DAOS_TX_NONE, &req); + + /** Lookup */ + memset(fetch_buf, 0, size); + lookup_single("dkey", "akey", 0, fetch_buf, size, DAOS_TX_NONE, &req); + + /** Verify data consistency */ + assert_int_equal(req.iod[0].iod_size, size); + assert_memory_equal(update_buf, fetch_buf, size); + + print_message("abort bulk\n"); + if (arg->myrank == 0) + daos_debug_set_params(arg->group, -1, DMG_KEY_FAIL_LOC, + DAOS_OBJ_FAIL_BULK_ABORT | DAOS_FAIL_ONCE, + 0, NULL); + + /** Insert */ + insert_single("dkey1", "akey", 0, update_buf, size, DAOS_TX_NONE, &req); + + /** Lookup */ + memset(fetch_buf, 0, size); + lookup_single("dkey1", "akey", 0, fetch_buf, size, DAOS_TX_NONE, &req); + + /** Verify data consistency */ + assert_int_equal(req.iod[0].iod_size, size); + assert_memory_equal(update_buf, fetch_buf, size); + + D_FREE(update_buf); + D_FREE(fetch_buf); + ioreq_fini(&req); +} + static const struct CMUnitTest io_tests[] = { { "IO1: simple update/fetch/verify", io_simple, async_disable, test_case_teardown}, @@ -5476,6 +5534,8 @@ static const struct CMUnitTest io_tests[] = { io_56, async_disable, test_case_teardown}, { "IO57: collective object query with rank_0 excluded", io_57, rebuild_sub_rf1_setup, test_teardown}, + { "IO58: IO bulk timeout", + io_bulk_timeout, async_disable, test_case_teardown}, }; int