Skip to content

Commit

Permalink
Merge branch 'master' into amd/dfuse-chunk-read
Browse files Browse the repository at this point in the history
Features: dfuse,-test_dfuse_find
  • Loading branch information
ashleypittman committed Apr 30, 2024
2 parents d2b9fa9 + e7aa7a8 commit f79cb98
Show file tree
Hide file tree
Showing 24 changed files with 448 additions and 1,108 deletions.
2 changes: 2 additions & 0 deletions src/chk/chk_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,8 @@ chk_ins_init(struct chk_instance **p_ins)
out_init:
if (rc == 0)
*p_ins = ins;
else
D_FREE(ins);

return rc;
}
Expand Down
2 changes: 1 addition & 1 deletion src/chk/chk_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -2933,7 +2933,7 @@ chk_engine_pool_start(uint64_t gen, uuid_t uuid, uint32_t phase, uint32_t flags)
D_GOTO(put, rc = (rc == -DER_NONEXIST ? 1 : rc));

if (cbk->cb_phase < phase) {
cbk->cb_phase = cbk->cb_phase;
cbk->cb_phase = phase;
/* QUEST: How to estimate the left time? */
cbk->cb_time.ct_left_time = CHK__CHECK_SCAN_PHASE__CSP_DONE - cbk->cb_phase;
rc = chk_bk_update_pool(cbk, uuid_str);
Expand Down
3 changes: 1 addition & 2 deletions src/chk/chk_leader.c
Original file line number Diff line number Diff line change
Expand Up @@ -3385,8 +3385,7 @@ chk_leader_prop(chk_prop_cb_t prop_cb, void *buf)
{
struct chk_property *prop = &chk_leader->ci_prop;

return prop_cb(buf, (struct chk_policy *)prop->cp_policies,
CHK_POLICY_MAX - 1, prop->cp_flags);
return prop_cb(buf, prop->cp_policies, CHK_POLICY_MAX - 1, prop->cp_flags);
}

static int
Expand Down
31 changes: 12 additions & 19 deletions src/chk/chk_upcall.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ chk_report_upcall(uint64_t gen, uint64_t seq, uint32_t cla, uint32_t act, int re
D_ASPRINTF(report.pool_uuid, DF_UUIDF, DP_UUID(*pool));
if (report.pool_uuid == NULL)
D_GOTO(out, rc = -DER_NOMEM);
} else {
report.pool_uuid = NULL;
}

report.pool_label = pool_label;
Expand All @@ -104,8 +102,6 @@ chk_report_upcall(uint64_t gen, uint64_t seq, uint32_t cla, uint32_t act, int re
D_ASPRINTF(report.cont_uuid, DF_UUIDF, DP_UUID(*cont));
if (report.cont_uuid == NULL)
D_GOTO(out, rc = -DER_NOMEM);
} else {
report.cont_uuid = NULL;
}

report.cont_label = cont_label;
Expand All @@ -114,24 +110,18 @@ chk_report_upcall(uint64_t gen, uint64_t seq, uint32_t cla, uint32_t act, int re
D_ASPRINTF(report.objid, DF_UOID, DP_UOID(*obj));
if (report.objid == NULL)
D_GOTO(out, rc = -DER_NOMEM);
} else {
report.objid = NULL;
}

if (!daos_iov_empty(dkey)) {
D_ASPRINTF(report.dkey, DF_KEY, DP_KEY(dkey));
if (report.dkey == NULL)
D_GOTO(out, rc = -DER_NOMEM);
} else {
report.dkey = NULL;
}

if (!daos_iov_empty(akey)) {
D_ASPRINTF(report.akey, DF_KEY, DP_KEY(akey));
if (report.akey == NULL)
D_GOTO(out, rc = -DER_NOMEM);
} else {
report.akey = NULL;
}

D_ASPRINTF(report.timestamp, "%s", ctime(&tm));
Expand All @@ -150,20 +140,23 @@ chk_report_upcall(uint64_t gen, uint64_t seq, uint32_t cla, uint32_t act, int re
goto out;

report.n_act_details = rc;
} else {
report.n_act_details = 0;
report.act_details = NULL;
}

rc = ds_chk_report_upcall(&report);

out:
D_FREE(report.pool_uuid);
D_FREE(report.cont_uuid);
D_FREE(report.objid);
D_FREE(report.dkey);
D_FREE(report.akey);
D_FREE(report.timestamp);
if (report.pool_uuid != protobuf_c_empty_string)
D_FREE(report.pool_uuid);
if (report.cont_uuid != protobuf_c_empty_string)
D_FREE(report.cont_uuid);
if (report.objid != protobuf_c_empty_string)
D_FREE(report.objid);
if (report.dkey != protobuf_c_empty_string)
D_FREE(report.dkey);
if (report.akey != protobuf_c_empty_string)
D_FREE(report.akey);
if (report.timestamp != protobuf_c_empty_string)
D_FREE(report.timestamp);
chk_sg_free(report.act_details, report.n_act_details);

D_CDEBUG(rc != 0, DLOG_ERR, DLOG_INFO,
Expand Down
2 changes: 1 addition & 1 deletion src/client/dfs/duns.c
Original file line number Diff line number Diff line change
Expand Up @@ -1377,7 +1377,7 @@ duns_set_sys_name(struct duns_attr_t *attrp, const char *sys)
{
if (attrp == NULL)
return EINVAL;
D_STRNDUP(attrp->da_sys, sys, DAOS_SYS_NAME_MAX_LEN);
D_STRNDUP(attrp->da_sys, sys, DAOS_SYS_NAME_MAX);
if (attrp->da_sys == NULL)
return ENOMEM;

Expand Down
5 changes: 2 additions & 3 deletions src/client/dfuse/ops/read.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ dfuse_readahead_reply(fuse_req_t req, size_t len, off_t position, struct dfuse_o
position + reply_len - 1, position + reply_len, position + len - 1);
}

DFUSE_IE_STAT_ADD(oh->doh_ie, DS_PRE_READ);
DFUSE_REPLY_BUFQ(oh, req, oh->doh_readahead->dra_ev->de_iov.iov_buf + position, reply_len);
return true;
}
Expand Down Expand Up @@ -470,10 +471,8 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct
replied = dfuse_readahead_reply(req, len, position, oh);
D_MUTEX_UNLOCK(&oh->doh_readahead->dra_lock);

if (replied) {
DFUSE_IE_STAT_ADD(oh->doh_ie, DS_PRE_READ);
if (replied)
return;
}
}

