Skip to content

Commit

Permalink
DAOS-15914 dtx: control DTX RPC to reduce network load
Browse files Browse the repository at this point in the history
1. Send DTX batched commit RPCs step by step

Currently, for each DTX batched commit operation, it will handle at
most 512 DTX entries that may generate DTX commit RPCs to thousands
of DAOS targets. We will not send out the batched RPCs all together,
instead, we will send them step by step. After each step, the logic
will yield and wait until replied, and then next batched RPCs. That
can avoid holding too much system resources for relative long time.
It is also helpful to reduce the whole system network peak load and
the pressure on related targets.

2. Cleanup stale DTX based on global RPC timeout

Originally, DTX cleanup logic will be triggered if the life for some
stale DTX exceeds the fixed threshold DTX_CLEANUP_THD_AGE_UP (90 sec)
that maybe smaller than global default RPC timeout, as to related DTX
refresh RPC for cleanup logic maybe send out too early before related
modification RPC(s) timeout. It increases network load unnecessarily.

Then we adjust the DTX cleanup threshold based on global default RPC
timeout value, and give related DTX leader sometime after default RPC
timeout to commit or abort the DTX. If the DTX is still prepared after
that, then trigger DTX cleanup to handle potential stale DTX entries.

3. Reorg DTX CoS logic

Reduce the RPCs caused by potential repeated DTX commit. More clear
names for DTX CoS API.

Signed-off-by: Fan Yong <fan.yong@intel.com>
  • Loading branch information
Nasf-Fan authored and daltonbohning committed Jul 3, 2024
1 parent 4d00fa8 commit 30ef9f8
Show file tree
Hide file tree
Showing 14 changed files with 373 additions and 271 deletions.
16 changes: 16 additions & 0 deletions src/cart/crt_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -1986,6 +1986,22 @@ crt_context_set_timeout(crt_context_t crt_ctx, uint32_t timeout_sec)
return rc;
}

int
crt_context_get_timeout(crt_context_t crt_ctx, uint32_t *timeout_sec)
{
struct crt_context *ctx = crt_ctx;
int rc = 0;

if (crt_ctx == CRT_CONTEXT_NULL) {
D_ERROR("NULL context passed\n");
rc = -DER_INVAL;
} else {
*timeout_sec = ctx->cc_timeout_sec;
}

return rc;
}

/* Force complete the rpc. Used for handling of unreachable rpcs */
void
crt_req_force_completion(struct crt_rpc_priv *rpc_priv)
Expand Down
2 changes: 1 addition & 1 deletion src/dtx/dtx_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ dtx_coll_local_one(void *args)
rc = vos_dtx_abort(cont->sc_hdl, &dcla->dcla_xid, dcla->dcla_epoch);
break;
case DTX_COLL_CHECK:
rc = vos_dtx_check(cont->sc_hdl, &dcla->dcla_xid, NULL, NULL, NULL, NULL, false);
rc = vos_dtx_check(cont->sc_hdl, &dcla->dcla_xid, NULL, NULL, NULL, false);
if (rc == DTX_ST_INITED) {
/*
* For DTX_CHECK, non-ready one is equal to non-exist. Do not directly
Expand Down
57 changes: 35 additions & 22 deletions src/dtx/dtx_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ struct dtx_batched_cont_args {
struct dtx_batched_pool_args *dbca_pool;
int dbca_refs;
uint32_t dbca_reg_gen;
uint32_t dbca_cleanup_thd;
uint32_t dbca_deregister:1,
dbca_cleanup_done:1,
dbca_commit_done:1,
Expand All @@ -68,6 +69,7 @@ struct dtx_cleanup_cb_args {
d_list_t dcca_pc_list;
int dcca_st_count;
int dcca_pc_count;
uint32_t dcca_cleanup_thd;
};

static inline void
Expand Down Expand Up @@ -226,7 +228,7 @@ dtx_cleanup_iter_cb(uuid_t co_uuid, vos_iter_entry_t *ent, void *args)
return 0;

/* Stop the iteration if current DTX is not too old. */
if (dtx_sec2age(ent->ie_dtx_start_time) <= DTX_CLEANUP_THD_AGE_LO)
if (dtx_sec2age(ent->ie_dtx_start_time) <= dcca->dcca_cleanup_thd)
return 1;

D_ASSERT(ent->ie_dtx_mbs_dsize > 0);
Expand Down Expand Up @@ -336,6 +338,8 @@ dtx_cleanup(void *arg)
D_INIT_LIST_HEAD(&dcca.dcca_pc_list);
dcca.dcca_st_count = 0;
dcca.dcca_pc_count = 0;
/* Cleanup stale DTX entries within about 10 seconds windows each time. */
dcca.dcca_cleanup_thd = dbca->dbca_cleanup_thd - 10;
rc = ds_cont_iter(cont->sc_pool->spc_hdl, cont->sc_uuid,
dtx_cleanup_iter_cb, &dcca, VOS_ITER_DTX, 0);
if (rc < 0)
Expand Down Expand Up @@ -626,7 +630,7 @@ dtx_batched_commit_one(void *arg)
int rc;

cnt = dtx_fetch_committable(cont, DTX_THRESHOLD_COUNT, NULL,
DAOS_EPOCH_MAX, &dtes, &dcks, &dce);
DAOS_EPOCH_MAX, false, &dtes, &dcks, &dce);
if (cnt == 0)
break;

