From 30ef9f825dafa6ca9e30064fae9ea6e588547413 Mon Sep 17 00:00:00 2001 From: Fan Yong Date: Thu, 30 May 2024 13:45:47 +0800 Subject: [PATCH] DAOS-15914 dtx: control DTX RPC to reduce network load 1. Send DTX batched commit RPCs step by step Currently, for each DTX batched commit operation, it will handle at most 512 DTX entries that may generate DTX commit RPCs to thousands of DAOS targets. We will not send out the batched RPCs all together, instead, we will send them step by step. After each step, the logic will yield and wait until replied, and then next batched RPCs. That can avoid holding too much system resources for relative long time. It is also helpful to reduce the whole system network peak load and the pressure on related targets. 2. Cleanup stale DTX based on global RPC timeout Originally, DTX cleanup logic will be triggered if the life for some stale DTX exceeds the fixed threshold DTX_CLEANUP_THD_AGE_UP (90 sec) that maybe smaller than global default RPC timeout, as to related DTX refresh RPC for cleanup logic maybe send out too early before related modification RPC(s) timeout. It increases network load unnecessarily. Then we adjust the DTX cleanup threshold based on global default RPC timeout value, and give related DTX leader sometime after default RPC timeout to commit or abort the DTX. If the DTX is still prepared after that, then trigger DTX cleanup to handle potential stale DTX entries. 3. Reorg DTX CoS logic Reduce the RPCs caused by potential repeated DTX commit. More clear names for DTX CoS API. Signed-off-by: Fan Yong --- src/cart/crt_context.c | 16 ++ src/dtx/dtx_coll.c | 2 +- src/dtx/dtx_common.c | 57 ++++--- src/dtx/dtx_cos.c | 168 +++++++++++++++----- src/dtx/dtx_internal.h | 27 ++-- src/dtx/dtx_resync.c | 4 +- src/dtx/dtx_rpc.c | 271 +++++++++++++++++---------------- src/dtx/dtx_srv.c | 53 +------ src/include/cart/api.h | 13 ++ src/include/daos_srv/dtx_srv.h | 7 +- src/include/daos_srv/vos.h | 4 +- src/object/srv_obj.c | 10 +- src/vos/tests/vts_dtx.c | 6 +- src/vos/vos_dtx.c | 6 +- 14 files changed, 373 insertions(+), 271 deletions(-) diff --git a/src/cart/crt_context.c b/src/cart/crt_context.c index a90ac3d93004..c3c5e220eff0 100644 --- a/src/cart/crt_context.c +++ b/src/cart/crt_context.c @@ -1986,6 +1986,22 @@ crt_context_set_timeout(crt_context_t crt_ctx, uint32_t timeout_sec) return rc; } +int +crt_context_get_timeout(crt_context_t crt_ctx, uint32_t *timeout_sec) +{ + struct crt_context *ctx = crt_ctx; + int rc = 0; + + if (crt_ctx == CRT_CONTEXT_NULL) { + D_ERROR("NULL context passed\n"); + rc = -DER_INVAL; + } else { + *timeout_sec = ctx->cc_timeout_sec; + } + + return rc; +} + /* Force complete the rpc. Used for handling of unreachable rpcs */ void crt_req_force_completion(struct crt_rpc_priv *rpc_priv) diff --git a/src/dtx/dtx_coll.c b/src/dtx/dtx_coll.c index 11e02ba2d39f..3ae69648c02d 100644 --- a/src/dtx/dtx_coll.c +++ b/src/dtx/dtx_coll.c @@ -305,7 +305,7 @@ dtx_coll_local_one(void *args) rc = vos_dtx_abort(cont->sc_hdl, &dcla->dcla_xid, dcla->dcla_epoch); break; case DTX_COLL_CHECK: - rc = vos_dtx_check(cont->sc_hdl, &dcla->dcla_xid, NULL, NULL, NULL, NULL, false); + rc = vos_dtx_check(cont->sc_hdl, &dcla->dcla_xid, NULL, NULL, NULL, false); if (rc == DTX_ST_INITED) { /* * For DTX_CHECK, non-ready one is equal to non-exist. Do not directly diff --git a/src/dtx/dtx_common.c b/src/dtx/dtx_common.c index 7b879ff2900b..ae9055793981 100644 --- a/src/dtx/dtx_common.c +++ b/src/dtx/dtx_common.c @@ -49,6 +49,7 @@ struct dtx_batched_cont_args { struct dtx_batched_pool_args *dbca_pool; int dbca_refs; uint32_t dbca_reg_gen; + uint32_t dbca_cleanup_thd; uint32_t dbca_deregister:1, dbca_cleanup_done:1, dbca_commit_done:1, @@ -68,6 +69,7 @@ struct dtx_cleanup_cb_args { d_list_t dcca_pc_list; int dcca_st_count; int dcca_pc_count; + uint32_t dcca_cleanup_thd; }; static inline void @@ -226,7 +228,7 @@ dtx_cleanup_iter_cb(uuid_t co_uuid, vos_iter_entry_t *ent, void *args) return 0; /* Stop the iteration if current DTX is not too old. */ - if (dtx_sec2age(ent->ie_dtx_start_time) <= DTX_CLEANUP_THD_AGE_LO) + if (dtx_sec2age(ent->ie_dtx_start_time) <= dcca->dcca_cleanup_thd) return 1; D_ASSERT(ent->ie_dtx_mbs_dsize > 0); @@ -336,6 +338,8 @@ dtx_cleanup(void *arg) D_INIT_LIST_HEAD(&dcca.dcca_pc_list); dcca.dcca_st_count = 0; dcca.dcca_pc_count = 0; + /* Cleanup stale DTX entries within about 10 seconds windows each time. */ + dcca.dcca_cleanup_thd = dbca->dbca_cleanup_thd - 10; rc = ds_cont_iter(cont->sc_pool->spc_hdl, cont->sc_uuid, dtx_cleanup_iter_cb, &dcca, VOS_ITER_DTX, 0); if (rc < 0) @@ -626,7 +630,7 @@ dtx_batched_commit_one(void *arg) int rc; cnt = dtx_fetch_committable(cont, DTX_THRESHOLD_COUNT, NULL, - DAOS_EPOCH_MAX, &dtes, &dcks, &dce); + DAOS_EPOCH_MAX, false, &dtes, &dcks, &dce); if (cnt == 0) break; @@ -754,7 +758,7 @@ dtx_batched_commit(void *arg) if (dtx_cont_opened(cont) && !dbca->dbca_deregister && dbca->dbca_cleanup_req == NULL && stat.dtx_oldest_active_time != 0 && - dtx_sec2age(stat.dtx_oldest_active_time) >= DTX_CLEANUP_THD_AGE_UP) { + dtx_sec2age(stat.dtx_oldest_active_time) >= dbca->dbca_cleanup_thd) { D_ASSERT(!dbca->dbca_cleanup_done); dtx_get_dbca(dbca); @@ -1267,6 +1271,7 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul uint32_t flags; int status = -1; int rc = 0; + int i; bool aborted = false; bool unpin = false; @@ -1366,7 +1371,7 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul D_ASSERT(dth->dth_mbs != NULL); if (dlh->dlh_coll) { - rc = dtx_add_cos(cont, dlh->dlh_coll_entry, &dth->dth_leader_oid, + rc = dtx_cos_add(cont, dlh->dlh_coll_entry, &dth->dth_leader_oid, dth->dth_dkey_hash, dth->dth_epoch, DCF_EXP_CMT | DCF_COLL); } else { size = sizeof(*dte) + sizeof(*mbs) + dth->dth_mbs->dm_data_size; @@ -1391,7 +1396,7 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul else flags = 0; - rc = dtx_add_cos(cont, dte, &dth->dth_leader_oid, dth->dth_dkey_hash, + rc = dtx_cos_add(cont, dte, &dth->dth_leader_oid, dth->dth_dkey_hash, dth->dth_epoch, flags); dtx_entry_put(dte); } @@ -1480,11 +1485,13 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul * It is harmless to keep some partially committed DTX entries in CoS cache. */ if (result == 0 && dth->dth_cos_done) { - int i; - for (i = 0; i < dth->dth_dti_cos_count; i++) - dtx_del_cos(cont, &dth->dth_dti_cos[i], + dtx_cos_del(cont, &dth->dth_dti_cos[i], &dth->dth_leader_oid, dth->dth_dkey_hash); + } else { + for (i = 0; i < dth->dth_dti_cos_count; i++) + dtx_cos_put_piggyback(cont, &dth->dth_dti_cos[i], + &dth->dth_leader_oid, dth->dth_dkey_hash); } D_DEBUG(DB_IO, "Stop the DTX "DF_DTI" ver %u, dkey %lu, %s, cos %d/%d: result "DF_RC"\n", @@ -1644,7 +1651,7 @@ dtx_flush_on_close(struct dss_module_info *dmi, struct dtx_batched_cont_args *db struct dtx_coll_entry *dce = NULL; cnt = dtx_fetch_committable(cont, DTX_THRESHOLD_COUNT, - NULL, DAOS_EPOCH_MAX, &dtes, &dcks, &dce); + NULL, DAOS_EPOCH_MAX, true, &dtes, &dcks, &dce); if (cnt <= 0) D_GOTO(out, rc = cnt); @@ -1774,6 +1781,7 @@ dtx_cont_register(struct ds_cont_child *cont) struct dtx_batched_pool_args *dbpa = NULL; struct dtx_batched_cont_args *dbca = NULL; struct umem_attr uma; + uint32_t timeout; int rc; bool new_pool = true; @@ -1812,6 +1820,19 @@ dtx_cont_register(struct ds_cont_child *cont) if (dbca == NULL) D_GOTO(out, rc = -DER_NOMEM); + rc = crt_context_get_timeout(dmi->dmi_ctx, &timeout); + if (rc != 0) { + D_ERROR("Failed to get DTX cleanup timeout: "DF_RC"\n", DP_RC(rc)); + goto out; + } + + /* + * Give related DTX leader sometime after default RPC timeout to commit or abort + * the DTX. If the DTX is still prepared after that, then trigger DTX cleanup to + * handle potential stale DTX entries. + */ + dbca->dbca_cleanup_thd = timeout + DTX_COMMIT_THRESHOLD_AGE * 2; + memset(&uma, 0, sizeof(uma)); uma.uma_id = UMEM_CLASS_VMEM; rc = dbtree_create_inplace_ex(DBTREE_CLASS_DTX_COS, 0, @@ -1967,7 +1988,7 @@ dtx_handle_resend(daos_handle_t coh, struct dtx_id *dti, */ return -DER_NONEXIST; - rc = vos_dtx_check(coh, dti, epoch, pm_ver, NULL, NULL, false); + rc = vos_dtx_check(coh, dti, epoch, pm_ver, NULL, false); switch (rc) { case DTX_ST_INITED: return -DER_INPROGRESS; @@ -2000,14 +2021,6 @@ dtx_handle_resend(daos_handle_t coh, struct dtx_id *dti, } } -/* - * If a large transaction has sub-requests to dispatch to a lot of DTX participants, - * then we may have to split the dispatch process to multiple steps; otherwise, the - * dispatch process may trigger too many in-flight or in-queued RPCs that will hold - * too much resource as to server maybe out of memory. - */ -#define DTX_EXEC_STEP_LENGTH DTX_THRESHOLD_COUNT - struct dtx_chore { struct dss_chore chore; dtx_sub_func_t func; @@ -2186,8 +2199,8 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, dlh->dlh_need_agg = 0; dlh->dlh_agg_done = 0; - if (sub_cnt > DTX_EXEC_STEP_LENGTH) { - dlh->dlh_forward_cnt = DTX_EXEC_STEP_LENGTH; + if (sub_cnt > DTX_RPC_STEP_LENGTH) { + dlh->dlh_forward_cnt = DTX_RPC_STEP_LENGTH; } else { dlh->dlh_forward_cnt = sub_cnt; if (likely(dlh->dlh_delay_sub_cnt == 0) && agg_cb != NULL) @@ -2237,7 +2250,7 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, sub_cnt -= dlh->dlh_forward_cnt; if (sub_cnt > 0) { dlh->dlh_forward_idx += dlh->dlh_forward_cnt; - if (sub_cnt <= DTX_EXEC_STEP_LENGTH) { + if (sub_cnt <= DTX_RPC_STEP_LENGTH) { dlh->dlh_forward_cnt = sub_cnt; if (likely(dlh->dlh_delay_sub_cnt == 0) && agg_cb != NULL) dlh->dlh_need_agg = 1; @@ -2337,7 +2350,7 @@ dtx_obj_sync(struct ds_cont_child *cont, daos_unit_oid_t *oid, struct dtx_coll_entry *dce = NULL; cnt = dtx_fetch_committable(cont, DTX_THRESHOLD_COUNT, oid, - epoch, &dtes, &dcks, &dce); + epoch, true, &dtes, &dcks, &dce); if (cnt <= 0) { rc = cnt; if (rc < 0) diff --git a/src/dtx/dtx_cos.c b/src/dtx/dtx_cos.c index 9442adf2248e..926aeba51994 100644 --- a/src/dtx/dtx_cos.c +++ b/src/dtx/dtx_cos.c @@ -60,8 +60,9 @@ struct dtx_cos_rec_child { }; /* The DTX epoch. */ daos_epoch_t dcrc_epoch; - /* For non-collective DTX, it points to the dtx_cos_rec. */ struct dtx_cos_rec *dcrc_ptr; + uint32_t dcrc_piggyback_refs; + uint32_t dcrc_coll:1; /* For collective DTX. */ }; struct dtx_cos_rec_bundle { @@ -129,14 +130,13 @@ dtx_cos_rec_alloc(struct btr_instance *tins, d_iov_t *key_iov, } dcrc->dcrc_epoch = rbund->epoch; + dcrc->dcrc_ptr = dcr; if (rbund->flags & DCF_COLL) { - /* Set dcrc_ptr as NULL to indicate that it is collective DTX. */ - dcrc->dcrc_ptr = NULL; + dcrc->dcrc_coll = 1; dcrc->dcrc_dce = dtx_coll_entry_get(rbund->entry); d_list_add_tail(&dcrc->dcrc_gl_committable, &cont->sc_dtx_coll_list); cont->sc_dtx_committable_coll_count++; } else { - dcrc->dcrc_ptr = dcr; dcrc->dcrc_dte = dtx_entry_get(rbund->entry); d_list_add_tail(&dcrc->dcrc_gl_committable, &cont->sc_dtx_cos_list); } @@ -177,11 +177,11 @@ dtx_cos_rec_free(struct btr_instance *tins, struct btr_record *rec, void *args) dcrc_lo_link) { d_list_del(&dcrc->dcrc_lo_link); d_list_del(&dcrc->dcrc_gl_committable); - if (dcrc->dcrc_ptr != NULL) { - dtx_entry_put(dcrc->dcrc_dte); - } else { + if (dcrc->dcrc_coll) { dtx_coll_entry_put(dcrc->dcrc_dce); coll++; + } else { + dtx_entry_put(dcrc->dcrc_dte); } D_FREE(dcrc); dec++; @@ -190,11 +190,11 @@ dtx_cos_rec_free(struct btr_instance *tins, struct btr_record *rec, void *args) dcrc_lo_link) { d_list_del(&dcrc->dcrc_lo_link); d_list_del(&dcrc->dcrc_gl_committable); - if (dcrc->dcrc_ptr != NULL) { - dtx_entry_put(dcrc->dcrc_dte); - } else { + if (dcrc->dcrc_coll) { dtx_coll_entry_put(dcrc->dcrc_dce); coll++; + } else { + dtx_entry_put(dcrc->dcrc_dte); } D_FREE(dcrc); dec++; @@ -203,11 +203,11 @@ dtx_cos_rec_free(struct btr_instance *tins, struct btr_record *rec, void *args) dcrc_lo_link) { d_list_del(&dcrc->dcrc_lo_link); d_list_del(&dcrc->dcrc_gl_committable); - if (dcrc->dcrc_ptr != NULL) { - dtx_entry_put(dcrc->dcrc_dte); - } else { + if (dcrc->dcrc_coll) { dtx_coll_entry_put(dcrc->dcrc_dce); coll++; + } else { + dtx_entry_put(dcrc->dcrc_dte); } D_FREE(dcrc); dec++; @@ -257,14 +257,13 @@ dtx_cos_rec_update(struct btr_instance *tins, struct btr_record *rec, return -DER_NOMEM; dcrc->dcrc_epoch = rbund->epoch; + dcrc->dcrc_ptr = dcr; if (rbund->flags & DCF_COLL) { - /* Set dcrc_ptr as NULL to indicate that it is collective DTX. */ - dcrc->dcrc_ptr = NULL; + dcrc->dcrc_coll = 1; dcrc->dcrc_dce = dtx_coll_entry_get(rbund->entry); d_list_add_tail(&dcrc->dcrc_gl_committable, &cont->sc_dtx_coll_list); cont->sc_dtx_committable_coll_count++; } else { - dcrc->dcrc_ptr = dcr; dcrc->dcrc_dte = dtx_entry_get(rbund->entry); d_list_add_tail(&dcrc->dcrc_gl_committable, &cont->sc_dtx_cos_list); } @@ -297,7 +296,7 @@ btr_ops_t dtx_btr_cos_ops = { int dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt, - daos_unit_oid_t *oid, daos_epoch_t epoch, + daos_unit_oid_t *oid, daos_epoch_t epoch, bool force, struct dtx_entry ***dtes, struct dtx_cos_key **dcks, struct dtx_coll_entry **p_dce) { @@ -307,9 +306,11 @@ dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt, uint32_t count; uint32_t i = 0; + /* Process collective DXT with higher priority. */ if (!d_list_empty(&cont->sc_dtx_coll_list) && oid == NULL) { d_list_for_each_entry(dcrc, &cont->sc_dtx_coll_list, dcrc_gl_committable) { - if (epoch >= dcrc->dcrc_epoch) { + if (epoch >= dcrc->dcrc_epoch && + (dcrc->dcrc_piggyback_refs == 0 || force)) { D_ALLOC_PTR(dck_buf); if (dck_buf == NULL) return -DER_NOMEM; @@ -346,17 +347,16 @@ dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt, daos_unit_oid_compare(dcrc->dcrc_ptr->dcr_oid, *oid) != 0) continue; - if (epoch < dcrc->dcrc_epoch) + if (epoch < dcrc->dcrc_epoch || (dcrc->dcrc_piggyback_refs > 0 && !force)) continue; - dck_buf[i].oid = dcrc->dcrc_ptr->dcr_oid; - dck_buf[i].dkey_hash = dcrc->dcrc_ptr->dcr_dkey_hash; - - if (unlikely(oid != NULL && dcrc->dcrc_ptr == NULL)) { + if (unlikely(oid != NULL && dcrc->dcrc_coll)) { if (i > 0) continue; D_FREE(dte_buf); + dck_buf[i].oid = dcrc->dcrc_ptr->dcr_oid; + dck_buf[i].dkey_hash = dcrc->dcrc_ptr->dcr_dkey_hash; *dcks = dck_buf; *p_dce = dtx_coll_entry_get(dcrc->dcrc_dce); @@ -364,6 +364,9 @@ dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt, } dte_buf[i] = dtx_entry_get(dcrc->dcrc_dte); + dck_buf[i].oid = dcrc->dcrc_ptr->dcr_oid; + dck_buf[i].dkey_hash = dcrc->dcrc_ptr->dcr_dkey_hash; + if (++i >= count) break; } @@ -381,8 +384,8 @@ dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt, } int -dtx_list_cos(struct ds_cont_child *cont, daos_unit_oid_t *oid, - uint64_t dkey_hash, int max, struct dtx_id **dtis) +dtx_cos_get_piggyback(struct ds_cont_child *cont, daos_unit_oid_t *oid, + uint64_t dkey_hash, int max, struct dtx_id **dtis) { struct dtx_cos_key key; d_iov_t kiov; @@ -423,6 +426,7 @@ dtx_list_cos(struct ds_cont_child *cont, daos_unit_oid_t *oid, d_list_for_each_entry(dcrc, &dcr->dcr_prio_list, dcrc_lo_link) { dti[i] = dcrc->dcrc_dte->dte_xid; + dcrc->dcrc_piggyback_refs++; if (++i >= count) break; } @@ -433,8 +437,40 @@ dtx_list_cos(struct ds_cont_child *cont, daos_unit_oid_t *oid, return count; } +void +dtx_cos_put_piggyback(struct ds_cont_child *cont, struct dtx_id *xid, + daos_unit_oid_t *oid, uint64_t dkey_hash) +{ + struct dtx_cos_key key; + d_iov_t kiov; + d_iov_t riov; + struct dtx_cos_rec *dcr; + struct dtx_cos_rec_child *dcrc; + int rc; + + key.oid = *oid; + key.dkey_hash = dkey_hash; + d_iov_set(&kiov, &key, sizeof(key)); + d_iov_set(&riov, NULL, 0); + + /* It is normal that the DTX entry (to be put) in CoS has already been removed by race. */ + + rc = dbtree_lookup(cont->sc_dtx_cos_hdl, &kiov, &riov); + if (rc != 0) + return; + + dcr = (struct dtx_cos_rec *)riov.iov_buf; + + d_list_for_each_entry(dcrc, &dcr->dcr_prio_list, dcrc_lo_link) { + if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) { + dcrc->dcrc_piggyback_refs--; + return; + } + } +} + int -dtx_add_cos(struct ds_cont_child *cont, void *entry, daos_unit_oid_t *oid, +dtx_cos_add(struct ds_cont_child *cont, void *entry, daos_unit_oid_t *oid, uint64_t dkey_hash, daos_epoch_t epoch, uint32_t flags) { struct dtx_cos_key key; @@ -475,7 +511,7 @@ dtx_add_cos(struct ds_cont_child *cont, void *entry, daos_unit_oid_t *oid, } int -dtx_del_cos(struct ds_cont_child *cont, struct dtx_id *xid, +dtx_cos_del(struct ds_cont_child *cont, struct dtx_id *xid, daos_unit_oid_t *oid, uint64_t dkey_hash) { struct dtx_cos_key key; @@ -503,11 +539,11 @@ dtx_del_cos(struct ds_cont_child *cont, struct dtx_id *xid, d_list_del(&dcrc->dcrc_gl_committable); d_list_del(&dcrc->dcrc_lo_link); - if (dcrc->dcrc_ptr != NULL) { - dtx_entry_put(dcrc->dcrc_dte); - } else { + if (dcrc->dcrc_coll) { dtx_coll_entry_put(dcrc->dcrc_dce); cont->sc_dtx_committable_coll_count--; + } else { + dtx_entry_put(dcrc->dcrc_dte); } D_FREE(dcrc); @@ -523,11 +559,11 @@ dtx_del_cos(struct ds_cont_child *cont, struct dtx_id *xid, d_list_del(&dcrc->dcrc_gl_committable); d_list_del(&dcrc->dcrc_lo_link); - if (dcrc->dcrc_ptr != NULL) { - dtx_entry_put(dcrc->dcrc_dte); - } else { + if (dcrc->dcrc_coll) { dtx_coll_entry_put(dcrc->dcrc_dce); cont->sc_dtx_committable_coll_count--; + } else { + dtx_entry_put(dcrc->dcrc_dte); } D_FREE(dcrc); @@ -543,11 +579,11 @@ dtx_del_cos(struct ds_cont_child *cont, struct dtx_id *xid, d_list_del(&dcrc->dcrc_gl_committable); d_list_del(&dcrc->dcrc_lo_link); - if (dcrc->dcrc_ptr != NULL) { - dtx_entry_put(dcrc->dcrc_dte); - } else { + if (dcrc->dcrc_coll) { dtx_coll_entry_put(dcrc->dcrc_dce); cont->sc_dtx_committable_coll_count--; + } else { + dtx_entry_put(dcrc->dcrc_dte); } D_FREE(dcrc); @@ -591,3 +627,63 @@ dtx_cos_oldest(struct ds_cont_child *cont) return dcrc->dcrc_epoch; } + +void +dtx_cos_prio(struct ds_cont_child *cont, struct dtx_id *xid, + daos_unit_oid_t *oid, uint64_t dkey_hash) +{ + struct dtx_cos_key key; + d_iov_t kiov; + d_iov_t riov; + struct dtx_cos_rec *dcr; + struct dtx_cos_rec_child *dcrc; + int rc; + bool found = false; + + key.oid = *oid; + key.dkey_hash = dkey_hash; + d_iov_set(&kiov, &key, sizeof(key)); + d_iov_set(&riov, NULL, 0); + + rc = dbtree_lookup(cont->sc_dtx_cos_hdl, &kiov, &riov); + if (rc != 0) + goto out; + + dcr = (struct dtx_cos_rec *)riov.iov_buf; + + d_list_for_each_entry(dcrc, &dcr->dcr_reg_list, dcrc_lo_link) { + if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) { + d_list_del(&dcrc->dcrc_lo_link); + d_list_add(&dcrc->dcrc_lo_link, &dcr->dcr_prio_list); + dcr->dcr_reg_count--; + dcr->dcr_prio_count++; + + D_GOTO(out, found = true); + } + } + + d_list_for_each_entry(dcrc, &dcr->dcr_prio_list, dcrc_lo_link) { + if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) { + d_list_del(&dcrc->dcrc_lo_link); + d_list_add(&dcrc->dcrc_lo_link, &dcr->dcr_prio_list); + + D_GOTO(out, found = true); + } + } + + d_list_for_each_entry(dcrc, &dcr->dcr_expcmt_list, dcrc_lo_link) { + if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) + D_GOTO(out, found = true); + } + +out: + if (found) { + d_list_del(&dcrc->dcrc_gl_committable); + if (dcrc->dcrc_coll) + d_list_add(&dcrc->dcrc_gl_committable, &cont->sc_dtx_coll_list); + else + d_list_add(&dcrc->dcrc_gl_committable, &cont->sc_dtx_cos_list); + } + + /* It is normal that the DTX entry (for priority) in CoS has been committed by race. */ +} diff --git a/src/dtx/dtx_internal.h b/src/dtx/dtx_internal.h index e59baede09c4..a42bcc1d7d68 100644 --- a/src/dtx/dtx_internal.h +++ b/src/dtx/dtx_internal.h @@ -94,17 +94,6 @@ CRT_RPC_DECLARE(dtx_coll, DAOS_ISEQ_COLL_DTX, DAOS_OSEQ_COLL_DTX); #define DTX_YIELD_CYCLE (DTX_THRESHOLD_COUNT >> 3) -/* The time threshold for triggering DTX cleanup of stale entries. - * If the oldest active DTX exceeds such threshold, it will trigger - * DTX cleanup locally. - */ -#define DTX_CLEANUP_THD_AGE_UP 90 - -/* If DTX cleanup for stale entries is triggered, then the DTXs with - * older ages than this threshold will be cleanup. - */ -#define DTX_CLEANUP_THD_AGE_LO 75 - /* The count threshold (per pool) for triggering DTX aggregation. */ #define DTX_AGG_THD_CNT_MAX (1 << 24) #define DTX_AGG_THD_CNT_MIN (1 << 20) @@ -192,6 +181,14 @@ extern uint32_t dtx_batched_ult_max; */ #define DTX_COLL_TREE_WIDTH 8 +/* + * If a large transaction has sub-requests to dispatch to a lot of DTX participants, + * then we may have to split the dispatch process to multiple steps; otherwise, the + * dispatch process may trigger too many in-flight or in-queued RPCs that will hold + * too much resource as to server maybe out of memory. + */ +#define DTX_RPC_STEP_LENGTH DTX_THRESHOLD_COUNT + extern struct crt_corpc_ops dtx_coll_commit_co_ops; extern struct crt_corpc_ops dtx_coll_abort_co_ops; extern struct crt_corpc_ops dtx_coll_check_co_ops; @@ -255,14 +252,16 @@ int dtx_leader_get(struct ds_pool *pool, struct dtx_memberships *mbs, /* dtx_cos.c */ int dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt, - daos_unit_oid_t *oid, daos_epoch_t epoch, + daos_unit_oid_t *oid, daos_epoch_t epoch, bool force, struct dtx_entry ***dtes, struct dtx_cos_key **dcks, struct dtx_coll_entry **p_dce); -int dtx_add_cos(struct ds_cont_child *cont, void *entry, daos_unit_oid_t *oid, +int dtx_cos_add(struct ds_cont_child *cont, void *entry, daos_unit_oid_t *oid, uint64_t dkey_hash, daos_epoch_t epoch, uint32_t flags); -int dtx_del_cos(struct ds_cont_child *cont, struct dtx_id *xid, +int dtx_cos_del(struct ds_cont_child *cont, struct dtx_id *xid, daos_unit_oid_t *oid, uint64_t dkey_hash); uint64_t dtx_cos_oldest(struct ds_cont_child *cont); +void dtx_cos_prio(struct ds_cont_child *cont, struct dtx_id *xid, + daos_unit_oid_t *oid, uint64_t dkey_hash); /* dtx_rpc.c */ int dtx_check(struct ds_cont_child *cont, struct dtx_entry *dte, diff --git a/src/dtx/dtx_resync.c b/src/dtx/dtx_resync.c index 3698f2384420..b98a6469954c 100644 --- a/src/dtx/dtx_resync.c +++ b/src/dtx/dtx_resync.c @@ -85,7 +85,7 @@ dtx_resync_commit(struct ds_cont_child *cont, * committed or aborted the DTX during we handling other * DTXs. So double check the status before current commit. */ - rc = vos_dtx_check(cont->sc_hdl, &dre->dre_xid, NULL, NULL, NULL, NULL, false); + rc = vos_dtx_check(cont->sc_hdl, &dre->dre_xid, NULL, NULL, NULL, false); /* Skip this DTX since it has been committed or aggregated. */ if (rc == DTX_ST_COMMITTED || rc == DTX_ST_COMMITTABLE || rc == -DER_NONEXIST) @@ -301,7 +301,7 @@ dtx_status_handle_one(struct ds_cont_child *cont, struct dtx_entry *dte, daos_un * committed or aborted the DTX during we handling other * DTXs. So double check the status before next action. */ - rc = vos_dtx_check(cont->sc_hdl, &dte->dte_xid, NULL, NULL, NULL, NULL, false); + rc = vos_dtx_check(cont->sc_hdl, &dte->dte_xid, NULL, NULL, NULL, false); /* Skip the DTX that may has been committed or aborted. */ if (rc == DTX_ST_COMMITTED || rc == DTX_ST_COMMITTABLE || rc == -DER_NONEXIST) diff --git a/src/dtx/dtx_rpc.c b/src/dtx/dtx_rpc.c index 2af60538348a..f44edf895721 100644 --- a/src/dtx/dtx_rpc.c +++ b/src/dtx/dtx_rpc.c @@ -354,7 +354,7 @@ struct dtx_common_args { daos_handle_t dca_tree_hdl; daos_epoch_t dca_epoch; int dca_count; - int dca_committed; + int dca_steps; d_rank_t dca_rank; uint32_t dca_tgtid; struct ds_cont_child *dca_cont; @@ -369,37 +369,27 @@ struct dtx_common_args { /* If is_reentrance, this function ignores len. */ static int -dtx_req_list_send(struct dtx_common_args *dca, daos_epoch_t epoch, int len, bool is_reentrance) +dtx_req_list_send(struct dtx_common_args *dca, bool is_reentrance) { struct dtx_req_args *dra = &dca->dca_dra; int rc; if (!is_reentrance) { - dra->dra_length = len; + dra->dra_length = dca->dca_steps; + dca->dca_i = 0; - rc = ABT_future_create(len, dtx_req_list_cb, &dra->dra_future); + rc = ABT_future_create(dca->dca_steps, dtx_req_list_cb, &dra->dra_future); if (rc != ABT_SUCCESS) { - D_ERROR("ABT_future_create failed for opc %x, len = %d: " - "rc = %d.\n", dra->dra_opc, len, rc); + D_ERROR("ABT_future_create failed for opc %x, len %d: rc %d.\n", + dra->dra_opc, dca->dca_steps, rc); return dss_abterr2der(rc); } D_DEBUG(DB_TRACE, "%p: DTX req for opc %x, future %p (%d) start.\n", - &dca->dca_chore, dra->dra_opc, dra->dra_future, len); + &dca->dca_chore, dra->dra_opc, dra->dra_future, dca->dca_steps); } - /* - * Begin or continue an iteration over dca_head. When beginning the - * iteration, dca->dca_drr does not point to a real entry, and is only - * safe for d_list_for_each_entry_continue. - */ - if (!is_reentrance) { - dca->dca_drr = d_list_entry(&dca->dca_head, struct dtx_req_rec, drr_link); - dca->dca_i = 0; - } - /* DO NOT add any line here! See the comment on dca->dca_drr above. */ - d_list_for_each_entry_continue(dca->dca_drr, &dca->dca_head, drr_link) - { + while (1) { D_DEBUG(DB_TRACE, "chore=%p: drr=%p i=%d\n", &dca->dca_chore, dca->dca_drr, dca->dca_i); @@ -410,7 +400,7 @@ dtx_req_list_send(struct dtx_common_args *dca, daos_epoch_t epoch, int len, bool DAOS_FAIL_CHECK(DAOS_DTX_FAIL_COMMIT))) rc = dtx_req_send(dca->dca_drr, 1); else - rc = dtx_req_send(dca->dca_drr, epoch); + rc = dtx_req_send(dca->dca_drr, dca->dca_epoch); if (rc != 0) { /* If the first sub-RPC failed, then break, otherwise * other remote replicas may have already received the @@ -422,8 +412,15 @@ dtx_req_list_send(struct dtx_common_args *dca, daos_epoch_t epoch, int len, bool } } + /* dca->dca_drr maybe not points to a real entry if all RPCs have been sent. */ + dca->dca_drr = d_list_entry(dca->dca_drr->drr_link.next, + struct dtx_req_rec, drr_link); + + if (++(dca->dca_i) >= dca->dca_steps) + break; + /* Yield to avoid holding CPU for too long time. */ - if (++(dca->dca_i) % DTX_RPC_YIELD_THD == 0) + if (dca->dca_i % DTX_RPC_YIELD_THD == 0) return DSS_CHORE_YIELD; } @@ -615,63 +612,13 @@ static enum dss_chore_status dtx_rpc_helper(struct dss_chore *chore, bool is_reentrance) { struct dtx_common_args *dca = container_of(chore, struct dtx_common_args, dca_chore); - struct ds_pool *pool = dca->dca_cont->sc_pool->spc_pool; - struct umem_attr uma = { 0 }; - int length = 0; int rc; - int i; - - if (is_reentrance) { - D_DEBUG(DB_TRACE, "%p: skip to send\n", &dca->dca_chore); - goto send; - } - - if (dca->dca_dtes != NULL) { - D_ASSERT(dca->dca_dtis != NULL); - if (dca->dca_count > 1) { - uma.uma_id = UMEM_CLASS_VMEM; - rc = dbtree_create_inplace(DBTREE_CLASS_DTX_CF, 0, DTX_CF_BTREE_ORDER, - &uma, &dca->dca_tree_root, &dca->dca_tree_hdl); - if (rc != 0) - goto done; - } - - ABT_rwlock_rdlock(pool->sp_lock); - for (i = 0; i < dca->dca_count; i++) { - rc = dtx_classify_one(pool, dca->dca_tree_hdl, &dca->dca_head, &length, - dca->dca_dtes[i], dca->dca_count, - dca->dca_rank, dca->dca_tgtid, dca->dca_dra.dra_opc); - if (rc < 0) { - ABT_rwlock_unlock(pool->sp_lock); - goto done; - } - - daos_dti_copy(&dca->dca_dtis[i], &dca->dca_dtes[i]->dte_xid); - } - ABT_rwlock_unlock(pool->sp_lock); - - /* For DTX_CHECK, if no other available target(s), then current target is the - * unique valid one (and also 'prepared'), then related DTX can be committed. - */ - if (d_list_empty(&dca->dca_head)) { - rc = (dca->dca_dra.dra_opc == DTX_CHECK ? DTX_ST_PREPARED : 0); - goto done; - } - } else { - length = dca->dca_count; - } - - D_ASSERT(length > 0); - -send: - rc = dtx_req_list_send(dca, dca->dca_epoch, length, is_reentrance); + rc = dtx_req_list_send(dca, is_reentrance); if (rc == DSS_CHORE_YIELD) return DSS_CHORE_YIELD; if (rc == DSS_CHORE_DONE) rc = 0; - -done: if (rc != 0) dca->dca_dra.dra_result = rc; D_CDEBUG(rc < 0, DLOG_ERR, DB_TRACE, "%p: DTX RPC chore for %u done: %d\n", chore, @@ -684,12 +631,17 @@ dtx_rpc_helper(struct dss_chore *chore, bool is_reentrance) } static int -dtx_rpc_prep(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry **dtes, - uint32_t count, int opc, daos_epoch_t epoch, d_list_t *cmt_list, - d_list_t *abt_list, d_list_t *act_list, struct dtx_common_args *dca) +dtx_rpc(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry **dtes, uint32_t count, + int opc, daos_epoch_t epoch, d_list_t *cmt_list, d_list_t *abt_list, d_list_t *act_list, + bool keep_head, struct dtx_common_args *dca) { + struct ds_pool *pool = cont->sc_pool->spc_pool; + struct dtx_req_rec *drr; struct dtx_req_args *dra; + struct umem_attr uma = { 0 }; + int length = 0; int rc = 0; + int i; memset(dca, 0, sizeof(*dca)); @@ -709,7 +661,7 @@ dtx_rpc_prep(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry ** dra->dra_abt_list = abt_list; dra->dra_act_list = act_list; dra->dra_opc = opc; - uuid_copy(dra->dra_po_uuid, cont->sc_pool->spc_pool->sp_uuid); + uuid_copy(dra->dra_po_uuid, pool->sp_uuid); uuid_copy(dra->dra_co_uuid, cont->sc_uuid); if (dti_list != NULL) { @@ -725,39 +677,111 @@ dtx_rpc_prep(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry ** } } - /* Use helper ULT to handle DTX RPC if there are enough helper XS. */ - if (dss_has_enough_helper()) { - rc = ABT_eventual_create(0, &dca->dca_chore_eventual); - if (rc != ABT_SUCCESS) { - D_ERROR("failed to create eventual: %d\n", rc); - rc = dss_abterr2der(rc); + if (dca->dca_dtes != NULL) { + D_ASSERT(dca->dca_dtis != NULL); + + if (dca->dca_count > 1) { + uma.uma_id = UMEM_CLASS_VMEM; + rc = dbtree_create_inplace(DBTREE_CLASS_DTX_CF, 0, DTX_CF_BTREE_ORDER, + &uma, &dca->dca_tree_root, &dca->dca_tree_hdl); + if (rc != 0) + goto out; + } + + ABT_rwlock_rdlock(pool->sp_lock); + for (i = 0; i < dca->dca_count; i++) { + rc = dtx_classify_one(pool, dca->dca_tree_hdl, &dca->dca_head, &length, + dca->dca_dtes[i], dca->dca_count, + dca->dca_rank, dca->dca_tgtid, dca->dca_dra.dra_opc); + if (rc != 0) { + ABT_rwlock_unlock(pool->sp_lock); + goto out; + } + + daos_dti_copy(&dca->dca_dtis[i], &dca->dca_dtes[i]->dte_xid); + } + ABT_rwlock_unlock(pool->sp_lock); + + /* For DTX_CHECK, if no other available target(s), then current target is the + * unique valid one (and also 'prepared'), then related DTX can be committed. + */ + if (d_list_empty(&dca->dca_head)) { + rc = (dca->dca_dra.dra_opc == DTX_CHECK ? DTX_ST_PREPARED : 0); goto out; } - rc = dss_chore_delegate(&dca->dca_chore, dtx_rpc_helper); } else { - dss_chore_diy(&dca->dca_chore, dtx_rpc_helper); - rc = dca->dca_dra.dra_result; + D_ASSERT(!d_list_empty(&dca->dca_head)); + + length = dca->dca_count; } -out: - return rc; -} + dca->dca_drr = d_list_entry(dca->dca_head.next, struct dtx_req_rec, drr_link); -static int -dtx_rpc_post(struct dtx_common_args *dca, int ret, bool keep_head) -{ - struct dtx_req_rec *drr; - int rc; + /* + * Do not send out the batched RPCs all together, instead, we do that step by step to + * avoid holding too much system resources for relative long time. It is also helpful + * to reduce the whole network peak load and the pressure on related peers. + */ + while (length > 0) { + if (length > DTX_RPC_STEP_LENGTH && opc != DTX_CHECK) + dca->dca_steps = DTX_RPC_STEP_LENGTH; + else + dca->dca_steps = length; + + /* Use helper ULT to handle DTX RPC if there are enough helper XS. */ + if (dss_has_enough_helper()) { + rc = ABT_eventual_create(0, &dca->dca_chore_eventual); + if (rc != ABT_SUCCESS) { + D_ERROR("failed to create eventual: %d\n", rc); + rc = dss_abterr2der(rc); + goto out; + } - if (dca->dca_chore_eventual != ABT_EVENTUAL_NULL) { - rc = ABT_eventual_wait(dca->dca_chore_eventual, NULL); - D_ASSERTF(rc == ABT_SUCCESS, "ABT_eventual_wait: %d\n", rc); - rc = ABT_eventual_free(&dca->dca_chore_eventual); - D_ASSERTF(rc == ABT_SUCCESS, "ABT_eventual_free: %d\n", rc); - } + rc = dss_chore_delegate(&dca->dca_chore, dtx_rpc_helper); + if (rc != 0) + goto out; + + rc = ABT_eventual_wait(dca->dca_chore_eventual, NULL); + D_ASSERTF(rc == ABT_SUCCESS, "ABT_eventual_wait: %d\n", rc); + + rc = ABT_eventual_free(&dca->dca_chore_eventual); + D_ASSERTF(rc == ABT_SUCCESS, "ABT_eventual_free: %d\n", rc); + } else { + dss_chore_diy(&dca->dca_chore, dtx_rpc_helper); + } + + rc = dtx_req_wait(&dca->dca_dra); + if (rc == 0 || rc == -DER_NONEXIST) + goto cont; + + switch (opc) { + case DTX_COMMIT: + case DTX_ABORT: + if (rc != -DER_EXCLUDED && rc != -DER_OOG) + goto out; + break; + case DTX_CHECK: + if (rc == DTX_ST_COMMITTED || rc == DTX_ST_COMMITTABLE) + goto out; + /* + * Go ahead even if someone failed, there may be 'COMMITTED' + * in subsequent check, that will overwrite former failure. + */ + break; + case DTX_REFRESH: + D_ASSERTF(length < DTX_RPC_STEP_LENGTH, + "Too long list for DTX refresh: %u vs %u\n", + length, DTX_RPC_STEP_LENGTH); + break; + default: + D_ASSERTF(0, "Invalid DTX opc %u\n", opc); + } - rc = dtx_req_wait(&dca->dca_dra); +cont: + length -= dca->dca_steps; + } +out: if (daos_handle_is_valid(dca->dca_tree_hdl)) dbtree_destroy(dca->dca_tree_hdl, NULL); @@ -767,7 +791,7 @@ dtx_rpc_post(struct dtx_common_args *dca, int ret, bool keep_head) dtx_drr_cleanup(drr); } - return ret != 0 ? ret : rc; + return rc; } /** @@ -796,8 +820,6 @@ dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes, int rc1 = 0; int i; - rc = dtx_rpc_prep(cont, NULL, dtes, count, DTX_COMMIT, 0, NULL, NULL, NULL, &dca); - /* * NOTE: Before committing the DTX on remote participants, we cannot remove the active * DTX locally; otherwise, the local committed DTX entry may be removed via DTX @@ -806,10 +828,8 @@ dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes, * then it will get -DER_TX_UNCERTAIN, that may cause related application to be * failed. So here, we let remote participants to commit firstly, if failed, we * will ask the leader to retry the commit until all participants got committed. - * - * Some RPC may has been sent, so need to wait even if dtx_rpc_prep hit failure. */ - rc = dtx_rpc_post(&dca, rc, false); + rc = dtx_rpc(cont, NULL, dtes, count, DTX_COMMIT, 0, NULL, NULL, NULL, false, &dca); if (rc > 0 || rc == -DER_NONEXIST || rc == -DER_EXCLUDED || rc == -DER_OOG) rc = 0; @@ -846,7 +866,7 @@ dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes, for (i = 0; i < count; i++) { if (rm_cos[i]) { D_ASSERT(!daos_oid_is_null(dcks[i].oid.id_pub)); - dtx_del_cos(cont, &dca.dca_dtis[i], &dcks[i].oid, + dtx_cos_del(cont, &dca.dca_dtis[i], &dcks[i].oid, dcks[i].dkey_hash); } } @@ -878,13 +898,10 @@ dtx_abort(struct ds_cont_child *cont, struct dtx_entry *dte, daos_epoch_t epoch) struct dtx_common_args dca; int rc; int rc1; - int rc2; - rc = dtx_rpc_prep(cont, NULL, &dte, 1, DTX_ABORT, epoch, NULL, NULL, NULL, &dca); - - rc2 = dtx_rpc_post(&dca, rc, false); - if (rc2 > 0 || rc2 == -DER_NONEXIST) - rc2 = 0; + rc = dtx_rpc(cont, NULL, &dte, 1, DTX_ABORT, epoch, NULL, NULL, NULL, false, &dca); + if (rc > 0 || rc == -DER_NONEXIST) + rc = 0; /* * NOTE: The DTX abort maybe triggered by dtx_leader_end() for timeout on some DTX @@ -902,10 +919,10 @@ dtx_abort(struct ds_cont_child *cont, struct dtx_entry *dte, daos_epoch_t epoch) if (rc1 > 0 || rc1 == -DER_NONEXIST) rc1 = 0; - D_CDEBUG(rc1 != 0 || rc2 != 0, DLOG_ERR, DB_TRACE, "Abort DTX "DF_DTI": rc %d %d %d\n", - DP_DTI(&dte->dte_xid), rc, rc1, rc2); + D_CDEBUG(rc1 != 0 || rc != 0, DLOG_ERR, DB_TRACE, "Abort DTX "DF_DTI": rc %d %d\n", + DP_DTI(&dte->dte_xid), rc, rc1); - return rc1 != 0 ? rc1 : rc2; + return rc != 0 ? rc : rc1; } int @@ -913,7 +930,6 @@ dtx_check(struct ds_cont_child *cont, struct dtx_entry *dte, daos_epoch_t epoch) { struct dtx_common_args dca; int rc; - int rc1; /* If no other target, then current target is the unique * one and 'prepared', then related DTX can be committed. @@ -921,14 +937,12 @@ dtx_check(struct ds_cont_child *cont, struct dtx_entry *dte, daos_epoch_t epoch) if (dte->dte_mbs->dm_tgt_cnt == 1) return DTX_ST_PREPARED; - rc = dtx_rpc_prep(cont, NULL, &dte, 1, DTX_CHECK, epoch, NULL, NULL, NULL, &dca); + rc = dtx_rpc(cont, NULL, &dte, 1, DTX_CHECK, epoch, NULL, NULL, NULL, false, &dca); - rc1 = dtx_rpc_post(&dca, rc, false); + D_CDEBUG(rc < 0 && rc != -DER_NONEXIST, DLOG_ERR, DB_TRACE, + "Check DTX "DF_DTI": rc %d\n", DP_DTI(&dte->dte_xid), rc); - D_CDEBUG(rc1 < 0 && rc1 != -DER_NONEXIST, DLOG_ERR, DB_TRACE, - "Check DTX "DF_DTI": rc %d %d\n", DP_DTI(&dte->dte_xid), rc, rc1); - - return rc1; + return rc; } int @@ -1079,9 +1093,8 @@ dtx_refresh_internal(struct ds_cont_child *cont, int *check_count, d_list_t *che } if (len > 0) { - rc = dtx_rpc_prep(cont, &head, NULL, len, DTX_REFRESH, 0, - cmt_list, abt_list, act_list, &dca); - rc = dtx_rpc_post(&dca, rc, for_io); + rc = dtx_rpc(cont, &head, NULL, len, DTX_REFRESH, 0, cmt_list, abt_list, act_list, + for_io, &dca); /* * For IO case, the DTX refresh failure caused by network trouble may be not fatal @@ -1151,7 +1164,7 @@ dtx_refresh_internal(struct ds_cont_child *cont, int *check_count, d_list_t *che */ rc1 = vos_dtx_check(cont->sc_hdl, &dsp->dsp_xid, - NULL, NULL, NULL, NULL, false); + NULL, NULL, NULL, false); if (rc1 == DTX_ST_COMMITTED || rc1 == DTX_ST_COMMITTABLE || rc1 == -DER_NONEXIST) { d_list_del(&dsp->dsp_link); @@ -1182,7 +1195,7 @@ dtx_refresh_internal(struct ds_cont_child *cont, int *check_count, d_list_t *che dtx_dsp_free(dsp); } else if (dsp->dsp_status == -DER_INPROGRESS) { rc1 = vos_dtx_check(cont->sc_hdl, &dsp->dsp_xid, - NULL, NULL, NULL, NULL, false); + NULL, NULL, NULL, false); if (rc1 != DTX_ST_COMMITTED && rc1 != DTX_ST_ABORTED && rc1 != -DER_NONEXIST) { if (!for_io) @@ -1602,7 +1615,7 @@ dtx_coll_commit(struct ds_cont_child *cont, struct dtx_coll_entry *dce, struct d * Otherwise, the batched commit ULT may be blocked by such "bad" entry. */ if (rc2 == 0 && dck != NULL) - dtx_del_cos(cont, &dce->dce_xid, &dck->oid, dck->dkey_hash); + dtx_cos_del(cont, &dce->dce_xid, &dck->oid, dck->dkey_hash); D_CDEBUG(rc != 0 || rc1 != 0 || rc2 != 0, DLOG_ERR, DB_TRACE, "Collectively commit DTX "DF_DTI": %d/%d/%d\n", diff --git a/src/dtx/dtx_srv.c b/src/dtx/dtx_srv.c index cfba7862baba..93cc6744a2df 100644 --- a/src/dtx/dtx_srv.c +++ b/src/dtx/dtx_srv.c @@ -151,7 +151,6 @@ dtx_handler(crt_rpc_t *rpc) struct dtx_out *dout = crt_reply_get(rpc); struct ds_cont_child *cont = NULL; struct dtx_id *dtis; - struct dtx_memberships *mbs[DTX_REFRESH_MAX] = { 0 }; struct dtx_cos_key dcks[DTX_REFRESH_MAX] = { 0 }; uint32_t vers[DTX_REFRESH_MAX] = { 0 }; uint32_t opc = opc_get(rpc->cr_opc); @@ -243,7 +242,7 @@ dtx_handler(crt_rpc_t *rpc) D_GOTO(out, rc = -DER_PROTO); rc = vos_dtx_check(cont->sc_hdl, din->di_dtx_array.ca_arrays, - NULL, NULL, NULL, NULL, false); + NULL, NULL, NULL, false); if (rc == DTX_ST_INITED) { /* For DTX_CHECK, non-ready one is equal to non-exist. Do not directly * return 'DTX_ST_INITED' to avoid interoperability trouble if related @@ -287,8 +286,7 @@ dtx_handler(crt_rpc_t *rpc) for (i = 0, rc1 = 0; i < count; i++) { ptr = (int *)dout->do_sub_rets.ca_arrays + i; dtis = (struct dtx_id *)din->di_dtx_array.ca_arrays + i; - *ptr = vos_dtx_check(cont->sc_hdl, dtis, NULL, &vers[i], &mbs[i], &dcks[i], - true); + *ptr = vos_dtx_check(cont->sc_hdl, dtis, NULL, &vers[i], &dcks[i], true); if (*ptr == -DER_NONEXIST && !(flags[i] & DRF_INITIAL_LEADER)) { struct dtx_stat stat = { 0 }; @@ -312,10 +310,10 @@ dtx_handler(crt_rpc_t *rpc) * it will cause interoperability trouble if remote server is old. */ *ptr = DTX_ST_PREPARED; + } else if (*ptr == DTX_ST_COMMITTABLE) { + /* Higher priority for the DTX, then it can be committed ASAP. */ + dtx_cos_prio(cont, dtis, &dcks[i].oid, dcks[i].dkey_hash); } - - if (mbs[i] != NULL) - rc1++; } break; default: @@ -340,47 +338,6 @@ dtx_handler(crt_rpc_t *rpc) if (likely(dpm != NULL)) d_tm_inc_counter(dpm->dpm_total[opc], 1); - if (opc == DTX_REFRESH && rc1 > 0) { - struct dtx_entry dtes[DTX_REFRESH_MAX] = { 0 }; - struct dtx_entry *pdte[DTX_REFRESH_MAX] = { 0 }; - int j; - - for (i = 0, j = 0; i < count; i++) { - if (mbs[i] == NULL) - continue; - - /* For collective DTX, it will be committed soon. */ - if (mbs[i]->dm_flags & DMF_COLL_TARGET) { - D_FREE(mbs[i]); - continue; - } - - daos_dti_copy(&dtes[j].dte_xid, - (struct dtx_id *)din->di_dtx_array.ca_arrays + i); - dtes[j].dte_ver = vers[i]; - dtes[j].dte_refs = 1; - dtes[j].dte_mbs = mbs[i]; - - pdte[j] = &dtes[j]; - dcks[j] = dcks[i]; - j++; - } - - if (j > 0) { - /* - * Commit the DTX after replied the original refresh request to - * avoid further query the same DTX. - */ - rc = dtx_commit(cont, pdte, dcks, j); - if (rc < 0) - D_WARN("Failed to commit DTX "DF_DTI", count %d: " - DF_RC"\n", DP_DTI(&dtes[0].dte_xid), j, DP_RC(rc)); - - for (i = 0; i < j; i++) - D_FREE(pdte[i]->dte_mbs); - } - } - D_FREE(dout->do_sub_rets.ca_arrays); dout->do_sub_rets.ca_count = 0; diff --git a/src/include/cart/api.h b/src/include/cart/api.h index 2c82b8f4303a..21941d7cb983 100644 --- a/src/include/cart/api.h +++ b/src/include/cart/api.h @@ -188,6 +188,19 @@ crt_context_create_on_iface(const char *iface_name, crt_context_t *crt_ctx); int crt_context_set_timeout(crt_context_t crt_ctx, uint32_t timeout_sec); +/** + * Get the default timeout value for the RPC requests created on the specified context. + * + * This is an optional function. + * + * \param[in] req pointer to RPC request + * \param[out] timeout_sec timeout value in seconds + * + * \return DER_SUCCESS on success, negative value if error + */ +int +crt_context_get_timeout(crt_context_t crt_ctx, uint32_t *timeout_sec); + /** * Destroy CRT transport context. * diff --git a/src/include/daos_srv/dtx_srv.h b/src/include/daos_srv/dtx_srv.h index f7be21b2e0e1..7b3f7e9ee57c 100644 --- a/src/include/daos_srv/dtx_srv.h +++ b/src/include/daos_srv/dtx_srv.h @@ -315,8 +315,11 @@ dtx_begin(daos_handle_t xoh, struct dtx_id *dti, struct dtx_epoch *epoch, int dtx_end(struct dtx_handle *dth, struct ds_cont_child *cont, int result); int -dtx_list_cos(struct ds_cont_child *cont, daos_unit_oid_t *oid, - uint64_t dkey_hash, int max, struct dtx_id **dtis); +dtx_cos_get_piggyback(struct ds_cont_child *cont, daos_unit_oid_t *oid, uint64_t dkey_hash, + int max, struct dtx_id **dtis); +void +dtx_cos_put_piggyback(struct ds_cont_child *cont, struct dtx_id *xid, + daos_unit_oid_t *oid, uint64_t dkey_hash); int dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, dtx_agg_cb_t agg_cb, int allow_failure, void *func_arg); diff --git a/src/include/daos_srv/vos.h b/src/include/daos_srv/vos.h index 3c6ee71a8886..3b838c2b4a60 100644 --- a/src/include/daos_srv/vos.h +++ b/src/include/daos_srv/vos.h @@ -74,7 +74,6 @@ vos_dtx_validation(struct dtx_handle *dth); * \param[in,out] epoch Pointer to current epoch, if it is zero and if the DTX exists, then * the DTX's epoch will be saved in it. * \param[out] pm_ver Hold the DTX's pool map version. - * \param[out] mbs Pointer to the DTX participants information. * \param[out] dck Pointer to the key for CoS cache. * \param[in] for_refresh It is for DTX_REFRESH or not. * @@ -95,8 +94,7 @@ vos_dtx_validation(struct dtx_handle *dth); */ int vos_dtx_check(daos_handle_t coh, struct dtx_id *dti, daos_epoch_t *epoch, - uint32_t *pm_ver, struct dtx_memberships **mbs, struct dtx_cos_key *dck, - bool for_refresh); + uint32_t *pm_ver, struct dtx_cos_key *dck, bool for_refresh); /** * Load participants information for the given DTX. diff --git a/src/object/srv_obj.c b/src/object/srv_obj.c index 0619298977f4..8caa1ea05662 100644 --- a/src/object/srv_obj.c +++ b/src/object/srv_obj.c @@ -2934,9 +2934,8 @@ ds_obj_rw_handler(crt_rpc_t *rpc) * them before real modifications to avoid availability issues. */ D_FREE(dti_cos); - dti_cos_cnt = dtx_list_cos(ioc.ioc_coc, &orw->orw_oid, - orw->orw_dkey_hash, DTX_THRESHOLD_COUNT, - &dti_cos); + dti_cos_cnt = dtx_cos_get_piggyback(ioc.ioc_coc, &orw->orw_oid, orw->orw_dkey_hash, + DTX_THRESHOLD_COUNT, &dti_cos); if (dti_cos_cnt < 0) D_GOTO(out, rc = dti_cos_cnt); @@ -3852,9 +3851,8 @@ ds_obj_punch_handler(crt_rpc_t *rpc) * them before real modifications to avoid availability issues. */ D_FREE(dti_cos); - dti_cos_cnt = dtx_list_cos(ioc.ioc_coc, &opi->opi_oid, - opi->opi_dkey_hash, DTX_THRESHOLD_COUNT, - &dti_cos); + dti_cos_cnt = dtx_cos_get_piggyback(ioc.ioc_coc, &opi->opi_oid, opi->opi_dkey_hash, + DTX_THRESHOLD_COUNT, &dti_cos); if (dti_cos_cnt < 0) D_GOTO(out, rc = dti_cos_cnt); diff --git a/src/vos/tests/vts_dtx.c b/src/vos/tests/vts_dtx.c index 5024e3e2bd82..bd54dd52838c 100644 --- a/src/vos/tests/vts_dtx.c +++ b/src/vos/tests/vts_dtx.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2019-2023 Intel Corporation. + * (C) Copyright 2019-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -831,7 +831,7 @@ dtx_18(void **state) assert_rc_equal(rc, 10); for (i = 0; i < 10; i++) { - rc = vos_dtx_check(args->ctx.tc_co_hdl, &xid[i], NULL, NULL, NULL, NULL, false); + rc = vos_dtx_check(args->ctx.tc_co_hdl, &xid[i], NULL, NULL, NULL, false); assert_int_equal(rc, DTX_ST_COMMITTED); } @@ -842,7 +842,7 @@ dtx_18(void **state) assert_rc_equal(rc, 0); for (i = 0; i < 10; i++) { - rc = vos_dtx_check(args->ctx.tc_co_hdl, &xid[i], NULL, NULL, NULL, NULL, false); + rc = vos_dtx_check(args->ctx.tc_co_hdl, &xid[i], NULL, NULL, NULL, false); assert_rc_equal(rc, -DER_NONEXIST); } diff --git a/src/vos/vos_dtx.c b/src/vos/vos_dtx.c index 1851331dd6a0..5c57c0ee4c70 100644 --- a/src/vos/vos_dtx.c +++ b/src/vos/vos_dtx.c @@ -1831,8 +1831,7 @@ vos_dtx_pack_mbs(struct umem_instance *umm, struct vos_dtx_act_ent *dae) int vos_dtx_check(daos_handle_t coh, struct dtx_id *dti, daos_epoch_t *epoch, - uint32_t *pm_ver, struct dtx_memberships **mbs, struct dtx_cos_key *dck, - bool for_refresh) + uint32_t *pm_ver, struct dtx_cos_key *dck, bool for_refresh) { struct vos_container *cont; struct vos_dtx_act_ent *dae; @@ -1878,9 +1877,6 @@ vos_dtx_check(daos_handle_t coh, struct dtx_id *dti, daos_epoch_t *epoch, } if (dae->dae_committable || DAE_FLAGS(dae) & DTE_PARTIAL_COMMITTED) { - if (mbs != NULL) - *mbs = vos_dtx_pack_mbs(vos_cont2umm(cont), dae); - if (epoch != NULL) *epoch = DAE_EPOCH(dae);