diff --git a/src/dtx/dtx_common.c b/src/dtx/dtx_common.c index 43ba3d64752..ee119d4b965 100644 --- a/src/dtx/dtx_common.c +++ b/src/dtx/dtx_common.c @@ -1914,10 +1914,16 @@ dtx_handle_resend(daos_handle_t coh, struct dtx_id *dti, */ #define DTX_EXEC_STEP_LENGTH DTX_THRESHOLD_COUNT -struct dtx_ult_arg { +struct dtx_chore { + struct dss_chore chore; dtx_sub_func_t func; void *func_arg; struct dtx_leader_handle *dlh; + + /* Chore-internal state variables */ + uint32_t i; + uint32_t j; + uint32_t k; }; static void @@ -1970,20 +1976,34 @@ dtx_sub_comp_cb(struct dtx_leader_handle *dlh, int idx, int rc) idx, tgt->st_rank, tgt->st_tgt_idx, tgt->st_flags, rc); } -static void -dtx_leader_exec_ops_ult(void *arg) +static enum dss_chore_status +dtx_leader_exec_ops_chore(struct dss_chore *chore, bool is_reentrance) { - struct dtx_ult_arg *ult_arg = arg; - struct dtx_leader_handle *dlh = ult_arg->dlh; + struct dtx_chore *dtx_chore = container_of(chore, struct dtx_chore, chore); + struct dtx_leader_handle *dlh = dtx_chore->dlh; struct dtx_sub_status *sub; struct daos_shard_tgt *tgt; - uint32_t i; - uint32_t j; - uint32_t k; int rc = 0; - for (i = dlh->dlh_forward_idx, j = 0, k = 0; j < dlh->dlh_forward_cnt; i++, j++) { - sub = &dlh->dlh_subs[i]; + /* + * If this is the first entrance, initialize the chore-internal state + * variables. + */ + if (is_reentrance) { + D_DEBUG(DB_TRACE, "%p: resume: i=%u j=%u k=%u forward_cnt=%u\n", chore, + dtx_chore->i, dtx_chore->j, dtx_chore->k, dlh->dlh_forward_cnt); + dtx_chore->i++; + dtx_chore->j++; + } else { + D_DEBUG(DB_TRACE, "%p: initialize: forward_idx=%u forward_cnt=%u\n", chore, + dlh->dlh_forward_idx, dlh->dlh_forward_cnt); + dtx_chore->i = dlh->dlh_forward_idx; + dtx_chore->j = 0; + dtx_chore->k = 0; + } + + for (; dtx_chore->j < dlh->dlh_forward_cnt; dtx_chore->i++, dtx_chore->j++) { + sub = &dlh->dlh_subs[dtx_chore->i]; tgt = &sub->dss_tgt; if (dlh->dlh_normal_sub_done == 0) { @@ -1991,7 +2011,7 @@ dtx_leader_exec_ops_ult(void *arg) sub->dss_comp = 0; if (unlikely(tgt->st_flags & DTF_DELAY_FORWARD)) { - dtx_sub_comp_cb(dlh, i, 0); + dtx_sub_comp_cb(dlh, dtx_chore->i, 0); continue; } } else { @@ -2003,33 +2023,35 @@ dtx_leader_exec_ops_ult(void *arg) } if (tgt->st_rank == DAOS_TGT_IGNORE || - (i == daos_fail_value_get() && DAOS_FAIL_CHECK(DAOS_DTX_SKIP_PREPARE))) { + (dtx_chore->i == daos_fail_value_get() && + DAOS_FAIL_CHECK(DAOS_DTX_SKIP_PREPARE))) { if (dlh->dlh_normal_sub_done == 0 || tgt->st_flags & DTF_DELAY_FORWARD) - dtx_sub_comp_cb(dlh, i, 0); + dtx_sub_comp_cb(dlh, dtx_chore->i, 0); continue; } - rc = ult_arg->func(dlh, ult_arg->func_arg, i, dtx_sub_comp_cb); + rc = dtx_chore->func(dlh, dtx_chore->func_arg, dtx_chore->i, dtx_sub_comp_cb); if (rc != 0) { if (sub->dss_comp == 0) - dtx_sub_comp_cb(dlh, i, rc); + dtx_sub_comp_cb(dlh, dtx_chore->i, rc); break; } /* Yield to avoid holding CPU for too long time. */ - if ((++k) % DTX_RPC_YIELD_THD == 0) - ABT_thread_yield(); + if (++(dtx_chore->k) % DTX_RPC_YIELD_THD == 0) + return DSS_CHORE_YIELD; } if (rc != 0) { - for (i++, j++; j < dlh->dlh_forward_cnt; i++, j++) { - sub = &dlh->dlh_subs[i]; + for (dtx_chore->i++, dtx_chore->j++; dtx_chore->j < dlh->dlh_forward_cnt; + dtx_chore->i++, dtx_chore->j++) { + sub = &dlh->dlh_subs[dtx_chore->i]; tgt = &sub->dss_tgt; if (dlh->dlh_normal_sub_done == 0 || tgt->st_flags & DTF_DELAY_FORWARD) { sub->dss_result = 0; sub->dss_comp = 0; - dtx_sub_comp_cb(dlh, i, 0); + dtx_sub_comp_cb(dlh, dtx_chore->i, 0); } } } @@ -2039,6 +2061,8 @@ dtx_leader_exec_ops_ult(void *arg) D_ASSERTF(rc == ABT_SUCCESS, "ABT_future_set failed [%u, %u), for delay %s: %d\n", dlh->dlh_forward_idx, dlh->dlh_forward_idx + dlh->dlh_forward_cnt, dlh->dlh_normal_sub_done == 1 ? "yes" : "no", rc); + + return DSS_CHORE_DONE; } /** @@ -2048,15 +2072,15 @@ int dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, dtx_agg_cb_t agg_cb, int allow_failure, void *func_arg) { - struct dtx_ult_arg ult_arg; + struct dtx_chore dtx_chore; int sub_cnt = dlh->dlh_normal_sub_cnt + dlh->dlh_delay_sub_cnt; int rc = 0; int local_rc = 0; int remote_rc = 0; - ult_arg.func = func; - ult_arg.func_arg = func_arg; - ult_arg.dlh = dlh; + dtx_chore.func = func; + dtx_chore.func_arg = func_arg; + dtx_chore.dlh = dlh; dlh->dlh_result = 0; dlh->dlh_allow_failure = allow_failure; @@ -2092,15 +2116,10 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, D_GOTO(out, rc = dss_abterr2der(rc)); } - /* - * NOTE: Ideally, we probably should create ULT for each shard, but for performance - * reasons, let's only create one for all remote targets for now. - */ - rc = dss_ult_create(dtx_leader_exec_ops_ult, &ult_arg, DSS_XS_IOFW, - dss_get_module_info()->dmi_tgt_id, DSS_DEEP_STACK_SZ, NULL); + rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore); if (rc != 0) { - D_ERROR("ult create failed [%u, %u] (2): "DF_RC"\n", - dlh->dlh_forward_idx, dlh->dlh_forward_cnt, DP_RC(rc)); + DL_ERROR(rc, "chore create failed [%u, %u] (2)", dlh->dlh_forward_idx, + dlh->dlh_forward_cnt); ABT_future_free(&dlh->dlh_future); goto out; } @@ -2168,10 +2187,9 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, /* The ones without DELAY flag will be skipped when scan the targets array. */ dlh->dlh_forward_cnt = dlh->dlh_normal_sub_cnt + dlh->dlh_delay_sub_cnt; - rc = dss_ult_create(dtx_leader_exec_ops_ult, &ult_arg, DSS_XS_IOFW, - dss_get_module_info()->dmi_tgt_id, DSS_DEEP_STACK_SZ, NULL); + rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore); if (rc != 0) { - D_ERROR("ult create failed (4): "DF_RC"\n", DP_RC(rc)); + DL_ERROR(rc, "chore create failed (4)"); ABT_future_free(&dlh->dlh_future); goto out; } diff --git a/src/dtx/dtx_rpc.c b/src/dtx/dtx_rpc.c index 324f8bffd3a..03b0b542383 100644 --- a/src/dtx/dtx_rpc.c +++ b/src/dtx/dtx_rpc.c @@ -363,6 +363,8 @@ dtx_req_wait(struct dtx_req_args *dra) } struct dtx_common_args { + struct dss_chore dca_chore; + ABT_eventual dca_chore_eventual; struct dtx_req_args dca_dra; d_list_t dca_head; struct btr_root dca_tree_root; @@ -373,57 +375,76 @@ struct dtx_common_args { d_rank_t dca_rank; uint32_t dca_tgtid; struct ds_cont_child *dca_cont; - ABT_thread dca_helper; struct dtx_id dca_dti_inline; struct dtx_id *dca_dtis; struct dtx_entry **dca_dtes; + + /* Chore-internal state variables */ + struct dtx_req_rec *dca_drr; + int dca_i; }; +/* If is_reentrance, this function ignores len. */ static int -dtx_req_list_send(struct dtx_common_args *dca, daos_epoch_t epoch, int len) +dtx_req_list_send(struct dtx_common_args *dca, daos_epoch_t epoch, int len, bool is_reentrance) { struct dtx_req_args *dra = &dca->dca_dra; - struct dtx_req_rec *drr; int rc; - int i = 0; - dra->dra_length = len; + if (!is_reentrance) { + dra->dra_length = len; + + rc = ABT_future_create(len, dtx_req_list_cb, &dra->dra_future); + if (rc != ABT_SUCCESS) { + D_ERROR("ABT_future_create failed for opc %x, len = %d: " + "rc = %d.\n", dra->dra_opc, len, rc); + return dss_abterr2der(rc); + } - rc = ABT_future_create(len, dtx_req_list_cb, &dra->dra_future); - if (rc != ABT_SUCCESS) { - D_ERROR("ABT_future_create failed for opc %x, len = %d: " - "rc = %d.\n", dra->dra_opc, len, rc); - return dss_abterr2der(rc); + D_DEBUG(DB_TRACE, "%p: DTX req for opc %x, future %p (%d) start.\n", + &dca->dca_chore, dra->dra_opc, dra->dra_future, len); } - D_DEBUG(DB_TRACE, "DTX req for opc %x, future %p start.\n", dra->dra_opc, dra->dra_future); + /* + * Begin or continue an iteration over dca_head. When beginning the + * iteration, dca->dca_drr does not point to a real entry, and is only + * safe for d_list_for_each_entry_continue. + */ + if (!is_reentrance) { + dca->dca_drr = d_list_entry(&dca->dca_head, struct dtx_req_rec, drr_link); + dca->dca_i = 0; + } + /* DO NOT add any line here! See the comment on dca->dca_drr above. */ + d_list_for_each_entry_continue(dca->dca_drr, &dca->dca_head, drr_link) + { + D_DEBUG(DB_TRACE, "chore=%p: drr=%p i=%d\n", &dca->dca_chore, dca->dca_drr, + dca->dca_i); - d_list_for_each_entry(drr, &dca->dca_head, drr_link) { - drr->drr_parent = dra; - drr->drr_result = 0; + dca->dca_drr->drr_parent = dra; + dca->dca_drr->drr_result = 0; - if (unlikely(dra->dra_opc == DTX_COMMIT && i == 0 && + if (unlikely(dra->dra_opc == DTX_COMMIT && dca->dca_i == 0 && DAOS_FAIL_CHECK(DAOS_DTX_FAIL_COMMIT))) - rc = dtx_req_send(drr, 1); + rc = dtx_req_send(dca->dca_drr, 1); else - rc = dtx_req_send(drr, epoch); + rc = dtx_req_send(dca->dca_drr, epoch); if (rc != 0) { /* If the first sub-RPC failed, then break, otherwise * other remote replicas may have already received the * RPC and executed it, so have to go ahead. */ - if (i == 0) { + if (dca->dca_i == 0) { ABT_future_free(&dra->dra_future); return rc; } } /* Yield to avoid holding CPU for too long time. */ - if (++i % DTX_RPC_YIELD_THD == 0) - ABT_thread_yield(); + if (++(dca->dca_i) % DTX_RPC_YIELD_THD == 0) + return DSS_CHORE_YIELD; } - return 0; + return DSS_CHORE_DONE; } static int @@ -599,16 +620,22 @@ dtx_classify_one(struct ds_pool *pool, daos_handle_t tree, d_list_t *head, int * return rc > 0 ? 0 : rc; } -static int -dtx_rpc_internal(struct dtx_common_args *dca) +static enum dss_chore_status +dtx_rpc_helper(struct dss_chore *chore, bool is_reentrance) { + struct dtx_common_args *dca = container_of(chore, struct dtx_common_args, dca_chore); struct ds_pool *pool = dca->dca_cont->sc_pool->spc_pool; struct umem_attr uma = { 0 }; int length = 0; int rc; int i; - if (dca->dca_dra.dra_opc != DTX_REFRESH) { + if (is_reentrance) { + D_DEBUG(DB_TRACE, "%p: skip to send\n", &dca->dca_chore); + goto send; + } + + if (dca->dca_dtes != NULL) { D_ASSERT(dca->dca_dtis != NULL); if (dca->dca_count > 1) { @@ -616,7 +643,7 @@ dtx_rpc_internal(struct dtx_common_args *dca) rc = dbtree_create_inplace(DBTREE_CLASS_DTX_CF, 0, DTX_CF_BTREE_ORDER, &uma, &dca->dca_tree_root, &dca->dca_tree_hdl); if (rc != 0) - return rc; + goto done; } ABT_rwlock_rdlock(pool->sp_lock); @@ -626,7 +653,7 @@ dtx_rpc_internal(struct dtx_common_args *dca) dca->dca_rank, dca->dca_tgtid); if (rc < 0) { ABT_rwlock_unlock(pool->sp_lock); - return rc; + goto done; } daos_dti_copy(&dca->dca_dtis[i], &dca->dca_dtes[i]->dte_xid); @@ -636,30 +663,33 @@ dtx_rpc_internal(struct dtx_common_args *dca) /* For DTX_CHECK, if no other available target(s), then current target is the * unique valid one (and also 'prepared'), then related DTX can be committed. */ - if (d_list_empty(&dca->dca_head)) - return dca->dca_dra.dra_opc == DTX_CHECK ? DTX_ST_PREPARED : 0; + if (d_list_empty(&dca->dca_head)) { + rc = (dca->dca_dra.dra_opc == DTX_CHECK ? DTX_ST_PREPARED : 0); + goto done; + } } else { length = dca->dca_count; } D_ASSERT(length > 0); - return dtx_req_list_send(dca, dca->dca_epoch, length); -} - -static void -dtx_rpc_helper(void *arg) -{ - struct dtx_common_args *dca = arg; - int rc; - - rc = dtx_rpc_internal(dca); +send: + rc = dtx_req_list_send(dca, dca->dca_epoch, length, is_reentrance); + if (rc == DSS_CHORE_YIELD) + return DSS_CHORE_YIELD; + if (rc == DSS_CHORE_DONE) + rc = 0; +done: if (rc != 0) dca->dca_dra.dra_result = rc; - - D_CDEBUG(rc < 0, DLOG_ERR, DB_TRACE, - "DTX helper ULT for %u exit: %d\n", dca->dca_dra.dra_opc, rc); + D_CDEBUG(rc < 0, DLOG_ERR, DB_TRACE, "%p: DTX RPC chore for %u done: %d\n", chore, + dca->dca_dra.dra_opc, rc); + if (dca->dca_chore_eventual != ABT_EVENTUAL_NULL) { + rc = ABT_eventual_set(dca->dca_chore_eventual, NULL, 0); + D_ASSERTF(rc == ABT_SUCCESS, "ABT_eventual_set: %d\n", rc); + } + return DSS_CHORE_DONE; } static int @@ -672,6 +702,7 @@ dtx_rpc_prep(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry ** memset(dca, 0, sizeof(*dca)); + dca->dca_chore_eventual = ABT_EVENTUAL_NULL; D_INIT_LIST_HEAD(&dca->dca_head); dca->dca_tree_hdl = DAOS_HDL_INVAL; dca->dca_epoch = epoch; @@ -679,7 +710,6 @@ dtx_rpc_prep(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry ** crt_group_rank(NULL, &dca->dca_rank); dca->dca_tgtid = dss_get_module_info()->dmi_tgt_id; dca->dca_cont = cont; - dca->dca_helper = ABT_THREAD_NULL; dca->dca_dtes = dtes; dra = &dca->dca_dra; @@ -705,11 +735,18 @@ dtx_rpc_prep(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry ** } /* Use helper ULT to handle DTX RPC if there are enough helper XS. */ - if (dss_has_enough_helper()) - rc = dss_ult_create(dtx_rpc_helper, dca, DSS_XS_IOFW, dca->dca_tgtid, - DSS_DEEP_STACK_SZ, &dca->dca_helper); - else - rc = dtx_rpc_internal(dca); + if (dss_has_enough_helper()) { + rc = ABT_eventual_create(0, &dca->dca_chore_eventual); + if (rc != ABT_SUCCESS) { + D_ERROR("failed to create eventual: %d\n", rc); + rc = dss_abterr2der(rc); + goto out; + } + rc = dss_chore_delegate(&dca->dca_chore, dtx_rpc_helper); + } else { + dss_chore_diy(&dca->dca_chore, dtx_rpc_helper); + rc = dca->dca_dra.dra_result; + } out: return rc; @@ -721,8 +758,12 @@ dtx_rpc_post(struct dtx_common_args *dca, int ret, bool keep_head) struct dtx_req_rec *drr; int rc; - if (dca->dca_helper != ABT_THREAD_NULL) - ABT_thread_free(&dca->dca_helper); + if (dca->dca_chore_eventual != ABT_EVENTUAL_NULL) { + rc = ABT_eventual_wait(dca->dca_chore_eventual, NULL); + D_ASSERTF(rc == ABT_SUCCESS, "ABT_eventual_wait: %d\n", rc); + rc = ABT_eventual_free(&dca->dca_chore_eventual); + D_ASSERTF(rc == ABT_SUCCESS, "ABT_eventual_free: %d\n", rc); + } rc = dtx_req_wait(&dca->dca_dra); diff --git a/src/engine/sched.c b/src/engine/sched.c index aae2fb5554b..988b8f2b6aa 100644 --- a/src/engine/sched.c +++ b/src/engine/sched.c @@ -1441,8 +1441,8 @@ sched_stop(struct dss_xstream *dx) process_all(dx); } -void -sched_cond_wait(ABT_cond cond, ABT_mutex mutex) +static void +cond_wait(ABT_cond cond, ABT_mutex mutex, bool for_business) { struct dss_xstream *dx = dss_current_xstream(); struct sched_info *info = &dx->dx_sched_info; @@ -1451,6 +1451,20 @@ sched_cond_wait(ABT_cond cond, ABT_mutex mutex) ABT_cond_wait(cond, mutex); D_ASSERT(info->si_wait_cnt > 0); info->si_wait_cnt -= 1; + if (for_business) + info->si_stats.ss_busy_ts = info->si_cur_ts; +} + +void +sched_cond_wait(ABT_cond cond, ABT_mutex mutex) +{ + cond_wait(cond, mutex, false /* for_business */); +} + +void +sched_cond_wait_for_business(ABT_cond cond, ABT_mutex mutex) +{ + cond_wait(cond, mutex, true /* for_business */); } uint64_t diff --git a/src/engine/srv.c b/src/engine/srv.c index 246ee975c64..986d8ed04c4 100644 --- a/src/engine/srv.c +++ b/src/engine/srv.c @@ -370,6 +370,7 @@ dss_srv_handler(void *arg) int rc; bool track_mem = false; bool signal_caller = true; + bool with_chore_queue = dx->dx_iofw && !dx->dx_main_xs; rc = dss_xstream_set_affinity(dx); if (rc) @@ -500,6 +501,16 @@ dss_srv_handler(void *arg) } } + if (with_chore_queue) { + rc = dss_chore_queue_start(dx); + if (rc != 0) { + DL_ERROR(rc, "failed to start chore queue"); + ABT_future_set(dx->dx_shutdown, dx); + wait_all_exited(dx, dmi); + goto nvme_fini; + } + } + dmi->dmi_xstream = dx; ABT_mutex_lock(xstream_data.xd_mutex); /* initialized everything for the ULT, notify the creator */ @@ -546,6 +557,9 @@ dss_srv_handler(void *arg) if (dx->dx_comm) dx->dx_progress_started = false; + if (with_chore_queue) + dss_chore_queue_stop(dx); + wait_all_exited(dx, dmi); if (dmi->dmi_dp) { daos_profile_destroy(dmi->dmi_dp); @@ -755,6 +769,8 @@ dss_start_one_xstream(hwloc_cpuset_t cpus, int tag, int xs_id) } else { dx->dx_main_xs = (xs_id >= dss_sys_xs_nr) && (xs_offset == 0); } + /* See the DSS_XS_IOFW case in sched_ult2xs. */ + dx->dx_iofw = xs_id >= dss_sys_xs_nr && (!dx->dx_main_xs || dss_tgt_offload_xs_nr == 0); dx->dx_dsc_started = false; /** @@ -783,6 +799,12 @@ dss_start_one_xstream(hwloc_cpuset_t cpus, int tag, int xs_id) D_GOTO(out_dx, rc); } + rc = dss_chore_queue_init(dx); + if (rc != 0) { + DL_ERROR(rc, "initialize chore queue fails"); + goto out_sched; + } + dss_mem_stats_init(&dx->dx_mem_stats, xs_id); /** start XS, ABT rank 0 is reserved for the primary xstream */ @@ -790,7 +812,7 @@ dss_start_one_xstream(hwloc_cpuset_t cpus, int tag, int xs_id) &dx->dx_xstream); if (rc != ABT_SUCCESS) { D_ERROR("create xstream fails %d\n", rc); - D_GOTO(out_sched, rc = dss_abterr2der(rc)); + D_GOTO(out_chore_queue, rc = dss_abterr2der(rc)); } rc = ABT_thread_attr_create(&attr); @@ -839,6 +861,8 @@ dss_start_one_xstream(hwloc_cpuset_t cpus, int tag, int xs_id) ABT_thread_attr_free(&attr); ABT_xstream_join(dx->dx_xstream); ABT_xstream_free(&dx->dx_xstream); +out_chore_queue: + dss_chore_queue_fini(dx); out_sched: dss_sched_fini(dx); out_dx: @@ -898,6 +922,7 @@ dss_xstreams_fini(bool force) dx = xstream_data.xd_xs_ptrs[i]; if (dx == NULL) continue; + dss_chore_queue_fini(dx); dss_sched_fini(dx); dss_xstream_free(dx); xstream_data.xd_xs_ptrs[i] = NULL; diff --git a/src/engine/srv_internal.h b/src/engine/srv_internal.h index 892e6ae3dc4..8621175b44f 100644 --- a/src/engine/srv_internal.h +++ b/src/engine/srv_internal.h @@ -60,6 +60,15 @@ struct mem_stats { uint64_t ms_current; }; +/* See dss_chore. */ +struct dss_chore_queue { + d_list_t chq_list; + bool chq_stop; + ABT_mutex chq_mutex; + ABT_cond chq_cond; + ABT_thread chq_ult; +}; + /** Per-xstream configuration data */ struct dss_xstream { char dx_name[DSS_XS_NAME_LEN]; @@ -85,6 +94,7 @@ struct dss_xstream { unsigned int dx_timeout; bool dx_main_xs; /* true for main XS */ bool dx_comm; /* true with cart context */ + bool dx_iofw; /* true for DSS_XS_IOFW XS */ bool dx_dsc_started; /* DSC progress ULT started */ struct mem_stats dx_mem_stats; /* memory usages stats on this xstream */ #ifdef ULT_MMAP_STACK @@ -93,6 +103,7 @@ struct dss_xstream { #endif bool dx_progress_started; /* Network poll started */ int dx_tag; /** tag for xstream */ + struct dss_chore_queue dx_chore_queue; }; /** Engine module's metrics */ @@ -370,4 +381,9 @@ dss_xstream_has_nvme(struct dss_xstream *dx) return false; } +int dss_chore_queue_init(struct dss_xstream *dx); +int dss_chore_queue_start(struct dss_xstream *dx); +void dss_chore_queue_stop(struct dss_xstream *dx); +void dss_chore_queue_fini(struct dss_xstream *dx); + #endif /* __DAOS_SRV_INTERNAL__ */ diff --git a/src/engine/ult.c b/src/engine/ult.c index 204381755fb..1e8743fcd89 100644 --- a/src/engine/ult.c +++ b/src/engine/ult.c @@ -610,3 +610,212 @@ dss_main_exec(void (*func)(void *), void *arg) return dss_ult_create(func, arg, DSS_XS_SELF, info->dmi_tgt_id, 0, NULL); } + +static void +dss_chore_diy_internal(struct dss_chore *chore) +{ +reenter: + D_DEBUG(DB_TRACE, "%p: status=%d\n", chore, chore->cho_status); + chore->cho_status = chore->cho_func(chore, chore->cho_status == DSS_CHORE_YIELD); + D_ASSERT(chore->cho_status != DSS_CHORE_NEW); + if (chore->cho_status == DSS_CHORE_YIELD) { + ABT_thread_yield(); + goto reenter; + } +} + +static void +dss_chore_ult(void *arg) +{ + struct dss_chore *chore = arg; + + dss_chore_diy_internal(chore); +} + +/** + * Add \a chore for \a func to the chore queue of some other xstream. + * + * \param[in] chore address of the embedded chore object + * \param[in] func function to be executed via \a chore + * + * \retval -DER_CANCEL chore queue stopping + */ +int +dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func) +{ + struct dss_module_info *info = dss_get_module_info(); + int xs_id; + struct dss_xstream *dx; + struct dss_chore_queue *queue; + + chore->cho_status = DSS_CHORE_NEW; + chore->cho_func = func; + + /* + * The dss_chore_queue_ult approach may get insufficient scheduling on + * a "main" xstream when the chore queue is long. So we fall back to + * the one-ULT-per-chore approach if there's no helper xstream. + */ + if (dss_tgt_offload_xs_nr == 0) { + D_INIT_LIST_HEAD(&chore->cho_link); + return dss_ult_create(dss_chore_ult, chore, DSS_XS_IOFW, info->dmi_tgt_id, + 0 /* stack_size */, NULL /* ult */); + } + + /* Find the chore queue. */ + xs_id = sched_ult2xs(DSS_XS_IOFW, info->dmi_tgt_id); + D_ASSERT(xs_id != -DER_INVAL); + dx = dss_get_xstream(xs_id); + D_ASSERT(dx != NULL); + queue = &dx->dx_chore_queue; + D_ASSERT(queue != NULL); + + ABT_mutex_lock(queue->chq_mutex); + if (queue->chq_stop) { + ABT_mutex_unlock(queue->chq_mutex); + return -DER_CANCELED; + } + d_list_add_tail(&chore->cho_link, &queue->chq_list); + ABT_cond_broadcast(queue->chq_cond); + ABT_mutex_unlock(queue->chq_mutex); + + D_DEBUG(DB_TRACE, "%p: tgt_id=%d -> xs_id=%d dx.tgt_id=%d\n", chore, info->dmi_tgt_id, + xs_id, dx->dx_tgt_id); + return 0; +} + +/** + * Do \a chore for \a func synchronously in the current ULT. + * + * \param[in] chore embedded chore object + * \param[in] func function to be executed via \a chore + */ +void +dss_chore_diy(struct dss_chore *chore, dss_chore_func_t func) +{ + D_INIT_LIST_HEAD(&chore->cho_link); + chore->cho_status = DSS_CHORE_NEW; + chore->cho_func = func; + + dss_chore_diy_internal(chore); +} + +static void +dss_chore_queue_ult(void *arg) +{ + struct dss_chore_queue *queue = arg; + d_list_t list = D_LIST_HEAD_INIT(list); + + D_ASSERT(queue != NULL); + D_DEBUG(DB_TRACE, "begin\n"); + + for (;;) { + struct dss_chore *chore; + struct dss_chore *chore_tmp; + bool stop = false; + + /* + * The scheduling order shall be + * + * [queue->chq_list] [list], + * + * where list contains chores that have returned + * DSS_CHORE_YIELD in the previous iteration. + */ + ABT_mutex_lock(queue->chq_mutex); + for (;;) { + if (!d_list_empty(&queue->chq_list)) { + d_list_splice_init(&queue->chq_list, &list); + break; + } + if (!d_list_empty(&list)) + break; + if (queue->chq_stop) { + stop = true; + break; + } + sched_cond_wait_for_business(queue->chq_cond, queue->chq_mutex); + } + ABT_mutex_unlock(queue->chq_mutex); + + if (stop) + break; + + d_list_for_each_entry_safe(chore, chore_tmp, &list, cho_link) { + bool is_reentrance = (chore->cho_status == DSS_CHORE_YIELD); + + D_DEBUG(DB_TRACE, "%p: before: status=%d\n", chore, chore->cho_status); + chore->cho_status = chore->cho_func(chore, is_reentrance); + D_ASSERT(chore->cho_status != DSS_CHORE_NEW); + D_DEBUG(DB_TRACE, "%p: after: status=%d\n", chore, chore->cho_status); + if (chore->cho_status == DSS_CHORE_DONE) + d_list_del_init(&chore->cho_link); + ABT_thread_yield(); + } + } + + D_DEBUG(DB_TRACE, "end\n"); +} + +int +dss_chore_queue_init(struct dss_xstream *dx) +{ + struct dss_chore_queue *queue = &dx->dx_chore_queue; + int rc; + + D_INIT_LIST_HEAD(&queue->chq_list); + queue->chq_stop = false; + + rc = ABT_mutex_create(&queue->chq_mutex); + if (rc != ABT_SUCCESS) { + D_ERROR("failed to create chore queue mutex: %d\n", rc); + return dss_abterr2der(rc); + } + + rc = ABT_cond_create(&queue->chq_cond); + if (rc != ABT_SUCCESS) { + D_ERROR("failed to create chore queue condition variable: %d\n", rc); + ABT_mutex_free(&queue->chq_mutex); + return dss_abterr2der(rc); + } + + return 0; +} + +int +dss_chore_queue_start(struct dss_xstream *dx) +{ + struct dss_chore_queue *queue = &dx->dx_chore_queue; + int rc; + + rc = daos_abt_thread_create(dx->dx_sp, dss_free_stack_cb, dx->dx_pools[DSS_POOL_GENERIC], + dss_chore_queue_ult, queue, ABT_THREAD_ATTR_NULL, + &queue->chq_ult); + if (rc != 0) { + D_ERROR("failed to create chore queue ULT: %d\n", rc); + return dss_abterr2der(rc); + } + + return 0; +} + +void +dss_chore_queue_stop(struct dss_xstream *dx) +{ + struct dss_chore_queue *queue = &dx->dx_chore_queue; + + ABT_mutex_lock(queue->chq_mutex); + queue->chq_stop = true; + ABT_cond_broadcast(queue->chq_cond); + ABT_mutex_unlock(queue->chq_mutex); + ABT_thread_free(&queue->chq_ult); +} + +void +dss_chore_queue_fini(struct dss_xstream *dx) +{ + struct dss_chore_queue *queue = &dx->dx_chore_queue; + + ABT_cond_free(&queue->chq_cond); + ABT_mutex_free(&queue->chq_mutex); +} diff --git a/src/include/daos_srv/daos_engine.h b/src/include/daos_srv/daos_engine.h index afdf267cd60..06a927b8d3f 100644 --- a/src/include/daos_srv/daos_engine.h +++ b/src/include/daos_srv/daos_engine.h @@ -338,6 +338,13 @@ int sched_req_space_check(struct sched_request *req); */ void sched_cond_wait(ABT_cond cond, ABT_mutex mutex); +/** + * Wrapper of ABT_cond_wait(), inform scheduler that it's going + * to be blocked for a relative long time. Unlike sched_cond_wait, + * after waking up, this function will prevent relaxing for a while. + */ +void sched_cond_wait_for_business(ABT_cond cond, ABT_mutex mutex); + /** * Get current monotonic time in milli-seconds. */ @@ -812,4 +819,40 @@ enum dss_drpc_call_flag { int dss_drpc_call(int32_t module, int32_t method, void *req, size_t req_size, unsigned int flags, Drpc__Response **resp); +/** Status of a chore */ +enum dss_chore_status { + DSS_CHORE_NEW, /**< ready to be scheduled for the first time (private) */ + DSS_CHORE_YIELD, /**< ready to be scheduled again */ + DSS_CHORE_DONE /**< no more scheduling required */ +}; + +struct dss_chore; + +/** + * Must return either DSS_CHORE_YIELD (if yielding to other chores) or + * DSS_CHORE_DONE (if terminating). If \a is_reentrance is true, this is not + * the first time \a chore is scheduled. A typical implementation shall + * initialize its internal state variables if \a is_reentrance is false. See + * dtx_leader_exec_ops_chore for an example. + */ +typedef enum dss_chore_status (*dss_chore_func_t)(struct dss_chore *chore, bool is_reentrance); + +/** + * Chore (opaque) + * + * A simple task (e.g., an I/O forwarding task) that yields by returning + * DSS_CHORE_YIELD instead of calling ABT_thread_yield. This data structure + * shall be embedded in the user's own task data structure, which typically + * also includes arguments and internal state variables for \a cho_func. All + * fields are private. See dtx_chore for an example. + */ +struct dss_chore { + d_list_t cho_link; + enum dss_chore_status cho_status; + dss_chore_func_t cho_func; +}; + +int dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func); +void dss_chore_diy(struct dss_chore *chore, dss_chore_func_t func); + #endif /* __DSS_API_H__ */