Expand Down Expand Up @@ -754,7 +758,7 @@ dtx_batched_commit(void *arg)
if (dtx_cont_opened(cont) &&
!dbca->dbca_deregister && dbca->dbca_cleanup_req == NULL &&
stat.dtx_oldest_active_time != 0 &&
dtx_sec2age(stat.dtx_oldest_active_time) >= DTX_CLEANUP_THD_AGE_UP) {
dtx_sec2age(stat.dtx_oldest_active_time) >= dbca->dbca_cleanup_thd) {
D_ASSERT(!dbca->dbca_cleanup_done);
dtx_get_dbca(dbca);

Expand Down Expand Up @@ -1267,6 +1271,7 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul
uint32_t flags;
int status = -1;
int rc = 0;
int i;
bool aborted = false;
bool unpin = false;

Expand Down Expand Up @@ -1366,7 +1371,7 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul
D_ASSERT(dth->dth_mbs != NULL);

if (dlh->dlh_coll) {
rc = dtx_add_cos(cont, dlh->dlh_coll_entry, &dth->dth_leader_oid,
rc = dtx_cos_add(cont, dlh->dlh_coll_entry, &dth->dth_leader_oid,
dth->dth_dkey_hash, dth->dth_epoch, DCF_EXP_CMT | DCF_COLL);
} else {
size = sizeof(*dte) + sizeof(*mbs) + dth->dth_mbs->dm_data_size;
Expand All @@ -1391,7 +1396,7 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul
else
flags = 0;

rc = dtx_add_cos(cont, dte, &dth->dth_leader_oid, dth->dth_dkey_hash,
rc = dtx_cos_add(cont, dte, &dth->dth_leader_oid, dth->dth_dkey_hash,
dth->dth_epoch, flags);
dtx_entry_put(dte);
}
Expand Down Expand Up @@ -1480,11 +1485,13 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul
* It is harmless to keep some partially committed DTX entries in CoS cache.
*/
if (result == 0 && dth->dth_cos_done) {
int i;

for (i = 0; i < dth->dth_dti_cos_count; i++)
dtx_del_cos(cont, &dth->dth_dti_cos[i],
dtx_cos_del(cont, &dth->dth_dti_cos[i],
&dth->dth_leader_oid, dth->dth_dkey_hash);
} else {
for (i = 0; i < dth->dth_dti_cos_count; i++)
dtx_cos_put_piggyback(cont, &dth->dth_dti_cos[i],
&dth->dth_leader_oid, dth->dth_dkey_hash);
}

D_DEBUG(DB_IO, "Stop the DTX "DF_DTI" ver %u, dkey %lu, %s, cos %d/%d: result "DF_RC"\n",
Expand Down Expand Up @@ -1644,7 +1651,7 @@ dtx_flush_on_close(struct dss_module_info *dmi, struct dtx_batched_cont_args *db
struct dtx_coll_entry *dce = NULL;

cnt = dtx_fetch_committable(cont, DTX_THRESHOLD_COUNT,
NULL, DAOS_EPOCH_MAX, &dtes, &dcks, &dce);
NULL, DAOS_EPOCH_MAX, true, &dtes, &dcks, &dce);
if (cnt <= 0)
D_GOTO(out, rc = cnt);

Expand Down Expand Up @@ -1774,6 +1781,7 @@ dtx_cont_register(struct ds_cont_child *cont)
struct dtx_batched_pool_args *dbpa = NULL;
struct dtx_batched_cont_args *dbca = NULL;
struct umem_attr uma;
uint32_t timeout;
int rc;
bool new_pool = true;

Expand Down Expand Up @@ -1812,6 +1820,19 @@ dtx_cont_register(struct ds_cont_child *cont)
if (dbca == NULL)
D_GOTO(out, rc = -DER_NOMEM);

rc = crt_context_get_timeout(dmi->dmi_ctx, &timeout);
if (rc != 0) {
D_ERROR("Failed to get DTX cleanup timeout: "DF_RC"\n", DP_RC(rc));
goto out;
}

/*
* Give related DTX leader sometime after default RPC timeout to commit or abort
* the DTX. If the DTX is still prepared after that, then trigger DTX cleanup to
* handle potential stale DTX entries.
*/
dbca->dbca_cleanup_thd = timeout + DTX_COMMIT_THRESHOLD_AGE * 2;

memset(&uma, 0, sizeof(uma));
uma.uma_id = UMEM_CLASS_VMEM;
rc = dbtree_create_inplace_ex(DBTREE_CLASS_DTX_COS, 0,
Expand Down Expand Up @@ -1967,7 +1988,7 @@ dtx_handle_resend(daos_handle_t coh, struct dtx_id *dti,
*/
return -DER_NONEXIST;

rc = vos_dtx_check(coh, dti, epoch, pm_ver, NULL, NULL, false);
rc = vos_dtx_check(coh, dti, epoch, pm_ver, NULL, false);
switch (rc) {
case DTX_ST_INITED:
return -DER_INPROGRESS;
Expand Down Expand Up @@ -2000,14 +2021,6 @@ dtx_handle_resend(daos_handle_t coh, struct dtx_id *dti,
}
}

/*
* If a large transaction has sub-requests to dispatch to a lot of DTX participants,
* then we may have to split the dispatch process to multiple steps; otherwise, the
* dispatch process may trigger too many in-flight or in-queued RPCs that will hold
* too much resource as to server maybe out of memory.
*/
#define DTX_EXEC_STEP_LENGTH DTX_THRESHOLD_COUNT

struct dtx_chore {
struct dss_chore chore;
dtx_sub_func_t func;
Expand Down Expand Up @@ -2186,8 +2199,8 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
dlh->dlh_need_agg = 0;
dlh->dlh_agg_done = 0;

if (sub_cnt > DTX_EXEC_STEP_LENGTH) {
dlh->dlh_forward_cnt = DTX_EXEC_STEP_LENGTH;
if (sub_cnt > DTX_RPC_STEP_LENGTH) {
dlh->dlh_forward_cnt = DTX_RPC_STEP_LENGTH;
} else {
dlh->dlh_forward_cnt = sub_cnt;
if (likely(dlh->dlh_delay_sub_cnt == 0) && agg_cb != NULL)
Expand Down Expand Up @@ -2237,7 +2250,7 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
sub_cnt -= dlh->dlh_forward_cnt;
if (sub_cnt > 0) {
dlh->dlh_forward_idx += dlh->dlh_forward_cnt;
if (sub_cnt <= DTX_EXEC_STEP_LENGTH) {
if (sub_cnt <= DTX_RPC_STEP_LENGTH) {
dlh->dlh_forward_cnt = sub_cnt;
if (likely(dlh->dlh_delay_sub_cnt == 0) && agg_cb != NULL)
dlh->dlh_need_agg = 1;
Expand Down Expand Up @@ -2337,7 +2350,7 @@ dtx_obj_sync(struct ds_cont_child *cont, daos_unit_oid_t *oid,
struct dtx_coll_entry *dce = NULL;

cnt = dtx_fetch_committable(cont, DTX_THRESHOLD_COUNT, oid,
epoch, &dtes, &dcks, &dce);
epoch, true, &dtes, &dcks, &dce);
if (cnt <= 0) {
rc = cnt;
if (rc < 0)
Expand Down
Loading

0 comments on commit 30ef9f8

Please sign in to comment.