diff --git a/src/cart/crt_context.c b/src/cart/crt_context.c index a90ac3d93004..c3c5e220eff0 100644 --- a/src/cart/crt_context.c +++ b/src/cart/crt_context.c @@ -1986,6 +1986,22 @@ crt_context_set_timeout(crt_context_t crt_ctx, uint32_t timeout_sec) return rc; } +int +crt_context_get_timeout(crt_context_t crt_ctx, uint32_t *timeout_sec) +{ + struct crt_context *ctx = crt_ctx; + int rc = 0; + + if (crt_ctx == CRT_CONTEXT_NULL) { + D_ERROR("NULL context passed\n"); + rc = -DER_INVAL; + } else { + *timeout_sec = ctx->cc_timeout_sec; + } + + return rc; +} + /* Force complete the rpc. Used for handling of unreachable rpcs */ void crt_req_force_completion(struct crt_rpc_priv *rpc_priv) diff --git a/src/dtx/dtx_coll.c b/src/dtx/dtx_coll.c index 11e02ba2d39f..3ae69648c02d 100644 --- a/src/dtx/dtx_coll.c +++ b/src/dtx/dtx_coll.c @@ -305,7 +305,7 @@ dtx_coll_local_one(void *args) rc = vos_dtx_abort(cont->sc_hdl, &dcla->dcla_xid, dcla->dcla_epoch); break; case DTX_COLL_CHECK: - rc = vos_dtx_check(cont->sc_hdl, &dcla->dcla_xid, NULL, NULL, NULL, NULL, false); + rc = vos_dtx_check(cont->sc_hdl, &dcla->dcla_xid, NULL, NULL, NULL, false); if (rc == DTX_ST_INITED) { /* * For DTX_CHECK, non-ready one is equal to non-exist. Do not directly diff --git a/src/dtx/dtx_common.c b/src/dtx/dtx_common.c index 7b879ff2900b..ae9055793981 100644 --- a/src/dtx/dtx_common.c +++ b/src/dtx/dtx_common.c @@ -49,6 +49,7 @@ struct dtx_batched_cont_args { struct dtx_batched_pool_args *dbca_pool; int dbca_refs; uint32_t dbca_reg_gen; + uint32_t dbca_cleanup_thd; uint32_t dbca_deregister:1, dbca_cleanup_done:1, dbca_commit_done:1, @@ -68,6 +69,7 @@ struct dtx_cleanup_cb_args { d_list_t dcca_pc_list; int dcca_st_count; int dcca_pc_count; + uint32_t dcca_cleanup_thd; }; static inline void @@ -226,7 +228,7 @@ dtx_cleanup_iter_cb(uuid_t co_uuid, vos_iter_entry_t *ent, void *args) return 0; /* Stop the iteration if current DTX is not too old. */ - if (dtx_sec2age(ent->ie_dtx_start_time) <= DTX_CLEANUP_THD_AGE_LO) + if (dtx_sec2age(ent->ie_dtx_start_time) <= dcca->dcca_cleanup_thd) return 1; D_ASSERT(ent->ie_dtx_mbs_dsize > 0); @@ -336,6 +338,8 @@ dtx_cleanup(void *arg) D_INIT_LIST_HEAD(&dcca.dcca_pc_list); dcca.dcca_st_count = 0; dcca.dcca_pc_count = 0; + /* Cleanup stale DTX entries within about 10 seconds windows each time. */ + dcca.dcca_cleanup_thd = dbca->dbca_cleanup_thd - 10; rc = ds_cont_iter(cont->sc_pool->spc_hdl, cont->sc_uuid, dtx_cleanup_iter_cb, &dcca, VOS_ITER_DTX, 0); if (rc < 0) @@ -626,7 +630,7 @@ dtx_batched_commit_one(void *arg) int rc; cnt = dtx_fetch_committable(cont, DTX_THRESHOLD_COUNT, NULL, - DAOS_EPOCH_MAX, &dtes, &dcks, &dce); + DAOS_EPOCH_MAX, false, &dtes, &dcks, &dce); if (cnt == 0) break; @@ -754,7 +758,7 @@ dtx_batched_commit(void *arg) if (dtx_cont_opened(cont) && !dbca->dbca_deregister && dbca->dbca_cleanup_req == NULL && stat.dtx_oldest_active_time != 0 && - dtx_sec2age(stat.dtx_oldest_active_time) >= DTX_CLEANUP_THD_AGE_UP) { + dtx_sec2age(stat.dtx_oldest_active_time) >= dbca->dbca_cleanup_thd) { D_ASSERT(!dbca->dbca_cleanup_done); dtx_get_dbca(dbca); @@ -1267,6 +1271,7 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul uint32_t flags; int status = -1; int rc = 0; + int i; bool aborted = false; bool unpin = false; @@ -1366,7 +1371,7 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul D_ASSERT(dth->dth_mbs != NULL); if (dlh->dlh_coll) { - rc = dtx_add_cos(cont, dlh->dlh_coll_entry, &dth->dth_leader_oid, + rc = dtx_cos_add(cont, dlh->dlh_coll_entry, &dth->dth_leader_oid, dth->dth_dkey_hash, dth->dth_epoch, DCF_EXP_CMT | DCF_COLL); } else { size = sizeof(*dte) + sizeof(*mbs) + dth->dth_mbs->dm_data_size; @@ -1391,7 +1396,7 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul else flags = 0; - rc = dtx_add_cos(cont, dte, &dth->dth_leader_oid, dth->dth_dkey_hash, + rc = dtx_cos_add(cont, dte, &dth->dth_leader_oid, dth->dth_dkey_hash, dth->dth_epoch, flags); dtx_entry_put(dte); } @@ -1480,11 +1485,13 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul * It is harmless to keep some partially committed DTX entries in CoS cache. */ if (result == 0 && dth->dth_cos_done) { - int i; - for (i = 0; i < dth->dth_dti_cos_count; i++) - dtx_del_cos(cont, &dth->dth_dti_cos[i], + dtx_cos_del(cont, &dth->dth_dti_cos[i], &dth->dth_leader_oid, dth->dth_dkey_hash); + } else { + for (i = 0; i < dth->dth_dti_cos_count; i++) + dtx_cos_put_piggyback(cont, &dth->dth_dti_cos[i], + &dth->dth_leader_oid, dth->dth_dkey_hash); } D_DEBUG(DB_IO, "Stop the DTX "DF_DTI" ver %u, dkey %lu, %s, cos %d/%d: result "DF_RC"\n", @@ -1644,7 +1651,7 @@ dtx_flush_on_close(struct dss_module_info *dmi, struct dtx_batched_cont_args *db struct dtx_coll_entry *dce = NULL; cnt = dtx_fetch_committable(cont, DTX_THRESHOLD_COUNT, - NULL, DAOS_EPOCH_MAX, &dtes, &dcks, &dce); + NULL, DAOS_EPOCH_MAX, true, &dtes, &dcks, &dce); if (cnt <= 0) D_GOTO(out, rc = cnt); @@ -1774,6 +1781,7 @@ dtx_cont_register(struct ds_cont_child *cont) struct dtx_batched_pool_args *dbpa = NULL; struct dtx_batched_cont_args *dbca = NULL; struct umem_attr uma; + uint32_t timeout; int rc; bool new_pool = true; @@ -1812,6 +1820,19 @@ dtx_cont_register(struct ds_cont_child *cont) if (dbca == NULL) D_GOTO(out, rc = -DER_NOMEM); + rc = crt_context_get_timeout(dmi->dmi_ctx, &timeout); + if (rc != 0) { + D_ERROR("Failed to get DTX cleanup timeout: "DF_RC"\n", DP_RC(rc)); + goto out; + } + + /* + * Give related DTX leader sometime after default RPC timeout to commit or abort + * the DTX. If the DTX is still prepared after that, then trigger DTX cleanup to + * handle potential stale DTX entries. + */ + dbca->dbca_cleanup_thd = timeout + DTX_COMMIT_THRESHOLD_AGE * 2; + memset(&uma, 0, sizeof(uma)); uma.uma_id = UMEM_CLASS_VMEM; rc = dbtree_create_inplace_ex(DBTREE_CLASS_DTX_COS, 0, @@ -1967,7 +1988,7 @@ dtx_handle_resend(daos_handle_t coh, struct dtx_id *dti, */ return -DER_NONEXIST; - rc = vos_dtx_check(coh, dti, epoch, pm_ver, NULL, NULL, false); + rc = vos_dtx_check(coh, dti, epoch, pm_ver, NULL, false); switch (rc) { case DTX_ST_INITED: return -DER_INPROGRESS; @@ -2000,14 +2021,6 @@ dtx_handle_resend(daos_handle_t coh, struct dtx_id *dti, } } -/* - * If a large transaction has sub-requests to dispatch to a lot of DTX participants, - * then we may have to split the dispatch process to multiple steps; otherwise, the - * dispatch process may trigger too many in-flight or in-queued RPCs that will hold - * too much resource as to server maybe out of memory. - */ -#define DTX_EXEC_STEP_LENGTH DTX_THRESHOLD_COUNT - struct dtx_chore { struct dss_chore chore; dtx_sub_func_t func; @@ -2186,8 +2199,8 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, dlh->dlh_need_agg = 0; dlh->dlh_agg_done = 0; - if (sub_cnt > DTX_EXEC_STEP_LENGTH) { - dlh->dlh_forward_cnt = DTX_EXEC_STEP_LENGTH; + if (sub_cnt > DTX_RPC_STEP_LENGTH) { + dlh->dlh_forward_cnt = DTX_RPC_STEP_LENGTH; } else { dlh->dlh_forward_cnt = sub_cnt; if (likely(dlh->dlh_delay_sub_cnt == 0) && agg_cb != NULL) @@ -2237,7 +2250,7 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, sub_cnt -= dlh->dlh_forward_cnt; if (sub_cnt > 0) { dlh->dlh_forward_idx += dlh->dlh_forward_cnt; - if (sub_cnt <= DTX_EXEC_STEP_LENGTH) { + if (sub_cnt <= DTX_RPC_STEP_LENGTH) { dlh->dlh_forward_cnt = sub_cnt; if (likely(dlh->dlh_delay_sub_cnt == 0) && agg_cb != NULL) dlh->dlh_need_agg = 1; @@ -2337,7 +2350,7 @@ dtx_obj_sync(struct ds_cont_child *cont, daos_unit_oid_t *oid, struct dtx_coll_entry *dce = NULL; cnt = dtx_fetch_committable(cont, DTX_THRESHOLD_COUNT, oid, - epoch, &dtes, &dcks, &dce); + epoch, true, &dtes, &dcks, &dce); if (cnt <= 0) { rc = cnt; if (rc < 0) diff --git a/src/dtx/dtx_cos.c b/src/dtx/dtx_cos.c index 9442adf2248e..926aeba51994 100644 --- a/src/dtx/dtx_cos.c +++ b/src/dtx/dtx_cos.c @@ -60,8 +60,9 @@ struct dtx_cos_rec_child { }; /* The DTX epoch. */ daos_epoch_t dcrc_epoch; - /* For non-collective DTX, it points to the dtx_cos_rec. */ struct dtx_cos_rec *dcrc_ptr; + uint32_t dcrc_piggyback_refs; + uint32_t dcrc_coll:1; /* For collective DTX. */ }; struct dtx_cos_rec_bundle { @@ -129,14 +130,13 @@ dtx_cos_rec_alloc(struct btr_instance *tins, d_iov_t *key_iov, } dcrc->dcrc_epoch = rbund->epoch; + dcrc->dcrc_ptr = dcr; if (rbund->flags & DCF_COLL) { - /* Set dcrc_ptr as NULL to indicate that it is collective DTX. */ - dcrc->dcrc_ptr = NULL; + dcrc->dcrc_coll = 1; dcrc->dcrc_dce = dtx_coll_entry_get(rbund->entry); d_list_add_tail(&dcrc->dcrc_gl_committable, &cont->sc_dtx_coll_list); cont->sc_dtx_committable_coll_count++; } else { - dcrc->dcrc_ptr = dcr; dcrc->dcrc_dte = dtx_entry_get(rbund->entry); d_list_add_tail(&dcrc->dcrc_gl_committable, &cont->sc_dtx_cos_list); } @@ -177,11 +177,11 @@ dtx_cos_rec_free(struct btr_instance *tins, struct btr_record *rec, void *args) dcrc_lo_link) { d_list_del(&dcrc->dcrc_lo_link); d_list_del(&dcrc->dcrc_gl_committable); - if (dcrc->dcrc_ptr != NULL) { - dtx_entry_put(dcrc->dcrc_dte); - } else { + if (dcrc->dcrc_coll) { dtx_coll_entry_put(dcrc->dcrc_dce); coll++; + } else { + dtx_entry_put(dcrc->dcrc_dte); } D_FREE(dcrc); dec++; @@ -190,11 +190,11 @@ dtx_cos_rec_free(struct btr_instance *tins, struct btr_record *rec, void *args) dcrc_lo_link) { d_list_del(&dcrc->dcrc_lo_link); d_list_del(&dcrc->dcrc_gl_committable); - if (dcrc->dcrc_ptr != NULL) { - dtx_entry_put(dcrc->dcrc_dte); - } else { + if (dcrc->dcrc_coll) { dtx_coll_entry_put(dcrc->dcrc_dce); coll++; + } else { + dtx_entry_put(dcrc->dcrc_dte); } D_FREE(dcrc); dec++; @@ -203,11 +203,11 @@ dtx_cos_rec_free(struct btr_instance *tins, struct btr_record *rec, void *args) dcrc_lo_link) { d_list_del(&dcrc->dcrc_lo_link); d_list_del(&dcrc->dcrc_gl_committable); - if (dcrc->dcrc_ptr != NULL) { - dtx_entry_put(dcrc->dcrc_dte); - } else { + if (dcrc->dcrc_coll) { dtx_coll_entry_put(dcrc->dcrc_dce); coll++; + } else { + dtx_entry_put(dcrc->dcrc_dte); } D_FREE(dcrc); dec++; @@ -257,14 +257,13 @@ dtx_cos_rec_update(struct btr_instance *tins, struct btr_record *rec, return -DER_NOMEM; dcrc->dcrc_epoch = rbund->epoch; + dcrc->dcrc_ptr = dcr; if (rbund->flags & DCF_COLL) { - /* Set dcrc_ptr as NULL to indicate that it is collective DTX. */ - dcrc->dcrc_ptr = NULL; + dcrc->dcrc_coll = 1; dcrc->dcrc_dce = dtx_coll_entry_get(rbund->entry); d_list_add_tail(&dcrc->dcrc_gl_committable, &cont->sc_dtx_coll_list); cont->sc_dtx_committable_coll_count++; } else { - dcrc->dcrc_ptr = dcr; dcrc->dcrc_dte = dtx_entry_get(rbund->entry); d_list_add_tail(&dcrc->dcrc_gl_committable, &cont->sc_dtx_cos_list); } @@ -297,7 +296,7 @@ btr_ops_t dtx_btr_cos_ops = { int dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt, - daos_unit_oid_t *oid, daos_epoch_t epoch, + daos_unit_oid_t *oid, daos_epoch_t epoch, bool force, struct dtx_entry ***dtes, struct dtx_cos_key **dcks, struct dtx_coll_entry **p_dce) { @@ -307,9 +306,11 @@ dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt, uint32_t count; uint32_t i = 0; + /* Process collective DXT with higher priority. */ if (!d_list_empty(&cont->sc_dtx_coll_list) && oid == NULL) { d_list_for_each_entry(dcrc, &cont->sc_dtx_coll_list, dcrc_gl_committable) { - if (epoch >= dcrc->dcrc_epoch) { + if (epoch >= dcrc->dcrc_epoch && + (dcrc->dcrc_piggyback_refs == 0 || force)) { D_ALLOC_PTR(dck_buf); if (dck_buf == NULL) return -DER_NOMEM; @@ -346,17 +347,16 @@ dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt, daos_unit_oid_compare(dcrc->dcrc_ptr->dcr_oid, *oid) != 0) continue; - if (epoch < dcrc->dcrc_epoch) + if (epoch < dcrc->dcrc_epoch || (dcrc->dcrc_piggyback_refs > 0 && !force)) continue; - dck_buf[i].oid = dcrc->dcrc_ptr->dcr_oid; - dck_buf[i].dkey_hash = dcrc->dcrc_ptr->dcr_dkey_hash; - - if (unlikely(oid != NULL && dcrc->dcrc_ptr == NULL)) { + if (unlikely(oid != NULL && dcrc->dcrc_coll)) { if (i > 0) continue; D_FREE(dte_buf); + dck_buf[i].oid = dcrc->dcrc_ptr->dcr_oid; + dck_buf[i].dkey_hash = dcrc->dcrc_ptr->dcr_dkey_hash; *dcks = dck_buf; *p_dce = dtx_coll_entry_get(dcrc->dcrc_dce); @@ -364,6 +364,9 @@ dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt, } dte_buf[i] = dtx_entry_get(dcrc->dcrc_dte); + dck_buf[i].oid = dcrc->dcrc_ptr->dcr_oid; + dck_buf[i].dkey_hash = dcrc->dcrc_ptr->dcr_dkey_hash; + if (++i >= count) break; } @@ -381,8 +384,8 @@ dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt, } int -dtx_list_cos(struct ds_cont_child *cont, daos_unit_oid_t *oid, - uint64_t dkey_hash, int max, struct dtx_id **dtis) +dtx_cos_get_piggyback(struct ds_cont_child *cont, daos_unit_oid_t *oid, + uint64_t dkey_hash, int max, struct dtx_id **dtis) { struct dtx_cos_key key; d_iov_t kiov; @@ -423,6 +426,7 @@ dtx_list_cos(struct ds_cont_child *cont, daos_unit_oid_t *oid, d_list_for_each_entry(dcrc, &dcr->dcr_prio_list, dcrc_lo_link) { dti[i] = dcrc->dcrc_dte->dte_xid; + dcrc->dcrc_piggyback_refs++; if (++i >= count) break; } @@ -433,8 +437,40 @@ dtx_list_cos(struct ds_cont_child *cont, daos_unit_oid_t *oid, return count; } +void +dtx_cos_put_piggyback(struct ds_cont_child *cont, struct dtx_id *xid, + daos_unit_oid_t *oid, uint64_t dkey_hash) +{ + struct dtx_cos_key key; + d_iov_t kiov; + d_iov_t riov; + struct dtx_cos_rec *dcr; + struct dtx_cos_rec_child *dcrc; + int rc; + + key.oid = *oid; + key.dkey_hash = dkey_hash; + d_iov_set(&kiov, &key, sizeof(key)); + d_iov_set(&riov, NULL, 0); + + /* It is normal that the DTX entry (to be put) in CoS has already been removed by race. */ + + rc = dbtree_lookup(cont->sc_dtx_cos_hdl, &kiov, &riov); + if (rc != 0) + return; + + dcr = (struct dtx_cos_rec *)riov.iov_buf; + + d_list_for_each_entry(dcrc, &dcr->dcr_prio_list, dcrc_lo_link) { + if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) { + dcrc->dcrc_piggyback_refs--; + return; + } + } +} + int -dtx_add_cos(struct ds_cont_child *cont, void *entry, daos_unit_oid_t *oid, +dtx_cos_add(struct ds_cont_child *cont, void *entry, daos_unit_oid_t *oid, uint64_t dkey_hash, daos_epoch_t epoch, uint32_t flags) { struct dtx_cos_key key; @@ -475,7 +511,7 @@ dtx_add_cos(struct ds_cont_child *cont, void *entry, daos_unit_oid_t *oid, } int -dtx_del_cos(struct ds_cont_child *cont, struct dtx_id *xid, +dtx_cos_del(struct ds_cont_child *cont, struct dtx_id *xid, daos_unit_oid_t *oid, uint64_t dkey_hash) { struct dtx_cos_key key; @@ -503,11 +539,11 @@ dtx_del_cos(struct ds_cont_child *cont, struct dtx_id *xid, d_list_del(&dcrc->dcrc_gl_committable); d_list_del(&dcrc->dcrc_lo_link); - if (dcrc->dcrc_ptr != NULL) { - dtx_entry_put(dcrc->dcrc_dte); - } else { + if (dcrc->dcrc_coll) { dtx_coll_entry_put(dcrc->dcrc_dce); cont->sc_dtx_committable_coll_count--; + } else { + dtx_entry_put(dcrc->dcrc_dte); } D_FREE(dcrc); @@ -523,11 +559,11 @@ dtx_del_cos(struct ds_cont_child *cont, struct dtx_id *xid, d_list_del(&dcrc->dcrc_gl_committable); d_list_del(&dcrc->dcrc_lo_link); - if (dcrc->dcrc_ptr != NULL) { - dtx_entry_put(dcrc->dcrc_dte); - } else { + if (dcrc->dcrc_coll) { dtx_coll_entry_put(dcrc->dcrc_dce); cont->sc_dtx_committable_coll_count--; + } else { + dtx_entry_put(dcrc->dcrc_dte); } D_FREE(dcrc); @@ -543,11 +579,11 @@ dtx_del_cos(struct ds_cont_child *cont, struct dtx_id *xid, d_list_del(&dcrc->dcrc_gl_committable); d_list_del(&dcrc->dcrc_lo_link); - if (dcrc->dcrc_ptr != NULL) { - dtx_entry_put(dcrc->dcrc_dte); - } else { + if (dcrc->dcrc_coll) { dtx_coll_entry_put(dcrc->dcrc_dce); cont->sc_dtx_committable_coll_count--; + } else { + dtx_entry_put(dcrc->dcrc_dte); } D_FREE(dcrc); @@ -591,3 +627,63 @@ dtx_cos_oldest(struct ds_cont_child *cont) return dcrc->dcrc_epoch; } + +void +dtx_cos_prio(struct ds_cont_child *cont, struct dtx_id *xid, + daos_unit_oid_t *oid, uint64_t dkey_hash) +{ + struct dtx_cos_key key; + d_iov_t kiov; + d_iov_t riov; + struct dtx_cos_rec *dcr; + struct dtx_cos_rec_child *dcrc; + int rc; + bool found = false; + + key.oid = *oid; + key.dkey_hash = dkey_hash; + d_iov_set(&kiov, &key, sizeof(key)); + d_iov_set(&riov, NULL, 0); + + rc = dbtree_lookup(cont->sc_dtx_cos_hdl, &kiov, &riov); + if (rc != 0) + goto out; + + dcr = (struct dtx_cos_rec *)riov.iov_buf; + + d_list_for_each_entry(dcrc, &dcr->dcr_reg_list, dcrc_lo_link) { + if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) { + d_list_del(&dcrc->dcrc_lo_link); + d_list_add(&dcrc->dcrc_lo_link, &dcr->dcr_prio_list); + dcr->dcr_reg_count--; + dcr->dcr_prio_count++; + + D_GOTO(out, found = true); + } + } + + d_list_for_each_entry(dcrc, &dcr->dcr_prio_list, dcrc_lo_link) { + if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) { + d_list_del(&dcrc->dcrc_lo_link); + d_list_add(&dcrc->dcrc_lo_link, &dcr->dcr_prio_list); + + D_GOTO(out, found = true); + } + } + + d_list_for_each_entry(dcrc, &dcr->dcr_expcmt_list, dcrc_lo_link) { + if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) + D_GOTO(out, found = true); + } + +out: + if (found) { + d_list_del(&dcrc->dcrc_gl_committable); + if (dcrc->dcrc_coll) + d_list_add(&dcrc->dcrc_gl_committable, &cont->sc_dtx_coll_list); + else + d_list_add(&dcrc->dcrc_gl_committable, &cont->sc_dtx_cos_list); + } + + /* It is normal that the DTX entry (for priority) in CoS has been committed by race. */ +} diff --git a/src/dtx/dtx_internal.h b/src/dtx/dtx_internal.h index e59baede09c4..a42bcc1d7d68 100644 --- a/src/dtx/dtx_internal.h +++ b/src/dtx/dtx_internal.h @@ -94,17 +94,6 @@ CRT_RPC_DECLARE(dtx_coll, DAOS_ISEQ_COLL_DTX, DAOS_OSEQ_COLL_DTX); #define DTX_YIELD_CYCLE (DTX_THRESHOLD_COUNT >> 3) -/* The time threshold for triggering DTX cleanup of stale entries. - * If the oldest active DTX exceeds such threshold, it will trigger - * DTX cleanup locally. - */ -#define DTX_CLEANUP_THD_AGE_UP 90 - -/* If DTX cleanup for stale entries is triggered, then the DTXs with - * older ages than this threshold will be cleanup. - */ -#define DTX_CLEANUP_THD_AGE_LO 75 - /* The count threshold (per pool) for triggering DTX aggregation. */ #define DTX_AGG_THD_CNT_MAX (1 << 24) #define DTX_AGG_THD_CNT_MIN (1 << 20) @@ -192,6 +181,14 @@ extern uint32_t dtx_batched_ult_max; */ #define DTX_COLL_TREE_WIDTH 8 +/* + * If a large transaction has sub-requests to dispatch to a lot of DTX participants, + * then we may have to split the dispatch process to multiple steps; otherwise, the + * dispatch process may trigger too many in-flight or in-queued RPCs that will hold + * too much resource as to server maybe out of memory. + */ +#define DTX_RPC_STEP_LENGTH DTX_THRESHOLD_COUNT + extern struct crt_corpc_ops dtx_coll_commit_co_ops; extern struct crt_corpc_ops dtx_coll_abort_co_ops; extern struct crt_corpc_ops dtx_coll_check_co_ops; @@ -255,14 +252,16 @@ int dtx_leader_get(struct ds_pool *pool, struct dtx_memberships *mbs, /* dtx_cos.c */ int dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt, - daos_unit_oid_t *oid, daos_epoch_t epoch, + daos_unit_oid_t *oid, daos_epoch_t epoch, bool force, struct dtx_entry ***dtes, struct dtx_cos_key **dcks, struct dtx_coll_entry **p_dce); -int dtx_add_cos(struct ds_cont_child *cont, void *entry, daos_unit_oid_t *oid, +int dtx_cos_add(struct ds_cont_child *cont, void *entry, daos_unit_oid_t *oid, uint64_t dkey_hash, daos_epoch_t epoch, uint32_t flags); -int dtx_del_cos(struct ds_cont_child *cont, struct dtx_id *xid, +int dtx_cos_del(struct ds_cont_child *cont, struct dtx_id *xid, daos_unit_oid_t *oid, uint64_t dkey_hash); uint64_t dtx_cos_oldest(struct ds_cont_child *cont); +void dtx_cos_prio(struct ds_cont_child *cont, struct dtx_id *xid, + daos_unit_oid_t *oid, uint64_t dkey_hash); /* dtx_rpc.c */ int dtx_check(struct ds_cont_child *cont, struct dtx_entry *dte, diff --git a/src/dtx/dtx_resync.c b/src/dtx/dtx_resync.c index 3698f2384420..b98a6469954c 100644 --- a/src/dtx/dtx_resync.c +++ b/src/dtx/dtx_resync.c @@ -85,7 +85,7 @@ dtx_resync_commit(struct ds_cont_child *cont, * committed or aborted the DTX during we handling other * DTXs. So double check the status before current commit. */ - rc = vos_dtx_check(cont->sc_hdl, &dre->dre_xid, NULL, NULL, NULL, NULL, false); + rc = vos_dtx_check(cont->sc_hdl, &dre->dre_xid, NULL, NULL, NULL, false); /* Skip this DTX since it has been committed or aggregated. */ if (rc == DTX_ST_COMMITTED || rc == DTX_ST_COMMITTABLE || rc == -DER_NONEXIST) @@ -301,7 +301,7 @@ dtx_status_handle_one(struct ds_cont_child *cont, struct dtx_entry *dte, daos_un * committed or aborted the DTX during we handling other * DTXs. So double check the status before next action. */ - rc = vos_dtx_check(cont->sc_hdl, &dte->dte_xid, NULL, NULL, NULL, NULL, false); + rc = vos_dtx_check(cont->sc_hdl, &dte->dte_xid, NULL, NULL, NULL, false); /* Skip the DTX that may has been committed or aborted. */ if (rc == DTX_ST_COMMITTED || rc == DTX_ST_COMMITTABLE || rc == -DER_NONEXIST) diff --git a/src/dtx/dtx_rpc.c b/src/dtx/dtx_rpc.c index 2af60538348a..f44edf895721 100644 --- a/src/dtx/dtx_rpc.c +++ b/src/dtx/dtx_rpc.c @@ -354,7 +354,7 @@ struct dtx_common_args { daos_handle_t dca_tree_hdl; daos_epoch_t dca_epoch; int dca_count; - int dca_committed; + int dca_steps; d_rank_t dca_rank; uint32_t dca_tgtid; struct ds_cont_child *dca_cont; @@ -369,37 +369,27 @@ struct dtx_common_args { /* If is_reentrance, this function ignores len. */ static int -dtx_req_list_send(struct dtx_common_args *dca, daos_epoch_t epoch, int len, bool is_reentrance) +dtx_req_list_send(struct dtx_common_args *dca, bool is_reentrance) { struct dtx_req_args *dra = &dca->dca_dra; int rc; if (!is_reentrance) { - dra->dra_length = len; + dra->dra_length = dca->dca_steps; + dca->dca_i = 0; - rc = ABT_future_create(len, dtx_req_list_cb, &dra->dra_future); + rc = ABT_future_create(dca->dca_steps, dtx_req_list_cb, &dra->dra_future); if (rc != ABT_SUCCESS) { - D_ERROR("ABT_future_create failed for opc %x, len = %d: " - "rc = %d.\n", dra->dra_opc, len, rc); + D_ERROR("ABT_future_create failed for opc %x, len %d: rc %d.\n", + dra->dra_opc, dca->dca_steps, rc); return dss_abterr2der(rc); } D_DEBUG(DB_TRACE, "%p: DTX req for opc %x, future %p (%d) start.\n", - &dca->dca_chore, dra->dra_opc, dra->dra_future, len); + &dca->dca_chore, dra->dra_opc, dra->dra_future, dca->dca_steps); } - /* - * Begin or continue an iteration over dca_head. When beginning the - * iteration, dca->dca_drr does not point to a real entry, and is only - * safe for d_list_for_each_entry_continue. - */ - if (!is_reentrance) { - dca->dca_drr = d_list_entry(&dca->dca_head, struct dtx_req_rec, drr_link); - dca->dca_i = 0; - } - /* DO NOT add any line here! See the comment on dca->dca_drr above. */ - d_list_for_each_entry_continue(dca->dca_drr, &dca->dca_head, drr_link) - { + while (1) { D_DEBUG(DB_TRACE, "chore=%p: drr=%p i=%d\n", &dca->dca_chore, dca->dca_drr, dca->dca_i); @@ -410,7 +400,7 @@ dtx_req_list_send(struct dtx_common_args *dca, daos_epoch_t epoch, int len, bool DAOS_FAIL_CHECK(DAOS_DTX_FAIL_COMMIT))) rc = dtx_req_send(dca->dca_drr, 1); else - rc = dtx_req_send(dca->dca_drr, epoch); + rc = dtx_req_send(dca->dca_drr, dca->dca_epoch); if (rc != 0) { /* If the first sub-RPC failed, then break, otherwise * other remote replicas may have already received the @@ -422,8 +412,15 @@ dtx_req_list_send(struct dtx_common_args *dca, daos_epoch_t epoch, int len, bool } } + /* dca->dca_drr maybe not points to a real entry if all RPCs have been sent. */ + dca->dca_drr = d_list_entry(dca->dca_drr->drr_link.next, + struct dtx_req_rec, drr_link); + + if (++(dca->dca_i) >= dca->dca_steps) + break; + /* Yield to avoid holding CPU for too long time. */ - if (++(dca->dca_i) % DTX_RPC_YIELD_THD == 0) + if (dca->dca_i % DTX_RPC_YIELD_THD == 0) return DSS_CHORE_YIELD; } @@ -615,63 +612,13 @@ static enum dss_chore_status dtx_rpc_helper(struct dss_chore *chore, bool is_reentrance) { struct dtx_common_args *dca = container_of(chore, struct dtx_common_args, dca_chore); - struct ds_pool *pool = dca->dca_cont->sc_pool->spc_pool; - struct umem_attr uma = { 0 }; - int length = 0; int rc; - int i; - - if (is_reentrance) { - D_DEBUG(DB_TRACE, "%p: skip to send\n", &dca->dca_chore); - goto send; - } - - if (dca->dca_dtes != NULL) { - D_ASSERT(dca->dca_dtis != NULL); - if (dca->dca_count > 1) { - uma.uma_id = UMEM_CLASS_VMEM; - rc = dbtree_create_inplace(DBTREE_CLASS_DTX_CF, 0, DTX_CF_BTREE_ORDER, - &uma, &dca->dca_tree_root, &dca->dca_tree_hdl); - if (rc != 0) - goto done; - } - - ABT_rwlock_rdlock(pool->sp_lock); - for (i = 0; i < dca->dca_count; i++) { - rc = dtx_classify_one(pool, dca->dca_tree_hdl, &dca->dca_head, &length, - dca->dca_dtes[i], dca->dca_count, - dca->dca_rank, dca->dca_tgtid, dca->dca_dra.dra_opc); - if (rc < 0) { - ABT_rwlock_unlock(pool->sp_lock); - goto done; - } - - daos_dti_copy(&dca->dca_dtis[i], &dca->dca_dtes[i]->dte_xid); - } - ABT_rwlock_unlock(pool->sp_lock); - - /* For DTX_CHECK, if no other available target(s), then current target is the - * unique valid one (and also 'prepared'), then related DTX can be committed. - */ - if (d_list_empty(&dca->dca_head)) { - rc = (dca->dca_dra.dra_opc == DTX_CHECK ? DTX_ST_PREPARED : 0); - goto done; - } - } else { - length = dca->dca_count; - } - - D_ASSERT(length > 0); - -send: - rc = dtx_req_list_send(dca, dca->dca_epoch, length, is_reentrance); + rc = dtx_req_list_send(dca, is_reentrance); if (rc == DSS_CHORE_YIELD) return DSS_CHORE_YIELD; if (rc == DSS_CHORE_DONE) rc = 0; - -done: if (rc != 0) dca->dca_dra.dra_result = rc; D_CDEBUG(rc < 0, DLOG_ERR, DB_TRACE, "%p: DTX RPC chore for %u done: %d\n", chore, @@ -684,12 +631,17 @@ dtx_rpc_helper(struct dss_chore *chore, bool is_reentrance) } static int -dtx_rpc_prep(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry **dtes, - uint32_t count, int opc, daos_epoch_t epoch, d_list_t *cmt_list, - d_list_t *abt_list, d_list_t *act_list, struct dtx_common_args *dca) +dtx_rpc(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry **dtes, uint32_t count, + int opc, daos_epoch_t epoch, d_list_t *cmt_list, d_list_t *abt_list, d_list_t *act_list, + bool keep_head, struct dtx_common_args *dca) { + struct ds_pool *pool = cont->sc_pool->spc_pool; + struct dtx_req_rec *drr; struct dtx_req_args *dra; + struct umem_attr uma = { 0 }; + int length = 0; int rc = 0; + int i; memset(dca, 0, sizeof(*dca)); @@ -709,7 +661,7 @@ dtx_rpc_prep(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry ** dra->dra_abt_list = abt_list; dra->dra_act_list = act_list; dra->dra_opc = opc; - uuid_copy(dra->dra_po_uuid, cont->sc_pool->spc_pool->sp_uuid); + uuid_copy(dra->dra_po_uuid, pool->sp_uuid); uuid_copy(dra->dra_co_uuid, cont->sc_uuid); if (dti_list != NULL) { @@ -725,39 +677,111 @@ dtx_rpc_prep(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry ** } } - /* Use helper ULT to handle DTX RPC if there are enough helper XS. */ - if (dss_has_enough_helper()) { - rc = ABT_eventual_create(0, &dca->dca_chore_eventual); - if (rc != ABT_SUCCESS) { - D_ERROR("failed to create eventual: %d\n", rc); - rc = dss_abterr2der(rc); + if (dca->dca_dtes != NULL) { + D_ASSERT(dca->dca_dtis != NULL); + + if (dca->dca_count > 1) { + uma.uma_id = UMEM_CLASS_VMEM; + rc = dbtree_create_inplace(DBTREE_CLASS_DTX_CF, 0, DTX_CF_BTREE_ORDER, + &uma, &dca->dca_tree_root, &dca->dca_tree_hdl); + if (rc != 0) + goto out; + } + + ABT_rwlock_rdlock(pool->sp_lock); + for (i = 0; i < dca->dca_count; i++) { + rc = dtx_classify_one(pool, dca->dca_tree_hdl, &dca->dca_head, &length, + dca->dca_dtes[i], dca->dca_count, + dca->dca_rank, dca->dca_tgtid, dca->dca_dra.dra_opc); + if (rc != 0) { + ABT_rwlock_unlock(pool->sp_lock); + goto out; + } + + daos_dti_copy(&dca->dca_dtis[i], &dca->dca_dtes[i]->dte_xid); + } + ABT_rwlock_unlock(pool->sp_lock); + + /* For DTX_CHECK, if no other available target(s), then current target is the + * unique valid one (and also 'prepared'), then related DTX can be committed. + */ + if (d_list_empty(&dca->dca_head)) { + rc = (dca->dca_dra.dra_opc == DTX_CHECK ? DTX_ST_PREPARED : 0); goto out; } - rc = dss_chore_delegate(&dca->dca_chore, dtx_rpc_helper); } else { - dss_chore_diy(&dca->dca_chore, dtx_rpc_helper); - rc = dca->dca_dra.dra_result; + D_ASSERT(!d_list_empty(&dca->dca_head)); + + length = dca->dca_count; } -out: - return rc; -} + dca->dca_drr = d_list_entry(dca->dca_head.next, struct dtx_req_rec, drr_link); -static int -dtx_rpc_post(struct dtx_common_args *dca, int ret, bool keep_head) -{ - struct dtx_req_rec *drr; - int rc; + /* + * Do not send out the batched RPCs all together, instead, we do that step by step to + * avoid holding too much system resources for relative long time. It is also helpful + * to reduce the whole network peak load and the pressure on related peers. + */ + while (length > 0) { + if (length > DTX_RPC_STEP_LENGTH && opc != DTX_CHECK) + dca->dca_steps = DTX_RPC_STEP_LENGTH; + else + dca->dca_steps = length; + + /* Use helper ULT to handle DTX RPC if there are enough helper XS. */ + if (dss_has_enough_helper()) { + rc = ABT_eventual_create(0, &dca->dca_chore_eventual); + if (rc != ABT_SUCCESS) { + D_ERROR("failed to create eventual: %d\n", rc); + rc = dss_abterr2der(rc); + goto out; + } - if (dca->dca_chore_eventual != ABT_EVENTUAL_NULL) { - rc = ABT_eventual_wait(dca->dca_chore_eventual, NULL); - D_ASSERTF(rc == ABT_SUCCESS, "ABT_eventual_wait: %d\n", rc); - rc = ABT_eventual_free(&dca->dca_chore_eventual); - D_ASSERTF(rc == ABT_SUCCESS, "ABT_eventual_free: %d\n", rc); - } + rc = dss_chore_delegate(&dca->dca_chore, dtx_rpc_helper); + if (rc != 0) + goto out; + + rc = ABT_eventual_wait(dca->dca_chore_eventual, NULL); + D_ASSERTF(rc == ABT_SUCCESS, "ABT_eventual_wait: %d\n", rc); + + rc = ABT_eventual_free(&dca->dca_chore_eventual); + D_ASSERTF(rc == ABT_SUCCESS, "ABT_eventual_free: %d\n", rc); + } else { + dss_chore_diy(&dca->dca_chore, dtx_rpc_helper); + } + + rc = dtx_req_wait(&dca->dca_dra); + if (rc == 0 || rc == -DER_NONEXIST) + goto cont; + + switch (opc) { + case DTX_COMMIT: + case DTX_ABORT: + if (rc != -DER_EXCLUDED && rc != -DER_OOG) + goto out; + break; + case DTX_CHECK: + if (rc == DTX_ST_COMMITTED || rc == DTX_ST_COMMITTABLE) + goto out; + /* + * Go ahead even if someone failed, there may be 'COMMITTED' + * in subsequent check, that will overwrite former failure. + */ + break; + case DTX_REFRESH: + D_ASSERTF(length < DTX_RPC_STEP_LENGTH, + "Too long list for DTX refresh: %u vs %u\n", + length, DTX_RPC_STEP_LENGTH); + break; + default: + D_ASSERTF(0, "Invalid DTX opc %u\n", opc); + } - rc = dtx_req_wait(&dca->dca_dra); +cont: + length -= dca->dca_steps; + } +out: if (daos_handle_is_valid(dca->dca_tree_hdl)) dbtree_destroy(dca->dca_tree_hdl, NULL); @@ -767,7 +791,7 @@ dtx_rpc_post(struct dtx_common_args *dca, int ret, bool keep_head) dtx_drr_cleanup(drr); } - return ret != 0 ? ret : rc; + return rc; } /** @@ -796,8 +820,6 @@ dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes, int rc1 = 0; int i; - rc = dtx_rpc_prep(cont, NULL, dtes, count, DTX_COMMIT, 0, NULL, NULL, NULL, &dca); - /* * NOTE: Before committing the DTX on remote participants, we cannot remove the active * DTX locally; otherwise, the local committed DTX entry may be removed via DTX @@ -806,10 +828,8 @@ dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes, * then it will get -DER_TX_UNCERTAIN, that may cause related application to be * failed. So here, we let remote participants to commit firstly, if failed, we * will ask the leader to retry the commit until all participants got committed. - * - * Some RPC may has been sent, so need to wait even if dtx_rpc_prep hit failure. */ - rc = dtx_rpc_post(&dca, rc, false); + rc = dtx_rpc(cont, NULL, dtes, count, DTX_COMMIT, 0, NULL, NULL, NULL, false, &dca); if (rc > 0 || rc == -DER_NONEXIST || rc == -DER_EXCLUDED || rc == -DER_OOG) rc = 0; @@ -846,7 +866,7 @@ dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes, for (i = 0; i < count; i++) { if (rm_cos[i]) { D_ASSERT(!daos_oid_is_null(dcks[i].oid.id_pub)); - dtx_del_cos(cont, &dca.dca_dtis[i], &dcks[i].oid, + dtx_cos_del(cont, &dca.dca_dtis[i], &dcks[i].oid, dcks[i].dkey_hash); } } @@ -878,13 +898,10 @@ dtx_abort(struct ds_cont_child *cont, struct dtx_entry *dte, daos_epoch_t epoch) struct dtx_common_args dca; int rc; int rc1; - int rc2; - rc = dtx_rpc_prep(cont, NULL, &dte, 1, DTX_ABORT, epoch, NULL, NULL, NULL, &dca); - - rc2 = dtx_rpc_post(&dca, rc, false); - if (rc2 > 0 || rc2 == -DER_NONEXIST) - rc2 = 0; + rc = dtx_rpc(cont, NULL, &dte, 1, DTX_ABORT, epoch, NULL, NULL, NULL, false, &dca); + if (rc > 0 || rc == -DER_NONEXIST) + rc = 0; /* * NOTE: The DTX abort maybe triggered by dtx_leader_end() for timeout on some DTX @@ -902,10 +919,10 @@ dtx_abort(struct ds_cont_child *cont, struct dtx_entry *dte, daos_epoch_t epoch) if (rc1 > 0 || rc1 == -DER_NONEXIST) rc1 = 0; - D_CDEBUG(rc1 != 0 || rc2 != 0, DLOG_ERR, DB_TRACE, "Abort DTX "DF_DTI": rc %d %d %d\n", - DP_DTI(&dte->dte_xid), rc, rc1, rc2); + D_CDEBUG(rc1 != 0 || rc != 0, DLOG_ERR, DB_TRACE, "Abort DTX "DF_DTI": rc %d %d\n", + DP_DTI(&dte->dte_xid), rc, rc1); - return rc1 != 0 ? rc1 : rc2; + return rc != 0 ? rc : rc1; } int @@ -913,7 +930,6 @@ dtx_check(struct ds_cont_child *cont, struct dtx_entry *dte, daos_epoch_t epoch) { struct dtx_common_args dca; int rc; - int rc1; /* If no other target, then current target is the unique * one and 'prepared', then related DTX can be committed. @@ -921,14 +937,12 @@ dtx_check(struct ds_cont_child *cont, struct dtx_entry *dte, daos_epoch_t epoch) if (dte->dte_mbs->dm_tgt_cnt == 1) return DTX_ST_PREPARED; - rc = dtx_rpc_prep(cont, NULL, &dte, 1, DTX_CHECK, epoch, NULL, NULL, NULL, &dca); + rc = dtx_rpc(cont, NULL, &dte, 1, DTX_CHECK, epoch, NULL, NULL, NULL, false, &dca); - rc1 = dtx_rpc_post(&dca, rc, false); + D_CDEBUG(rc < 0 && rc != -DER_NONEXIST, DLOG_ERR, DB_TRACE, + "Check DTX "DF_DTI": rc %d\n", DP_DTI(&dte->dte_xid), rc); - D_CDEBUG(rc1 < 0 && rc1 != -DER_NONEXIST, DLOG_ERR, DB_TRACE, - "Check DTX "DF_DTI": rc %d %d\n", DP_DTI(&dte->dte_xid), rc, rc1); - - return rc1; + return rc; } int @@ -1079,9 +1093,8 @@ dtx_refresh_internal(struct ds_cont_child *cont, int *check_count, d_list_t *che } if (len > 0) { - rc = dtx_rpc_prep(cont, &head, NULL, len, DTX_REFRESH, 0, - cmt_list, abt_list, act_list, &dca); - rc = dtx_rpc_post(&dca, rc, for_io); + rc = dtx_rpc(cont, &head, NULL, len, DTX_REFRESH, 0, cmt_list, abt_list, act_list, + for_io, &dca); /* * For IO case, the DTX refresh failure caused by network trouble may be not fatal @@ -1151,7 +1164,7 @@ dtx_refresh_internal(struct ds_cont_child *cont, int *check_count, d_list_t *che */ rc1 = vos_dtx_check(cont->sc_hdl, &dsp->dsp_xid, - NULL, NULL, NULL, NULL, false); + NULL, NULL, NULL, false); if (rc1 == DTX_ST_COMMITTED || rc1 == DTX_ST_COMMITTABLE || rc1 == -DER_NONEXIST) { d_list_del(&dsp->dsp_link); @@ -1182,7 +1195,7 @@ dtx_refresh_internal(struct ds_cont_child *cont, int *check_count, d_list_t *che dtx_dsp_free(dsp); } else if (dsp->dsp_status == -DER_INPROGRESS) { rc1 = vos_dtx_check(cont->sc_hdl, &dsp->dsp_xid, - NULL, NULL, NULL, NULL, false); + NULL, NULL, NULL, false); if (rc1 != DTX_ST_COMMITTED && rc1 != DTX_ST_ABORTED && rc1 != -DER_NONEXIST) { if (!for_io) @@ -1602,7 +1615,7 @@ dtx_coll_commit(struct ds_cont_child *cont, struct dtx_coll_entry *dce, struct d * Otherwise, the batched commit ULT may be blocked by such "bad" entry. */ if (rc2 == 0 && dck != NULL) - dtx_del_cos(cont, &dce->dce_xid, &dck->oid, dck->dkey_hash); + dtx_cos_del(cont, &dce->dce_xid, &dck->oid, dck->dkey_hash); D_CDEBUG(rc != 0 || rc1 != 0 || rc2 != 0, DLOG_ERR, DB_TRACE, "Collectively commit DTX "DF_DTI": %d/%d/%d\n", diff --git a/src/dtx/dtx_srv.c b/src/dtx/dtx_srv.c index cfba7862baba..93cc6744a2df 100644 --- a/src/dtx/dtx_srv.c +++ b/src/dtx/dtx_srv.c @@ -151,7 +151,6 @@ dtx_handler(crt_rpc_t *rpc) struct dtx_out *dout = crt_reply_get(rpc); struct ds_cont_child *cont = NULL; struct dtx_id *dtis; - struct dtx_memberships *mbs[DTX_REFRESH_MAX] = { 0 }; struct dtx_cos_key dcks[DTX_REFRESH_MAX] = { 0 }; uint32_t vers[DTX_REFRESH_MAX] = { 0 }; uint32_t opc = opc_get(rpc->cr_opc); @@ -243,7 +242,7 @@ dtx_handler(crt_rpc_t *rpc) D_GOTO(out, rc = -DER_PROTO); rc = vos_dtx_check(cont->sc_hdl, din->di_dtx_array.ca_arrays, - NULL, NULL, NULL, NULL, false); + NULL, NULL, NULL, false); if (rc == DTX_ST_INITED) { /* For DTX_CHECK, non-ready one is equal to non-exist. Do not directly * return 'DTX_ST_INITED' to avoid interoperability trouble if related @@ -287,8 +286,7 @@ dtx_handler(crt_rpc_t *rpc) for (i = 0, rc1 = 0; i < count; i++) { ptr = (int *)dout->do_sub_rets.ca_arrays + i; dtis = (struct dtx_id *)din->di_dtx_array.ca_arrays + i; - *ptr = vos_dtx_check(cont->sc_hdl, dtis, NULL, &vers[i], &mbs[i], &dcks[i], - true); + *ptr = vos_dtx_check(cont->sc_hdl, dtis, NULL, &vers[i], &dcks[i], true); if (*ptr == -DER_NONEXIST && !(flags[i] & DRF_INITIAL_LEADER)) { struct dtx_stat stat = { 0 }; @@ -312,10 +310,10 @@ dtx_handler(crt_rpc_t *rpc) * it will cause interoperability trouble if remote server is old. */ *ptr = DTX_ST_PREPARED; + } else if (*ptr == DTX_ST_COMMITTABLE) { + /* Higher priority for the DTX, then it can be committed ASAP. */ + dtx_cos_prio(cont, dtis, &dcks[i].oid, dcks[i].dkey_hash); } - - if (mbs[i] != NULL) - rc1++; } break; default: @@ -340,47 +338,6 @@ dtx_handler(crt_rpc_t *rpc) if (likely(dpm != NULL)) d_tm_inc_counter(dpm->dpm_total[opc], 1); - if (opc == DTX_REFRESH && rc1 > 0) { - struct dtx_entry dtes[DTX_REFRESH_MAX] = { 0 }; - struct dtx_entry *pdte[DTX_REFRESH_MAX] = { 0 }; - int j; - - for (i = 0, j = 0; i < count; i++) { - if (mbs[i] == NULL) - continue; - - /* For collective DTX, it will be committed soon. */ - if (mbs[i]->dm_flags & DMF_COLL_TARGET) { - D_FREE(mbs[i]); - continue; - } - - daos_dti_copy(&dtes[j].dte_xid, - (struct dtx_id *)din->di_dtx_array.ca_arrays + i); - dtes[j].dte_ver = vers[i]; - dtes[j].dte_refs = 1; - dtes[j].dte_mbs = mbs[i]; - - pdte[j] = &dtes[j]; - dcks[j] = dcks[i]; - j++; - } - - if (j > 0) { - /* - * Commit the DTX after replied the original refresh request to - * avoid further query the same DTX. - */ - rc = dtx_commit(cont, pdte, dcks, j); - if (rc < 0) - D_WARN("Failed to commit DTX "DF_DTI", count %d: " - DF_RC"\n", DP_DTI(&dtes[0].dte_xid), j, DP_RC(rc)); - - for (i = 0; i < j; i++) - D_FREE(pdte[i]->dte_mbs); - } - } - D_FREE(dout->do_sub_rets.ca_arrays); dout->do_sub_rets.ca_count = 0; diff --git a/src/include/cart/api.h b/src/include/cart/api.h index 2c82b8f4303a..21941d7cb983 100644 --- a/src/include/cart/api.h +++ b/src/include/cart/api.h @@ -188,6 +188,19 @@ crt_context_create_on_iface(const char *iface_name, crt_context_t *crt_ctx); int crt_context_set_timeout(crt_context_t crt_ctx, uint32_t timeout_sec); +/** + * Get the default timeout value for the RPC requests created on the specified context. + * + * This is an optional function. + * + * \param[in] req pointer to RPC request + * \param[out] timeout_sec timeout value in seconds + * + * \return DER_SUCCESS on success, negative value if error + */ +int +crt_context_get_timeout(crt_context_t crt_ctx, uint32_t *timeout_sec); + /** * Destroy CRT transport context. * diff --git a/src/include/daos_srv/dtx_srv.h b/src/include/daos_srv/dtx_srv.h index f7be21b2e0e1..7b3f7e9ee57c 100644 --- a/src/include/daos_srv/dtx_srv.h +++ b/src/include/daos_srv/dtx_srv.h @@ -315,8 +315,11 @@ dtx_begin(daos_handle_t xoh, struct dtx_id *dti, struct dtx_epoch *epoch, int dtx_end(struct dtx_handle *dth, struct ds_cont_child *cont, int result); int -dtx_list_cos(struct ds_cont_child *cont, daos_unit_oid_t *oid, - uint64_t dkey_hash, int max, struct dtx_id **dtis); +dtx_cos_get_piggyback(struct ds_cont_child *cont, daos_unit_oid_t *oid, uint64_t dkey_hash, + int max, struct dtx_id **dtis); +void +dtx_cos_put_piggyback(struct ds_cont_child *cont, struct dtx_id *xid, + daos_unit_oid_t *oid, uint64_t dkey_hash); int dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, dtx_agg_cb_t agg_cb, int allow_failure, void *func_arg); diff --git a/src/include/daos_srv/vos.h b/src/include/daos_srv/vos.h index 3c6ee71a8886..3b838c2b4a60 100644 --- a/src/include/daos_srv/vos.h +++ b/src/include/daos_srv/vos.h @@ -74,7 +74,6 @@ vos_dtx_validation(struct dtx_handle *dth); * \param[in,out] epoch Pointer to current epoch, if it is zero and if the DTX exists, then * the DTX's epoch will be saved in it. * \param[out] pm_ver Hold the DTX's pool map version. - * \param[out] mbs Pointer to the DTX participants information. * \param[out] dck Pointer to the key for CoS cache. * \param[in] for_refresh It is for DTX_REFRESH or not. * @@ -95,8 +94,7 @@ vos_dtx_validation(struct dtx_handle *dth); */ int vos_dtx_check(daos_handle_t coh, struct dtx_id *dti, daos_epoch_t *epoch, - uint32_t *pm_ver, struct dtx_memberships **mbs, struct dtx_cos_key *dck, - bool for_refresh); + uint32_t *pm_ver, struct dtx_cos_key *dck, bool for_refresh); /** * Load participants information for the given DTX. diff --git a/src/object/srv_obj.c b/src/object/srv_obj.c index 0619298977f4..8caa1ea05662 100644 --- a/src/object/srv_obj.c +++ b/src/object/srv_obj.c @@ -2934,9 +2934,8 @@ ds_obj_rw_handler(crt_rpc_t *rpc) * them before real modifications to avoid availability issues. */ D_FREE(dti_cos); - dti_cos_cnt = dtx_list_cos(ioc.ioc_coc, &orw->orw_oid, - orw->orw_dkey_hash, DTX_THRESHOLD_COUNT, - &dti_cos); + dti_cos_cnt = dtx_cos_get_piggyback(ioc.ioc_coc, &orw->orw_oid, orw->orw_dkey_hash, + DTX_THRESHOLD_COUNT, &dti_cos); if (dti_cos_cnt < 0) D_GOTO(out, rc = dti_cos_cnt); @@ -3852,9 +3851,8 @@ ds_obj_punch_handler(crt_rpc_t *rpc) * them before real modifications to avoid availability issues. */ D_FREE(dti_cos); - dti_cos_cnt = dtx_list_cos(ioc.ioc_coc, &opi->opi_oid, - opi->opi_dkey_hash, DTX_THRESHOLD_COUNT, - &dti_cos); + dti_cos_cnt = dtx_cos_get_piggyback(ioc.ioc_coc, &opi->opi_oid, opi->opi_dkey_hash, + DTX_THRESHOLD_COUNT, &dti_cos); if (dti_cos_cnt < 0) D_GOTO(out, rc = dti_cos_cnt); diff --git a/src/vos/tests/vts_dtx.c b/src/vos/tests/vts_dtx.c index 5024e3e2bd82..bd54dd52838c 100644 --- a/src/vos/tests/vts_dtx.c +++ b/src/vos/tests/vts_dtx.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2019-2023 Intel Corporation. + * (C) Copyright 2019-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -831,7 +831,7 @@ dtx_18(void **state) assert_rc_equal(rc, 10); for (i = 0; i < 10; i++) { - rc = vos_dtx_check(args->ctx.tc_co_hdl, &xid[i], NULL, NULL, NULL, NULL, false); + rc = vos_dtx_check(args->ctx.tc_co_hdl, &xid[i], NULL, NULL, NULL, false); assert_int_equal(rc, DTX_ST_COMMITTED); } @@ -842,7 +842,7 @@ dtx_18(void **state) assert_rc_equal(rc, 0); for (i = 0; i < 10; i++) { - rc = vos_dtx_check(args->ctx.tc_co_hdl, &xid[i], NULL, NULL, NULL, NULL, false); + rc = vos_dtx_check(args->ctx.tc_co_hdl, &xid[i], NULL, NULL, NULL, false); assert_rc_equal(rc, -DER_NONEXIST); } diff --git a/src/vos/vos_dtx.c b/src/vos/vos_dtx.c index 1851331dd6a0..5c57c0ee4c70 100644 --- a/src/vos/vos_dtx.c +++ b/src/vos/vos_dtx.c @@ -1831,8 +1831,7 @@ vos_dtx_pack_mbs(struct umem_instance *umm, struct vos_dtx_act_ent *dae) int vos_dtx_check(daos_handle_t coh, struct dtx_id *dti, daos_epoch_t *epoch, - uint32_t *pm_ver, struct dtx_memberships **mbs, struct dtx_cos_key *dck, - bool for_refresh) + uint32_t *pm_ver, struct dtx_cos_key *dck, bool for_refresh) { struct vos_container *cont; struct vos_dtx_act_ent *dae; @@ -1878,9 +1877,6 @@ vos_dtx_check(daos_handle_t coh, struct dtx_id *dti, daos_epoch_t *epoch, } if (dae->dae_committable || DAE_FLAGS(dae) & DTE_PARTIAL_COMMITTED) { - if (mbs != NULL) - *mbs = vos_dtx_pack_mbs(vos_cont2umm(cont), dae); - if (epoch != NULL) *epoch = DAE_EPOCH(dae);