From c9b6084c7e240a9d65d3f65b66289e1893ab236d Mon Sep 17 00:00:00 2001 From: Jeff Olivier Date: Thu, 6 Jun 2024 09:53:57 -0600 Subject: [PATCH] DAOS-15931 rebuild: fix data corruption caused by partial parity rebuild epoch (#14519) Rebuild code change: 1. __migrate_fetch_update_parity(), fix a bug when set partial replica rebuild epoch for parity shard rebuild. 2. __migrate_fetch_update_bulk() should carry DIOF_FOR_MIGRATION flag, 3. migrate_fetch_update_parity() parameter fix when calling __migrate_fetch_update_parity(). EC aggregation change: 1. ds_obj_ec_rep_handler() and ds_obj_ec_agg_handler(), the vos_update_begin() should carry VOS_OF_REBUILD to avoid -DER_VOS_PARTIAL_UPDATE failure. 2. give more chance to abort EC agg when rebuild started, to save conflict window. includes backports of DAOS-15007 object: fix EC aggregation's ap_min_unagg_eph set (#13875) DAOS-15262 vos: Fix probe issue in vos iterator (#13918) DAOS-14908 vos: Reduce aggregation conflicts (#14143) Signed-off-by: Jeff Olivier Signed-off-by: Xuezhao Liu Signed-off-by: Niu Yawei --- src/container/srv_container.c | 8 +- src/container/srv_target.c | 35 +++---- src/include/daos_srv/vos_types.h | 26 ++--- src/object/obj_internal.h | 2 + src/object/obj_utils.c | 9 ++ src/object/srv_ec_aggregate.c | 61 ++++++++---- src/object/srv_obj.c | 9 +- src/object/srv_obj_migrate.c | 12 +-- src/vos/tests/vts_aggregate.c | 4 +- src/vos/tests/vts_io.c | 65 +++++++++--- src/vos/vos_aggregate.c | 99 +++++++++--------- src/vos/vos_common.c | 13 +++ src/vos/vos_dtx.c | 2 +- src/vos/vos_internal.h | 80 +++++++-------- src/vos/vos_io.c | 4 +- src/vos/vos_iterator.c | 166 ++++++++++++++++--------------- src/vos/vos_obj.c | 45 ++++++--- src/vos/vos_obj.h | 22 ++-- src/vos/vos_obj_cache.c | 103 ++++++++++++------- src/vos/vos_obj_index.c | 14 +++ src/vos/vos_query.c | 4 +- 21 files changed, 474 insertions(+), 309 deletions(-) diff --git a/src/container/srv_container.c b/src/container/srv_container.c index ce6774f1469..4e6c99c3e7f 100644 --- a/src/container/srv_container.c +++ b/src/container/srv_container.c @@ -1740,19 +1740,19 @@ ds_cont_leader_update_agg_eph(uuid_t pool_uuid, uuid_t cont_uuid, if (i == ec_agg->ea_servers_num) { if (!retried) { - D_DEBUG(DB_MD, "rank %u eph "DF_U64" retry for" + D_DEBUG(DB_MD, "rank %u eph "DF_X64" retry for" DF_CONT"\n", rank, eph, DP_CONT(pool_uuid, cont_uuid)); retried = true; ec_agg->ea_deleted = 1; goto retry; } else { - D_WARN("rank %u eph "DF_U64" does not exist for " + D_WARN("rank %u eph "DF_X64" does not exist for " DF_CONT"\n", rank, eph, DP_CONT(pool_uuid, cont_uuid)); } } else { - D_DEBUG(DB_MD, DF_CONT" update eph rank %u eph "DF_U64"\n", + D_DEBUG(DB_MD, DF_CONT" update eph rank %u eph "DF_X64"\n", DP_CONT(pool_uuid, cont_uuid), rank, eph); } @@ -1871,7 +1871,7 @@ cont_agg_eph_leader_ult(void *arg) * server might cause the minimum epoch is less than * ea_current_eph. */ - D_DEBUG(DB_MD, DF_CONT" minimum "DF_U64" current "DF_U64"\n", + D_DEBUG(DB_MD, DF_CONT" minimum "DF_U64" current "DF_X64"\n", DP_CONT(svc->cs_pool_uuid, ec_agg->ea_cont_uuid), min_eph, ec_agg->ea_current_eph); diff --git a/src/container/srv_target.c b/src/container/srv_target.c index 5615d02d11d..cabbea456ae 100644 --- a/src/container/srv_target.c +++ b/src/container/srv_target.c @@ -173,9 +173,8 @@ cont_aggregate_runnable(struct ds_cont_child *cont, struct sched_request *req, * see pool_iv_pre_sync(), the IV fetch from the following * ds_cont_csummer_init() will fail anyway. */ - D_DEBUG(DB_EPC, DF_CONT": skip aggregation " - "No pool map yet or stopping %d\n", - DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), + D_DEBUG(DB_EPC, DF_CONT ": skip %s aggregation: No pool map yet or stopping %d\n", + DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), vos_agg ? "VOS" : "EC", pool->sp_stopping); return false; } @@ -206,15 +205,17 @@ cont_aggregate_runnable(struct ds_cont_child *cont, struct sched_request *req, if (cont->sc_props.dcp_dedup_enabled || cont->sc_props.dcp_compress_enabled || cont->sc_props.dcp_encrypt_enabled) { - D_DEBUG(DB_EPC, DF_CONT": skip aggregation for " - "deduped/compressed/encrypted container\n", - DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid)); + D_DEBUG(DB_EPC, + DF_CONT ": skip %s aggregation for deduped/compressed/encrypted" + " container\n", + DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), vos_agg ? "VOS" : "EC"); return false; } /* snapshot list isn't fetched yet */ if (cont->sc_aggregation_max == 0) { - D_DEBUG(DB_EPC, "No aggregation before snapshots fetched\n"); + D_DEBUG(DB_EPC, "No %s aggregation before snapshots fetched\n", + vos_agg ? "VOS" : "EC"); /* fetch snapshot list */ if (dss_get_module_info()->dmi_tgt_id == 0) ds_cont_tgt_snapshots_refresh(cont->sc_pool->spc_uuid, @@ -237,8 +238,8 @@ cont_aggregate_runnable(struct ds_cont_child *cont, struct sched_request *req, if (pool->sp_reclaim == DAOS_RECLAIM_LAZY && dss_xstream_is_busy() && sched_req_space_check(req) == SCHED_SPACE_PRESS_NONE) { - D_DEBUG(DB_EPC, "Pool reclaim strategy is lazy, service is " - "busy and no space pressure\n"); + D_DEBUG(DB_EPC, "Pool reclaim strategy is lazy, service is busy and no space" + " pressure\n"); return false; } @@ -449,9 +450,9 @@ cont_aggregate_interval(struct ds_cont_child *cont, cont_aggregate_cb_t cb, struct sched_request *req = cont2req(cont, param->ap_vos_agg); int rc = 0; - D_DEBUG(DB_EPC, DF_CONT"[%d]: Aggregation ULT started\n", - DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), - dmi->dmi_tgt_id); + D_DEBUG(DB_EPC, DF_CONT "[%d]: %s Aggregation ULT started\n", + DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), dmi->dmi_tgt_id, + param->ap_vos_agg ? "VOS" : "EC"); if (req == NULL) goto out; @@ -473,9 +474,9 @@ cont_aggregate_interval(struct ds_cont_child *cont, cont_aggregate_cb_t cb, break; /* pool destroyed */ } else if (rc < 0) { D_CDEBUG(rc == -DER_BUSY, DB_EPC, DLOG_ERR, - DF_CONT": VOS aggregate failed. "DF_RC"\n", + DF_CONT ": %s aggregate failed. " DF_RC "\n", DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), - DP_RC(rc)); + param->ap_vos_agg ? "VOS" : "EC", DP_RC(rc)); } else if (sched_req_space_check(req) != SCHED_SPACE_PRESS_NONE) { /* Don't sleep too long when there is space pressure */ msecs = 2ULL * 100; @@ -487,9 +488,9 @@ cont_aggregate_interval(struct ds_cont_child *cont, cont_aggregate_cb_t cb, sched_req_sleep(req, msecs); } out: - D_DEBUG(DB_EPC, DF_CONT"[%d]: Aggregation ULT stopped\n", - DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), - dmi->dmi_tgt_id); + D_DEBUG(DB_EPC, DF_CONT "[%d]: %s Aggregation ULT stopped\n", + DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), dmi->dmi_tgt_id, + param->ap_vos_agg ? "VOS" : "EC"); } static int diff --git a/src/include/daos_srv/vos_types.h b/src/include/daos_srv/vos_types.h index 15c30c158c9..c8bb4cc6b33 100644 --- a/src/include/daos_srv/vos_types.h +++ b/src/include/daos_srv/vos_types.h @@ -1,5 +1,5 @@ /** - * (C) Copyright 2015-2023 Intel Corporation. + * (C) Copyright 2015-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -339,29 +339,31 @@ D_CASSERT((VOS_USE_TIMESTAMPS & (VOS_GET_MAX | VOS_GET_MIN | VOS_GET_DKEY | enum { /** The absence of any flags means iterate all unsorted extents */ - VOS_IT_RECX_ALL = 0, + VOS_IT_RECX_ALL = 0, /** Include visible extents in sorted iteration */ - VOS_IT_RECX_VISIBLE = (1 << 0), + VOS_IT_RECX_VISIBLE = (1 << 0), /** Include covered extents, implies VOS_IT_RECX_VISIBLE */ - VOS_IT_RECX_COVERED = (1 << 1) | VOS_IT_RECX_VISIBLE, + VOS_IT_RECX_COVERED = (1 << 1) | VOS_IT_RECX_VISIBLE, /** Include hole extents in sorted iteration * Only applicable if VOS_IT_RECX_COVERED is not set */ - VOS_IT_RECX_SKIP_HOLES = (1 << 2), + VOS_IT_RECX_SKIP_HOLES = (1 << 2), /** When sorted iteration is enabled, iterate in reverse */ - VOS_IT_RECX_REVERSE = (1 << 3), + VOS_IT_RECX_REVERSE = (1 << 3), /** The iterator is for purge operation */ - VOS_IT_FOR_PURGE = (1 << 4), + VOS_IT_FOR_PURGE = (1 << 4), /** The iterator is for data migration scan */ - VOS_IT_FOR_MIGRATION = (1 << 5), + VOS_IT_FOR_MIGRATION = (1 << 5), /** Iterate only show punched records in interval */ - VOS_IT_PUNCHED = (1 << 6), + VOS_IT_PUNCHED = (1 << 6), /** Cleanup stale DTX entry. */ - VOS_IT_FOR_DISCARD = (1 << 7), + VOS_IT_FOR_DISCARD = (1 << 7), /** Entry is not committed */ - VOS_IT_UNCOMMITTED = (1 << 8), + VOS_IT_UNCOMMITTED = (1 << 8), + /** The iterator is for an aggregation operation (EC or VOS) */ + VOS_IT_FOR_AGG = (1 << 9), /** Mask for all flags */ - VOS_IT_MASK = (1 << 9) - 1, + VOS_IT_MASK = (1 << 10) - 1, }; typedef struct { diff --git a/src/object/obj_internal.h b/src/object/obj_internal.h index 4d750c87332..50a094eadc9 100644 --- a/src/object/obj_internal.h +++ b/src/object/obj_internal.h @@ -590,6 +590,8 @@ struct obj_pool_metrics { struct d_tm_node_t *opm_update_ec_full; /** Total number of EC partial update operations (type = counter) */ struct d_tm_node_t *opm_update_ec_partial; + /** Total number of EC agg conflicts with VOS aggregation or discard */ + struct d_tm_node_t *opm_ec_agg_blocked; }; void diff --git a/src/object/obj_utils.c b/src/object/obj_utils.c index f85409aee9b..55e526b58ae 100644 --- a/src/object/obj_utils.c +++ b/src/object/obj_utils.c @@ -231,6 +231,15 @@ obj_metrics_alloc_internal(const char *path, int tgt_id, bool server) if (rc) D_WARN("Failed to create EC partial update counter: " DF_RC "\n", DP_RC(rc)); + /** Total number of times EC aggregation conflicts with discard or VOS + * aggregation + */ + rc = d_tm_add_metric(&metrics->opm_ec_agg_blocked, D_TM_COUNTER, + "total number of EC agg pauses due to VOS discard or agg", NULL, + "%s/EC_agg/blocked%s", path, tgt_path); + if (rc) + D_WARN("Failed to create EC agg blocked counter: " DF_RC "\n", DP_RC(rc)); + return metrics; } diff --git a/src/object/srv_ec_aggregate.c b/src/object/srv_ec_aggregate.c index 59d35160c0c..c73def48a8d 100644 --- a/src/object/srv_ec_aggregate.c +++ b/src/object/srv_ec_aggregate.c @@ -1438,6 +1438,15 @@ agg_peer_update(struct ec_agg_entry *entry, bool write_parity) D_ASSERT(!write_parity || entry->ae_sgl.sg_iovs[AGG_IOV_PARITY].iov_buf); + agg_param = container_of(entry, struct ec_agg_param, ap_agg_entry); + + /* If rebuild started, abort it before sending RPC to save conflict window with rebuild + * (see obj_inflight_io_check()). + */ + if (agg_param->ap_pool_info.api_pool->sp_rebuilding > 0) { + D_DEBUG(DB_EPC, DF_UOID " abort as rebuild started\n", DP_UOID(entry->ae_oid)); + return -1; + } rc = agg_get_obj_handle(entry); if (rc) { @@ -1445,7 +1454,6 @@ agg_peer_update(struct ec_agg_entry *entry, bool write_parity) return rc; } - agg_param = container_of(entry, struct ec_agg_param, ap_agg_entry); rc = pool_map_find_failed_tgts(agg_param->ap_pool_info.api_pool->sp_map, &targets, &failed_tgts_cnt); if (rc) { @@ -1706,6 +1714,15 @@ agg_process_holes(struct ec_agg_entry *entry) int tid, rc = 0; int *status; + agg_param = container_of(entry, struct ec_agg_param, ap_agg_entry); + /* If rebuild started, abort it before sending RPC to save conflict window with rebuild + * (see obj_inflight_io_check()). + */ + if (agg_param->ap_pool_info.api_pool->sp_rebuilding > 0) { + D_DEBUG(DB_EPC, DF_UOID " abort as rebuild started\n", DP_UOID(entry->ae_oid)); + return -1; + } + D_ALLOC_ARRAY(stripe_ud.asu_recxs, entry->ae_cur_stripe.as_extent_cnt + 1); if (stripe_ud.asu_recxs == NULL) { @@ -1723,8 +1740,6 @@ agg_process_holes(struct ec_agg_entry *entry) if (rc) goto out; - agg_param = container_of(entry, struct ec_agg_param, - ap_agg_entry); rc = ABT_eventual_create(sizeof(*status), &stripe_ud.asu_eventual); if (rc != ABT_SUCCESS) { rc = dss_abterr2der(rc); @@ -2583,10 +2598,12 @@ static int cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr, uint32_t flags, struct agg_param *agg_param) { + struct obj_pool_metrics *opm; struct ec_agg_param *ec_agg_param = agg_param->ap_data; vos_iter_param_t iter_param = { 0 }; struct vos_iter_anchors anchors = { 0 }; int rc = 0; + int blocks = 0; /* * Avoid calling into vos_aggregate() when aborting aggregation @@ -2602,6 +2619,7 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr, return rc; } + ec_agg_param->ap_min_unagg_eph = DAOS_EPOCH_MAX; if (flags & VOS_AGG_FL_FORCE_SCAN) { /** We don't want to use the latest container aggregation epoch for the filter * in this case. We instead use the lower bound of the epoch range. @@ -2619,15 +2637,11 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr, goto update_hae; } - rc = vos_aggregate_enter(cont->sc_hdl, epr); - if (rc) - goto update_hae; - iter_param.ip_hdl = cont->sc_hdl; iter_param.ip_epr.epr_lo = epr->epr_lo; iter_param.ip_epr.epr_hi = epr->epr_hi; iter_param.ip_epc_expr = VOS_IT_EPC_RR; - iter_param.ip_flags = VOS_IT_RECX_VISIBLE; + iter_param.ip_flags = VOS_IT_RECX_VISIBLE | VOS_IT_FOR_AGG; iter_param.ip_recx.rx_idx = 0ULL; iter_param.ip_recx.rx_nr = ~PARITY_INDICATOR; iter_param.ip_filter_cb = agg_filter; @@ -2635,9 +2649,9 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr, agg_reset_entry(&ec_agg_param->ap_agg_entry, NULL, NULL); - ec_agg_param->ap_min_unagg_eph = DAOS_EPOCH_MAX; - rc = vos_iterate(&iter_param, VOS_ITER_OBJ, true, &anchors, - agg_iterate_pre_cb, agg_iterate_post_cb, ec_agg_param, NULL); +retry: + rc = vos_iterate(&iter_param, VOS_ITER_OBJ, true, &anchors, agg_iterate_pre_cb, + agg_iterate_post_cb, ec_agg_param, NULL); /* Post_cb may not being executed in some cases */ agg_clear_extents(&ec_agg_param->ap_agg_entry); @@ -2648,17 +2662,28 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr, ec_agg_param->ap_agg_entry.ae_obj_hdl = DAOS_HDL_INVAL; } - if (cont->sc_pool->spc_pool->sp_rebuilding > 0 && !cont->sc_stopping) { - /* There is rebuild going on, and we can't proceed EC aggregate boundary, - * Let's wait for 5 seconds for another EC aggregation. + if (rc == -DER_BUSY && cont->sc_pool->spc_pool->sp_rebuilding == 0) { + /** Hit an object conflict VOS aggregation or discard. Rather than exiting, let's + * yield and try again. */ - D_ASSERT(cont->sc_ec_agg_req != NULL); - sched_req_sleep(cont->sc_ec_agg_req, 5 * 1000); + opm = cont->sc_pool->spc_metrics[DAOS_OBJ_MODULE]; + d_tm_inc_counter(opm->opm_ec_agg_blocked, 1); + blocks++; + /** Warn once if it goes over 20 times */ + D_CDEBUG(blocks == 20, DLOG_WARN, DB_EPC, + "EC agg hit conflict with VOS agg or discard (nr=%d), retrying...\n", + blocks); + ec_aggregate_yield(ec_agg_param); + goto retry; } - vos_aggregate_exit(cont->sc_hdl); - update_hae: + /* clear the flag before next turn's cont_aggregate_runnable(), to save conflict + * window with rebuild (see obj_inflight_io_check()). + */ + if (cont->sc_pool->spc_pool->sp_rebuilding > 0) + cont->sc_ec_agg_active = 0; + if (rc == 0) { cont->sc_ec_agg_eph = max(cont->sc_ec_agg_eph, epr->epr_hi); if (!cont->sc_stopping && cont->sc_ec_query_agg_eph) { diff --git a/src/object/srv_obj.c b/src/object/srv_obj.c index 8b8368c6bc5..11a26749695 100644 --- a/src/object/srv_obj.c +++ b/src/object/srv_obj.c @@ -2479,8 +2479,8 @@ ds_obj_ec_rep_handler(crt_rpc_t *rpc) dkey = (daos_key_t *)&oer->er_dkey; iod = (daos_iod_t *)&oer->er_iod; iod_csums = oer->er_iod_csums.ca_arrays; - rc = vos_update_begin(ioc.ioc_coc->sc_hdl, oer->er_oid, oer->er_epoch_range.epr_hi, 0, - dkey, 1, iod, iod_csums, 0, &ioh, NULL); + rc = vos_update_begin(ioc.ioc_coc->sc_hdl, oer->er_oid, oer->er_epoch_range.epr_hi, + VOS_OF_REBUILD, dkey, 1, iod, iod_csums, 0, &ioh, NULL); if (rc) { D_ERROR(DF_UOID" Update begin failed: "DF_RC"\n", DP_UOID(oer->er_oid), DP_RC(rc)); @@ -2555,9 +2555,8 @@ ds_obj_ec_agg_handler(crt_rpc_t *rpc) D_ASSERT(ioc.ioc_coc != NULL); dkey = (daos_key_t *)&oea->ea_dkey; if (parity_bulk != CRT_BULK_NULL) { - rc = vos_update_begin(ioc.ioc_coc->sc_hdl, oea->ea_oid, - oea->ea_epoch_range.epr_hi, 0, dkey, 1, - iod, iod_csums, 0, &ioh, NULL); + rc = vos_update_begin(ioc.ioc_coc->sc_hdl, oea->ea_oid, oea->ea_epoch_range.epr_hi, + VOS_OF_REBUILD, dkey, 1, iod, iod_csums, 0, &ioh, NULL); if (rc) { D_ERROR(DF_UOID" Update begin failed: "DF_RC"\n", DP_UOID(oea->ea_oid), DP_RC(rc)); diff --git a/src/object/srv_obj_migrate.c b/src/object/srv_obj_migrate.c index 483a574eab4..62efb663be8 100644 --- a/src/object/srv_obj_migrate.c +++ b/src/object/srv_obj_migrate.c @@ -1136,7 +1136,7 @@ __migrate_fetch_update_parity(struct migrate_one *mrone, daos_handle_t oh, ptr += size * iods[i].iod_size; offset = recx->rx_idx; size = recx->rx_nr; - parity_eph = ephs[i][j]; + parity_eph = encode ? ephs[i][j] : mrone->mo_epoch; } if (size > 0) @@ -1198,9 +1198,8 @@ migrate_fetch_update_parity(struct migrate_one *mrone, daos_handle_t oh, update_eph = mrone->mo_iods_update_ephs_from_parity[i][j]; update_eph_p = &update_eph; - rc = __migrate_fetch_update_parity(mrone, oh, &iod, fetch_eph, &update_eph_p, - mrone->mo_iods_num_from_parity, ds_cont, - true); + rc = __migrate_fetch_update_parity(mrone, oh, &iod, fetch_eph, + &update_eph_p, 1, ds_cont, true); if (rc) return rc; } @@ -1553,8 +1552,9 @@ migrate_fetch_update_bulk(struct migrate_one *mrone, daos_handle_t oh, fetch_eph = mrone->mo_iods_update_ephs_from_parity[i][j]; update_eph = mrone->mo_iods_update_ephs_from_parity[i][j]; - rc = __migrate_fetch_update_bulk(mrone, oh, &iod, 1, fetch_eph, update_eph, - DIOF_EC_RECOV_FROM_PARITY, ds_cont); + rc = __migrate_fetch_update_bulk( + mrone, oh, &iod, 1, fetch_eph, update_eph, + DIOF_EC_RECOV_FROM_PARITY | DIOF_FOR_MIGRATION, ds_cont); if (rc != 0) D_GOTO(out, rc); } diff --git a/src/vos/tests/vts_aggregate.c b/src/vos/tests/vts_aggregate.c index 2b2b92082af..19cbffcab75 100644 --- a/src/vos/tests/vts_aggregate.c +++ b/src/vos/tests/vts_aggregate.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2019-2022 Intel Corporation. + * (C) Copyright 2019-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -206,7 +206,7 @@ lookup_object(struct io_test_args *arg, daos_unit_oid_t oid) vos_hdl2cont(arg->ctx.tc_co_hdl), oid, &epr, 0, VOS_OBJ_VISIBLE, DAOS_INTENT_DEFAULT, &obj, 0); if (rc == 0) - vos_obj_release(vos_obj_cache_current(true), obj, false); + vos_obj_release(vos_obj_cache_current(true), obj, 0, false); return rc; } diff --git a/src/vos/tests/vts_io.c b/src/vos/tests/vts_io.c index 9747881041e..47ba1429034 100644 --- a/src/vos/tests/vts_io.c +++ b/src/vos/tests/vts_io.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2016-2023 Intel Corporation. + * (C) Copyright 2016-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -996,11 +996,18 @@ io_obj_cache_test(void **state) &objs[0], 0); assert_rc_equal(rc, 0); - rc = vos_obj_discard_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &obj1); + /** Hold object for discard */ + rc = vos_obj_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &epr, 0, VOS_OBJ_DISCARD, + DAOS_INTENT_DISCARD, &obj1, 0); assert_rc_equal(rc, 0); - /** Should be prevented because object already held for discard */ - rc = vos_obj_discard_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &obj2); - assert_rc_equal(rc, -DER_UPDATE_AGAIN); + /** Second discard should fail */ + rc = vos_obj_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &epr, 0, VOS_OBJ_DISCARD, + DAOS_INTENT_DISCARD, &obj2, 0); + assert_rc_equal(rc, -DER_BUSY); + /** Should prevent simultaneous aggregation */ + rc = vos_obj_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &epr, 0, VOS_OBJ_AGGREGATE, + DAOS_INTENT_PURGE, &obj2, 0); + assert_rc_equal(rc, -DER_BUSY); /** Should prevent simultaneous hold for create as well */ rc = vos_obj_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &epr, 0, VOS_OBJ_CREATE | VOS_OBJ_VISIBLE, DAOS_INTENT_DEFAULT, @@ -1010,17 +1017,43 @@ io_obj_cache_test(void **state) /** Need to be able to hold for read though or iteration won't work */ rc = vos_obj_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &epr, 0, VOS_OBJ_VISIBLE, DAOS_INTENT_DEFAULT, &obj2, 0); - vos_obj_discard_release(occ, obj2); - vos_obj_discard_release(occ, obj1); + vos_obj_release(occ, obj2, 0, false); + vos_obj_release(occ, obj1, VOS_OBJ_DISCARD, false); + + /** Hold object for aggregation */ + rc = vos_obj_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &epr, 0, VOS_OBJ_AGGREGATE, + DAOS_INTENT_PURGE, &obj1, 0); + assert_rc_equal(rc, 0); + /** Discard should fail */ + rc = vos_obj_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &epr, 0, VOS_OBJ_DISCARD, + DAOS_INTENT_DISCARD, &obj2, 0); + assert_rc_equal(rc, -DER_BUSY); + /** Second aggregation should fail */ + rc = vos_obj_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &epr, 0, VOS_OBJ_AGGREGATE, + DAOS_INTENT_PURGE, &obj2, 0); + assert_rc_equal(rc, -DER_BUSY); + /** Simultaneous create should work */ + rc = vos_obj_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &epr, 0, + VOS_OBJ_CREATE | VOS_OBJ_VISIBLE, DAOS_INTENT_DEFAULT, &obj2, 0); + assert_rc_equal(rc, 0); + vos_obj_release(occ, obj2, 0, false); + + /** Need to be able to hold for read though or iteration won't work */ + rc = vos_obj_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &epr, 0, VOS_OBJ_VISIBLE, + DAOS_INTENT_DEFAULT, &obj2, 0); + vos_obj_release(occ, obj2, 0, false); + vos_obj_release(occ, obj1, VOS_OBJ_AGGREGATE, false); + /** Now that other one is done, this should work */ - rc = vos_obj_discard_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &obj2); + rc = vos_obj_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &epr, 0, VOS_OBJ_DISCARD, + DAOS_INTENT_DISCARD, &obj2, 0); assert_rc_equal(rc, 0); - vos_obj_discard_release(occ, obj2); + vos_obj_release(occ, obj2, VOS_OBJ_DISCARD, false); rc = umem_tx_end(ummg, 0); assert_rc_equal(rc, 0); - vos_obj_release(occ, objs[0], false); + vos_obj_release(occ, objs[0], 0, false); rc = umem_tx_begin(umml, NULL); assert_rc_equal(rc, 0); @@ -1029,7 +1062,7 @@ io_obj_cache_test(void **state) VOS_OBJ_CREATE | VOS_OBJ_VISIBLE, DAOS_INTENT_DEFAULT, &objs[0], 0); assert_rc_equal(rc, 0); - vos_obj_release(occ, objs[0], false); + vos_obj_release(occ, objs[0], 0, false); rc = umem_tx_end(umml, 0); assert_rc_equal(rc, 0); @@ -1047,20 +1080,20 @@ io_obj_cache_test(void **state) VOS_OBJ_VISIBLE, DAOS_INTENT_DEFAULT, &objs[16], 0); assert_rc_equal(rc, 0); - vos_obj_release(occ, objs[16], false); + vos_obj_release(occ, objs[16], 0, false); for (i = 0; i < 5; i++) - vos_obj_release(occ, objs[i], false); + vos_obj_release(occ, objs[i], 0, false); for (i = 10; i < 15; i++) - vos_obj_release(occ, objs[i], false); + vos_obj_release(occ, objs[i], 0, false); rc = hold_objects(objs, occ, &l_coh, &oids[1], 15, 20, true, 0); assert_int_equal(rc, 0); for (i = 5; i < 10; i++) - vos_obj_release(occ, objs[i], false); + vos_obj_release(occ, objs[i], 0, false); for (i = 15; i < 20; i++) - vos_obj_release(occ, objs[i], false); + vos_obj_release(occ, objs[i], 0, false); rc = vos_cont_close(l_coh); assert_rc_equal(rc, 0); diff --git a/src/vos/vos_aggregate.c b/src/vos/vos_aggregate.c index b06ce33e57b..ccc974a23af 100644 --- a/src/vos/vos_aggregate.c +++ b/src/vos/vos_aggregate.c @@ -1,10 +1,10 @@ /** - * (C) Copyright 2019-2023 Intel Corporation. + * (C) Copyright 2019-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ /** - * Implementation for aggregation and discard + * Implementation for aggregation and discard */ #define D_LOGFAC DD_FAC(vos) @@ -2486,6 +2486,10 @@ aggregate_enter(struct vos_container *cont, int agg_mode, daos_epoch_range_t *ep struct vos_agg_metrics *vam = agg_cont2metrics(cont); int rc; + /** TODO: Now that we have per object mutual exclusion, perhaps we can + * remove the top level mutual exclusion. Keep it for now to avoid too + * much change at once. + */ switch (agg_mode) { default: D_ASSERT(0); @@ -2499,7 +2503,7 @@ aggregate_enter(struct vos_container *cont, int agg_mode, daos_epoch_range_t *ep } if (cont->vc_obj_discard_count != 0) { - D_ERROR(DF_CONT": In object discard epr["DF_U64", "DF_U64"]\n", + D_ERROR(DF_CONT ": In object discard epr[" DF_U64 ", " DF_U64 "]\n", DP_CONT(cont->vc_pool->vp_id, cont->vc_id), cont->vc_epr_discard.epr_lo, cont->vc_epr_discard.epr_hi); return -DER_BUSY; @@ -2526,13 +2530,6 @@ aggregate_enter(struct vos_container *cont, int agg_mode, daos_epoch_range_t *ep return -DER_BUSY; } - if (cont->vc_obj_discard_count != 0) { - D_ERROR(DF_CONT": In object discard epr["DF_U64", "DF_U64"]\n", - DP_CONT(cont->vc_pool->vp_id, cont->vc_id), - cont->vc_epr_discard.epr_lo, cont->vc_epr_discard.epr_hi); - return -DER_BUSY; - } - if (cont->vc_in_discard && cont->vc_epr_discard.epr_lo <= epr->epr_hi) { D_ERROR(DF_CONT": Discard epr["DF_U64", "DF_U64"], " @@ -2552,21 +2549,18 @@ aggregate_enter(struct vos_container *cont, int agg_mode, daos_epoch_range_t *ep break; case AGG_MODE_OBJ_DISCARD: + /** Theoretically, this could overlap with vos_discard as well + * as aggregation but it makes the logic in vos_obj_hold more + * complicated so defer for now and just disallow it. We can + * conflict with aggregation, however without issues. + */ if (cont->vc_in_discard) { - D_ERROR(DF_CONT": In discard epr["DF_U64", "DF_U64"]\n", + D_ERROR(DF_CONT ": Already in discard epr[" DF_U64 ", " DF_U64 "]\n", DP_CONT(cont->vc_pool->vp_id, cont->vc_id), cont->vc_epr_discard.epr_lo, cont->vc_epr_discard.epr_hi); return -DER_BUSY; } - if (cont->vc_in_aggregation) { - D_ERROR(DF_CONT": In aggregation epr["DF_U64", "DF_U64"]\n", - DP_CONT(cont->vc_pool->vp_id, cont->vc_id), - cont->vc_epr_aggregation.epr_lo, cont->vc_epr_aggregation.epr_hi); - return -DER_BUSY; - } - - /** Allow discard from multiple objects */ cont->vc_obj_discard_count++; break; } @@ -2645,12 +2639,14 @@ vos_aggregate(daos_handle_t coh, daos_epoch_range_t *epr, int (*yield_func)(void *arg), void *yield_arg, uint32_t flags) { struct vos_container *cont = vos_hdl2cont(coh); + struct vos_agg_metrics *vam = agg_cont2metrics(cont); struct agg_data *ad; uint64_t feats; daos_epoch_t agg_write; bool has_agg_write; int rc; bool run_agg = false; + int blocks = 0; D_DEBUG(DB_TRACE, "epr: %lu -> %lu\n", epr->epr_lo, epr->epr_hi); D_ASSERT(epr != NULL); @@ -2705,11 +2701,25 @@ vos_aggregate(daos_handle_t coh, daos_epoch_range_t *epr, merge_window_init(&ad->ad_agg_param.ap_window); ad->ad_agg_param.ap_flags = flags; - ad->ad_iter_param.ip_flags |= VOS_IT_FOR_PURGE; + ad->ad_iter_param.ip_flags |= VOS_IT_FOR_PURGE | VOS_IT_FOR_AGG; +retry: rc = vos_iterate(&ad->ad_iter_param, VOS_ITER_OBJ, true, &ad->ad_anchors, vos_aggregate_pre_cb, vos_aggregate_post_cb, &ad->ad_agg_param, NULL); - if (rc != 0 || ad->ad_agg_param.ap_nospc_err) { + if (rc == -DER_BUSY) { + /** Hit a conflict with obj_discard. Rather than exiting, let's + * yield and try again. + */ + if (vam && vam->vam_agg_blocked) + d_tm_inc_counter(vam->vam_agg_blocked, 1); + blocks++; + /** Warn once if it goes over 20 times */ + D_CDEBUG(blocks == 20, DLOG_WARN, DB_EPC, + "VOS aggrregation hit conflict (nr=%d), retrying...\n", blocks); + close_merge_window(&ad->ad_agg_param.ap_window, rc); + vos_aggregate_yield(&ad->ad_agg_param); + goto retry; + } else if (rc != 0 || ad->ad_agg_param.ap_nospc_err) { close_merge_window(&ad->ad_agg_param.ap_window, rc); goto exit; } else if (ad->ad_agg_param.ap_csum_err) { @@ -2744,8 +2754,6 @@ vos_aggregate(daos_handle_t coh, daos_epoch_range_t *epr, D_FREE(ad); if (rc < 0) { - struct vos_agg_metrics *vam = agg_cont2metrics(cont); - if (vam && vam->vam_fail_count) d_tm_inc_counter(vam->vam_fail_count, 1); } @@ -2757,12 +2765,13 @@ int vos_discard(daos_handle_t coh, daos_unit_oid_t *oidp, daos_epoch_range_t *epr, int (*yield_func)(void *arg), void *yield_arg) { - struct vos_container *cont = vos_hdl2cont(coh); - struct vos_object *obj; + struct vos_container *cont = vos_hdl2cont(coh); + struct vos_agg_metrics *vam = agg_cont2metrics(cont); struct agg_data *ad; int type = VOS_ITER_OBJ; int rc; int mode = oidp == NULL ? AGG_MODE_DISCARD : AGG_MODE_OBJ_DISCARD; + int blocks = 0; D_ASSERT(epr != NULL); D_ASSERTF(epr->epr_lo <= epr->epr_hi, @@ -2773,22 +2782,9 @@ vos_discard(daos_handle_t coh, daos_unit_oid_t *oidp, daos_epoch_range_t *epr, if (ad == NULL) return -DER_NOMEM; - if (oidp != NULL) { - rc = vos_obj_discard_hold(vos_obj_cache_current(cont->vc_pool->vp_sysdb), - cont, *oidp, &obj); - if (rc != 0) { - if (rc == -DER_NONEXIST) - rc = 0; - goto free_agg_data; - } - - D_ASSERT(obj != NULL); - D_ASSERT(obj->obj_discard); - } - rc = aggregate_enter(cont, mode, epr); if (rc != 0) - goto release_obj; + goto exit; if (oidp != NULL) { D_DEBUG(DB_EPC, "Discard "DF_UOID" epr "DF_X64"-"DF_X64"\n", DP_UOID(*oidp), @@ -2823,17 +2819,26 @@ vos_discard(daos_handle_t coh, daos_unit_oid_t *oidp, daos_epoch_range_t *epr, ad->ad_agg_param.ap_yield_arg = yield_arg; ad->ad_iter_param.ip_flags |= VOS_IT_FOR_DISCARD; - rc = vos_iterate(&ad->ad_iter_param, type, true, &ad->ad_anchors, - vos_aggregate_pre_cb, vos_aggregate_post_cb, - &ad->ad_agg_param, NULL); +retry: + rc = vos_iterate(&ad->ad_iter_param, type, true, &ad->ad_anchors, vos_aggregate_pre_cb, + vos_aggregate_post_cb, &ad->ad_agg_param, NULL); + if (rc == -DER_BUSY) { + /** Hit an object conflict with EC aggregation. Rather than exiting, let's + * yield and try again. + */ + blocks++; + /** Warn once if it goes over 20 times */ + D_CDEBUG(blocks == 20, DLOG_WARN, DB_EPC, + "VOS discard hit conflict (nr=%d), retrying...\n", blocks); + if (vam && vam->vam_discard_blocked) + d_tm_inc_counter(vam->vam_discard_blocked, 1); + vos_aggregate_yield(&ad->ad_agg_param); + goto retry; + } aggregate_exit(cont, mode); -release_obj: - if (oidp != NULL) - vos_obj_discard_release(vos_obj_cache_current(cont->vc_pool->vp_sysdb), obj); - -free_agg_data: +exit: D_FREE(ad); return rc; diff --git a/src/vos/vos_common.c b/src/vos/vos_common.c index 2a59197f709..b813bb3e76d 100644 --- a/src/vos/vos_common.c +++ b/src/vos/vos_common.c @@ -754,6 +754,19 @@ vos_metrics_alloc(const char *path, int tgt_id) if (rc) D_WARN("Failed to create 'merged_size' telemetry : "DF_RC"\n", DP_RC(rc)); + /* VOS aggregation conflicts with discard */ + rc = d_tm_add_metric(&vam->vam_agg_blocked, D_TM_COUNTER, "aggregation blocked by discard", + NULL, "%s/%s/agg_blocked/tgt_%u", path, VOS_AGG_DIR, tgt_id); + if (rc) + D_WARN("Failed to create 'agg_blocked' telemetry : " DF_RC "\n", DP_RC(rc)); + + /* VOS discard conflicts with aggregation */ + rc = d_tm_add_metric(&vam->vam_discard_blocked, D_TM_COUNTER, + "discard blocked by aggregation", NULL, "%s/%s/discard_blocked/tgt_%u", + path, VOS_AGG_DIR, tgt_id); + if (rc) + D_WARN("Failed to create 'discard_blocked' telemetry : " DF_RC "\n", DP_RC(rc)); + /* VOS aggregation failed */ rc = d_tm_add_metric(&vam->vam_fail_count, D_TM_COUNTER, "aggregation failures", NULL, "%s/%s/fail_count/tgt_%u", path, VOS_AGG_DIR, tgt_id); diff --git a/src/vos/vos_dtx.c b/src/vos/vos_dtx.c index 6d28c935ef8..741ec25c87c 100644 --- a/src/vos/vos_dtx.c +++ b/src/vos/vos_dtx.c @@ -2665,7 +2665,7 @@ vos_dtx_mark_sync(daos_handle_t coh, daos_unit_oid_t oid, daos_epoch_t epoch) sizeof(obj->obj_df->vo_sync), UMEM_COMMIT_IMMEDIATE); } - vos_obj_release(occ, obj, false); + vos_obj_release(occ, obj, 0, false); return 0; } diff --git a/src/vos/vos_internal.h b/src/vos/vos_internal.h index 92e69cc8ad5..2cc8c62a425 100644 --- a/src/vos/vos_internal.h +++ b/src/vos/vos_internal.h @@ -29,40 +29,42 @@ #define VOS_MINOR_EPC_MAX EVT_MINOR_EPC_MAX -#define VOS_TX_LOG_FAIL(rc, ...) \ - do { \ - bool __is_err = true; \ - \ - if (rc >= 0) \ - break; \ - switch (rc) { \ - case -DER_TX_RESTART: \ - case -DER_INPROGRESS: \ - case -DER_EXIST: \ - case -DER_NONEXIST: \ - __is_err = false; \ - break; \ - } \ - D_CDEBUG(__is_err, DLOG_ERR, DB_IO, \ - __VA_ARGS__); \ +#define VOS_TX_LOG_FAIL(rc, ...) \ + do { \ + bool __is_err = true; \ + \ + if (rc >= 0) \ + break; \ + switch (rc) { \ + case -DER_TX_RESTART: \ + case -DER_INPROGRESS: \ + case -DER_UPDATE_AGAIN: \ + case -DER_BUSY: \ + case -DER_EXIST: \ + case -DER_NONEXIST: \ + __is_err = false; \ + break; \ + } \ + D_CDEBUG(__is_err, DLOG_ERR, DB_IO, __VA_ARGS__); \ } while (0) -#define VOS_TX_TRACE_FAIL(rc, ...) \ - do { \ - bool __is_err = true; \ - \ - if (rc >= 0) \ - break; \ - switch (rc) { \ - case -DER_TX_RESTART: \ - case -DER_INPROGRESS: \ - case -DER_EXIST: \ - case -DER_NONEXIST: \ - __is_err = false; \ - break; \ - } \ - D_CDEBUG(__is_err, DLOG_ERR, DB_TRACE, \ - __VA_ARGS__); \ +#define VOS_TX_TRACE_FAIL(rc, ...) \ + do { \ + bool __is_err = true; \ + \ + if (rc >= 0) \ + break; \ + switch (rc) { \ + case -DER_TX_RESTART: \ + case -DER_INPROGRESS: \ + case -DER_UPDATE_AGAIN: \ + case -DER_BUSY: \ + case -DER_EXIST: \ + case -DER_NONEXIST: \ + __is_err = false; \ + break; \ + } \ + D_CDEBUG(__is_err, DLOG_ERR, DB_TRACE, __VA_ARGS__); \ } while (0) #define VOS_CONT_ORDER 20 /* Order of container tree */ @@ -186,6 +188,8 @@ struct vos_agg_metrics { struct d_tm_node_t *vam_merge_recs; /* Total merged EV records */ struct d_tm_node_t *vam_merge_size; /* Total merged size */ struct d_tm_node_t *vam_fail_count; /* Aggregation failed */ + struct d_tm_node_t *vam_agg_blocked; /* Aggregation waiting for discard */ + struct d_tm_node_t *vam_discard_blocked; /* Discard waiting for aggregation */ }; struct vos_gc_metrics { @@ -1056,13 +1060,11 @@ struct vos_iterator { vos_iter_type_t it_type; enum vos_iter_state it_state; uint32_t it_ref_cnt; - uint32_t it_from_parent:1, - it_for_purge:1, - it_for_discard:1, - it_for_migration:1, - it_show_uncommitted:1, - it_ignore_uncommitted:1, - it_for_sysdb:1; + /** Note: it_for_agg is only set at object level as it's only used for + * mutual exclusion between aggregation and object discard. + */ + uint32_t it_from_parent : 1, it_for_purge : 1, it_for_discard : 1, it_for_migration : 1, + it_show_uncommitted : 1, it_ignore_uncommitted : 1, it_for_sysdb : 1, it_for_agg : 1; }; /* Auxiliary structure for passing information between parent and nested diff --git a/src/vos/vos_io.c b/src/vos/vos_io.c index a27af4be11a..cfba470bb36 100644 --- a/src/vos/vos_io.c +++ b/src/vos/vos_io.c @@ -571,8 +571,8 @@ vos_ioc_destroy(struct vos_io_context *ioc, bool evict) dcs_csum_info_list_fini(&ioc->ic_csum_list); if (ioc->ic_obj) - vos_obj_release(vos_obj_cache_current(ioc->ic_cont->vc_pool->vp_sysdb), - ioc->ic_obj, evict); + vos_obj_release(vos_obj_cache_current(ioc->ic_cont->vc_pool->vp_sysdb), ioc->ic_obj, + 0, evict); vos_ioc_reserve_fini(ioc); vos_ilog_fetch_finish(&ioc->ic_dkey_info); diff --git a/src/vos/vos_iterator.c b/src/vos/vos_iterator.c index 817ea2e56c3..040736226f4 100644 --- a/src/vos/vos_iterator.c +++ b/src/vos/vos_iterator.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2016-2023 Intel Corporation. + * (C) Copyright 2016-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -331,6 +331,82 @@ vos_iter_finish(daos_handle_t ih) return rc || prc; } +static inline void +vos_iter_sched_sync(struct vos_iterator *iter) +{ + iter->it_seq = vos_sched_seq(!!iter->it_for_sysdb); +} + +static inline bool +vos_iter_sched_check(struct vos_iterator *iter) +{ + uint64_t seq = vos_sched_seq(!!iter->it_for_sysdb); + bool ret = iter->it_seq != seq; + + iter->it_seq = seq; + return ret; +} + +static int +vos_iter_validate_internal(struct vos_iterator *iter) +{ + daos_anchor_t *anchor; + int rc; + struct dtx_handle *old; + bool is_sysdb = !!iter->it_for_sysdb; + + D_ASSERT(iter->it_anchors != NULL); + + if (!vos_iter_sched_check(iter)) + return 0; /* No interleaving operations so no need to revalidate */ + + if (iter->it_parent) { + rc = vos_iter_validate_internal(iter->it_parent); + if (rc != 0) + return rc; + } else { + D_ASSERT(iter->it_type == VOS_ITER_OBJ); + } + + switch (iter->it_type) { + case VOS_ITER_OBJ: + anchor = &iter->it_anchors->ia_obj; + break; + case VOS_ITER_DKEY: + anchor = &iter->it_anchors->ia_dkey; + break; + case VOS_ITER_AKEY: + anchor = &iter->it_anchors->ia_akey; + break; + case VOS_ITER_SINGLE: + anchor = &iter->it_anchors->ia_sv; + break; + case VOS_ITER_RECX: + anchor = &iter->it_anchors->ia_sv; + break; + default: + D_ASSERTF(0, "Unexpected iterator type %d\n", iter->it_type); + } + + old = vos_dth_get(is_sysdb); + vos_dth_set(iter->it_dth, is_sysdb); + rc = iter->it_ops->iop_probe(iter, anchor, VOS_ITER_PROBE_AGAIN); + vos_dth_set(old, is_sysdb); + + if (rc == 0) + return 0; + + iter->it_anchors->ia_probe_level = iter->it_type; + + return iter->it_type; +} + +int +vos_iter_validate(daos_handle_t ih) +{ + return vos_iter_validate_internal(vos_hdl2iter(ih)); +} + int vos_iter_probe_ex(daos_handle_t ih, daos_anchor_t *anchor, uint32_t flags) { @@ -676,22 +752,6 @@ advance_stage(vos_iter_type_t type, unsigned int acts, vos_iter_param_t *param, return rc; } -static inline void -vos_iter_sched_sync(struct vos_iterator *iter) -{ - iter->it_seq = vos_sched_seq(!!iter->it_for_sysdb); -} - -static inline bool -vos_iter_sched_check(struct vos_iterator *iter) -{ - uint64_t seq = vos_sched_seq(!!iter->it_for_sysdb); - bool ret = iter->it_seq != seq; - - iter->it_seq = seq; - return ret; -} - static inline int vos_iter_cb(vos_iter_cb_t iter_cb, daos_handle_t ih, vos_iter_entry_t *iter_ent, vos_iter_type_t type, vos_iter_param_t *param, void *arg, unsigned int *acts) @@ -702,8 +762,18 @@ vos_iter_cb(vos_iter_cb_t iter_cb, daos_handle_t ih, vos_iter_entry_t *iter_ent, vos_iter_sched_sync(iter); D_ASSERT(iter_cb != NULL); rc = iter_cb(ih, iter_ent, type, param, arg, acts); - if (vos_iter_sched_check(iter)) + if (vos_iter_sched_check(iter)) { *acts |= VOS_ITER_CB_YIELD; + if (rc == 0 && iter->it_parent != NULL && + (param->ip_flags & + (VOS_IT_RECX_VISIBLE | VOS_IT_FOR_AGG | VOS_IT_FOR_DISCARD)) == 0) { + /** If scanning the whole tree, we need to revalidate the parent + * chain and possibly jump back to another level to continue */ + rc = vos_iter_validate_internal(iter->it_parent); + if (rc > 0) + rc = 0; + } + } return rc; } @@ -986,63 +1056,3 @@ vos_iterate(vos_iter_param_t *param, vos_iter_type_t type, bool recursive, return vos_iterate_internal(param, type, recursive, false, anchors, pre_cb, post_cb, arg, dth); } - -static int -vos_iter_validate_internal(struct vos_iterator *iter) -{ - daos_anchor_t *anchor; - int rc; - struct dtx_handle *old; - bool is_sysdb = !!iter->it_for_sysdb; - - D_ASSERT(iter->it_anchors != NULL); - - if (!vos_iter_sched_check(iter)) - return 0; /* No interleaving operations so no need to revalidate */ - - if (iter->it_parent) { - rc = vos_iter_validate_internal(iter->it_parent); - if (rc != 0) - return rc; - } else { - D_ASSERT(iter->it_type == VOS_ITER_OBJ); - } - - switch (iter->it_type) { - case VOS_ITER_OBJ: - anchor = &iter->it_anchors->ia_obj; - break; - case VOS_ITER_DKEY: - anchor = &iter->it_anchors->ia_dkey; - break; - case VOS_ITER_AKEY: - anchor = &iter->it_anchors->ia_akey; - break; - case VOS_ITER_SINGLE: - anchor = &iter->it_anchors->ia_sv; - break; - case VOS_ITER_RECX: - anchor = &iter->it_anchors->ia_sv; - break; - default: - D_ASSERTF(0, "Unexpected iterator type %d\n", iter->it_type); - } - - old = vos_dth_get(is_sysdb); - vos_dth_set(iter->it_dth, is_sysdb); - rc = iter->it_ops->iop_probe(iter, anchor, VOS_ITER_PROBE_AGAIN); - vos_dth_set(old, is_sysdb); - - if (rc == 0) - return 0; - - iter->it_anchors->ia_probe_level = iter->it_type; - - return iter->it_type; -} - -int -vos_iter_validate(daos_handle_t ih) -{ - return vos_iter_validate_internal(vos_hdl2iter(ih)); -} diff --git a/src/vos/vos_obj.c b/src/vos/vos_obj.c index 97a30e6bb07..5692a5f14bb 100644 --- a/src/vos/vos_obj.c +++ b/src/vos/vos_obj.c @@ -511,8 +511,8 @@ vos_obj_punch(daos_handle_t coh, daos_unit_oid_t oid, daos_epoch_t epoch, rc = vos_mark_agg(cont, &obj->obj_df->vo_tree, &cont->vc_cont_df->cd_obj_root, epoch); - vos_obj_release(vos_obj_cache_current(cont->vc_pool->vp_sysdb), - obj, rc != 0); + vos_obj_release(vos_obj_cache_current(cont->vc_pool->vp_sysdb), obj, 0, + rc != 0); } } @@ -620,7 +620,7 @@ vos_obj_key2anchor(daos_handle_t coh, daos_unit_oid_t oid, daos_key_t *dkey, dao key_tree_release(toh, false); out: - vos_obj_release(occ, obj, false); + vos_obj_release(occ, obj, 0, false); return rc; } @@ -656,7 +656,7 @@ vos_obj_delete_internal(daos_handle_t coh, daos_unit_oid_t oid, bool only_delete rc = umem_tx_end(umm, rc); out: - vos_obj_release(occ, obj, true); + vos_obj_release(occ, obj, 0, true); return rc; } @@ -737,7 +737,7 @@ vos_obj_del_key(daos_handle_t coh, daos_unit_oid_t oid, daos_key_t *dkey, out_tx: rc = umem_tx_end(umm, rc); out: - vos_obj_release(occ, obj, true); + vos_obj_release(occ, obj, 0, true); return rc; } @@ -1651,6 +1651,8 @@ vos_obj_iter_prep(vos_iter_type_t type, vos_iter_param_t *param, oiter->it_iter.it_for_discard = 1; if (param->ip_flags & VOS_IT_FOR_MIGRATION) oiter->it_iter.it_for_migration = 1; + if (param->ip_flags & VOS_IT_FOR_AGG) + oiter->it_iter.it_for_agg = 1; if (is_sysdb) oiter->it_iter.it_for_sysdb = 1; if (param->ip_flags == VOS_IT_KEY_TREE) { @@ -1772,16 +1774,22 @@ nested_dkey_iter_init(struct vos_obj_iter *oiter, struct vos_iter_info *info) { int rc; struct vos_container *cont = vos_hdl2cont(info->ii_hdl); + uint64_t flags = 0; + + if ((oiter->it_flags & VOS_IT_PUNCHED) == 0) + flags |= VOS_OBJ_VISIBLE; + if (oiter->it_iter.it_for_agg) + flags |= VOS_OBJ_AGGREGATE; + if (oiter->it_iter.it_for_discard) + flags |= VOS_OBJ_DISCARD; /* XXX the condition epoch ranges could cover multiple versions of * the object/key if it's punched more than once. However, rebuild * system should guarantee this will never happen. */ - rc = vos_obj_hold(vos_obj_cache_current(cont->vc_pool->vp_sysdb), cont, - info->ii_oid, &info->ii_epr, oiter->it_iter.it_bound, - (oiter->it_flags & VOS_IT_PUNCHED) ? 0 : - VOS_OBJ_VISIBLE, vos_iter_intent(&oiter->it_iter), - &oiter->it_obj, NULL); + rc = vos_obj_hold(vos_obj_cache_current(cont->vc_pool->vp_sysdb), cont, info->ii_oid, + &info->ii_epr, oiter->it_iter.it_bound, flags, + vos_iter_intent(&oiter->it_iter), &oiter->it_obj, NULL); D_ASSERTF(rc != -DER_NONEXIST, "Nested iterator called without setting probe"); @@ -1808,7 +1816,8 @@ nested_dkey_iter_init(struct vos_obj_iter *oiter, struct vos_iter_info *info) return 0; failed: - vos_obj_release(vos_obj_cache_current(cont->vc_pool->vp_sysdb), oiter->it_obj, false); + vos_obj_release(vos_obj_cache_current(cont->vc_pool->vp_sysdb), oiter->it_obj, flags, + false); return rc; } @@ -1926,6 +1935,7 @@ vos_obj_iter_fini(struct vos_iterator *iter) struct vos_obj_iter *oiter = vos_iter2oiter(iter); int rc; struct vos_object *object; + uint64_t flags = 0; if (daos_handle_is_inval(oiter->it_hdl)) D_GOTO(out, rc = -DER_NO_HDL); @@ -1952,9 +1962,16 @@ vos_obj_iter_fini(struct vos_iterator *iter) */ object = oiter->it_obj; if (oiter->it_flags != VOS_IT_KEY_TREE && object != NULL && - (iter->it_type == VOS_ITER_DKEY || !iter->it_from_parent)) - vos_obj_release(vos_obj_cache_current(object->obj_cont->vc_pool->vp_sysdb), - object, false); + (iter->it_type == VOS_ITER_DKEY || !iter->it_from_parent)) { + if (iter->it_type == VOS_ITER_DKEY) { + if (iter->it_for_discard) + flags = VOS_OBJ_DISCARD; + else if (iter->it_for_agg) + flags = VOS_OBJ_AGGREGATE; + } + vos_obj_release(vos_obj_cache_current(object->obj_cont->vc_pool->vp_sysdb), object, + flags, false); + } vos_ilog_fetch_finish(&oiter->it_ilog_info); D_FREE(oiter); diff --git a/src/vos/vos_obj.h b/src/vos/vos_obj.h index e4a8c11fd7a..c687cd77f9c 100644 --- a/src/vos/vos_obj.h +++ b/src/vos/vos_obj.h @@ -49,19 +49,25 @@ struct vos_object { struct vos_container *obj_cont; /** nobody should access this object */ bool obj_zombie; - /** Object is in discard */ - bool obj_discard; + /** Object is held for discard */ + uint32_t obj_discard : 1, + /** If non-zero, object is held for aggregation */ + obj_aggregate : 1; }; enum { /** Only return the object if it's visible */ - VOS_OBJ_VISIBLE = (1 << 0), + VOS_OBJ_VISIBLE = (1 << 0), /** Create the object if it doesn't exist */ - VOS_OBJ_CREATE = (1 << 1), - /** Hold for object specific discard */ - VOS_OBJ_DISCARD = (1 << 2), + VOS_OBJ_CREATE = (1 << 1), + /** Hold for discard */ + VOS_OBJ_DISCARD = (1 << 2), + /** Hold for VOS or EC aggregation */ + VOS_OBJ_AGGREGATE = (1 << 3), /** Hold the object for delete dkey */ - VOS_OBJ_KILL_DKEY = (1 << 3), + VOS_OBJ_KILL_DKEY = (1 << 4), + /** Don't actually complete the hold, just check for conflicts */ + VOS_OBJ_NO_HOLD = (1 << 5), }; /** @@ -101,7 +107,7 @@ vos_obj_hold(struct daos_lru_cache *occ, struct vos_container *cont, * \param obj [IN] Reference to be released. */ void -vos_obj_release(struct daos_lru_cache *occ, struct vos_object *obj, bool evict); +vos_obj_release(struct daos_lru_cache *occ, struct vos_object *obj, uint64_t flags, bool evict); /** Evict an object reference from the cache */ void vos_obj_evict(struct daos_lru_cache *occ, struct vos_object *obj); diff --git a/src/vos/vos_obj_cache.c b/src/vos/vos_obj_cache.c index 82a3e510471..38dc935618f 100644 --- a/src/vos/vos_obj_cache.c +++ b/src/vos/vos_obj_cache.c @@ -206,7 +206,7 @@ vos_obj_cache_current(bool standalone) static __thread struct vos_object obj_local = {0}; void -vos_obj_release(struct daos_lru_cache *occ, struct vos_object *obj, bool evict) +vos_obj_release(struct daos_lru_cache *occ, struct vos_object *obj, uint64_t flags, bool evict) { if (obj == &obj_local) { @@ -216,6 +216,10 @@ vos_obj_release(struct daos_lru_cache *occ, struct vos_object *obj, bool evict) } D_ASSERT((occ != NULL) && (obj != NULL)); + if (flags & VOS_OBJ_AGGREGATE) + obj->obj_aggregate = 0; + else if (flags & VOS_OBJ_DISCARD) + obj->obj_discard = 0; if (evict) daos_lru_ref_evict(occ, &obj->obj_llink); @@ -223,35 +227,6 @@ vos_obj_release(struct daos_lru_cache *occ, struct vos_object *obj, bool evict) daos_lru_ref_release(occ, &obj->obj_llink); } -int -vos_obj_discard_hold(struct daos_lru_cache *occ, struct vos_container *cont, daos_unit_oid_t oid, - struct vos_object **objp) -{ - struct vos_object *obj = NULL; - daos_epoch_range_t epr = {0, DAOS_EPOCH_MAX}; - int rc; - - rc = vos_obj_hold(occ, cont, oid, &epr, 0, VOS_OBJ_DISCARD, - DAOS_INTENT_DISCARD, &obj, NULL); - if (rc != 0) - return rc; - - D_ASSERTF(!obj->obj_discard, "vos_obj_hold should return an error if already in discard\n"); - - obj->obj_discard = true; - *objp = obj; - - return 0; -} - -void -vos_obj_discard_release(struct daos_lru_cache *occ, struct vos_object *obj) -{ - obj->obj_discard = false; - - vos_obj_release(occ, obj, false); -} - /** Move local object to the lru cache */ static inline int cache_object(struct daos_lru_cache *occ, struct vos_object **objp) @@ -296,6 +271,37 @@ cache_object(struct daos_lru_cache *occ, struct vos_object **objp) return 0; } +static bool +vos_obj_op_conflict(struct vos_object *obj, uint64_t flags, uint32_t intent, bool create) +{ + bool discard = flags & VOS_OBJ_DISCARD; + bool agg = flags & VOS_OBJ_AGGREGATE; + + /* VOS aggregation is mutually exclusive with VOS discard. + * Object discard is mutually exclusive with VOS discard. + * EC aggregation is not mutually exclusive with anything. + * For simplicity, we do make all of them mutually exclusive on the same + * object. + */ + + if (obj->obj_discard) { + /** Mutually exclusive with create, discard and aggregation */ + if (create || discard || agg) { + D_DEBUG(DB_EPC, "Conflict detected, discard already running on object\n"); + return true; + } + } else if (obj->obj_aggregate) { + /** Mutually exclusive with discard */ + if (discard || agg) { + D_DEBUG(DB_EPC, + "Conflict detected, aggregation already running on object\n"); + return true; + } + } + + return false; +} + int vos_obj_hold(struct daos_lru_cache *occ, struct vos_container *cont, daos_unit_oid_t oid, daos_epoch_range_t *epr, daos_epoch_t bound, @@ -310,12 +316,13 @@ vos_obj_hold(struct daos_lru_cache *occ, struct vos_container *cont, uint32_t cond_mask = 0; bool create; void *create_flag = NULL; - bool visible_only; + bool visible_only; D_ASSERT(cont != NULL); D_ASSERT(cont->vc_pool); - *obj_p = NULL; + if (obj_p != NULL) + *obj_p = NULL; if (cont->vc_pool->vp_dying) return -DER_SHUTDOWN; @@ -340,6 +347,10 @@ vos_obj_hold(struct daos_lru_cache *occ, struct vos_container *cont, rc = daos_lru_ref_hold(occ, &lkey, sizeof(lkey), create_flag, &lret); if (rc == -DER_NONEXIST) { D_ASSERT(obj_local.obj_cont == NULL); + if (flags & VOS_OBJ_NO_HOLD) { + /** Object is not cached, so there can be no other holders */ + return 0; + } obj = &obj_local; init_object(obj, oid, cont); } else if (rc != 0) { @@ -406,14 +417,25 @@ vos_obj_hold(struct daos_lru_cache *occ, struct vos_container *cont, } check_object: - if (obj->obj_discard && (create || (flags & VOS_OBJ_DISCARD) != 0)) { - /** Cleanup before assert so unit test that triggers doesn't corrupt the state */ - vos_obj_release(occ, obj, false); + if (vos_obj_op_conflict(obj, flags, intent, create)) { + /** Cleanup so unit test that triggers doesn't corrupt the state */ + vos_obj_release(occ, obj, 0, false); /* Update request will retry with this error */ - rc = -DER_UPDATE_AGAIN; + if (create) + rc = -DER_UPDATE_AGAIN; + else + rc = -DER_BUSY; goto failed_2; } + if (flags & VOS_OBJ_NO_HOLD) { + /** Just checking for conflicts, so we are done */ + vos_obj_release(occ, obj, 0, false); + return 0; + } + + D_ASSERT(obj_p != NULL); + if ((flags & VOS_OBJ_DISCARD) || intent == DAOS_INTENT_KILL || intent == DAOS_INTENT_PUNCH) goto out; @@ -497,13 +519,18 @@ vos_obj_hold(struct daos_lru_cache *occ, struct vos_container *cont, goto failed_2; } + if (flags & VOS_OBJ_AGGREGATE) + obj->obj_aggregate = 1; + else if (flags & VOS_OBJ_DISCARD) + obj->obj_discard = 1; *obj_p = obj; return 0; failed: - vos_obj_release(occ, obj, true); + vos_obj_release(occ, obj, 0, true); failed_2: - VOS_TX_LOG_FAIL(rc, "failed to hold object, rc="DF_RC"\n", DP_RC(rc)); + VOS_TX_LOG_FAIL(rc, "failed to hold object " DF_UOID ", rc=" DF_RC "\n", DP_UOID(oid), + DP_RC(rc)); return rc; } diff --git a/src/vos/vos_obj_index.c b/src/vos/vos_obj_index.c index 58d1dba1021..5445993186c 100644 --- a/src/vos/vos_obj_index.c +++ b/src/vos/vos_obj_index.c @@ -847,11 +847,13 @@ oi_iter_aggregate(daos_handle_t ih, bool range_discard) { struct vos_iterator *iter = vos_hdl2iter(ih); struct vos_oi_iter *oiter = iter2oiter(iter); + struct vos_container *cont = oiter->oit_cont; struct vos_obj_df *obj; daos_unit_oid_t oid; d_iov_t rec_iov; bool delete = false, invisible = false; int rc; + uint64_t base_flag = range_discard ? VOS_OBJ_DISCARD : VOS_OBJ_AGGREGATE; D_ASSERT(iter->it_type == VOS_ITER_OBJ); @@ -865,6 +867,18 @@ oi_iter_aggregate(daos_handle_t ih, bool range_discard) obj = (struct vos_obj_df *)rec_iov.iov_buf; oid = obj->vo_id; + rc = vos_obj_hold(vos_obj_cache_current(cont->vc_pool->vp_sysdb), cont, oid, + &oiter->oit_epr, iter->it_bound, base_flag | VOS_OBJ_NO_HOLD, + DAOS_INTENT_PURGE, NULL, NULL); + if (rc != 0) { + /** -DER_BUSY means the object is in-use already. We will after a yield in this + * case. + */ + D_CDEBUG(rc == -DER_BUSY, DB_EPC, DLOG_ERR, "Hold check failed for " DF_UOID "\n", + DP_UOID(oid)); + return rc; + } + rc = umem_tx_begin(vos_cont2umm(oiter->oit_cont), NULL); if (rc != 0) goto exit; diff --git a/src/vos/vos_query.c b/src/vos/vos_query.c index b7007ec6b87..8e0938c81db 100644 --- a/src/vos/vos_query.c +++ b/src/vos/vos_query.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2019-2022 Intel Corporation. + * (C) Copyright 2019-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -787,7 +787,7 @@ vos_obj_query_key(daos_handle_t coh, daos_unit_oid_t oid, uint32_t flags, *max_write = obj->obj_df->vo_max_write; if (obj != NULL) - vos_obj_release(vos_obj_cache_current(is_sysdb), obj, false); + vos_obj_release(vos_obj_cache_current(is_sysdb), obj, 0, false); if (rc == 0 || rc == -DER_NONEXIST) { if (vos_ts_wcheck(query->qt_ts_set, obj_epr.epr_hi,