Skip to content

Commit

Permalink
DAOS-15874 control: Add optional credential cache to agent
Browse files Browse the repository at this point in the history
On heavily-loaded client nodes where many processes are being
launched by the same user or users, the admin may optionally
enable the credential cache in the agent in order to lower
agent overhead caused by generating identical credentials
for each process owned by a user. The agent-generated
credential is presented by the client process during pool/container
connection and is used to evaluate ACL permissions for
that connection.

Example config:
credential_config:
  cache_lifetime: 1m

Features: control
Required-githooks: true
Change-Id: I6ae2a8be1dd97ef14e0ccef0283d65bc1fabc4ed
Signed-off-by: Michael MacDonald <mjmac@google.com>
  • Loading branch information
mjmac committed May 22, 2024
1 parent b2edc29 commit 944c359
Show file tree
Hide file tree
Showing 13 changed files with 424 additions and 105 deletions.
2 changes: 2 additions & 0 deletions src/control/cmd/daos_agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ disable_caching: true
cache_expiration: 30
disable_auto_evict: true
credential_config:
cache_lifetime: 10m
client_user_map:
1000:
user: frodo
Expand Down Expand Up @@ -140,6 +141,7 @@ transport_config:
CacheExpiration: refreshMinutes(30 * time.Minute),
DisableAutoEvict: true,
CredentialConfig: &security.CredentialConfig{
CacheLifetime: time.Minute * 10,
ClientUserMap: map[uint32]*security.MappedClientUser{
1000: {
User: "frodo",
Expand Down
75 changes: 63 additions & 12 deletions src/control/cmd/daos_agent/infocache.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,22 @@ func getFabricScanFn(log logging.Logger, cfg *Config, scanner *hardware.FabricSc
}

type cacheItem struct {
sync.Mutex
sync.RWMutex
lastCached time.Time
refreshInterval time.Duration
}

// isStale returns true if the cache item is stale.
// NB: Should be run under a lock to protect lastCached.
func (ci *cacheItem) isStale() bool {
if ci.refreshInterval == 0 {
return false
}
return ci.lastCached.Add(ci.refreshInterval).Before(time.Now())
}

// isCached returns true if the cache item is cached.
// NB: Should be run under at least a read lock to protect lastCached.
func (ci *cacheItem) isCached() bool {
return !ci.lastCached.Equal(time.Time{})
}
Expand Down Expand Up @@ -130,16 +134,30 @@ func (ci *cachedAttachInfo) Key() string {
return sysAttachInfoKey(ci.system)
}

// NeedsRefresh checks whether the cached data needs to be refreshed.
func (ci *cachedAttachInfo) NeedsRefresh() bool {
// needsRefresh checks whether the cached data needs to be refreshed.
func (ci *cachedAttachInfo) needsRefresh() bool {
if ci == nil {
return false
}
return !ci.isCached() || ci.isStale()
}

// Refresh contacts the remote management server and refreshes the GetAttachInfo cache.
func (ci *cachedAttachInfo) Refresh(ctx context.Context) error {
// RefreshIfNeeded refreshes the cached data if it needs to be refreshed.
func (ci *cachedAttachInfo) RefreshIfNeeded(ctx context.Context) (func(), bool, error) {
if ci == nil {
return cache.NoopRelease, false, errors.New("cachedAttachInfo is nil")
}

ci.Lock()
if ci.needsRefresh() {
return ci.Unlock, true, ci.refresh(ctx)
}
return ci.Unlock, false, nil
}

// refresh implements the actual refresh logic.
// NB: Should be run under a lock.
func (ci *cachedAttachInfo) refresh(ctx context.Context) error {
if ci == nil {
return errors.New("cachedAttachInfo is nil")
}
Expand All @@ -155,6 +173,16 @@ func (ci *cachedAttachInfo) Refresh(ctx context.Context) error {
return nil
}

// Refresh contacts the remote management server and refreshes the GetAttachInfo cache.
func (ci *cachedAttachInfo) Refresh(ctx context.Context) (func(), error) {
if ci == nil {
return cache.NoopRelease, errors.New("cachedAttachInfo is nil")
}

ci.Lock()
return ci.Unlock, ci.refresh(ctx)
}

type cachedFabricInfo struct {
cacheItem
fetch fabricScanFn
Expand All @@ -172,17 +200,31 @@ func (cfi *cachedFabricInfo) Key() string {
return fabricKey
}

// NeedsRefresh indicates that the fabric information does not need to be refreshed unless it has
// needsRefresh indicates that the fabric information does not need to be refreshed unless it has
// never been populated.
func (cfi *cachedFabricInfo) NeedsRefresh() bool {
func (cfi *cachedFabricInfo) needsRefresh() bool {
if cfi == nil {
return false
}
return !cfi.isCached()
}

// Refresh scans the hardware for information about the fabric devices and caches the result.
func (cfi *cachedFabricInfo) Refresh(ctx context.Context) error {
// RefreshIfNeeded refreshes the cached fabric information if it needs to be refreshed.
func (cfi *cachedFabricInfo) RefreshIfNeeded(ctx context.Context) (func(), bool, error) {
if cfi == nil {
return cache.NoopRelease, false, errors.New("cachedFabricInfo is nil")
}

cfi.Lock()
if cfi.needsRefresh() {
return cfi.Unlock, true, cfi.refresh(ctx)
}
return cfi.Unlock, false, nil
}

// refresh implements the actual refresh logic.
// NB: Should be run under a lock.
func (cfi *cachedFabricInfo) refresh(ctx context.Context) error {
if cfi == nil {
return errors.New("cachedFabricInfo is nil")
}
Expand All @@ -197,6 +239,16 @@ func (cfi *cachedFabricInfo) Refresh(ctx context.Context) error {
return nil
}

// Refresh scans the hardware for information about the fabric devices and caches the result.
func (cfi *cachedFabricInfo) Refresh(ctx context.Context) (func(), error) {
if cfi == nil {
return cache.NoopRelease, errors.New("cachedFabricInfo is nil")
}

cfi.Lock()
return cfi.Unlock, cfi.refresh(ctx)
}

// InfoCache is a cache for the results of expensive operations needed by the agent.
type InfoCache struct {
log logging.Logger
Expand Down Expand Up @@ -350,15 +402,14 @@ func (c *InfoCache) GetAttachInfo(ctx context.Context, sys string) (*control.Get
}
createItem := func() (cache.Item, error) {
c.log.Debugf("cache miss for %s", sysAttachInfoKey(sys))
cai := newCachedAttachInfo(c.attachInfoRefresh, sys, c.client, c.getAttachInfo)
return cai, nil
return newCachedAttachInfo(c.attachInfoRefresh, sys, c.client, c.getAttachInfo), nil
}

item, release, err := c.cache.GetOrCreate(ctx, sysAttachInfoKey(sys), createItem)
defer release()
if err != nil {
return nil, errors.Wrap(err, "getting attach info from cache")
}
defer release()

cai, ok := item.(*cachedAttachInfo)
if !ok {
Expand Down
36 changes: 28 additions & 8 deletions src/control/cmd/daos_agent/infocache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,21 +149,31 @@ func TestAgent_cachedAttachInfo_Key(t *testing.T) {
}
}

func TestAgent_cachedAttachInfo_NeedsRefresh(t *testing.T) {
func TestAgent_cachedAttachInfo_RefreshIfNeeded(t *testing.T) {
log, buf := logging.NewTestLogger(t.Name())
defer test.ShowBufferOnFailure(t, buf)
mockClient := control.NewMockInvoker(log, &control.MockInvokerConfig{})

noopGetAttachInfo := func(_ context.Context, _ control.UnaryInvoker, _ *control.GetAttachInfoReq) (*control.GetAttachInfoResp, error) {
return nil, nil
}

for name, tc := range map[string]struct {
ai *cachedAttachInfo
expResult bool
}{
"nil": {},
"never cached": {
ai: newCachedAttachInfo(0, "test", nil, nil),
ai: newCachedAttachInfo(0, "test", mockClient, noopGetAttachInfo),
expResult: true,
},
"no refresh": {
ai: &cachedAttachInfo{
cacheItem: cacheItem{
lastCached: time.Now().Add(-time.Minute),
},
rpcClient: mockClient,
fetch: noopGetAttachInfo,
lastResponse: &control.GetAttachInfoResp{},
},
},
Expand All @@ -173,6 +183,8 @@ func TestAgent_cachedAttachInfo_NeedsRefresh(t *testing.T) {
lastCached: time.Now().Add(-time.Minute),
refreshInterval: time.Second,
},
rpcClient: mockClient,
fetch: noopGetAttachInfo,
lastResponse: &control.GetAttachInfoResp{},
},
expResult: true,
Expand All @@ -183,12 +195,15 @@ func TestAgent_cachedAttachInfo_NeedsRefresh(t *testing.T) {
lastCached: time.Now().Add(-time.Second),
refreshInterval: time.Minute,
},
rpcClient: mockClient,
fetch: noopGetAttachInfo,
lastResponse: &control.GetAttachInfoResp{},
},
},
} {
t.Run(name, func(t *testing.T) {
test.AssertEqual(t, tc.expResult, tc.ai.NeedsRefresh(), "")
_, refreshed, _ := tc.ai.RefreshIfNeeded(test.Context(t))
test.AssertEqual(t, tc.expResult, refreshed, "")
})
}
}
Expand Down Expand Up @@ -271,7 +286,8 @@ func TestAgent_cachedAttachInfo_Refresh(t *testing.T) {
}
}

err := ai.Refresh(test.Context(t))
release, err := ai.Refresh(test.Context(t))
release()

test.CmpErr(t, tc.expErr, err)

Expand Down Expand Up @@ -320,7 +336,7 @@ func TestAgent_cachedFabricInfo_Key(t *testing.T) {
}
}

func TestAgent_cachedFabricInfo_NeedsRefresh(t *testing.T) {
func TestAgent_cachedFabricInfo_RefreshIfNeeded(t *testing.T) {
for name, tc := range map[string]struct {
nilCache bool
cacheTime time.Time
Expand All @@ -342,11 +358,14 @@ func TestAgent_cachedFabricInfo_NeedsRefresh(t *testing.T) {

var cfi *cachedFabricInfo
if !tc.nilCache {
cfi = newCachedFabricInfo(log, nil)
cfi = newCachedFabricInfo(log, func(_ context.Context, _ ...string) (*NUMAFabric, error) {
return nil, nil
})
cfi.cacheItem.lastCached = tc.cacheTime
}

test.AssertEqual(t, tc.expResult, cfi.NeedsRefresh(), "")
_, refreshed, _ := cfi.RefreshIfNeeded(test.Context(t))
test.AssertEqual(t, tc.expResult, refreshed, "")
})
}
}
Expand Down Expand Up @@ -416,7 +435,8 @@ func TestAgent_cachedFabricInfo_Refresh(t *testing.T) {
}
}

err := cfi.Refresh(test.Context(t))
release, err := cfi.Refresh(test.Context(t))
release()

test.CmpErr(t, tc.expErr, err)

Expand Down
11 changes: 4 additions & 7 deletions src/control/cmd/daos_agent/mgmt_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package main

import (
"net"
"sync"

"github.com/pkg/errors"
"golang.org/x/net/context"
Expand All @@ -22,6 +21,7 @@ import (
"github.com/daos-stack/daos/src/control/drpc"
"github.com/daos-stack/daos/src/control/fault"
"github.com/daos-stack/daos/src/control/fault/code"
"github.com/daos-stack/daos/src/control/lib/atm"
"github.com/daos-stack/daos/src/control/lib/control"
"github.com/daos-stack/daos/src/control/lib/daos"
"github.com/daos-stack/daos/src/control/lib/hardware"
Expand All @@ -34,16 +34,13 @@ import (
// Management Service proxy, handling dRPCs sent by libdaos by forwarding them
// to MS.
type mgmtModule struct {
attachInfoMutex sync.RWMutex
fabricMutex sync.RWMutex

log logging.Logger
sys string
ctlInvoker control.Invoker
cache *InfoCache
monitor *procMon
cliMetricsSrc *promexp.ClientSource
useDefaultNUMA bool
useDefaultNUMA atm.Bool

numaGetter hardware.ProcessNUMAProvider
}
Expand Down Expand Up @@ -161,14 +158,14 @@ func (mod *mgmtModule) handleGetAttachInfo(ctx context.Context, reqb []byte, pid
}

func (mod *mgmtModule) getNUMANode(ctx context.Context, pid int32) (uint, error) {
if mod.useDefaultNUMA {
if mod.useDefaultNUMA.IsTrue() {
return 0, nil
}

numaNode, err := mod.numaGetter.GetNUMANodeIDForPID(ctx, pid)
if errors.Is(err, hardware.ErrNoNUMANodes) {
mod.log.Debug("system is not NUMA-aware")
mod.useDefaultNUMA = true
mod.useDefaultNUMA.SetTrue()
return 0, nil
} else if err != nil {
return 0, errors.Wrapf(err, "failed to get NUMA node ID for pid %d", pid)
Expand Down
3 changes: 2 additions & 1 deletion src/control/cmd/daos_agent/mgmt_rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/daos-stack/daos/src/control/drpc"
"github.com/daos-stack/daos/src/control/fault"
"github.com/daos-stack/daos/src/control/fault/code"
"github.com/daos-stack/daos/src/control/lib/atm"
"github.com/daos-stack/daos/src/control/lib/control"
"github.com/daos-stack/daos/src/control/lib/daos"
"github.com/daos-stack/daos/src/control/lib/hardware"
Expand Down Expand Up @@ -334,7 +335,7 @@ func TestAgent_mgmtModule_getNUMANode(t *testing.T) {

mod := &mgmtModule{
log: log,
useDefaultNUMA: tc.useDefaultNUMA,
useDefaultNUMA: atm.NewBool(tc.useDefaultNUMA),
numaGetter: tc.numaGetter,
}

Expand Down
Loading

0 comments on commit 944c359

Please sign in to comment.