Skip to content

Commit

Permalink
DAOS-15931 rebuild: fix data corruption caused by partial parity rebu…
Browse files Browse the repository at this point in the history
…ild epoch (#14519)

Rebuild code change:
1. __migrate_fetch_update_parity(), fix a bug when set partial replica
   rebuild epoch for parity shard rebuild.
2. __migrate_fetch_update_bulk() should carry DIOF_FOR_MIGRATION flag,
3. migrate_fetch_update_parity() parameter fix when calling
   __migrate_fetch_update_parity().

EC aggregation change:
1. ds_obj_ec_rep_handler() and ds_obj_ec_agg_handler(), the vos_update_begin()
   should carry VOS_OF_REBUILD to avoid -DER_VOS_PARTIAL_UPDATE failure.
2. give more chance to abort EC agg when rebuild started, to save
   conflict window.

includes backports of
DAOS-15007 object: fix EC aggregation's ap_min_unagg_eph set (#13875)
DAOS-15262 vos: Fix probe issue in vos iterator (#13918)
DAOS-14908 vos: Reduce aggregation conflicts (#14143)

Signed-off-by: Jeff Olivier <jeffolivier@google.com>
Signed-off-by: Xuezhao Liu <xuezhao.liu@intel.com>
Signed-off-by: Niu Yawei <yawei.niu@intel.com>
  • Loading branch information
jolivier23 authored Jun 6, 2024
1 parent 72f6096 commit c9b6084
Show file tree
Hide file tree
Showing 21 changed files with 474 additions and 309 deletions.
8 changes: 4 additions & 4 deletions src/container/srv_container.c
Original file line number Diff line number Diff line change
Expand Up @@ -1740,19 +1740,19 @@ ds_cont_leader_update_agg_eph(uuid_t pool_uuid, uuid_t cont_uuid,

if (i == ec_agg->ea_servers_num) {
if (!retried) {
D_DEBUG(DB_MD, "rank %u eph "DF_U64" retry for"
D_DEBUG(DB_MD, "rank %u eph "DF_X64" retry for"
DF_CONT"\n", rank, eph,
DP_CONT(pool_uuid, cont_uuid));
retried = true;
ec_agg->ea_deleted = 1;
goto retry;
} else {
D_WARN("rank %u eph "DF_U64" does not exist for "
D_WARN("rank %u eph "DF_X64" does not exist for "
DF_CONT"\n", rank, eph,
DP_CONT(pool_uuid, cont_uuid));
}
} else {
D_DEBUG(DB_MD, DF_CONT" update eph rank %u eph "DF_U64"\n",
D_DEBUG(DB_MD, DF_CONT" update eph rank %u eph "DF_X64"\n",
DP_CONT(pool_uuid, cont_uuid), rank, eph);
}

Expand Down Expand Up @@ -1871,7 +1871,7 @@ cont_agg_eph_leader_ult(void *arg)
* server might cause the minimum epoch is less than
* ea_current_eph.
*/
D_DEBUG(DB_MD, DF_CONT" minimum "DF_U64" current "DF_U64"\n",
D_DEBUG(DB_MD, DF_CONT" minimum "DF_U64" current "DF_X64"\n",
DP_CONT(svc->cs_pool_uuid, ec_agg->ea_cont_uuid),
min_eph, ec_agg->ea_current_eph);

Expand Down
35 changes: 18 additions & 17 deletions src/container/srv_target.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,8 @@ cont_aggregate_runnable(struct ds_cont_child *cont, struct sched_request *req,
* see pool_iv_pre_sync(), the IV fetch from the following
* ds_cont_csummer_init() will fail anyway.
*/
D_DEBUG(DB_EPC, DF_CONT": skip aggregation "
"No pool map yet or stopping %d\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid),
D_DEBUG(DB_EPC, DF_CONT ": skip %s aggregation: No pool map yet or stopping %d\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), vos_agg ? "VOS" : "EC",
pool->sp_stopping);
return false;
}
Expand Down Expand Up @@ -206,15 +205,17 @@ cont_aggregate_runnable(struct ds_cont_child *cont, struct sched_request *req,
if (cont->sc_props.dcp_dedup_enabled ||
cont->sc_props.dcp_compress_enabled ||
cont->sc_props.dcp_encrypt_enabled) {
D_DEBUG(DB_EPC, DF_CONT": skip aggregation for "
"deduped/compressed/encrypted container\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid));
D_DEBUG(DB_EPC,
DF_CONT ": skip %s aggregation for deduped/compressed/encrypted"
" container\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), vos_agg ? "VOS" : "EC");
return false;
}

/* snapshot list isn't fetched yet */
if (cont->sc_aggregation_max == 0) {
D_DEBUG(DB_EPC, "No aggregation before snapshots fetched\n");
D_DEBUG(DB_EPC, "No %s aggregation before snapshots fetched\n",
vos_agg ? "VOS" : "EC");
/* fetch snapshot list */
if (dss_get_module_info()->dmi_tgt_id == 0)
ds_cont_tgt_snapshots_refresh(cont->sc_pool->spc_uuid,
Expand All @@ -237,8 +238,8 @@ cont_aggregate_runnable(struct ds_cont_child *cont, struct sched_request *req,

if (pool->sp_reclaim == DAOS_RECLAIM_LAZY && dss_xstream_is_busy() &&
sched_req_space_check(req) == SCHED_SPACE_PRESS_NONE) {
D_DEBUG(DB_EPC, "Pool reclaim strategy is lazy, service is "
"busy and no space pressure\n");
D_DEBUG(DB_EPC, "Pool reclaim strategy is lazy, service is busy and no space"
" pressure\n");
return false;
}

Expand Down Expand Up @@ -449,9 +450,9 @@ cont_aggregate_interval(struct ds_cont_child *cont, cont_aggregate_cb_t cb,
struct sched_request *req = cont2req(cont, param->ap_vos_agg);
int rc = 0;

D_DEBUG(DB_EPC, DF_CONT"[%d]: Aggregation ULT started\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid),
dmi->dmi_tgt_id);
D_DEBUG(DB_EPC, DF_CONT "[%d]: %s Aggregation ULT started\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), dmi->dmi_tgt_id,
param->ap_vos_agg ? "VOS" : "EC");

if (req == NULL)
goto out;
Expand All @@ -473,9 +474,9 @@ cont_aggregate_interval(struct ds_cont_child *cont, cont_aggregate_cb_t cb,
break; /* pool destroyed */
} else if (rc < 0) {
D_CDEBUG(rc == -DER_BUSY, DB_EPC, DLOG_ERR,
DF_CONT": VOS aggregate failed. "DF_RC"\n",
DF_CONT ": %s aggregate failed. " DF_RC "\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid),
DP_RC(rc));
param->ap_vos_agg ? "VOS" : "EC", DP_RC(rc));
} else if (sched_req_space_check(req) != SCHED_SPACE_PRESS_NONE) {
/* Don't sleep too long when there is space pressure */
msecs = 2ULL * 100;
Expand All @@ -487,9 +488,9 @@ cont_aggregate_interval(struct ds_cont_child *cont, cont_aggregate_cb_t cb,
sched_req_sleep(req, msecs);
}
out:
D_DEBUG(DB_EPC, DF_CONT"[%d]: Aggregation ULT stopped\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid),
dmi->dmi_tgt_id);
D_DEBUG(DB_EPC, DF_CONT "[%d]: %s Aggregation ULT stopped\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), dmi->dmi_tgt_id,
param->ap_vos_agg ? "VOS" : "EC");
}

static int
Expand Down
26 changes: 14 additions & 12 deletions src/include/daos_srv/vos_types.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* (C) Copyright 2015-2023 Intel Corporation.
* (C) Copyright 2015-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -339,29 +339,31 @@ D_CASSERT((VOS_USE_TIMESTAMPS & (VOS_GET_MAX | VOS_GET_MIN | VOS_GET_DKEY |

enum {
/** The absence of any flags means iterate all unsorted extents */
VOS_IT_RECX_ALL = 0,
VOS_IT_RECX_ALL = 0,
/** Include visible extents in sorted iteration */
VOS_IT_RECX_VISIBLE = (1 << 0),
VOS_IT_RECX_VISIBLE = (1 << 0),
/** Include covered extents, implies VOS_IT_RECX_VISIBLE */
VOS_IT_RECX_COVERED = (1 << 1) | VOS_IT_RECX_VISIBLE,
VOS_IT_RECX_COVERED = (1 << 1) | VOS_IT_RECX_VISIBLE,
/** Include hole extents in sorted iteration
* Only applicable if VOS_IT_RECX_COVERED is not set
*/
VOS_IT_RECX_SKIP_HOLES = (1 << 2),
VOS_IT_RECX_SKIP_HOLES = (1 << 2),
/** When sorted iteration is enabled, iterate in reverse */
VOS_IT_RECX_REVERSE = (1 << 3),
VOS_IT_RECX_REVERSE = (1 << 3),
/** The iterator is for purge operation */
VOS_IT_FOR_PURGE = (1 << 4),
VOS_IT_FOR_PURGE = (1 << 4),
/** The iterator is for data migration scan */
VOS_IT_FOR_MIGRATION = (1 << 5),
VOS_IT_FOR_MIGRATION = (1 << 5),
/** Iterate only show punched records in interval */
VOS_IT_PUNCHED = (1 << 6),
VOS_IT_PUNCHED = (1 << 6),
/** Cleanup stale DTX entry. */
VOS_IT_FOR_DISCARD = (1 << 7),
VOS_IT_FOR_DISCARD = (1 << 7),
/** Entry is not committed */
VOS_IT_UNCOMMITTED = (1 << 8),
VOS_IT_UNCOMMITTED = (1 << 8),
/** The iterator is for an aggregation operation (EC or VOS) */
VOS_IT_FOR_AGG = (1 << 9),
/** Mask for all flags */
VOS_IT_MASK = (1 << 9) - 1,
VOS_IT_MASK = (1 << 10) - 1,
};

typedef struct {
Expand Down
2 changes: 2 additions & 0 deletions src/object/obj_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,8 @@ struct obj_pool_metrics {
struct d_tm_node_t *opm_update_ec_full;
/** Total number of EC partial update operations (type = counter) */
struct d_tm_node_t *opm_update_ec_partial;
/** Total number of EC agg conflicts with VOS aggregation or discard */
struct d_tm_node_t *opm_ec_agg_blocked;
};

void
Expand Down
9 changes: 9 additions & 0 deletions src/object/obj_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,15 @@ obj_metrics_alloc_internal(const char *path, int tgt_id, bool server)
if (rc)
D_WARN("Failed to create EC partial update counter: " DF_RC "\n", DP_RC(rc));

/** Total number of times EC aggregation conflicts with discard or VOS
* aggregation
*/
rc = d_tm_add_metric(&metrics->opm_ec_agg_blocked, D_TM_COUNTER,
"total number of EC agg pauses due to VOS discard or agg", NULL,
"%s/EC_agg/blocked%s", path, tgt_path);
if (rc)
D_WARN("Failed to create EC agg blocked counter: " DF_RC "\n", DP_RC(rc));

return metrics;
}

Expand Down
61 changes: 43 additions & 18 deletions src/object/srv_ec_aggregate.c
Original file line number Diff line number Diff line change
Expand Up @@ -1438,14 +1438,22 @@ agg_peer_update(struct ec_agg_entry *entry, bool write_parity)

D_ASSERT(!write_parity ||
entry->ae_sgl.sg_iovs[AGG_IOV_PARITY].iov_buf);
agg_param = container_of(entry, struct ec_agg_param, ap_agg_entry);

/* If rebuild started, abort it before sending RPC to save conflict window with rebuild
* (see obj_inflight_io_check()).
*/
if (agg_param->ap_pool_info.api_pool->sp_rebuilding > 0) {
D_DEBUG(DB_EPC, DF_UOID " abort as rebuild started\n", DP_UOID(entry->ae_oid));
return -1;
}

rc = agg_get_obj_handle(entry);
if (rc) {
D_ERROR("Failed to open object: "DF_RC"\n", DP_RC(rc));
return rc;
}

agg_param = container_of(entry, struct ec_agg_param, ap_agg_entry);
rc = pool_map_find_failed_tgts(agg_param->ap_pool_info.api_pool->sp_map,
&targets, &failed_tgts_cnt);
if (rc) {
Expand Down Expand Up @@ -1706,6 +1714,15 @@ agg_process_holes(struct ec_agg_entry *entry)
int tid, rc = 0;
int *status;

agg_param = container_of(entry, struct ec_agg_param, ap_agg_entry);
/* If rebuild started, abort it before sending RPC to save conflict window with rebuild
* (see obj_inflight_io_check()).
*/
if (agg_param->ap_pool_info.api_pool->sp_rebuilding > 0) {
D_DEBUG(DB_EPC, DF_UOID " abort as rebuild started\n", DP_UOID(entry->ae_oid));
return -1;
}

D_ALLOC_ARRAY(stripe_ud.asu_recxs,
entry->ae_cur_stripe.as_extent_cnt + 1);
if (stripe_ud.asu_recxs == NULL) {
Expand All @@ -1723,8 +1740,6 @@ agg_process_holes(struct ec_agg_entry *entry)
if (rc)
goto out;

agg_param = container_of(entry, struct ec_agg_param,
ap_agg_entry);
rc = ABT_eventual_create(sizeof(*status), &stripe_ud.asu_eventual);
if (rc != ABT_SUCCESS) {
rc = dss_abterr2der(rc);
Expand Down Expand Up @@ -2583,10 +2598,12 @@ static int
cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
uint32_t flags, struct agg_param *agg_param)
{
struct obj_pool_metrics *opm;
struct ec_agg_param *ec_agg_param = agg_param->ap_data;
vos_iter_param_t iter_param = { 0 };
struct vos_iter_anchors anchors = { 0 };
int rc = 0;
int blocks = 0;

/*
* Avoid calling into vos_aggregate() when aborting aggregation
Expand All @@ -2602,6 +2619,7 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
return rc;
}

ec_agg_param->ap_min_unagg_eph = DAOS_EPOCH_MAX;
if (flags & VOS_AGG_FL_FORCE_SCAN) {
/** We don't want to use the latest container aggregation epoch for the filter
* in this case. We instead use the lower bound of the epoch range.
Expand All @@ -2619,25 +2637,21 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
goto update_hae;
}

rc = vos_aggregate_enter(cont->sc_hdl, epr);
if (rc)
goto update_hae;

iter_param.ip_hdl = cont->sc_hdl;
iter_param.ip_epr.epr_lo = epr->epr_lo;
iter_param.ip_epr.epr_hi = epr->epr_hi;
iter_param.ip_epc_expr = VOS_IT_EPC_RR;
iter_param.ip_flags = VOS_IT_RECX_VISIBLE;
iter_param.ip_flags = VOS_IT_RECX_VISIBLE | VOS_IT_FOR_AGG;
iter_param.ip_recx.rx_idx = 0ULL;
iter_param.ip_recx.rx_nr = ~PARITY_INDICATOR;
iter_param.ip_filter_cb = agg_filter;
iter_param.ip_filter_arg = ec_agg_param;

agg_reset_entry(&ec_agg_param->ap_agg_entry, NULL, NULL);

ec_agg_param->ap_min_unagg_eph = DAOS_EPOCH_MAX;
rc = vos_iterate(&iter_param, VOS_ITER_OBJ, true, &anchors,
agg_iterate_pre_cb, agg_iterate_post_cb, ec_agg_param, NULL);
retry:
rc = vos_iterate(&iter_param, VOS_ITER_OBJ, true, &anchors, agg_iterate_pre_cb,
agg_iterate_post_cb, ec_agg_param, NULL);

/* Post_cb may not being executed in some cases */
agg_clear_extents(&ec_agg_param->ap_agg_entry);
Expand All @@ -2648,17 +2662,28 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
ec_agg_param->ap_agg_entry.ae_obj_hdl = DAOS_HDL_INVAL;
}

if (cont->sc_pool->spc_pool->sp_rebuilding > 0 && !cont->sc_stopping) {
/* There is rebuild going on, and we can't proceed EC aggregate boundary,
* Let's wait for 5 seconds for another EC aggregation.
if (rc == -DER_BUSY && cont->sc_pool->spc_pool->sp_rebuilding == 0) {
/** Hit an object conflict VOS aggregation or discard. Rather than exiting, let's
* yield and try again.
*/
D_ASSERT(cont->sc_ec_agg_req != NULL);
sched_req_sleep(cont->sc_ec_agg_req, 5 * 1000);
opm = cont->sc_pool->spc_metrics[DAOS_OBJ_MODULE];
d_tm_inc_counter(opm->opm_ec_agg_blocked, 1);
blocks++;
/** Warn once if it goes over 20 times */
D_CDEBUG(blocks == 20, DLOG_WARN, DB_EPC,
"EC agg hit conflict with VOS agg or discard (nr=%d), retrying...\n",
blocks);
ec_aggregate_yield(ec_agg_param);
goto retry;
}

vos_aggregate_exit(cont->sc_hdl);

update_hae:
/* clear the flag before next turn's cont_aggregate_runnable(), to save conflict
* window with rebuild (see obj_inflight_io_check()).
*/
if (cont->sc_pool->spc_pool->sp_rebuilding > 0)
cont->sc_ec_agg_active = 0;

if (rc == 0) {
cont->sc_ec_agg_eph = max(cont->sc_ec_agg_eph, epr->epr_hi);
if (!cont->sc_stopping && cont->sc_ec_query_agg_eph) {
Expand Down
9 changes: 4 additions & 5 deletions src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -2479,8 +2479,8 @@ ds_obj_ec_rep_handler(crt_rpc_t *rpc)
dkey = (daos_key_t *)&oer->er_dkey;
iod = (daos_iod_t *)&oer->er_iod;
iod_csums = oer->er_iod_csums.ca_arrays;
rc = vos_update_begin(ioc.ioc_coc->sc_hdl, oer->er_oid, oer->er_epoch_range.epr_hi, 0,
dkey, 1, iod, iod_csums, 0, &ioh, NULL);
rc = vos_update_begin(ioc.ioc_coc->sc_hdl, oer->er_oid, oer->er_epoch_range.epr_hi,
VOS_OF_REBUILD, dkey, 1, iod, iod_csums, 0, &ioh, NULL);
if (rc) {
D_ERROR(DF_UOID" Update begin failed: "DF_RC"\n",
DP_UOID(oer->er_oid), DP_RC(rc));
Expand Down Expand Up @@ -2555,9 +2555,8 @@ ds_obj_ec_agg_handler(crt_rpc_t *rpc)
D_ASSERT(ioc.ioc_coc != NULL);
dkey = (daos_key_t *)&oea->ea_dkey;
if (parity_bulk != CRT_BULK_NULL) {
rc = vos_update_begin(ioc.ioc_coc->sc_hdl, oea->ea_oid,
oea->ea_epoch_range.epr_hi, 0, dkey, 1,
iod, iod_csums, 0, &ioh, NULL);
rc = vos_update_begin(ioc.ioc_coc->sc_hdl, oea->ea_oid, oea->ea_epoch_range.epr_hi,
VOS_OF_REBUILD, dkey, 1, iod, iod_csums, 0, &ioh, NULL);
if (rc) {
D_ERROR(DF_UOID" Update begin failed: "DF_RC"\n",
DP_UOID(oea->ea_oid), DP_RC(rc));
Expand Down
12 changes: 6 additions & 6 deletions src/object/srv_obj_migrate.c
Original file line number Diff line number Diff line change
Expand Up @@ -1136,7 +1136,7 @@ __migrate_fetch_update_parity(struct migrate_one *mrone, daos_handle_t oh,
ptr += size * iods[i].iod_size;
offset = recx->rx_idx;
size = recx->rx_nr;
parity_eph = ephs[i][j];
parity_eph = encode ? ephs[i][j] : mrone->mo_epoch;
}

if (size > 0)
Expand Down Expand Up @@ -1198,9 +1198,8 @@ migrate_fetch_update_parity(struct migrate_one *mrone, daos_handle_t oh,

update_eph = mrone->mo_iods_update_ephs_from_parity[i][j];
update_eph_p = &update_eph;
rc = __migrate_fetch_update_parity(mrone, oh, &iod, fetch_eph, &update_eph_p,
mrone->mo_iods_num_from_parity, ds_cont,
true);
rc = __migrate_fetch_update_parity(mrone, oh, &iod, fetch_eph,
&update_eph_p, 1, ds_cont, true);
if (rc)
return rc;
}
Expand Down Expand Up @@ -1553,8 +1552,9 @@ migrate_fetch_update_bulk(struct migrate_one *mrone, daos_handle_t oh,
fetch_eph = mrone->mo_iods_update_ephs_from_parity[i][j];

update_eph = mrone->mo_iods_update_ephs_from_parity[i][j];
rc = __migrate_fetch_update_bulk(mrone, oh, &iod, 1, fetch_eph, update_eph,
DIOF_EC_RECOV_FROM_PARITY, ds_cont);
rc = __migrate_fetch_update_bulk(
mrone, oh, &iod, 1, fetch_eph, update_eph,
DIOF_EC_RECOV_FROM_PARITY | DIOF_FOR_MIGRATION, ds_cont);
if (rc != 0)
D_GOTO(out, rc);
}
Expand Down
4 changes: 2 additions & 2 deletions src/vos/tests/vts_aggregate.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* (C) Copyright 2019-2022 Intel Corporation.
* (C) Copyright 2019-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -206,7 +206,7 @@ lookup_object(struct io_test_args *arg, daos_unit_oid_t oid)
vos_hdl2cont(arg->ctx.tc_co_hdl), oid, &epr, 0,
VOS_OBJ_VISIBLE, DAOS_INTENT_DEFAULT, &obj, 0);
if (rc == 0)
vos_obj_release(vos_obj_cache_current(true), obj, false);
vos_obj_release(vos_obj_cache_current(true), obj, 0, false);
return rc;
}

Expand Down
Loading

0 comments on commit c9b6084

Please sign in to comment.