if (chunk_read(req, len, position, oh))
Expand Down
22 changes: 21 additions & 1 deletion src/control/server/mgmt_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,13 @@ func (svc *mgmtSvc) SystemCheckQuery(ctx context.Context, req *mgmtpb.CheckQuery
req.Shallow = true
}

uuids := common.NewStringSet(req.Uuids...)
wantUUID := func(uuid string) bool {
return len(uuids) == 0 || uuids.Has(uuid)
}

reports := []*chkpb.CheckReport{}

if !req.Shallow {
dResp, err := svc.makePoolCheckerCall(ctx, drpc.MethodCheckerQuery, req)
if err != nil {
Expand All @@ -340,16 +347,29 @@ func (svc *mgmtSvc) SystemCheckQuery(ctx context.Context, req *mgmtpb.CheckQuery
if err = proto.Unmarshal(dResp.Body, resp); err != nil {
return nil, errors.Wrap(err, "unmarshal CheckQuery response")
}

for _, r := range resp.Reports {
if wantUUID(r.PoolUuid) {
reports = append(reports, r)
}
}
}

// Collect saved older reports
cfList, err := svc.sysdb.GetCheckerFindings(req.GetSeqs()...)
if err != nil {
return nil, err
}

for _, f := range cfList {
resp.Reports = append(resp.Reports, &f.CheckReport)
if wantUUID(f.PoolUuid) {
reports = append(reports, &f.CheckReport)
}
}
sort.Slice(reports, func(i, j int) bool {
return reports[i].Seq < reports[j].Seq
})
resp.Reports = reports

return resp, nil
}
Expand Down
208 changes: 208 additions & 0 deletions src/control/server/mgmt_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,3 +641,211 @@ func TestServer_mgmtSvc_SystemCheckSetPolicy(t *testing.T) {
})
}
}

