Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backports to address b/336317519 #14561

Merged
merged 5 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/container/container_iv.c
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ cont_iv_ent_update(struct ds_iv_entry *entry, struct ds_iv_key *key,
}
if (entry->iv_class->iv_class_id == IV_CONT_CAPA &&
!uuid_is_null(civ_key->cont_uuid)) {
rc = ds_cont_tgt_close(civ_key->cont_uuid);
rc = ds_cont_tgt_close(entry->ns->iv_pool_uuid, civ_key->cont_uuid);
if (rc)
D_GOTO(out, rc);
}
Expand Down
5 changes: 3 additions & 2 deletions src/container/srv_container.c
Original file line number Diff line number Diff line change
Expand Up @@ -1801,8 +1801,9 @@ ds_cont_tgt_refresh_agg_eph(uuid_t pool_uuid, uuid_t cont_uuid,
uuid_copy(arg.cont_uuid, cont_uuid);
arg.min_eph = eph;

rc = dss_task_collective(cont_refresh_vos_agg_eph_one, &arg,
DSS_ULT_FL_PERIODIC);
rc = ds_pool_task_collective(pool_uuid,
PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT,
cont_refresh_vos_agg_eph_one, &arg, DSS_ULT_FL_PERIODIC);
return rc;
}

Expand Down
3 changes: 2 additions & 1 deletion src/container/srv_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ int ds_cont_tgt_open(uuid_t pool_uuid, uuid_t cont_hdl_uuid,
int ds_cont_tgt_snapshots_update(uuid_t pool_uuid, uuid_t cont_uuid,
uint64_t *snapshots, int snap_count);
int ds_cont_tgt_snapshots_refresh(uuid_t pool_uuid, uuid_t cont_uuid);
int ds_cont_tgt_close(uuid_t cont_hdl_uuid);
int
ds_cont_tgt_close(uuid_t pool_uuid, uuid_t cont_hdl_uuid);
int ds_cont_tgt_refresh_agg_eph(uuid_t pool_uuid, uuid_t cont_uuid,
daos_epoch_t eph);
int ds_cont_tgt_prop_update(uuid_t pool_uuid, uuid_t cont_uuid, daos_prop_t *prop);
Expand Down
62 changes: 35 additions & 27 deletions src/container/srv_target.c
Original file line number Diff line number Diff line change
Expand Up @@ -1256,7 +1256,9 @@ ds_cont_tgt_destroy(uuid_t pool_uuid, uuid_t cont_uuid)
cont_iv_entry_delete(pool->sp_iv_ns, pool_uuid, cont_uuid);
ds_pool_put(pool);

rc = dss_thread_collective(cont_child_destroy_one, &in, 0);
rc = ds_pool_thread_collective(pool_uuid,
PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT,
cont_child_destroy_one, &in, 0);
if (rc)
D_ERROR(DF_UUID"/"DF_UUID" container child destroy failed: %d\n",
DP_UUID(pool_uuid), DP_UUID(cont_uuid), rc);
Expand Down Expand Up @@ -1631,9 +1633,7 @@ ds_cont_tgt_open(uuid_t pool_uuid, uuid_t cont_hdl_uuid,
uuid_t cont_uuid, uint64_t flags, uint64_t sec_capas,
uint32_t status_pm_ver)
{
struct cont_tgt_open_arg arg = { 0 };
struct dss_coll_ops coll_ops = { 0 };
struct dss_coll_args coll_args = { 0 };
struct cont_tgt_open_arg arg = {0};
int rc;

uuid_copy(arg.pool_uuid, pool_uuid);
Expand All @@ -1647,22 +1647,9 @@ ds_cont_tgt_open(uuid_t pool_uuid, uuid_t cont_hdl_uuid,
D_DEBUG(DB_TRACE, "open pool/cont/hdl "DF_UUID"/"DF_UUID"/"DF_UUID"\n",
DP_UUID(pool_uuid), DP_UUID(cont_uuid), DP_UUID(cont_hdl_uuid));

/* collective operations */
coll_ops.co_func = cont_open_one;
coll_args.ca_func_args = &arg;

/* setting aggregator args */
rc = ds_pool_get_failed_tgt_idx(pool_uuid, &coll_args.ca_exclude_tgts,
&coll_args.ca_exclude_tgts_cnt);
if (rc) {
D_ERROR(DF_UUID "failed to get index : rc "DF_RC"\n",
DP_UUID(pool_uuid), DP_RC(rc));
return rc;
}

rc = dss_thread_collective_reduce(&coll_ops, &coll_args, 0);
D_FREE(coll_args.ca_exclude_tgts);

rc = ds_pool_thread_collective(pool_uuid,
PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT,
cont_open_one, &arg, 0);
if (rc != 0) {
/* Once it exclude the target from the pool, since the target
* might still in the cart group, so IV cont open might still
Expand Down Expand Up @@ -1732,12 +1719,14 @@ cont_close_one_hdl(void *vin)
}

int
ds_cont_tgt_close(uuid_t hdl_uuid)
ds_cont_tgt_close(uuid_t pool_uuid, uuid_t hdl_uuid)
{
struct coll_close_arg arg;

uuid_copy(arg.uuid, hdl_uuid);
return dss_thread_collective(cont_close_one_hdl, &arg, 0);
return ds_pool_thread_collective(pool_uuid,
PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT,
cont_close_one_hdl, &arg, 0);
}

struct xstream_cont_query {
Expand Down Expand Up @@ -1840,6 +1829,7 @@ ds_cont_tgt_query_handler(crt_rpc_t *rpc)
struct dss_coll_ops coll_ops;
struct dss_coll_args coll_args = { 0 };
struct xstream_cont_query pack_args;
struct ds_pool_hdl *pool_hdl;

out->tqo_hae = DAOS_EPOCH_MAX;

Expand All @@ -1858,9 +1848,17 @@ ds_cont_tgt_query_handler(crt_rpc_t *rpc)
coll_args.ca_aggregator = &pack_args;
coll_args.ca_func_args = &coll_args.ca_stream_args;

rc = dss_task_collective_reduce(&coll_ops, &coll_args, 0);
pool_hdl = ds_pool_hdl_lookup(in->tqi_pool_uuid);
if (pool_hdl == NULL)
D_GOTO(out, rc = -DER_NO_HDL);

rc = ds_pool_task_collective_reduce(pool_hdl->sph_pool->sp_uuid,
PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT,
&coll_ops, &coll_args, 0);
D_ASSERTF(rc == 0, ""DF_RC"\n", DP_RC(rc));

ds_pool_hdl_put(pool_hdl);
out:
out->tqo_hae = MIN(out->tqo_hae, pack_args.xcq_hae);
out->tqo_rc = (rc == 0 ? 0 : 1);

Expand Down Expand Up @@ -1943,9 +1941,13 @@ ds_cont_tgt_snapshots_update(uuid_t pool_uuid, uuid_t cont_uuid,
uuid_copy(args.cont_uuid, cont_uuid);
args.snap_count = snap_count;
args.snapshots = snapshots;

D_DEBUG(DB_EPC, DF_UUID": refreshing snapshots %d\n",
DP_UUID(cont_uuid), snap_count);
return dss_task_collective(cont_snap_update_one, &args, 0);

return ds_pool_task_collective(pool_uuid,
PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT,
cont_snap_update_one, &args, false);
}

void
Expand Down Expand Up @@ -2032,7 +2034,9 @@ ds_cont_tgt_snapshot_notify_handler(crt_rpc_t *rpc)
args.snap_opts = in->tsi_opts;
args.oit_oid = in->tsi_oit_oid;

out->tso_rc = dss_thread_collective(cont_snap_notify_one, &args, 0);
out->tso_rc = ds_pool_thread_collective(
in->tsi_pool_uuid, PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT,
cont_snap_notify_one, &args, 0);
if (out->tso_rc != 0)
D_ERROR(DF_CONT": Snapshot notify failed: "DF_RC"\n",
DP_CONT(in->tsi_pool_uuid, in->tsi_cont_uuid),
Expand Down Expand Up @@ -2077,7 +2081,9 @@ ds_cont_tgt_epoch_aggregate_handler(crt_rpc_t *rpc)
if (out->tao_rc != 0)
return;

rc = dss_task_collective(cont_epoch_aggregate_one, NULL, 0);
rc = ds_pool_task_collective(in->tai_pool_uuid,
PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT,
cont_epoch_aggregate_one, NULL, 0);
if (rc != 0)
D_ERROR(DF_CONT": Aggregation failed: "DF_RC"\n",
DP_CONT(in->tai_pool_uuid, in->tai_cont_uuid),
Expand Down Expand Up @@ -2525,7 +2531,9 @@ ds_cont_tgt_prop_update(uuid_t pool_uuid, uuid_t cont_uuid, daos_prop_t *prop)
uuid_copy(arg.cpa_cont_uuid, cont_uuid);
uuid_copy(arg.cpa_pool_uuid, pool_uuid);
arg.cpa_prop = prop;
rc = dss_task_collective(cont_child_prop_update, &arg, 0);
rc = ds_pool_task_collective(pool_uuid,
PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT,
cont_child_prop_update, &arg, 0);
if (rc)
D_ERROR("collective cont_write_data_turn_off failed, "DF_RC"\n",
DP_RC(rc));
Expand Down
4 changes: 3 additions & 1 deletion src/dtx/dtx_resync.c
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,9 @@ dtx_resync_ult(void *data)
if (DAOS_FAIL_CHECK(DAOS_DTX_RESYNC_DELAY))
dss_sleep(5 * 1000);

rc = dss_thread_collective(dtx_resync_one, arg, DSS_ULT_DEEP_STACK);
rc = ds_pool_thread_collective(arg->pool_uuid,
PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT | PO_COMP_ST_NEW,
dtx_resync_one, arg, DSS_ULT_DEEP_STACK);
if (rc) {
/* If dtx resync fails, then let's still update
* sp_dtx_resync_version, so the rebuild can go ahead,
Expand Down
64 changes: 56 additions & 8 deletions src/engine/ult.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ dss_collective_reduce_internal(struct dss_coll_ops *ops,
int xs_nr;
int rc;
int tid;
int tgt_id = dss_get_module_info()->dmi_tgt_id;
uint32_t bm_len;
bool self = false;

if (ops == NULL || args == NULL || ops->co_func == NULL) {
D_DEBUG(DB_MD, "mandatory args missing dss_collective_reduce");
Expand All @@ -115,6 +118,7 @@ dss_collective_reduce_internal(struct dss_coll_ops *ops,
return -DER_CANCELED;
}

bm_len = args->ca_tgt_bitmap_sz << 3;
xs_nr = dss_tgt_nr;
stream_args = &args->ca_stream_args;
D_ALLOC_ARRAY(stream_args->csa_streams, xs_nr);
Expand Down Expand Up @@ -156,19 +160,18 @@ dss_collective_reduce_internal(struct dss_coll_ops *ops,
stream = &stream_args->csa_streams[tid];
stream->st_coll_args = &carg;

if (args->ca_exclude_tgts_cnt) {
int i;

for (i = 0; i < args->ca_exclude_tgts_cnt; i++)
if (args->ca_exclude_tgts[i] == tid)
break;

if (i < args->ca_exclude_tgts_cnt) {
if (args->ca_tgt_bitmap != NULL) {
if (tid >= bm_len || isclr(args->ca_tgt_bitmap, tid)) {
D_DEBUG(DB_TRACE, "Skip tgt %d\n", tid);
rc = ABT_future_set(future, (void *)stream);
D_ASSERTF(rc == ABT_SUCCESS, "%d\n", rc);
continue;
}

if (tgt_id == tid && flags & DSS_USE_CURRENT_ULT) {
self = true;
continue;
}
}

dx = dss_get_xstream(DSS_MAIN_XS_ID(tid));
Expand Down Expand Up @@ -209,6 +212,12 @@ dss_collective_reduce_internal(struct dss_coll_ops *ops,
}
}

if (self) {
stream = &stream_args->csa_streams[tgt_id];
stream->st_coll_args = &carg;
collective_func(stream);
}

ABT_future_wait(future);

rc = aggregator.at_rc;
Expand Down Expand Up @@ -350,6 +359,45 @@ sched_ult2xs_multisocket(int xs_type, int tgt_id)
return target;
}

int
dss_build_coll_bitmap(int *exclude_tgts, uint32_t exclude_cnt, uint8_t **p_bitmap,
uint32_t *bitmap_sz)
{
uint8_t *bitmap = NULL;
uint32_t size = ((dss_tgt_nr - 1) >> 3) + 1;
uint32_t bits = size << 3;
int rc = 0;
int i;

D_ALLOC(bitmap, size);
if (bitmap == NULL)
D_GOTO(out, rc = -DER_NOMEM);

for (i = 0; i < size; i++)
bitmap[i] = 0xff;

for (i = dss_tgt_nr; i < bits; i++)
clrbit(bitmap, i);

if (exclude_tgts == NULL)
goto out;

for (i = 0; i < exclude_cnt; i++) {
D_ASSERT(exclude_tgts[i] < dss_tgt_nr);
clrbit(bitmap, exclude_tgts[i]);
}

out:
if (rc == 0) {
*p_bitmap = bitmap;
*bitmap_sz = size;
} else {
D_ERROR("Failed to build bitmap for collective task: " DF_RC "\n", DP_RC(rc));
}

return rc;
}

/* ============== ULT create functions =================================== */

static inline int
Expand Down
3 changes: 2 additions & 1 deletion src/include/daos_srv/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ int ds_cont_list(uuid_t pool_uuid, struct daos_pool_cont_info **conts, uint64_t
int ds_cont_filter(uuid_t pool_uuid, daos_pool_cont_filter_t *filt,
struct daos_pool_cont_info2 **conts, uint64_t *ncont);
int ds_cont_upgrade(uuid_t pool_uuid, struct cont_svc *svc);
int ds_cont_tgt_close(uuid_t hdl_uuid);
int
ds_cont_tgt_close(uuid_t pool_uuid, uuid_t hdl_uuid);
int ds_cont_tgt_open(uuid_t pool_uuid, uuid_t cont_hdl_uuid,
uuid_t cont_uuid, uint64_t flags, uint64_t sec_capas,
uint32_t status_pm_ver);
Expand Down
19 changes: 15 additions & 4 deletions src/include/daos_srv/daos_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,11 @@ int dss_parameters_set(unsigned int key_id, uint64_t value);

enum dss_ult_flags {
/* Periodically created ULTs */
DSS_ULT_FL_PERIODIC = (1 << 0),
DSS_ULT_FL_PERIODIC = (1 << 0),
/* Use DSS_DEEP_STACK_SZ as the stack size */
DSS_ULT_DEEP_STACK = (1 << 1),
DSS_ULT_DEEP_STACK = (1 << 1),
/* Use current ULT (instead of creating new one) for the task. */
DSS_USE_CURRENT_ULT = (1 << 2),
};

int dss_ult_create(void (*func)(void *), void *arg, int xs_type, int tgt_id,
Expand Down Expand Up @@ -491,8 +493,14 @@ struct dss_coll_args {
/** Arguments for dss_collective func (Mandatory) */
void *ca_func_args;
void *ca_aggregator;
int *ca_exclude_tgts;
unsigned int ca_exclude_tgts_cnt;
/* Specify on which targets to execute the task. */
uint8_t *ca_tgt_bitmap;
/*
* The size (in byte) of ca_tgt_bitmap. It may be smaller than dss_tgt_nr if only some
* VOS targets are involved. It also may be larger than dss_tgt_nr if dss_tgt_nr is not
* 2 ^ n aligned.
*/
uint32_t ca_tgt_bitmap_sz;
/** Stream arguments for all streams */
struct dss_coll_stream_args ca_stream_args;
};
Expand All @@ -514,6 +522,9 @@ dss_thread_collective_reduce(struct dss_coll_ops *ops,
unsigned int flags);
int dss_task_collective(int (*func)(void *), void *arg, unsigned int flags);
int dss_thread_collective(int (*func)(void *), void *arg, unsigned int flags);
int
dss_build_coll_bitmap(int *exclude_tgts, uint32_t exclude_cnt, uint8_t **p_bitmap,
uint32_t *bitmap_sz);

/**
* Loaded module management metholds
Expand Down
20 changes: 19 additions & 1 deletion src/include/daos_srv/pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ int ds_pool_tgt_map_update(struct ds_pool *pool, struct pool_buf *buf,
unsigned int map_version);

int ds_pool_start(uuid_t uuid);
void ds_pool_stop(uuid_t uuid);
int
ds_pool_stop(uuid_t uuid);
int ds_pool_extend(uuid_t pool_uuid, int ntargets, const d_rank_list_t *rank_list, int ndomains,
const uint32_t *domains, d_rank_list_t *svc_ranks);
int ds_pool_target_update_state(uuid_t pool_uuid, d_rank_list_t *ranks,
Expand Down Expand Up @@ -320,6 +321,23 @@ int ds_pool_tgt_discard(uuid_t pool_uuid, uint64_t epoch);
int
ds_pool_mark_upgrade_completed(uuid_t pool_uuid, int ret);

struct dss_coll_args;
struct dss_coll_ops;

int
ds_pool_thread_collective_reduce(uuid_t pool_uuid, uint32_t ex_status,
struct dss_coll_ops *coll_ops, struct dss_coll_args *coll_args,
uint32_t flags);
int
ds_pool_task_collective_reduce(uuid_t pool_uuid, uint32_t ex_status, struct dss_coll_ops *coll_ops,
struct dss_coll_args *coll_args, uint32_t flags);
int
ds_pool_thread_collective(uuid_t pool_uuid, uint32_t ex_status, int (*coll_func)(void *), void *arg,
uint32_t flags);
int
ds_pool_task_collective(uuid_t pool_uuid, uint32_t ex_status, int (*coll_func)(void *), void *arg,
uint32_t flags);

/**
* Verify if pool status satisfy Redundancy Factor requirement, by checking
* pool map device status.
Expand Down
Loading
Loading