Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAOS-16635 control: Pass meta_sz to pool extend+reintegrate API #15459

Merged
merged 4 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
476 changes: 243 additions & 233 deletions src/control/common/proto/mgmt/pool.pb.go

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion src/control/server/mgmt_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,8 @@ func (svc *mgmtSvc) poolCreate(parent context.Context, req *mgmtpb.PoolCreateReq
return nil, err
}

ps = system.NewPoolService(poolUUID, req.TierBytes, ranklist.RanksFromUint32(req.GetRanks()))
ps = system.NewPoolService(poolUUID, req.TierBytes, req.MemRatio,
ranklist.RanksFromUint32(req.GetRanks()))
ps.PoolLabel = poolLabel
if err := svc.sysdb.AddPoolService(ctx, ps); err != nil {
return nil, err
Expand Down Expand Up @@ -878,6 +879,7 @@ func (svc *mgmtSvc) PoolExtend(ctx context.Context, req *mgmtpb.PoolExtendReq) (
return nil, err
}
req.TierBytes = ps.Storage.PerRankTierStorage
req.MemRatio = ps.Storage.MemRatio

svc.log.Debugf("MgmtSvc.PoolExtend forwarding modified req:%+v\n", req)

Expand Down Expand Up @@ -920,6 +922,7 @@ func (svc *mgmtSvc) PoolReintegrate(ctx context.Context, req *mgmtpb.PoolReinteg
}

req.TierBytes = ps.Storage.PerRankTierStorage
req.MemRatio = ps.Storage.MemRatio

dresp, err := svc.makeLockedPoolServiceCall(ctx, drpc.MethodPoolReintegrate, req)
if err != nil {
Expand Down
290 changes: 265 additions & 25 deletions src/control/server/mgmt_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,23 @@ import (
"github.com/daos-stack/daos/src/control/system/raft"
)

var (
mockTierBytes = []uint64{uint64(1), uint64(2)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor - I don't think these need to be casted within the array

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix if I have to repush

mockSvcRanks = []uint32{0}
mockMemRatio = float32(0.5)
mockPoolService = &system.PoolService{
PoolUUID: uuid.MustParse(mockUUID),
State: system.PoolServiceStateReady,
Replicas: []ranklist.Rank{0},
Storage: &system.PoolServiceStorage{
CreationRankStr: "0",
CurrentRankStr: "0",
PerRankTierStorage: mockTierBytes,
MemRatio: mockMemRatio,
},
}
)

func getPoolLockCtx(t *testing.T, parent context.Context, sysdb poolDatabase, poolUUID uuid.UUID) (*raft.PoolLock, context.Context) {
t.Helper()

Expand Down Expand Up @@ -157,6 +174,7 @@ func TestServer_MgmtSvc_PoolCreateAlreadyExists(t *testing.T) {
Storage: &system.PoolServiceStorage{
CreationRankStr: "1",
PerRankTierStorage: []uint64{1, 2},
MemRatio: 0.5,
},
Replicas: []ranklist.Rank{1},
}); err != nil {
Expand Down Expand Up @@ -1347,49 +1365,277 @@ func TestServer_MgmtSvc_PoolExtend(t *testing.T) {
missingSB := newTestMgmtSvc(t, log)
missingSB.harness.instances[0].(*EngineInstance)._superblock = nil
notAP := newTestMgmtSvc(t, log)
scmAllocation := uint64(1)
nvmeAllocation := uint64(2)
mockRanks := []uint32{1}
mockFaultDomains := []uint32{1, 1, 1, 1}

for name, tc := range map[string]struct {
nilReq bool
getMockDrpc func(error) *mockDrpcClient
mgmtSvc *mgmtSvc
reqIn *mgmtpb.PoolExtendReq
drpcResp *mgmtpb.PoolExtendResp
expDrpcReq *mgmtpb.PoolExtendReq
expErr error
}{
"nil request": {
nilReq: true,
expErr: errors.New("nil request"),
},
"wrong system": {
reqIn: &mgmtpb.PoolExtendReq{Id: mockUUID, Sys: "bad"},
expErr: FaultWrongSystem("bad", build.DefaultSystemName),
},
"missing superblock": {
mgmtSvc: missingSB,
expErr: errors.New("not an access point"),
},
"not access point": {
mgmtSvc: notAP,
expErr: errors.New("not an access point"),
},
"dRPC send fails": {
expErr: errors.New("send failure"),
},
"garbage resp": {
getMockDrpc: func(err error) *mockDrpcClient {
// dRPC call returns junk in the message body
badBytes := makeBadBytes(42)

return getMockDrpcClientBytes(badBytes, err)
},
expErr: errors.New("unmarshal"),
},
"missing uuid": {
reqIn: &mgmtpb.PoolExtendReq{Ranks: mockRanks},
expErr: errors.New("empty pool id"),
},
"successfully extended": {
drpcResp: &mgmtpb.PoolExtendResp{
TierBytes: mockTierBytes,
},
// Expect that the last request contains updated params from ps entry.
expDrpcReq: &mgmtpb.PoolExtendReq{
Sys: build.DefaultSystemName,
SvcRanks: mockSvcRanks,
FaultDomains: mockFaultDomains,
Id: mockUUID,
Ranks: mockRanks,
TierBytes: mockTierBytes,
MemRatio: mockMemRatio,
},
},
} {
t.Run(name, func(t *testing.T) {
buf.Reset()
defer test.ShowBufferOnFailure(t, buf)

if tc.reqIn == nil && !tc.nilReq {
tc.reqIn = &mgmtpb.PoolExtendReq{Id: mockUUID, Ranks: mockRanks}
}
if tc.mgmtSvc == nil {
tc.mgmtSvc = newTestMgmtSvc(t, log)
}
addTestPoolService(t, tc.mgmtSvc.sysdb, mockPoolService)

if tc.getMockDrpc == nil {
tc.getMockDrpc = func(err error) *mockDrpcClient {
return getMockDrpcClient(tc.drpcResp, err)
}
}
mdc := tc.getMockDrpc(tc.expErr)
setupSvcDrpcClient(tc.mgmtSvc, 0, mdc)

if tc.reqIn != nil && tc.reqIn.Sys == "" {
tc.reqIn.Sys = build.DefaultSystemName
}

_, err := tc.mgmtSvc.membership.Add(system.MockMember(t, 1,
system.MemberStateJoined))
if err != nil {
t.Fatal(err)
}

gotResp, gotErr := tc.mgmtSvc.PoolExtend(test.Context(t), tc.reqIn)
test.CmpErr(t, tc.expErr, gotErr)
if tc.expErr != nil {
return
}

cmpOpts := test.DefaultCmpOpts()
if diff := cmp.Diff(tc.drpcResp, gotResp, cmpOpts...); diff != "" {
t.Fatalf("unexpected response (-want, +got)\n%s\n", diff)
}

// Check extend gets called with correct params from PS entry.
lastReq := new(mgmtpb.PoolExtendReq)
if err := proto.Unmarshal(getLastMockCall(mdc).Body, lastReq); err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(tc.expDrpcReq, lastReq, test.DefaultCmpOpts()...); diff != "" {
t.Fatalf("unexpected final dRPC request (-want, +got):\n%s\n", diff)
}
})
}
}

func TestServer_MgmtSvc_PoolReintegrate(t *testing.T) {
log, buf := logging.NewTestLogger(t.Name())
missingSB := newTestMgmtSvc(t, log)
missingSB.harness.instances[0].(*EngineInstance)._superblock = nil
notAP := newTestMgmtSvc(t, log)

for name, tc := range map[string]struct {
nilReq bool
getMockDrpc func(error) *mockDrpcClient
mgmtSvc *mgmtSvc
reqIn *mgmtpb.PoolReintegrateReq
drpcResp *mgmtpb.PoolReintegrateResp
expDrpcReq *mgmtpb.PoolReintegrateReq
expErr error
}{
"nil request": {
nilReq: true,
expErr: errors.New("nil request"),
},
"wrong system": {
reqIn: &mgmtpb.PoolReintegrateReq{Id: mockUUID, Sys: "bad"},
expErr: FaultWrongSystem("bad", build.DefaultSystemName),
},
"missing superblock": {
mgmtSvc: missingSB,
expErr: errors.New("not an access point"),
},
"not access point": {
mgmtSvc: notAP,
expErr: errors.New("not an access point"),
},
"dRPC send fails": {
expErr: errors.New("send failure"),
},
"garbage resp": {
getMockDrpc: func(err error) *mockDrpcClient {
// dRPC call returns junk in the message body
badBytes := makeBadBytes(42)

return getMockDrpcClientBytes(badBytes, err)
},
expErr: errors.New("unmarshal"),
},
"missing uuid": {
reqIn: &mgmtpb.PoolReintegrateReq{Rank: 1},
expErr: errors.New("empty pool id"),
},
"successfully extended": {
drpcResp: &mgmtpb.PoolReintegrateResp{},
// Expect that the last request contains updated params from ps entry.
expDrpcReq: &mgmtpb.PoolReintegrateReq{
Sys: build.DefaultSystemName,
SvcRanks: mockSvcRanks,
Id: mockUUID,
Rank: 1,
TierBytes: mockTierBytes,
MemRatio: mockMemRatio,
},
},
} {
t.Run(name, func(t *testing.T) {
buf.Reset()
defer test.ShowBufferOnFailure(t, buf)

if tc.reqIn == nil && !tc.nilReq {
tc.reqIn = &mgmtpb.PoolReintegrateReq{Id: mockUUID, Rank: 1}
}
if tc.mgmtSvc == nil {
tc.mgmtSvc = newTestMgmtSvc(t, log)
}
addTestPoolService(t, tc.mgmtSvc.sysdb, mockPoolService)

if tc.getMockDrpc == nil {
tc.getMockDrpc = func(err error) *mockDrpcClient {
return getMockDrpcClient(tc.drpcResp, err)
}
}
mdc := tc.getMockDrpc(tc.expErr)
setupSvcDrpcClient(tc.mgmtSvc, 0, mdc)

if tc.reqIn != nil && tc.reqIn.Sys == "" {
tc.reqIn.Sys = build.DefaultSystemName
}

_, err := tc.mgmtSvc.membership.Add(system.MockMember(t, 1,
system.MemberStateJoined))
if err != nil {
t.Fatal(err)
}

gotResp, gotErr := tc.mgmtSvc.PoolReintegrate(test.Context(t), tc.reqIn)
test.CmpErr(t, tc.expErr, gotErr)
if tc.expErr != nil {
return
}

cmpOpts := test.DefaultCmpOpts()
if diff := cmp.Diff(tc.drpcResp, gotResp, cmpOpts...); diff != "" {
t.Fatalf("unexpected response (-want, +got)\n%s\n", diff)
}

// Check extend gets called with correct params from PS entry.
lastReq := new(mgmtpb.PoolReintegrateReq)
if err := proto.Unmarshal(getLastMockCall(mdc).Body, lastReq); err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(tc.expDrpcReq, lastReq, test.DefaultCmpOpts()...); diff != "" {
t.Fatalf("unexpected final dRPC request (-want, +got):\n%s\n", diff)
}
})
}
}

func TestServer_MgmtSvc_PoolExclude(t *testing.T) {
log, buf := logging.NewTestLogger(t.Name())
missingSB := newTestMgmtSvc(t, log)
missingSB.harness.instances[0].(*EngineInstance)._superblock = nil
notAP := newTestMgmtSvc(t, log)
testPoolService := &system.PoolService{
PoolUUID: uuid.MustParse(mockUUID),
State: system.PoolServiceStateReady,
Replicas: []ranklist.Rank{0},
Storage: &system.PoolServiceStorage{
CreationRankStr: "0",
CurrentRankStr: "0",
PerRankTierStorage: []uint64{scmAllocation, nvmeAllocation},
},
}

for name, tc := range map[string]struct {
mgmtSvc *mgmtSvc
setupMockDrpc func(_ *mgmtSvc, _ error)
req *mgmtpb.PoolExtendReq
expResp *mgmtpb.PoolExtendResp
req *mgmtpb.PoolExcludeReq
expResp *mgmtpb.PoolExcludeResp
expErr error
}{
"nil request": {
expErr: errors.New("nil request"),
},
"wrong system": {
req: &mgmtpb.PoolExtendReq{Id: mockUUID, Sys: "bad"},
req: &mgmtpb.PoolExcludeReq{Id: mockUUID, Sys: "bad"},
expErr: FaultWrongSystem("bad", build.DefaultSystemName),
},
"missing superblock": {
mgmtSvc: missingSB,
req: &mgmtpb.PoolExtendReq{Id: mockUUID, Ranks: []uint32{1}},
req: &mgmtpb.PoolExcludeReq{Id: mockUUID, Rank: 2, TargetIdx: []uint32{1, 2}},
expErr: errors.New("not an access point"),
},
"not access point": {
mgmtSvc: notAP,
req: &mgmtpb.PoolExtendReq{Id: mockUUID, Ranks: []uint32{1}},
req: &mgmtpb.PoolExcludeReq{Id: mockUUID, Rank: 2, TargetIdx: []uint32{1, 2}},
expErr: errors.New("not an access point"),
},
"dRPC send fails": {
req: &mgmtpb.PoolExtendReq{Id: mockUUID, Ranks: []uint32{1}},
req: &mgmtpb.PoolExcludeReq{Id: mockUUID, Rank: 2, TargetIdx: []uint32{1, 2}},
expErr: errors.New("send failure"),
},
"zero target count": {
req: &mgmtpb.PoolExcludeReq{Id: mockUUID, Rank: 2, TargetIdx: []uint32{1, 2}},
expErr: errors.New("zero target count"),
},
"garbage resp": {
req: &mgmtpb.PoolExtendReq{Id: mockUUID, Ranks: []uint32{1}},
req: &mgmtpb.PoolExcludeReq{Id: mockUUID, Rank: 2, TargetIdx: []uint32{1, 2}},
setupMockDrpc: func(svc *mgmtSvc, err error) {
// dRPC call returns junk in the message body
badBytes := makeBadBytes(42)
Expand All @@ -1399,14 +1645,12 @@ func TestServer_MgmtSvc_PoolExtend(t *testing.T) {
expErr: errors.New("unmarshal"),
},
"missing uuid": {
req: &mgmtpb.PoolExtendReq{Ranks: []uint32{1}},
req: &mgmtpb.PoolExcludeReq{Rank: 2, TargetIdx: []uint32{1, 2}},
expErr: errors.New("empty pool id"),
},
"successfully extended": {
req: &mgmtpb.PoolExtendReq{Id: mockUUID, Ranks: []uint32{1}},
expResp: &mgmtpb.PoolExtendResp{
TierBytes: []uint64{scmAllocation, nvmeAllocation},
},
"successful drained": {
req: &mgmtpb.PoolExcludeReq{Id: mockUUID, Rank: 2, TargetIdx: []uint32{1, 2}},
expResp: &mgmtpb.PoolExcludeResp{},
},
} {
t.Run(name, func(t *testing.T) {
Expand All @@ -1429,11 +1673,7 @@ func TestServer_MgmtSvc_PoolExtend(t *testing.T) {
tc.req.Sys = build.DefaultSystemName
}

if _, err := tc.mgmtSvc.membership.Add(system.MockMember(t, 1, system.MemberStateJoined)); err != nil {
t.Fatal(err)
}

gotResp, gotErr := tc.mgmtSvc.PoolExtend(test.Context(t), tc.req)
gotResp, gotErr := tc.mgmtSvc.PoolExclude(test.Context(t), tc.req)
test.CmpErr(t, tc.expErr, gotErr)
if tc.expErr != nil {
return
Expand Down
Loading
Loading