func TestServer_mgmtSvc_SystemCheckQuery(t *testing.T) {
uuids := testPoolUUIDs(3)
testFindingsMS := []*chkpb.CheckReport{}
testFindingsDrpc := []*chkpb.CheckReport{}
drpcPools := []*mgmtpb.CheckQueryPool{}
for i, uuid := range uuids {
testFindingsMS = append(testFindingsMS, &chkpb.CheckReport{
Seq: uint64(i + 1),
Class: chkpb.CheckInconsistClass_CIC_CONT_BAD_LABEL,
Action: chkpb.CheckInconsistAction_CIA_TRUST_MS,
PoolUuid: uuid,
})

testFindingsDrpc = append(testFindingsDrpc, &chkpb.CheckReport{
Seq: uint64(i + 1 + len(uuids)),
Class: chkpb.CheckInconsistClass_CIC_POOL_NONEXIST_ON_ENGINE,
Action: chkpb.CheckInconsistAction_CIA_TRUST_MS,
PoolUuid: uuid,
})

drpcPools = append(drpcPools, &mgmtpb.CheckQueryPool{
Uuid: uuid,
Status: chkpb.CheckPoolStatus(i),
Phase: chkpb.CheckScanPhase(i),
})
}

drpcResp := &mgmtpb.CheckQueryResp{
InsStatus: chkpb.CheckInstStatus_CIS_RUNNING,
InsPhase: chkpb.CheckScanPhase_CSP_AGGREGATION,
Pools: drpcPools,
Reports: testFindingsDrpc,
}

for name, tc := range map[string]struct {
createMS func(*testing.T, logging.Logger) *mgmtSvc
setupDrpc func(*testing.T, *mgmtSvc)
req *mgmtpb.CheckQueryReq
expResp *mgmtpb.CheckQueryResp
expErr error
}{
"not MS replica": {
createMS: func(t *testing.T, log logging.Logger) *mgmtSvc {
svc := newTestMgmtSvc(t, log)
svc.sysdb = raft.MockDatabaseWithCfg(t, log, &raft.DatabaseConfig{
SystemName: build.DefaultSystemName,
Replicas: []*net.TCPAddr{{IP: net.IP{111, 222, 1, 1}}},
})
return svc
},
req: &mgmtpb.CheckQueryReq{
Sys: "daos_server",
},
expErr: errors.New("replica"),
},
"checker is not enabled": {
createMS: func(t *testing.T, log logging.Logger) *mgmtSvc {
return testSvcWithMemberState(t, log, system.MemberStateCheckerStarted, uuids)
},
req: &mgmtpb.CheckQueryReq{
Sys: "daos_server",
},
expErr: checker.FaultCheckerNotEnabled,
},
"bad member states": {
createMS: func(t *testing.T, log logging.Logger) *mgmtSvc {
return testSvcCheckerEnabled(t, log, system.MemberStateJoined, uuids)
},
req: &mgmtpb.CheckQueryReq{
Sys: "daos_server",
},
expErr: errors.New("expected states"),
},
"dRPC fails": {
setupDrpc: func(t *testing.T, ms *mgmtSvc) {
setupMockDrpcClient(ms, nil, errors.New("mock dRPC"))
},
req: &mgmtpb.CheckQueryReq{
Sys: "daos_server",
},
expErr: errors.New("mock dRPC"),
},
"bad resp": {
setupDrpc: func(t *testing.T, ms *mgmtSvc) {
setupMockDrpcClientBytes(ms, []byte("garbage"), nil)
},
req: &mgmtpb.CheckQueryReq{
Sys: "daos_server",
},
expErr: errors.New("unmarshal CheckQuery response"),
},
"success": {
req: &mgmtpb.CheckQueryReq{
Sys: "daos_server",
},
expResp: &mgmtpb.CheckQueryResp{
InsStatus: chkpb.CheckInstStatus_CIS_RUNNING,
InsPhase: chkpb.CheckScanPhase_CSP_AGGREGATION,
Pools: drpcPools,
Reports: append(testFindingsMS, testFindingsDrpc...),
},
},
"shallow": {
req: &mgmtpb.CheckQueryReq{
Sys: "daos_server",
Shallow: true,
},
setupDrpc: func(t *testing.T, ms *mgmtSvc) {
setupMockDrpcClient(ms, nil, errors.New("shouldn't call dRPC"))
},
expResp: &mgmtpb.CheckQueryResp{
Reports: testFindingsMS,
},
},
"request sequence numbers": {
req: &mgmtpb.CheckQueryReq{
Sys: "daos_server",
Seqs: []uint64{2, 3},
},
setupDrpc: func(t *testing.T, ms *mgmtSvc) {
setupMockDrpcClient(ms, nil, errors.New("shouldn't call dRPC"))
},
expResp: &mgmtpb.CheckQueryResp{
Reports: []*chkpb.CheckReport{
testFindingsMS[1],
testFindingsMS[2],
},
},
},
"request invalid sequence number": {
req: &mgmtpb.CheckQueryReq{
Sys: "daos_server",
Seqs: []uint64{2, 3, 25},
},
setupDrpc: func(t *testing.T, ms *mgmtSvc) {
setupMockDrpcClient(ms, nil, errors.New("shouldn't call dRPC"))
},
expErr: errors.New("not found"),
},
"request all uuids": {
req: &mgmtpb.CheckQueryReq{
Sys: "daos_server",
Uuids: uuids,
},
expResp: &mgmtpb.CheckQueryResp{
InsStatus: chkpb.CheckInstStatus_CIS_RUNNING,
InsPhase: chkpb.CheckScanPhase_CSP_AGGREGATION,
Pools: drpcPools,
Reports: append(testFindingsMS, testFindingsDrpc...),
},
},
"filter uuids": {
req: &mgmtpb.CheckQueryReq{
Sys: "daos_server",
Uuids: []string{uuids[0], uuids[2]},
},
expResp: &mgmtpb.CheckQueryResp{
InsStatus: chkpb.CheckInstStatus_CIS_RUNNING,
InsPhase: chkpb.CheckScanPhase_CSP_AGGREGATION,
Pools: drpcPools,
Reports: []*chkpb.CheckReport{
testFindingsMS[0],
testFindingsMS[2],
testFindingsDrpc[0],
testFindingsDrpc[2],
},
},
},
} {
t.Run(name, func(t *testing.T) {
log, buf := logging.NewTestLogger(t.Name())
defer test.ShowBufferOnFailure(t, buf)

if tc.createMS == nil {
tc.createMS = func(t *testing.T, log logging.Logger) *mgmtSvc {
svc := testSvcCheckerEnabled(t, log, system.MemberStateCheckerStarted, uuids)
for _, f := range testFindingsMS {
if err := svc.sysdb.AddCheckerFinding(&checker.Finding{CheckReport: *f}); err != nil {
t.Fatalf("unable to add finding %+v: %s", f, err.Error())
}
}
return svc
}
}
svc := tc.createMS(t, log)

if tc.setupDrpc == nil {
tc.setupDrpc = func(t *testing.T, ms *mgmtSvc) {
setupMockDrpcClient(ms, drpcResp, nil)
}
}
tc.setupDrpc(t, svc)

resp, err := svc.SystemCheckQuery(test.Context(t), tc.req)

test.CmpErr(t, tc.expErr, err)
if diff := cmp.Diff(tc.expResp, resp,
cmpopts.IgnoreUnexported(
mgmtpb.CheckQueryResp{},
mgmtpb.CheckQueryPool{},
chkpb.CheckReport{}),
); diff != "" {
t.Fatalf("want-, got+:\n%s", diff)
}
})
}
}
2 changes: 0 additions & 2 deletions src/include/daos_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,6 @@ struct daos_pool_cont_info2 {
void *pci_reserved[2];
};

#define DAOS_SYS_NAME_MAX_LEN 127

/**
* Connect to the DAOS pool identified by \a pool, a label or UUID string.
* Upon a successful completion, \a poh returns the pool handle, and \a info
Expand Down
Loading

0 comments on commit f79cb98

Please sign in to comment.